import time import gevent import logging log = logging.getLogger("RateLimit") called_db = {} # Holds events last call time queue_db = {} # Commands queued to run # Register event as called # Return: None def called(event, penalty=0): called_db[event] = time.time() + penalty # Check if calling event is allowed # Return: True if allowed False if not def isAllowed(event, allowed_again=10): last_called = called_db.get(event) if not last_called: # Its not called before return True elif time.time() - last_called >= allowed_again: del called_db[event] # Delete last call time to save memory return True else: return False def delayLeft(event, allowed_again=10): last_called = called_db.get(event) if not last_called: # Its not called before return 0 else: return allowed_again - (time.time() - last_called) def callQueue(event): func, args, kwargs, thread = queue_db[event] log.debug("Calling: %s" % event) called(event) del queue_db[event] return func(*args, **kwargs) # Rate limit and delay function call if necessary # If the function called again within the rate limit interval then previous queued call will be dropped # Return: Immediately gevent thread def callAsync(event, allowed_again=10, func=None, *args, **kwargs): if isAllowed(event, allowed_again): # Not called recently, call it now called(event) # print "Calling now" return gevent.spawn(func, *args, **kwargs) else: # Called recently, schedule it for later time_left = allowed_again - max(0, time.time() - called_db[event]) log.debug("Added to queue (%.2fs left): %s " % (time_left, event)) if not queue_db.get(event): # Function call not queued yet thread = gevent.spawn_later(time_left, lambda: callQueue(event)) # Call this function later queue_db[event] = (func, args, kwargs, thread) return thread else: # Function call already queued, just update the parameters thread = queue_db[event][3] queue_db[event] = (func, args, kwargs, thread) return thread # Rate limit and delay function call if needed # Return: Wait for execution/delay then return value def call(event, allowed_again=10, func=None, *args, **kwargs): if isAllowed(event): # Not called recently, call it now called(event) # print "Calling now", allowed_again return func(*args, **kwargs) else: # Called recently, schedule it for later time_left = max(0, allowed_again - (time.time() - called_db[event])) # print "Time left: %s" % time_left, args, kwargs log.debug("Calling sync (%.2fs left): %s" % (time_left, event)) called(event, time_left) time.sleep(time_left) back = func(*args, **kwargs) called(event) return back # Cleanup expired events every 3 minutes def rateLimitCleanup(): while 1: expired = time.time() - 60 * 2 # Cleanup if older than 2 minutes for event in list(called_db.keys()): if called_db[event] < expired: del called_db[event] time.sleep(60 * 3) # Every 3 minutes gevent.spawn(rateLimitCleanup) if __name__ == "__main__": from gevent import monkey monkey.patch_all() import random def publish(inner_path): print("Publishing %s..." % inner_path) return 1 def cb(thread): print("Value:", thread.value) print("Testing async spam requests rate limit to 1/sec...") for i in range(3000): thread = callAsync("publish content.json", 1, publish, "content.json %s" % i) time.sleep(float(random.randint(1, 20)) / 100000) print(thread.link(cb)) print("Done") time.sleep(2) print("Testing sync spam requests rate limit to 1/sec...") for i in range(5): call("publish data.json", 1, publish, "data.json %s" % i) time.sleep(float(random.randint(1, 100)) / 100) print("Done") print("Testing cleanup") thread = callAsync("publish content.json single", 1, publish, "content.json single") print("Needs to cleanup:", called_db, queue_db) print("Waiting 3min for cleanup process...") time.sleep(60 * 3) print("Cleaned up:", called_db, queue_db)