Move lmq connection & FutureJSON into lmq.py

Also delays creating the lmq instance until get_connection() is called,
which seems pretty much required if using multiple threads/processes as
we need each process to have its own connection.
This commit is contained in:
Jason Rhinelander 2020-08-19 18:53:08 -03:00
parent a14d33fc6d
commit fa9a17092c
2 changed files with 73 additions and 56 deletions

60
lmq.py Normal file
View File

@ -0,0 +1,60 @@
import pylokimq
import config
import json
import sys
from datetime import datetime, timedelta
lmq, lokid = None, None
def lmq_connection():
global lmq, lokid
if lmq is None:
lmq = pylokimq.LokiMQ(pylokimq.LogLevel.warn)
lmq.max_message_size = 10*1024*1024
lmq.start()
if lokid is None:
lokid = lmq.connect_remote(config.lokid_rpc)
return (lmq, lokid)
cached = {}
cached_args = {}
cache_expiry = {}
class FutureJSON():
"""Class for making a LMQ JSON RPC request that uses a future to wait on the result, and caches
the results for a set amount of time so that if the same endpoint with the same arguments is
requested again the cache will be used instead of repeating the request."""
def __init__(self, lmq, lokid, endpoint, cache_seconds=3, *, args=[], fail_okay=False, timeout=10):
self.endpoint = endpoint
self.fail_okay = fail_okay
if self.endpoint in cached and cached_args[self.endpoint] == args and cache_expiry[self.endpoint] >= datetime.now():
self.json = cached[self.endpoint]
self.args = None
self.future = None
else:
self.json = None
self.args = args
self.future = lmq.request_future(lokid, self.endpoint, self.args, timeout=timeout)
self.cache_seconds = cache_seconds
def get(self):
"""If the result is already available, returns it immediately (and can safely be called multiple times.
Otherwise waits for the result, parses as json, and caches it. Returns None if the request fails"""
if self.json is None and self.future is not None:
try:
result = self.future.get()
self.future = None
if result[0] != b'200':
raise RuntimeError("Request for {} failed: got {}".format(self.endpoint, result))
self.json = json.loads(result[1])
cached[self.endpoint] = self.json
cached_args[self.endpoint] = self.args
cache_expiry[self.endpoint] = datetime.now() + timedelta(seconds=self.cache_seconds)
except RuntimeError as e:
if not self.fail_okay:
print("Something getting wrong: {}".format(e), file=sys.stderr)
self.future = None
return self.json

View File

@ -1,14 +1,14 @@
#!/usr/bin/env python3
import flask
import pylokimq
from datetime import datetime, timedelta
from datetime import datetime
import babel.dates
import json
import sys
import statistics
import config
from lmq import FutureJSON, lmq_connection
# Make a dict of config.* to pass to templating
conf = {x: getattr(config, x) for x in dir(config) if not x.startswith('__')}
@ -19,50 +19,6 @@ if __name__ == '__main__':
app.config['TEMPLATES_AUTO_RELOAD'] = True
app.jinja_env.auto_reload = True
lmq = pylokimq.LokiMQ(pylokimq.LogLevel.warn)
lmq.max_message_size = 10*1024*1024
lmq.start()
lokid = lmq.connect_remote(config.lokid_rpc)
cached = {}
cached_args = {}
cache_expiry = {}
class FutureJSON():
def __init__(self, endpoint, cache_seconds=3, *, args=[], fail_okay=False, timeout=10):
self.endpoint = endpoint
self.fail_okay = fail_okay
if self.endpoint in cached and cached_args[self.endpoint] == args and cache_expiry[self.endpoint] >= datetime.now():
self.json = cached[self.endpoint]
self.args = None
self.future = None
else:
self.json = None
self.args = args
self.future = lmq.request_future(lokid, self.endpoint, self.args, timeout=timeout)
self.cache_seconds = cache_seconds
def get(self):
"""If the result is already available, returns it immediately (and can safely be called multiple times.
Otherwise waits for the result, parses as json, and caches it. Returns None if the request fails"""
if self.json is None and self.future is not None:
try:
result = self.future.get()
if result[0] != b'200':
raise RuntimeError("Request failed: got {}".format(result))
self.json = json.loads(result[1])
cached[self.endpoint] = self.json
cached_args[self.endpoint] = self.args
cache_expiry[self.endpoint] = datetime.now() + timedelta(seconds=self.cache_seconds)
except RuntimeError as e:
if not self.fail_okay:
print("Something getting wrong: {}".format(e), file=sys.stderr)
self.future = None
pass
return self.json
@app.template_filter('format_datetime')
def format_datetime(value, format='long'):
return babel.dates.format_datetime(value, format, tzinfo=babel.dates.get_timezone('UTC'))
@ -157,12 +113,13 @@ def css():
@app.route('/range/<int:first>/<int:last>')
@app.route('/autorefresh/<int:refresh>')
def main(refresh=None, page=0, per_page=None, first=None, last=None):
inforeq = FutureJSON('rpc.get_info', 1)
stake = FutureJSON('rpc.get_staking_requirement', 10)
base_fee = FutureJSON('rpc.get_fee_estimate', 10)
hfinfo = FutureJSON('rpc.hard_fork_info', 10)
mempool = FutureJSON('rpc.get_transaction_pool', 5)
sns = FutureJSON('rpc.get_service_nodes', 5,
lmq, lokid = lmq_connection()
inforeq = FutureJSON(lmq, lokid, 'rpc.get_info', 1)
stake = FutureJSON(lmq, lokid, 'rpc.get_staking_requirement', 10)
base_fee = FutureJSON(lmq, lokid, 'rpc.get_fee_estimate', 10)
hfinfo = FutureJSON(lmq, lokid, 'rpc.hard_fork_info', 10)
mempool = FutureJSON(lmq, lokid, 'rpc.get_transaction_pool', 5)
sns = FutureJSON(lmq, lokid, 'rpc.get_service_nodes', 5,
args=[json.dumps({
'all': False,
'fields': { x: True for x in ('service_node_pubkey', 'requested_unlock_height', 'last_reward_block_height',
@ -175,7 +132,7 @@ def main(refresh=None, page=0, per_page=None, first=None, last=None):
# This call is slow the first time it gets called in lokid but will be fast after that, so call
# it with a very short timeout. It's also an admin-only command, so will always fail if we're
# using a restricted RPC interface.
coinbase = FutureJSON('admin.get_coinbase_tx_sum', 10, timeout=1, fail_okay=True,
coinbase = FutureJSON(lmq, lokid, 'admin.get_coinbase_tx_sum', 10, timeout=1, fail_okay=True,
args=[json.dumps({"height":0, "count":2**31-1}).encode()])
server = dict(
timestamp=datetime.utcnow(),
@ -207,7 +164,7 @@ def main(refresh=None, page=0, per_page=None, first=None, last=None):
end_height = max(0, height - per_page*page - 1)
start_height = max(0, end_height - per_page + 1)
blocks = FutureJSON('rpc.get_block_headers_range', args=[json.dumps({
blocks = FutureJSON(lmq, lokid, 'rpc.get_block_headers_range', args=[json.dumps({
'start_height': start_height,
'end_height': end_height,
'get_tx_hashes': True,
@ -222,7 +179,7 @@ def main(refresh=None, page=0, per_page=None, first=None, last=None):
txids.append(b['miner_tx_hash'])
if 'tx_hashes' in b:
txids += b['tx_hashes']
txs = FutureJSON('rpc.get_transactions', args=[json.dumps({
txs = FutureJSON(lmq, lokid, 'rpc.get_transactions', args=[json.dumps({
"txs_hashes": txids,
"decode_as_json": True,
"tx_extra": True,
@ -245,7 +202,7 @@ def main(refresh=None, page=0, per_page=None, first=None, last=None):
blocks[i]['txs'].append(tx)
#txes = FutureJSON('rpc.get_transactions');
#txes = FutureJSON(lmq, lokid, 'rpc.get_transactions');
# mempool RPC return values are about as nasty as can be. For each mempool tx, we get back