@ -20,7 +20,7 @@ extern "C" {
namespace oxenmq {
void OxenMQ : : proxy_quit ( ) {
L MQ_LOG( debug , " Received quit command, shutting down proxy thread " ) ;
O MQ_LOG( debug , " Received quit command, shutting down proxy thread " ) ;
assert ( std : : none_of ( workers . begin ( ) , workers . end ( ) , [ ] ( auto & worker ) { return worker . worker_thread . joinable ( ) ; } ) ) ;
assert ( std : : none_of ( tagged_workers . begin ( ) , tagged_workers . end ( ) , [ ] ( auto & worker ) { return std : : get < 0 > ( worker ) . worker_thread . joinable ( ) ; } ) ) ;
@ -38,7 +38,7 @@ void OxenMQ::proxy_quit() {
connections . clear ( ) ;
peers . clear ( ) ;
L MQ_LOG( debug , " Proxy thread teardown complete " ) ;
O MQ_LOG( debug , " Proxy thread teardown complete " ) ;
}
void OxenMQ : : proxy_send ( bt_dict_consumer data ) {
@ -120,10 +120,10 @@ void OxenMQ::proxy_send(bt_dict_consumer data) {
if ( ! sock_route . first ) {
nowarn = true ;
if ( optional )
L MQ_LOG( debug , " Not sending: send is optional and no connection to " ,
O MQ_LOG( debug , " Not sending: send is optional and no connection to " ,
to_hex ( conn_id . pk ) , " is currently established " ) ;
else
L MQ_LOG( error , " Unable to send to " , to_hex ( conn_id . pk ) , " : no valid connection address found " ) ;
O MQ_LOG( error , " Unable to send to " , to_hex ( conn_id . pk ) , " : no valid connection address found " ) ;
break ;
}
send_to = sock_route . first ;
@ -131,20 +131,20 @@ void OxenMQ::proxy_send(bt_dict_consumer data) {
} else if ( ! conn_id . route . empty ( ) ) { // incoming non-SN connection
auto it = connections . find ( conn_id . id ) ;
if ( it = = connections . end ( ) ) {
L MQ_LOG( warn , " Unable to send to " , conn_id , " : incoming listening socket not found " ) ;
O MQ_LOG( warn , " Unable to send to " , conn_id , " : incoming listening socket not found " ) ;
break ;
}
send_to = & it - > second ;
} else {
auto pr = peers . equal_range ( conn_id ) ;
if ( pr . first = = peers . end ( ) ) {
L MQ_LOG( warn , " Unable to send: connection id " , conn_id , " is not (or is no longer) a valid outgoing connection " ) ;
O MQ_LOG( warn , " Unable to send: connection id " , conn_id , " is not (or is no longer) a valid outgoing connection " ) ;
break ;
}
auto & peer = pr . first - > second ;
auto it = connections . find ( peer . conn_id ) ;
if ( it = = connections . end ( ) ) {
L MQ_LOG( warn , " Unable to send: peer connection id " , conn_id , " is not (or is no longer) a valid outgoing connection " ) ;
O MQ_LOG( warn , " Unable to send: peer connection id " , conn_id , " is not (or is no longer) a valid outgoing connection " ) ;
break ;
}
send_to = & it - > second ;
@ -155,7 +155,7 @@ void OxenMQ::proxy_send(bt_dict_consumer data) {
} catch ( const zmq : : error_t & e ) {
if ( e . num ( ) = = EHOSTUNREACH & & ! conn_id . route . empty ( ) /*= incoming conn*/ ) {
L MQ_LOG( debug , " Incoming connection is no longer valid; removing peer details " ) ;
O MQ_LOG( debug , " Incoming connection is no longer valid; removing peer details " ) ;
auto pr = peers . equal_range ( conn_id ) ;
if ( pr . first ! = peers . end ( ) ) {
@ -174,7 +174,7 @@ void OxenMQ::proxy_send(bt_dict_consumer data) {
// The incoming connection to the SN is no longer good, but we can retry because
// we may have another active connection with the SN (or may want to open one).
if ( removed ) {
L MQ_LOG( debug , " Retrying sending to SN " , to_hex ( conn_id . pk ) , " using other sockets " ) ;
O MQ_LOG( debug , " Retrying sending to SN " , to_hex ( conn_id . pk ) , " using other sockets " ) ;
retry = true ;
}
}
@ -182,10 +182,10 @@ void OxenMQ::proxy_send(bt_dict_consumer data) {
}
if ( ! retry ) {
if ( ! conn_id . sn ( ) & & ! conn_id . route . empty ( ) ) { // incoming non-SN connection
L MQ_LOG( debug , " Unable to send message to incoming connection " , conn_id , " : " , e . what ( ) ,
O MQ_LOG( debug , " Unable to send message to incoming connection " , conn_id , " : " , e . what ( ) ,
" ; remote has probably disconnected " ) ;
} else {
L MQ_LOG( warn , " Unable to send message to " , conn_id , " : " , e . what ( ) ) ;
O MQ_LOG( warn , " Unable to send message to " , conn_id , " : " , e . what ( ) ) ;
}
nowarn = true ;
if ( callback_nosend ) {
@ -197,11 +197,11 @@ void OxenMQ::proxy_send(bt_dict_consumer data) {
}
if ( request ) {
if ( sent ) {
L MQ_LOG( debug , " Added new pending request " , to_hex ( request_tag ) ) ;
O MQ_LOG( debug , " Added new pending request " , to_hex ( request_tag ) ) ;
pending_requests . insert ( { request_tag , {
std : : chrono : : steady_clock : : now ( ) + request_timeout , std : : move ( request_callback ) } } ) ;
} else {
L MQ_LOG( debug , " Could not send request, scheduling request callback failure " ) ;
O MQ_LOG( debug , " Could not send request, scheduling request callback failure " ) ;
job ( [ callback = std : : move ( request_callback ) ] { callback ( false , { { " TIMEOUT " s } } ) ; } ) ;
}
}
@ -211,7 +211,7 @@ void OxenMQ::proxy_send(bt_dict_consumer data) {
else if ( callback_noqueue )
job ( std : : move ( callback_noqueue ) ) ;
else if ( ! nowarn )
L MQ_LOG( warn , " Unable to send message to " , conn_id , " : sending would block " ) ;
O MQ_LOG( warn , " Unable to send message to " , conn_id , " : sending would block " ) ;
}
}
@ -238,7 +238,7 @@ void OxenMQ::proxy_reply(bt_dict_consumer data) {
auto pr = peers . equal_range ( conn_id ) ;
if ( pr . first = = pr . second ) {
L MQ_LOG( warn , " Unable to send tagged reply: the connection is no longer valid " ) ;
O MQ_LOG( warn , " Unable to send tagged reply: the connection is no longer valid " ) ;
return ;
}
@ -251,18 +251,18 @@ void OxenMQ::proxy_reply(bt_dict_consumer data) {
} catch ( const zmq : : error_t & err ) {
if ( err . num ( ) = = EHOSTUNREACH ) {
if ( it - > second . outgoing ( ) ) {
L MQ_LOG( debug , " Unable to send reply to non-SN request on outgoing socket: "
O MQ_LOG( debug , " Unable to send reply to non-SN request on outgoing socket: "
" remote is no longer connected; closing connection " ) ;
proxy_close_connection ( it - > second . conn_id , CLOSE_LINGER ) ;
it = peers . erase ( it ) ;
+ + it ;
} else {
L MQ_LOG( debug , " Unable to send reply to non-SN request on incoming socket: "
O MQ_LOG( debug , " Unable to send reply to non-SN request on incoming socket: "
" remote is no longer connected; removing peer details " ) ;
it = peers . erase ( it ) ;
}
} else {
L MQ_LOG( warn , " Unable to send reply to incoming non-SN request: " , err . what ( ) ) ;
O MQ_LOG( warn , " Unable to send reply to incoming non-SN request: " , err . what ( ) ) ;
+ + it ;
}
}
@ -275,22 +275,22 @@ void OxenMQ::proxy_control_message(std::vector<zmq::message_t>& parts) {
if ( parts . size ( ) < 2 )
throw std : : logic_error ( " OxenMQ bug: Expected 2-3 message parts for a proxy control message " ) ;
auto route = view ( parts [ 0 ] ) , cmd = view ( parts [ 1 ] ) ;
L MQ_TRACE( " control message: " , cmd ) ;
O MQ_TRACE( " control message: " , cmd ) ;
if ( parts . size ( ) = = 3 ) {
L MQ_TRACE( " ...: " , parts [ 2 ] ) ;
O MQ_TRACE( " ...: " , parts [ 2 ] ) ;
auto data = view ( parts [ 2 ] ) ;
if ( cmd = = " SEND " ) {
L MQ_TRACE( " proxying message " ) ;
O MQ_TRACE( " proxying message " ) ;
return proxy_send ( data ) ;
} else if ( cmd = = " REPLY " ) {
L MQ_TRACE( " proxying reply to non-SN incoming message " ) ;
O MQ_TRACE( " proxying reply to non-SN incoming message " ) ;
return proxy_reply ( data ) ;
} else if ( cmd = = " BATCH " ) {
L MQ_TRACE( " proxy batch jobs " ) ;
O MQ_TRACE( " proxy batch jobs " ) ;
auto ptrval = bt_deserialize < uintptr_t > ( data ) ;
return proxy_batch ( reinterpret_cast < detail : : Batch * > ( ptrval ) ) ;
} else if ( cmd = = " INJECT " ) {
L MQ_TRACE( " proxy inject " ) ;
O MQ_TRACE( " proxy inject " ) ;
return proxy_inject_task ( detail : : deserialize_object < injected_task > ( bt_deserialize < uintptr_t > ( data ) ) ) ;
} else if ( cmd = = " SET_SNS " ) {
return proxy_set_active_sns ( data ) ;
@ -351,11 +351,11 @@ bool OxenMQ::proxy_bind(bind_data& b, size_t bind_index) {
b . on_bind = nullptr ;
}
if ( ! good ) {
L MQ_LOG( warn , " OxenMQ failed to listen on " , b . address ) ;
O MQ_LOG( warn , " OxenMQ failed to listen on " , b . address ) ;
return false ;
}
L MQ_LOG( info , " OxenMQ listening on " , b . address ) ;
O MQ_LOG( info , " OxenMQ listening on " , b . address ) ;
b . conn_id = next_conn_id + + ;
connections . emplace_hint ( connections . end ( ) , b . conn_id , std : : move ( listener ) ) ;
@ -368,11 +368,11 @@ bool OxenMQ::proxy_bind(bind_data& b, size_t bind_index) {
void OxenMQ : : proxy_loop ( ) {
# if defined(__linux__) || defined(__sun) || defined(__MINGW32__)
pthread_setname_np ( pthread_self ( ) , " l mq-proxy" ) ;
pthread_setname_np ( pthread_self ( ) , " o mq-proxy" ) ;
# elif defined(__FreeBSD__) || defined(__NetBSD__) || defined(__OpenBSD__)
pthread_set_name_np ( pthread_self ( ) , " l mq-proxy" ) ;
pthread_set_name_np ( pthread_self ( ) , " o mq-proxy" ) ;
# elif defined(__MACH__)
pthread_setname_np ( " l mq-proxy" ) ;
pthread_setname_np ( " o mq-proxy" ) ;
# endif
zap_auth . set ( zmq : : sockopt : : linger , 0 ) ;
@ -393,12 +393,12 @@ void OxenMQ::proxy_loop() {
}
if ( log_level ( ) > = LogLevel : : debug ) {
L MQ_LOG( debug , " Reserving space for " , max_workers , " max workers = " , general_workers , " general plus reservations for: " ) ;
O MQ_LOG( debug , " Reserving space for " , max_workers , " max workers = " , general_workers , " general plus reservations for: " ) ;
for ( const auto & cat : categories )
L MQ_LOG( debug , " - " , cat . first , " : " , cat . second . reserved_threads ) ;
L MQ_LOG( debug , " - (batch jobs): " , batch_jobs_reserved ) ;
L MQ_LOG( debug , " - (reply jobs): " , reply_jobs_reserved ) ;
L MQ_LOG( debug , " Plus " , tagged_workers . size ( ) , " tagged worker threads " ) ;
O MQ_LOG( debug , " - " , cat . first , " : " , cat . second . reserved_threads ) ;
O MQ_LOG( debug , " - (batch jobs): " , batch_jobs_reserved ) ;
O MQ_LOG( debug , " - (reply jobs): " , reply_jobs_reserved ) ;
O MQ_LOG( debug , " Plus " , tagged_workers . size ( ) , " tagged worker threads " ) ;
}
workers . reserve ( max_workers ) ;
@ -420,7 +420,7 @@ void OxenMQ::proxy_loop() {
for ( size_t i = 0 ; i < bind . size ( ) ; i + + ) {
if ( ! proxy_bind ( bind [ i ] , i ) ) {
L MQ_LOG( warn , " OxenMQ failed to listen on " , bind [ i ] . address ) ;
O MQ_LOG( warn , " OxenMQ failed to listen on " , bind [ i ] . address ) ;
throw zmq : : error_t { } ;
}
}
@ -467,25 +467,25 @@ void OxenMQ::proxy_loop() {
// and send them back a "START" to let them know to go ahead with startup. We need this
// synchronization dance to guarantee that the workers are routable before we can proceed.
if ( ! tagged_workers . empty ( ) ) {
L MQ_LOG( debug , " Waiting for tagged workers " ) ;
O MQ_LOG( debug , " Waiting for tagged workers " ) ;
std : : unordered_set < std : : string_view > waiting_on ;
for ( auto & w : tagged_workers )
waiting_on . emplace ( std : : get < run_info > ( w ) . worker_routing_id ) ;
for ( ; ! waiting_on . empty ( ) ; parts . clear ( ) ) {
recv_message_parts ( workers_socket , parts ) ;
if ( parts . size ( ) ! = 2 | | view ( parts [ 1 ] ) ! = " STARTING " sv ) {
L MQ_LOG( error , " Received invalid message on worker socket while waiting for tagged thread startup " ) ;
O MQ_LOG( error , " Received invalid message on worker socket while waiting for tagged thread startup " ) ;
continue ;
}
L MQ_LOG( debug , " Received STARTING message from " , view ( parts [ 0 ] ) ) ;
O MQ_LOG( debug , " Received STARTING message from " , view ( parts [ 0 ] ) ) ;
if ( auto it = waiting_on . find ( view ( parts [ 0 ] ) ) ; it ! = waiting_on . end ( ) )
waiting_on . erase ( it ) ;
else
L MQ_LOG( error , " Received STARTING message from unknown worker " , view ( parts [ 0 ] ) ) ;
O MQ_LOG( error , " Received STARTING message from unknown worker " , view ( parts [ 0 ] ) ) ;
}
for ( auto & w : tagged_workers ) {
L MQ_LOG( debug , " Telling tagged thread worker " , std : : get < run_info > ( w ) . worker_routing_id , " to finish startup " ) ;
O MQ_LOG( debug , " Telling tagged thread worker " , std : : get < run_info > ( w ) . worker_routing_id , " to finish startup " ) ;
route_control ( workers_socket , std : : get < run_info > ( w ) . worker_routing_id , " START " ) ;
}
}
@ -509,7 +509,7 @@ void OxenMQ::proxy_loop() {
if ( proxy_skip_one_poll )
proxy_skip_one_poll = false ;
else {
L MQ_TRACE( " polling for new messages " ) ;
O MQ_TRACE( " polling for new messages " ) ;
// We poll the control socket and worker socket for any incoming messages. If we have
// available worker room then also poll incoming connections and outgoing connections
@ -518,30 +518,30 @@ void OxenMQ::proxy_loop() {
zmq : : poll ( pollitems . data ( ) , pollitems . size ( ) , poll_timeout ) ;
}
L MQ_TRACE( " processing control messages " ) ;
O MQ_TRACE( " processing control messages " ) ;
// Retrieve any waiting incoming control messages
for ( parts . clear ( ) ; recv_message_parts ( command , parts , zmq : : recv_flags : : dontwait ) ; parts . clear ( ) ) {
proxy_control_message ( parts ) ;
}
L MQ_TRACE( " processing worker messages " ) ;
O MQ_TRACE( " processing worker messages " ) ;
for ( parts . clear ( ) ; recv_message_parts ( workers_socket , parts , zmq : : recv_flags : : dontwait ) ; parts . clear ( ) ) {
proxy_worker_message ( parts ) ;
}
L MQ_TRACE( " processing timers " ) ;
O MQ_TRACE( " processing timers " ) ;
zmq_timers_execute ( timers . get ( ) ) ;
// Handle any zap authentication
L MQ_TRACE( " processing zap requests " ) ;
O MQ_TRACE( " processing zap requests " ) ;
process_zap_requests ( ) ;
// See if we can drain anything from the current queue before we potentially add to it
// below.
L MQ_TRACE( " processing queued jobs and messages " ) ;
O MQ_TRACE( " processing queued jobs and messages " ) ;
proxy_process_queue ( ) ;
L MQ_TRACE( " processing new incoming messages " ) ;
O MQ_TRACE( " processing new incoming messages " ) ;
// We round-robin connections when pulling off pending messages one-by-one rather than
// pulling off all messages from one connection before moving to the next; thus in cases of
@ -566,7 +566,7 @@ void OxenMQ::proxy_loop() {
+ + end % = queue . size ( ) ;
if ( parts . empty ( ) ) {
L MQ_LOG( warn , " Ignoring empty (0-part) incoming message " ) ;
O MQ_LOG( warn , " Ignoring empty (0-part) incoming message " ) ;
continue ;
}
@ -576,12 +576,12 @@ void OxenMQ::proxy_loop() {
if ( connections_updated ) {
// If connections got updated then our points are stale, to restart the proxy loop;
// if there are still messages waiting we'll end up right back here.
L MQ_TRACE( " connections became stale; short-circuiting incoming message loop " ) ;
O MQ_TRACE( " connections became stale; short-circuiting incoming message loop " ) ;
break ;
}
}
L MQ_TRACE( " done proxy loop " ) ;
O MQ_TRACE( " done proxy loop " ) ;
}
}
@ -597,7 +597,7 @@ bool OxenMQ::proxy_handle_builtin(int64_t conn_id, zmq::socket_t& sock, std::vec
std : : string_view route , cmd ;
if ( parts . size ( ) < 1 + incoming ) {
L MQ_LOG( warn , " Received empty message; ignoring " ) ;
O MQ_LOG( warn , " Received empty message; ignoring " ) ;
return true ;
}
if ( incoming ) {
@ -606,18 +606,18 @@ bool OxenMQ::proxy_handle_builtin(int64_t conn_id, zmq::socket_t& sock, std::vec
} else {
cmd = view ( parts [ 0 ] ) ;
}
L MQ_TRACE( " Checking for builtins: ' " , cmd , " ' from " , peer_address ( parts . back ( ) ) ) ;
O MQ_TRACE( " Checking for builtins: ' " , cmd , " ' from " , peer_address ( parts . back ( ) ) ) ;
if ( cmd = = " REPLY " ) {
size_t tag_pos = 1 + incoming ;
if ( parts . size ( ) < = tag_pos ) {
L MQ_LOG( warn , " Received REPLY without a reply tag; ignoring " ) ;
O MQ_LOG( warn , " Received REPLY without a reply tag; ignoring " ) ;
return true ;
}
std : : string reply_tag { view ( parts [ tag_pos ] ) } ;
auto it = pending_requests . find ( reply_tag ) ;
if ( it ! = pending_requests . end ( ) ) {
L MQ_LOG( debug , " Received REPLY for pending command " , to_hex ( reply_tag ) , " ; scheduling callback " ) ;
O MQ_LOG( debug , " Received REPLY for pending command " , to_hex ( reply_tag ) , " ; scheduling callback " ) ;
std : : vector < std : : string > data ;
data . reserve ( parts . size ( ) - ( tag_pos + 1 ) ) ;
for ( auto it = parts . begin ( ) + ( tag_pos + 1 ) ; it ! = parts . end ( ) ; + + it )
@ -627,38 +627,38 @@ bool OxenMQ::proxy_handle_builtin(int64_t conn_id, zmq::socket_t& sock, std::vec
} ) ;
pending_requests . erase ( it ) ;
} else {
L MQ_LOG( warn , " Received REPLY with unknown or already handled reply tag ( " , to_hex ( reply_tag ) , " ); ignoring " ) ;
O MQ_LOG( warn , " Received REPLY with unknown or already handled reply tag ( " , to_hex ( reply_tag ) , " ); ignoring " ) ;
}
return true ;
} else if ( cmd = = " HI " ) {
if ( ! incoming ) {
L MQ_LOG( warn , " Got invalid 'HI' message on an outgoing connection; ignoring " ) ;
O MQ_LOG( warn , " Got invalid 'HI' message on an outgoing connection; ignoring " ) ;
return true ;
}
L MQ_LOG( debug , " Incoming client from " , peer_address ( parts . back ( ) ) , " sent HI, replying with HELLO " ) ;
O MQ_LOG( debug , " Incoming client from " , peer_address ( parts . back ( ) ) , " sent HI, replying with HELLO " ) ;
try {
send_routed_message ( sock , std : : string { route } , " HELLO " ) ;
} catch ( const std : : exception & e ) { L MQ_LOG( warn , " Couldn't reply with HELLO: " , e . what ( ) ) ; }
} catch ( const std : : exception & e ) { O MQ_LOG( warn , " Couldn't reply with HELLO: " , e . what ( ) ) ; }
return true ;
} else if ( cmd = = " HELLO " ) {
if ( incoming ) {
L MQ_LOG( warn , " Got invalid 'HELLO' message on an incoming connection; ignoring " ) ;
O MQ_LOG( warn , " Got invalid 'HELLO' message on an incoming connection; ignoring " ) ;
return true ;
}
auto it = std : : find_if ( pending_connects . begin ( ) , pending_connects .