From fa9a17092cb9bc0ff99c2a83f4226ff503f9b901 Mon Sep 17 00:00:00 2001 From: Jason Rhinelander Date: Wed, 19 Aug 2020 18:53:08 -0300 Subject: [PATCH] Move lmq connection & FutureJSON into lmq.py Also delays creating the lmq instance until get_connection() is called, which seems pretty much required if using multiple threads/processes as we need each process to have its own connection. --- lmq.py | 60 ++++++++++++++++++++++++++++++++++++++++++++++ observer.py | 69 ++++++++++------------------------------------------- 2 files changed, 73 insertions(+), 56 deletions(-) create mode 100644 lmq.py diff --git a/lmq.py b/lmq.py new file mode 100644 index 0000000..e7ceb78 --- /dev/null +++ b/lmq.py @@ -0,0 +1,60 @@ +import pylokimq +import config +import json +import sys +from datetime import datetime, timedelta + +lmq, lokid = None, None +def lmq_connection(): + global lmq, lokid + if lmq is None: + lmq = pylokimq.LokiMQ(pylokimq.LogLevel.warn) + lmq.max_message_size = 10*1024*1024 + lmq.start() + if lokid is None: + lokid = lmq.connect_remote(config.lokid_rpc) + return (lmq, lokid) + +cached = {} +cached_args = {} +cache_expiry = {} + +class FutureJSON(): + """Class for making a LMQ JSON RPC request that uses a future to wait on the result, and caches + the results for a set amount of time so that if the same endpoint with the same arguments is + requested again the cache will be used instead of repeating the request.""" + + def __init__(self, lmq, lokid, endpoint, cache_seconds=3, *, args=[], fail_okay=False, timeout=10): + self.endpoint = endpoint + self.fail_okay = fail_okay + if self.endpoint in cached and cached_args[self.endpoint] == args and cache_expiry[self.endpoint] >= datetime.now(): + self.json = cached[self.endpoint] + self.args = None + self.future = None + else: + self.json = None + self.args = args + self.future = lmq.request_future(lokid, self.endpoint, self.args, timeout=timeout) + self.cache_seconds = cache_seconds + + def get(self): + """If the result is already available, returns it immediately (and can safely be called multiple times. + Otherwise waits for the result, parses as json, and caches it. Returns None if the request fails""" + if self.json is None and self.future is not None: + try: + result = self.future.get() + self.future = None + if result[0] != b'200': + raise RuntimeError("Request for {} failed: got {}".format(self.endpoint, result)) + self.json = json.loads(result[1]) + cached[self.endpoint] = self.json + cached_args[self.endpoint] = self.args + cache_expiry[self.endpoint] = datetime.now() + timedelta(seconds=self.cache_seconds) + except RuntimeError as e: + if not self.fail_okay: + print("Something getting wrong: {}".format(e), file=sys.stderr) + self.future = None + + return self.json + + diff --git a/observer.py b/observer.py index e8a2604..e7d3c09 100644 --- a/observer.py +++ b/observer.py @@ -1,14 +1,14 @@ #!/usr/bin/env python3 import flask -import pylokimq -from datetime import datetime, timedelta +from datetime import datetime import babel.dates import json import sys import statistics import config +from lmq import FutureJSON, lmq_connection # Make a dict of config.* to pass to templating conf = {x: getattr(config, x) for x in dir(config) if not x.startswith('__')} @@ -19,50 +19,6 @@ if __name__ == '__main__': app.config['TEMPLATES_AUTO_RELOAD'] = True app.jinja_env.auto_reload = True -lmq = pylokimq.LokiMQ(pylokimq.LogLevel.warn) -lmq.max_message_size = 10*1024*1024 -lmq.start() -lokid = lmq.connect_remote(config.lokid_rpc) - -cached = {} -cached_args = {} -cache_expiry = {} - -class FutureJSON(): - def __init__(self, endpoint, cache_seconds=3, *, args=[], fail_okay=False, timeout=10): - self.endpoint = endpoint - self.fail_okay = fail_okay - if self.endpoint in cached and cached_args[self.endpoint] == args and cache_expiry[self.endpoint] >= datetime.now(): - self.json = cached[self.endpoint] - self.args = None - self.future = None - else: - self.json = None - self.args = args - self.future = lmq.request_future(lokid, self.endpoint, self.args, timeout=timeout) - self.cache_seconds = cache_seconds - - def get(self): - """If the result is already available, returns it immediately (and can safely be called multiple times. - Otherwise waits for the result, parses as json, and caches it. Returns None if the request fails""" - if self.json is None and self.future is not None: - try: - result = self.future.get() - if result[0] != b'200': - raise RuntimeError("Request failed: got {}".format(result)) - self.json = json.loads(result[1]) - cached[self.endpoint] = self.json - cached_args[self.endpoint] = self.args - cache_expiry[self.endpoint] = datetime.now() + timedelta(seconds=self.cache_seconds) - except RuntimeError as e: - if not self.fail_okay: - print("Something getting wrong: {}".format(e), file=sys.stderr) - self.future = None - pass - - return self.json - - @app.template_filter('format_datetime') def format_datetime(value, format='long'): return babel.dates.format_datetime(value, format, tzinfo=babel.dates.get_timezone('UTC')) @@ -157,12 +113,13 @@ def css(): @app.route('/range//') @app.route('/autorefresh/') def main(refresh=None, page=0, per_page=None, first=None, last=None): - inforeq = FutureJSON('rpc.get_info', 1) - stake = FutureJSON('rpc.get_staking_requirement', 10) - base_fee = FutureJSON('rpc.get_fee_estimate', 10) - hfinfo = FutureJSON('rpc.hard_fork_info', 10) - mempool = FutureJSON('rpc.get_transaction_pool', 5) - sns = FutureJSON('rpc.get_service_nodes', 5, + lmq, lokid = lmq_connection() + inforeq = FutureJSON(lmq, lokid, 'rpc.get_info', 1) + stake = FutureJSON(lmq, lokid, 'rpc.get_staking_requirement', 10) + base_fee = FutureJSON(lmq, lokid, 'rpc.get_fee_estimate', 10) + hfinfo = FutureJSON(lmq, lokid, 'rpc.hard_fork_info', 10) + mempool = FutureJSON(lmq, lokid, 'rpc.get_transaction_pool', 5) + sns = FutureJSON(lmq, lokid, 'rpc.get_service_nodes', 5, args=[json.dumps({ 'all': False, 'fields': { x: True for x in ('service_node_pubkey', 'requested_unlock_height', 'last_reward_block_height', @@ -175,7 +132,7 @@ def main(refresh=None, page=0, per_page=None, first=None, last=None): # This call is slow the first time it gets called in lokid but will be fast after that, so call # it with a very short timeout. It's also an admin-only command, so will always fail if we're # using a restricted RPC interface. - coinbase = FutureJSON('admin.get_coinbase_tx_sum', 10, timeout=1, fail_okay=True, + coinbase = FutureJSON(lmq, lokid, 'admin.get_coinbase_tx_sum', 10, timeout=1, fail_okay=True, args=[json.dumps({"height":0, "count":2**31-1}).encode()]) server = dict( timestamp=datetime.utcnow(), @@ -207,7 +164,7 @@ def main(refresh=None, page=0, per_page=None, first=None, last=None): end_height = max(0, height - per_page*page - 1) start_height = max(0, end_height - per_page + 1) - blocks = FutureJSON('rpc.get_block_headers_range', args=[json.dumps({ + blocks = FutureJSON(lmq, lokid, 'rpc.get_block_headers_range', args=[json.dumps({ 'start_height': start_height, 'end_height': end_height, 'get_tx_hashes': True, @@ -222,7 +179,7 @@ def main(refresh=None, page=0, per_page=None, first=None, last=None): txids.append(b['miner_tx_hash']) if 'tx_hashes' in b: txids += b['tx_hashes'] - txs = FutureJSON('rpc.get_transactions', args=[json.dumps({ + txs = FutureJSON(lmq, lokid, 'rpc.get_transactions', args=[json.dumps({ "txs_hashes": txids, "decode_as_json": True, "tx_extra": True, @@ -245,7 +202,7 @@ def main(refresh=None, page=0, per_page=None, first=None, last=None): blocks[i]['txs'].append(tx) - #txes = FutureJSON('rpc.get_transactions'); + #txes = FutureJSON(lmq, lokid, 'rpc.get_transactions'); # mempool RPC return values are about as nasty as can be. For each mempool tx, we get back