@ -5,6 +5,7 @@ extern "C" {
# include <sodium.h>
}
# include "lokimq.h"
# include "batch.h"
# include "hex.h"
namespace lokimq {
@ -141,7 +142,7 @@ std::string zmtp_metadata(string_view key, string_view value) {
}
void check_not_started ( const std : : thread & proxy_thread ) {
if ( proxy_thread . get_id ( ) ! = std : : thread : : id { } )
if ( proxy_thread . joinable ( ) )
throw std : : logic_error ( " Cannot add categories/commands/aliases after calling `start()` " ) ;
}
@ -168,14 +169,18 @@ std::string to_string(AuthLevel a) {
/// Extracts a pubkey and SN status from a zmq message properties. Throws on failure.
void extract_pubkey ( zmq : : message_t & msg , std : : string & pubkey , bool & service_node ) {
string_view pubkey_hex { msg . gets ( " User-Id " ) } ;
// The ZAP handler sets the User-Id to S: (service node) or C: (non-SN) followed by the
// pubkey.
if ( pubkey_hex . size ( ) ! = 66 | | ( pubkey_hex [ 0 ] ! = ' S ' & & pubkey_hex [ 0 ] ! = ' C ' ) | | pubkey_hex [ 1 ] ! = ' : ' )
if ( pubkey_hex . size ( ) ! = 64 )
throw std : : logic_error ( " bad user-id " ) ;
assert ( is_hex ( pubkey_hex . begin ( ) + 2 , pubkey_hex . end ( ) ) ) ;
assert ( is_hex ( pubkey_hex . begin ( ) , pubkey_hex . end ( ) ) ) ;
pubkey . reserve ( 32 ) ;
from_hex ( pubkey_hex . begin ( ) + 2 , pubkey_hex . end ( ) , std : : back_inserter ( pubkey ) ) ;
service_node = pubkey_hex [ 0 ] = = ' S ' ;
service_node = false ;
try {
string_view is_sn { msg . gets ( " X-SN " ) } ;
if ( is_sn . size ( ) = = 1 & & is_sn [ 0 ] = = ' 1 ' )
service_node = true ;
} catch ( . . . ) { /* property not set, ignore */ }
}
const char * peer_address ( zmq : : message_t & msg ) {
@ -330,11 +335,10 @@ LokiMQ::LokiMQ(
std : : vector < std : : string > bind_ ,
SNRemoteAddress lookup ,
AllowFunc allow ,
Logger logger ,
unsigned int general_workers )
Logger logger )
: object_id { next_id + + } , pubkey { std : : move ( pubkey_ ) } , privkey { std : : move ( privkey_ ) } , local_service_node { service_node } ,
bind { std : : move ( bind_ ) } , peer_lookup { std : : move ( lookup ) } , allow_connection { std : : move ( allow ) } , logger { logger } ,
poll_remote_offset { poll_internal_size + ( bind . empty ( ) ? 0 : 1 ) } , general_workers { general_workers } {
poll_remote_offset { poll_internal_size + ( bind . empty ( ) ? 0 : 1 ) } {
LMQ_LOG ( trace , " Constructing listening LokiMQ, id= " , object_id , " , this= " , this ) ;
@ -369,7 +373,7 @@ LokiMQ::LokiMQ(
}
void LokiMQ : : start ( ) {
if ( proxy_thread . get_id ( ) ! = std : : thread : : id { } )
if ( proxy_thread . joinable ( ) )
throw std : : logic_error ( " Cannot call start() multiple times! " ) ;
LMQ_LOG ( info , " Initializing LokiMQ " , bind . empty ( ) ? " remote-only " : " listener " , " with pubkey " , to_hex ( pubkey ) ) ;
@ -409,18 +413,28 @@ void LokiMQ::worker_thread(unsigned int index) {
Message message { * this } ;
std : : vector < zmq : : message_t > parts ;
run_info & run = workers [ index ] ; // This is our first job, and will be updated later with subsequent jobs
run_info & run = workers [ index ] ; // This conta in s our first job, and will be updated later with subsequent jobs
while ( true ) {
try {
message . pubkey = { run . pubkey . data ( ) , 32 } ;
message . service_node = run . service_node ;
message . data . clear ( ) ;
for ( auto & m : run . message_parts )
message . data . emplace_back ( m . data < char > ( ) , m . size ( ) ) ;
LMQ_LOG ( trace , " worker thread " , worker_id , " invoking " , run . command , " callback with " , message . data . size ( ) , " message parts " ) ;
( * run . callback ) ( message ) ;
if ( run . is_batch_job ) {
if ( run . batch_jobno > = 0 ) {
LMQ_LOG ( trace , " worker thread " , worker_id , " running batch " , run . batch , " # " , run . batch_jobno ) ;
run . batch - > run_job ( run . batch_jobno ) ;
} else if ( run . batch_jobno = = - 1 ) {
LMQ_LOG ( trace , " worker thread " , worker_id , " running batch " , run . batch , " completion " ) ;
run . batch - > job_completion ( ) ;
}
} else {
message . pubkey = { run . pubkey . data ( ) , 32 } ;
message . service_node = run . service_node ;
message . data . clear ( ) ;
for ( auto & m : run . data_parts )
message . data . emplace_back ( m . data < char > ( ) , m . size ( ) ) ;
LMQ_LOG ( trace , " worker thread " , worker_id , " invoking " , run . command , " callback with " , message . data . size ( ) , " message parts " ) ;
( * run . callback ) ( message ) ;
}
/*
* FIXME : BYE should be handled by the proxy thread , not the worker .
@ -428,7 +442,7 @@ void LokiMQ::worker_thread(unsigned int index) {
/*
if ( msg . command = = " BYE " ) {
LMQ_LOG ( info , " peer asked us to disconnect " ) ;
detail : : send_control ( get_control_socket ( ) , " DISCONNECT " , { { " pubkey " , msg . pubkey } } ) ;
detail : : send_control ( get_control_socket ( ) , " DISCONNECT " , msg . pubkey ) ;
continue ;
}
*/
@ -453,7 +467,7 @@ void LokiMQ::worker_thread(unsigned int index) {
LMQ_LOG ( warn , worker_id , " / " , object_id , " received disallowed " , cmd_type , " command " , msg . command < <
" from " < < ( msg . sn ? " non- " : " " ) < < " SN remote " < < to_hex ( msg . pubkey ) < < " ; replying with a BYE " ) ;
send ( msg . pubkey , " BYE " , send_option : : incoming { } ) ;
detail : : send_control ( get_control_socket ( ) , " DISCONNECT " , { { " pubkey " , msg . pubkey } } ) ;
detail : : send_control ( get_control_socket ( ) , " DISCONNECT " , msg . pubkey ) ;
continue ;
}
@ -478,7 +492,7 @@ void LokiMQ::worker_thread(unsigned int index) {
while ( true ) {
// Signal that we are ready for another job and wait for it. (We do this down here
// because our first job gets set up when the thread is started).
detail : : send_control ( sock , " READY " ) ;
detail : : send_control ( sock , " RAN " ) ;
LMQ_LOG ( trace , " worker " , worker_id , " waiting for requests " ) ;
parts . clear ( ) ;
recv_message_parts ( sock , std : : back_inserter ( parts ) ) ;
@ -675,16 +689,31 @@ void LokiMQ::proxy_reply(bt_dict &&data) {
}
}
void LokiMQ : : proxy_batch ( detail : : Batch * batchptr ) {
auto & batch = * batches . emplace ( batchptr ) . first ;
const int jobs = batch - > size ( ) ;
for ( int i = 0 ; i < jobs ; i + + )
batch_jobs . emplace ( batch , i ) ;
}
void LokiMQ : : proxy_control_message ( std : : vector < zmq : : message_t > parts ) {
if ( parts . size ( ) < 2 | | parts . size ( ) > 3 )
throw std : : logic_error ( " Expected 2-3 message parts for a proxy control message " ) ;
auto route = view ( parts [ 0 ] ) , cmd = view ( parts [ 1 ] ) ;
bt_dict data ;
if ( parts . size ( ) > 2 ) {
bt_deserialize ( view ( parts [ 2 ] ) , data ) ;
}
LMQ_LOG ( trace , " control message: " , cmd ) ;
if ( cmd = = " START " ) {
if ( cmd = = " SEND " ) {
LMQ_LOG ( trace , " proxying message " ) ;
proxy_send ( bt_deserialize < bt_dict > ( view ( parts . at ( 2 ) ) ) ) ;
} else if ( cmd = = " REPLY " ) {
LMQ_LOG ( trace , " proxying reply to non-SN incoming message " ) ;
proxy_reply ( bt_deserialize < bt_dict > ( view ( parts . at ( 2 ) ) ) ) ;
} else if ( cmd = = " BATCH " ) {
LMQ_LOG ( trace , " proxy batch jobs " ) ;
auto ptrval = bt_deserialize < uintptr_t > ( view ( parts . at ( 2 ) ) ) ;
proxy_batch ( reinterpret_cast < detail : : Batch * > ( ptrval ) ) ;
} else if ( cmd = = " CONNECT " ) {
proxy_connect ( bt_deserialize < bt_dict > ( view ( parts . at ( 2 ) ) ) ) ;
} else if ( cmd = = " START " ) {
// Command send by the owning thread during startup; we send back a simple READY reply to
// let it know we are running.
route_control ( command , route , " READY " ) ;
@ -696,16 +725,6 @@ void LokiMQ::proxy_control_message(std::vector<zmq::message_t> parts) {
for ( const auto & route : idle_workers )
route_control ( workers_socket , workers [ route ] . routing_id , " QUIT " ) ;
idle_workers . clear ( ) ;
} else if ( cmd = = " CONNECT " ) {
proxy_connect ( std : : move ( data ) ) ;
} else if ( cmd = = " DISCONNECT " ) {
proxy_disconnect ( data . at ( " pubkey " ) . get < std : : string > ( ) ) ;
} else if ( cmd = = " SEND " ) {
LMQ_LOG ( trace , " proxying message to " , to_hex ( data . at ( " pubkey " ) . get < std : : string > ( ) ) ) ;
proxy_send ( std : : move ( data ) ) ;
} else if ( cmd = = " REPLY " ) {
LMQ_LOG ( trace , " proxying reply to non-SN incoming message " ) ;
proxy_reply ( std : : move ( data ) ) ;
} else {
throw std : : runtime_error ( " Proxy received invalid control command: " + std : : string { cmd } +
" ( " + std : : to_string ( parts . size ( ) ) + " ) " ) ;
@ -771,12 +790,19 @@ void LokiMQ::proxy_loop() {
workers_socket . setsockopt < int > ( ZMQ_ROUTER_MANDATORY , 1 ) ;
workers_socket . bind ( SN_ADDR_WORKERS ) ;
if ( ! general_workers )
general_workers = std : : thread : : hardware_concurrency ( ) ;
if ( general_workers = = 0 )
general_workers = std : : max ( std : : thread : : hardware_concurrency ( ) , 1u ) ;
max_workers = general_workers ;
for ( const auto & cat : categories )
max_workers = general_workers + batch_jobs_reserved ;
for ( const auto & cat : categories ) {
max_workers + = cat . second . reserved_threads ;
}
if ( log_level ( ) > = LogLevel : : trace ) {
LMQ_LOG ( trace , " Reserving space for " , max_workers , " max workers = " , general_workers , " general + category reserved: " ) ;
for ( const auto & cat : categories )
LMQ_LOG ( trace , " - " , cat . first , " : " , cat . second . reserved_threads ) ;
}
workers . reserve ( max_workers ) ;
if ( ! workers . empty ( ) )
@ -817,13 +843,11 @@ void LokiMQ::proxy_loop() {
constexpr auto timeout_check_interval = 10000 ms ; // Minimum time before for checking for connections to close since the last check
auto last_conn_timeout = std : : chrono : : steady_clock : : now ( ) ;
size_t last_conn_index = 0 ; // Index of the connection we last received a message from; see below
std : : vector < zmq : : message_t > parts ;
while ( true ) {
if ( max_workers = = 0 ) { // Will be 0 only if we are quitting
if ( workers . empty ( ) ) {
if ( std : : none_of ( workers . begin ( ) , workers . end ( ) , [ ] ( auto & w ) { return w . thread . joinable ( ) ; } ) ) {
// All the workers have finished, so we can finish shutting down
return proxy_quit ( ) ;
}
@ -833,8 +857,7 @@ void LokiMQ::proxy_loop() {
// available worker room then also poll incoming connections and outgoing connections for
// messages to forward to a worker. Otherwise, we just look for a control message or a
// worker coming back with a ready message.
bool workers_available = idle_workers . size ( ) > 0 | | workers . size ( ) < max_workers ;
zmq : : poll ( pollitems . data ( ) , workers_available ? pollitems . size ( ) : poll_internal_size , poll_timeout ) ;
zmq : : poll ( pollitems . data ( ) , pollitems . size ( ) , poll_timeout ) ;
LMQ_LOG ( trace , " processing control messages " ) ;
// Retrieve any waiting incoming control messages
@ -843,81 +866,48 @@ void LokiMQ::proxy_loop() {
}
LMQ_LOG ( trace , " processing worker messages " ) ;
// Process messages sent by workers
for ( parts . clear ( ) ; recv_message_parts ( workers_socket , std : : back_inserter ( parts ) , zmq : : recv_flags : : dontwait ) ; parts . clear ( ) ) {
if ( parts . size ( ) ! = 2 ) {
LMQ_LOG ( error , " Received send invalid " , parts . size ( ) , " -part message " ) ;
continue ;
}
auto route = view ( parts [ 0 ] ) , cmd = view ( parts [ 1 ] ) ;
assert ( route . size ( ) > = 2 & & route [ 0 ] = = ' w ' & & route [ 1 ] > = ' 1 ' & & route [ 1 ] < = ' 9 ' ) ;
string_view worker_id_str { & route [ 1 ] , route . size ( ) - 1 } ; // Chop off the leading "w"
unsigned int worker_id = detail : : extract_unsigned ( worker_id_str ) ;
if ( ! worker_id_str . empty ( ) /* didn't consume everything */ | | worker_id > = workers . size ( ) ) {
LMQ_LOG ( error , " Worker id ' " , route , " ' is invalid, unable to process worker command " ) ;
continue ;
}
LMQ_LOG ( trace , " received " , cmd , " command from " , route ) ;
if ( cmd = = " READY " ) {
LMQ_LOG ( debug , " Worker " , route , " is ready " ) ;
if ( max_workers = = 0 ) { // Shutting down
LMQ_LOG ( trace , " Telling worker " , route , " to quit " ) ;
route_control ( workers_socket , route , " QUIT " ) ;
} else {
idle_workers . push_back ( worker_id ) ;
}
} else if ( cmd = = " QUITTING " ) {
workers [ worker_id ] . thread . join ( ) ;
LMQ_LOG ( debug , " Worker " , route , " exited normally " ) ;
} else {
LMQ_LOG ( error , " Worker " , route , " sent unknown control message: ` " , cmd , " ' " ) ;
}
proxy_worker_message ( parts ) ;
}
// Handle any zap authentication
LMQ_LOG ( trace , " processing zap requests " ) ;
process_zap_requests ( zap_auth ) ;
workers_available = idle_workers . size ( ) > 0 | | workers . size ( ) < max_workers ; // recheck - idle_workers could have changed above
if ( max_workers > 0 & & workers_available ) {
LMQ_LOG ( trace , " processing incoming messages " ) ;
// See if we can drain anything from the current queue before we potentially add to it
// below.
LMQ_LOG ( trace , " processing queued jobs and messages " ) ;
proxy_process_queue ( ) ;
// FIXME process any queued messages (i.e. that couldn't run but we had reserved tasks
// still free) first
LMQ_LOG ( trace , " processing new incoming messages " ) ;
// We round-robin connection queues for any pending messages (as long as we have enough
// waiting workers), but we don't want a lot of earlier connection requests to starve
// later request so each time through we continue from wherever we left off in the
// previous queue.
// We round-robin connections when pulling off pending messages one-by-one rather than
// pulling off all messages from one connection before moving to the next; thus in cases of
// contention we end up fairly distributing.
const size_t num_sockets = remotes . size ( ) + listener . connected ( ) ;
std : : queue < size_t > queue_index ;
for ( size_t i = 0 ; i < num_sockets ; i + + )
queue_index . push ( i ) ;
const size_t num_sockets = remotes . size ( ) + listener . connected ( ) ;
if ( last_conn_index > = num_sockets )
last_conn_index = 0 ;
std : : queue < size_t > queue_index ;
for ( size_t i = 1 ; i < = num_sockets ; i + + )
queue_index . push ( ( last_conn_index + i ) % num_sockets ) ;
for ( parts . clear ( ) ; ! queue_index . empty ( ) & & workers . size ( ) < max_workers ; parts . clear ( ) ) {
size_t i = queue_index . front ( ) ;
queue_index . pop ( ) ;
auto & sock = listener . connected ( ) ? ( i = = 0 ? listener : remotes [ i - 1 ] . second ) : remotes [ i ] . second ;
for ( parts . clear ( ) ; ! queue_index . empty ( ) & & workers . size ( ) < max_workers ; parts . clear ( ) ) {
size_t i = queue_index . front ( ) ;
queue_index . pop ( ) ;
auto & sock = listener . connected ( ) ? ( i = = 0 ? listener : remotes [ i - 1 ] . second ) : remotes [ i ] . second ;
if ( ! recv_message_parts ( sock , std : : back_inserter ( parts ) , zmq : : recv_flags : : dontwait ) )
continue ;
last_conn_index = i ;
queue_index . push ( i ) ; // We just read one, but there might be more messages waiting so requeue it at the end
if ( parts . empty ( ) ) {
LMQ_LOG ( warn , " Ignoring empty (0-part) incoming message " ) ;
continue ;
}
if ( ! recv_message_parts ( sock , std : : back_inserter ( parts ) , zmq : : recv_flags : : dontwait ) )
continue ;
if ( proxy_handle_builtin ( last_conn_index , parts ) ) continue ;
// We only pull this one message now but then requeue the socket so that after we check
// all other sockets we come back to this one to check again.
queue_index . push ( i ) ;
proxy_to_worker ( last_conn_index , parts ) ;
if ( parts . empty ( ) ) {
LMQ_LOG ( warn , " Ignoring empty (0-part) incoming message " ) ;
continue ;
}
if ( ! proxy_handle_builtin ( i , parts ) )
proxy_to_worker ( i , parts ) ;
}
// Drop idle connections (if we haven't done it in a while) but *only* if we have some idle
@ -936,7 +926,7 @@ void LokiMQ::proxy_loop() {
}
}
std : : pair < const LokiMQ : : category * , const LokiMQ : : CommandCallback * > LokiMQ : : get_command ( std : : string & command ) {
std : : pair < LokiMQ : : category * , const LokiMQ : : CommandCallback * > LokiMQ : : get_command ( std : : string & command ) {
if ( command . size ( ) > MAX_CATEGORY_LENGTH + 1 + MAX_COMMAND_LENGTH ) {
LMQ_LOG ( warn , " Invalid command ' " , command , " ': command too long " ) ;
return { } ;
@ -972,41 +962,179 @@ std::pair<const LokiMQ::category*, const LokiMQ::CommandCallback*> LokiMQ::get_c
return { & catit - > second , & callback_it - > second } ;
}
std : : pair < std : : string , LokiMQ : : peer_info > & LokiMQ : : proxy_lookup_peer ( zmq : : message_t & msg , ) {
void LokiMQ : : proxy_worker_message ( std : : vector < zmq : : message_t > & parts ) {
// Process messages sent by workers
if ( parts . size ( ) ! = 2 ) {
LMQ_LOG ( error , " Received send invalid " , parts . size ( ) , " -part message " ) ;
return ;
}
auto route = view ( parts [ 0 ] ) , cmd = view ( parts [ 1 ] ) ;
LMQ_LOG ( trace , " worker message from " , route ) ;
assert ( route . size ( ) > = 2 & & route [ 0 ] = = ' w ' & & route [ 1 ] > = ' 0 ' & & route [ 1 ] < = ' 9 ' ) ;
string_view worker_id_str { & route [ 1 ] , route . size ( ) - 1 } ; // Chop off the leading "w"
unsigned int worker_id = detail : : extract_unsigned ( worker_id_str ) ;
if ( ! worker_id_str . empty ( ) /* didn't consume everything */ | | worker_id > = workers . size ( ) ) {
LMQ_LOG ( error , " Worker id ' " , route , " ' is invalid, unable to process worker command " ) ;
return ;
}
auto & run = workers [ worker_id ] ;
LMQ_LOG ( trace , " received " , cmd , " command from " , route ) ;
if ( cmd = = " RAN " ) {
LMQ_LOG ( debug , " Worker " , route , " finished " , run . command ) ;
if ( run . is_batch_job ) {
assert ( batch_jobs_active > 0 ) ;
batch_jobs_active - - ;
bool clear_job = false ;
if ( run . batch_jobno = = - 1 ) {
// Returned from the completion function
clear_job = true ;
} else {
auto status = run . batch - > job_finished ( ) ;
if ( status = = detail : : BatchStatus : : complete ) {
batch_jobs . emplace ( run . batch , - 1 ) ;
} else if ( status = = detail : : BatchStatus : : done ) {
clear_job = true ;
}
}
if ( clear_job ) {
batches . erase ( run . batch ) ;
delete run . batch ;
run . batch = nullptr ;
}
} else {
assert ( run . cat - > active_threads > 0 ) ;
run . cat - > active_threads - - ;
}
if ( max_workers = = 0 ) { // Shutting down
LMQ_LOG ( trace , " Telling worker " , route , " to quit " ) ;
route_control ( workers_socket , route , " QUIT " ) ;
} else {
idle_workers . push_back ( worker_id ) ;
}
} else if ( cmd = = " QUITTING " ) {
workers [ worker_id ] . thread . join ( ) ;
LMQ_LOG ( debug , " Worker " , route , " exited normally " ) ;
} else {
LMQ_LOG ( error , " Worker " , route , " sent unknown control message: ` " , cmd , " ' " ) ;
}
}
decltype ( LokiMQ : : peers ) : : iterator LokiMQ : : proxy_lookup_peer ( zmq : : message_t & msg ) {
std : : string pubkey ;
bool service_node ;
try {
extract_pubkey ( parts . back ( ) , pubkey , service_node ) ;
extract_pubkey ( msg , pubkey , service_node ) ;
} catch ( . . . ) {
LMQ_LOG ( error , " Internal error: socket User-Id not set or invalid; dropping message " ) ;
return ;
LMQ_LOG ( error , " Internal error: message metadata not set or invalid; dropping message " ) ;
throw std : : out_of_range ( " message pubkey metadata invalid " ) ;
}
auto it = peers . find ( pubkey ) ;
if ( it = = peers . end ( ) )
it = peers . emplace ( std : : move ( pubkey ) , peer_info { } ) ;
auto & peer_info = peers [ pubkey ] ;
peer_info . service_node | = service_node ;
it = peers . emplace ( std : : move ( pubkey ) , peer_info { } ) . first ;
it - > second . service_node | = service_node ;
return it ;
}
bool LokiMQ : : proxy_handle_builtin ( size_t conn_index , std : : vector < zmq : : message_t > & parts ) {
( void ) conn_index ; // FIXME
auto cmd = view ( parts . front ( ) ) ;
if ( cmd = = " BYE " ) {
auto pit = proxy_lookup_peer ( parts . front ( ) ) ;
proxy_close_outgoing ( pit ) ;
return true ;
}
else if ( cmd = = " FORBIDDEN " | | cmd = = " NOT_A_SERVICE_NODE " ) {
return true ; // FIXME - ignore these? Log?
}
return false ;
}
void LokiMQ : : proxy_to_worker ( size_t conn_index , std : : vector < zmq : : message_t > & parts ) {
std : : string pubkey ;
bool service_node ;
try {
extract_pubkey ( parts . back ( ) , pubkey , service_node ) ;
} catch ( . . . ) {
LMQ_LOG ( error , " Internal error: socket User-Id not set or invalid; dropping message " ) ;
return ;
LokiMQ : : run_info & LokiMQ : : get_idle_worker ( ) {
if ( idle_workers . empty ( ) ) {
size_t id = workers . size ( ) ;
assert ( workers . capacity ( ) > id ) ;
workers . emplace_back ( ) ;
auto & r = workers . back ( ) ;
r . worker_id = id ;
r . routing_id = " w " + std : : to_string ( id ) ;
return r ;
}
size_t id = idle_workers . back ( ) ;
idle_workers . pop_back ( ) ;
return workers [ id ] ;
}
void LokiMQ : : set_batch_threads ( unsigned int threads ) {
if ( proxy_thread . joinable ( ) )
throw std : : logic_error ( " Cannot change reserved batch threads after calling `start()` " ) ;
batch_jobs_reserved = threads ;
}
void LokiMQ : : set_general_threads ( unsigned int threads ) {
if ( proxy_thread . joinable ( ) )
throw std : : logic_error ( " Cannot change general thread count after calling `start()` " ) ;
general_workers = threads ;
}
LokiMQ : : run_info & LokiMQ : : run_info : : operator = ( pending_command & & pending ) {
is_batch_job = false ;
cat = & pending . cat ;
command = std : : move ( pending . command ) ;
pubkey = std : : move ( pending . pubkey ) ;
service_node = pending . service_node ;
data_parts = std : : move ( pending . data_parts ) ;
callback = pending . callback ;
return * this ;
}
LokiMQ : : run_info & LokiMQ : : run_info : : operator = ( batch_job & & bj ) {
is_batch_job = true ;
batch_jobno = bj . second ;
batch = bj . first ;
return * this ;
}
void LokiMQ : : proxy_run_worker ( run_info & run ) {
if ( ! run . thread . joinable ( ) )
run . thread = std : : thread { & LokiMQ : : worker_thread , this , run . worker_id } ;
else
send_routed_message ( workers_socket , run . routing_id , " RUN " ) ;
}
void LokiMQ : : proxy_process_queue ( ) {
// First up: process any batch jobs; since these are internally they are given higher priority.
while ( ! batch_jobs . empty ( ) & &
( batch_jobs_active < batch_jobs_reserved | | workers . size ( ) - idle_workers . size ( ) < general_workers ) ) {
proxy_run_worker ( get_idle_worker ( ) = std : : move ( batch_jobs . front ( ) ) ) ;
batch_jobs . pop ( ) ;
batch_jobs_active + + ;
}
for ( auto it = pending_commands . begin ( ) ; it ! = pending_commands . end ( ) & & active_workers ( ) < max_workers ; ) {
auto & pending = * it ;
if ( pending . cat . active_threads < pending . cat . reserved_threads
| | active_workers ( ) < general_workers ) {
proxy_run_worker ( get_idle_worker ( ) = std : : move ( pending ) ) ;
pending . cat . queued - - ;
pending . cat . active_threads + + ;
assert ( pending . cat . queued > = 0 ) ;
it = pending_commands . erase ( it ) ;
} else {
+ + it ; // no available general or reserved worker spots for this job right now
}
}
}
void LokiMQ : : proxy_to_worker ( size_t conn_index , std : : vector < zmq : : message_t > & parts ) {
auto pit = proxy_lookup_peer ( parts . back ( ) ) ;
string_view pubkey = pit - > first ;
auto & peer_info = pit - > second ;
bool is_outgoing_conn = ! listener . connected ( ) | | conn_index > 0 ;
size_t command_part_index = is_outgoing_conn ? 0 : 1 ;
@ -1023,46 +1151,38 @@ void LokiMQ::proxy_to_worker(size_t conn_index, std::vector<zmq::message_t>& par
auto & category = * cat_call . first ;
auto & peer_info = peers [ pubkey ] ;
peer_info . service_node | = service_node ;
if ( ! proxy_check_auth ( pubkey , conn_index , peer_info , command , category , parts . back ( ) ) )
return ;
bool can_work = false ;
if ( category . active_threads < category . reserved_threads )
// Our category still has reserved spots so we get to run even if it would exceed
// general_workers.
can_work = true ;
else {
// We don't have a reserved spot, so the only way we get to run now is if there is an idle
// worker *and* we don't already have >= `general_workers` threads already doing things.
unsigned int working_threads = workers . size ( ) - idle_workers . size ( ) ;
if ( working_threads < general_workers )
can_work = true ;
}
// Steal any data message parts
size_t data_part_index = command_part_index + 1 ;
std : : vector < zmq : : message_t > data_parts ;
data_parts . reserve ( parts . size ( ) - data_part_index ) ;
for ( auto it = parts . begin ( ) + data_part_index ; it ! = parts . end ( ) ; + + it )
data_parts . push_back ( std : : move ( * it ) ) ;
if ( category . active_threads > = category . reserved_threads & & active_workers ( ) > = general_workers ) {
// No free reserved or general spots, try to queue it for later
if ( category . max_queue > = 0 & & category . queued > = category . max_queue ) {
LMQ_LOG ( warn , " No space to queue incoming command " , command , " ; already have " , category . queued ,
" commands queued in that category (max " , category . max_queue , " ); dropping message " ) ;
return ;
}
if ( ! can_work ) {
// We can't handle this now so queue it for later consideration when some workers free up.
LMQ_LOG ( debug , " No available free workers, queuing task for later " ) ;
// FIXME TODO
LMQ_LOG ( debug , " No available free workers, queuing " , command , " for later " ) ;
pending_commands . emplace_back ( category , std : : move ( command ) , std : : move ( data_parts ) , cat_call . second , pubkey , peer_info . service_node ) ;
category . queued + + ;
return ;
}
size_t index ;
std : : unique_ptr < run_info > new_run ;
if ( idle_workers . empty ( ) ) {
index = workers . size ( ) ;
assert ( workers . capacity ( ) > index ) ;
new_run = std : : make_unique < run_info > ( ) ;
new_run - > routing_id = " w " + std : : to_string ( index ) ;
} else {
index = idle_workers . back ( ) ;
}
auto & run = new_run ? * new_run : workers [ index ] ;
run . pubkey = std : : move ( pubkey ) ;
run . service_node = service_node ;
auto & run = get_idle_worker ( ) ;
run . is_batch_job = false ;
run . cat = & category ;
run . command = std : : move ( command ) ;
run . pubkey = pubkey ;
run . service_node = peer_info . service_node ;
run . data_parts = std : : move ( data_parts ) ;
run . callback = cat_call . second ;
if ( is_outgoing_conn ) {
peer_info . activity ( ) ; // outgoing connection activity, pump the activity timer
@ -1074,32 +1194,14 @@ void LokiMQ::proxy_to_worker(size_t conn_index, std::vector<zmq::message_t>& par
peer_info . incoming = route ;
}
LMQ_LOG ( trace , __FILE__ , __LINE__ , " Invoking incoming " , command , " from " ,
run . service_node ? " SN " : " non-SN " , to_hex ( run . pubkey ) , " @ " , peer_address ( parts . back ( ) ) ,
" on worker " , index ) ;
run . command = std : : move ( command ) ;
run . callback = cat_call . second ;
LMQ_LOG ( trace , " Forwarding incoming " , run . command , " from " , run . service_node ? " SN " : " non-SN " ,
to_hex ( run . pubkey ) , " @ " , peer_address ( parts . back ( ) ) , " to worker " , run . routing_id ) ;
// Steal any extra argument messages (after the command name)
run . message_parts . clear ( ) ;
for ( size_t i = ( command_part_index + 1 ) ; i < parts . size ( ) ; i + + )
run . message_parts . push_back ( std : : move ( parts [ i ] ) ) ;
if ( new_run ) {
// The thread processes the first job immediately upon startup, so just start it and don't
// send anything.
workers . push_back ( std : : move ( run ) ) ;
workers . back ( ) . thread = std : : thread { & LokiMQ : : worker_thread , this , index } ;
} else {
// The worker is idling, send it a RUN (prefixed with the route, for the ROUTER socket)
// to kick it into action
idle_workers . pop_back ( ) ;
send_routed_message ( workers_socket , run . routing_id , " RUN " ) ;
}
proxy_run_worker ( run ) ;
category . active_threads + + ;
}
bool LokiMQ : : proxy_check_auth ( const std : : string & pubkey , size_t conn_index , const peer_info & peer , const std : : string & command , const category & cat , zmq : : message_t & msg ) {
bool LokiMQ : : proxy_check_auth ( string_view pubkey , size_t conn_index , const peer_info & peer , const std : : string & command , const category & cat , zmq : : message_t & msg ) {
bool is_outgoing_conn = ! listener . connected ( ) | | conn_index > 0 ;
std : : string reply ;
if ( peer . auth_level < cat . access . auth ) {
@ -1138,7 +1240,7 @@ bool LokiMQ::proxy_check_auth(const std::string& pubkey, size_t conn_index, cons
void LokiMQ : : process_zap_requests ( zmq : : socket_t & zap_auth ) {
std : : vector < zmq : : message_t > frames ;
for ( frames . reserve ( 7 ) ; recv_message_parts ( zap_auth , std : : back_inserter ( frames ) , zmq : : recv_flags : : dontwait ) ; frames . clear ( ) ) {
if ( LogLevel : : trace > = log_level ( ) ) {
if ( log_level ( ) > = LogLevel : : trace ) {
std : : ostringstream o ;
o < < " Processing ZAP authentication request: " ;
for ( size_t i = 0 ; i < frames . size ( ) ; i + + ) {
@ -1255,6 +1357,13 @@ void LokiMQ::connect(const std::string &pubkey, std::chrono::milliseconds keep_a
detail : : send_control ( get_control_socket ( ) , " CONNECT " , bt_serialize < bt_dict > ( { { " pubkey " , pubkey } , { " keep-alive " , keep_alive . count ( ) } , { " hint " , hint } } ) ) ;
}
inline void LokiMQ : : job ( std : : function < void ( ) > f ) {
auto * b = new Batch < void > ;
b - > add_job ( std : : move ( f ) ) ;
auto * baseptr = static_cast < detail : : Batch * > ( b ) ;
detail : : send_control ( get_control_socket ( ) , " BATCH " , bt_serialize ( reinterpret_cast < uintptr_t > ( baseptr ) ) ) ;
}
}