import time import gevent import logging log = logging.getLogger("RateLimit") called_db = {} # Holds events last call time queue_db = {} # Commands queued to run # Register event as called # Return: None def called(event): called_db[event] = time.time() # Check if calling event is allowed # Return: True if allowed False if not def isAllowed(event, allowed_again=10): last_called = called_db.get(event) if not last_called: # Its not called before return True elif time.time()-last_called >= allowed_again: del called_db[event] # Delete last call time to save memory return True else: return False def callQueue(event): func, args, kwargs, thread = queue_db[event] log.debug("Calling: %s" % event) del called_db[event] del queue_db[event] return func(*args, **kwargs) # Rate limit and delay function call if needed, If the function called again within the rate limit interval then previous queued call will be dropped # Return: Immedietly gevent thread def callAsync(event, allowed_again=10, func=None, *args, **kwargs): if isAllowed(event, allowed_again): # Not called recently, call it now called(event) # print "Calling now" return gevent.spawn(func, *args, **kwargs) else: # Called recently, schedule it for later time_left = allowed_again-max(0, time.time()-called_db[event]) log.debug("Added to queue (%.2fs left): %s " % (time_left, event)) if not queue_db.get(event): # Function call not queued yet thread = gevent.spawn_later(time_left, lambda: callQueue(event)) # Call this function later queue_db[event] = (func, args, kwargs, thread) return thread else: # Function call already queued, just update the parameters thread = queue_db[event][3] queue_db[event] = (func, args, kwargs, thread) return thread # Rate limit and delay function call if needed # Return: Wait for execution/delay then return value def call(event, allowed_again=10, func=None, *args, **kwargs): if isAllowed(event): # Not called recently, call it now called(event) # print "Calling now" return func(*args, **kwargs) else: # Called recently, schedule it for later time_left = max(0, allowed_again-(time.time()-called_db[event])) # print "Time left: %s" % time_left, args, kwargs log.debug("Calling sync (%.2fs left): %s" % (time_left, event)) time.sleep(time_left) called(event) back = func(*args, **kwargs) if event in called_db: del called_db[event] return back # Cleanup expired events every 3 minutes def cleanup(): while 1: expired = time.time()-60*2 # Cleanup if older than 2 minutes for event in called_db.keys(): if called_db[event] < expired: del called_db[event] time.sleep(60*3) # Every 3 minutes gevent.spawn(cleanup) if __name__ == "__main__": from gevent import monkey monkey.patch_all() import random def publish(inner_path): print "Publishing %s..." % inner_path return 1 def cb(thread): print "Value:", thread.value print "Testing async spam requests rate limit to 1/sec..." for i in range(3000): thread = callAsync("publish content.json", 1, publish, "content.json %s" % i) time.sleep(float(random.randint(1,20))/100000) print thread.link(cb) print "Done" time.sleep(2) print "Testing sync spam requests rate limit to 1/sec..." for i in range(5): call("publish data.json", 1, publish, "data.json %s" % i) time.sleep(float(random.randint(1,100))/100) print "Done" print "Testing cleanup" thread = callAsync("publish content.json single", 1, publish, "content.json single") print "Needs to cleanup:", called_db, queue_db print "Waiting 3min for cleanup process..." time.sleep(60*3) print "Cleaned up:", called_db, queue_db