ZeroNet/src/util/RateLimit.py

129 lines
4.2 KiB
Python

import time
import gevent
import logging
log = logging.getLogger("RateLimit")
called_db = {} # Holds events last call time
queue_db = {} # Commands queued to run
# Register event as called
# Return: None
def called(event, penalty=0):
called_db[event] = time.time() + penalty
# Check if calling event is allowed
# Return: True if allowed False if not
def isAllowed(event, allowed_again=10):
last_called = called_db.get(event)
if not last_called: # Its not called before
return True
elif time.time() - last_called >= allowed_again:
del called_db[event] # Delete last call time to save memory
return True
else:
return False
def delayLeft(event, allowed_again=10):
last_called = called_db.get(event)
if not last_called: # Its not called before
return 0
else:
return allowed_again - (time.time() - last_called)
def callQueue(event):
func, args, kwargs, thread = queue_db[event]
log.debug("Calling: %s" % event)
called(event)
del queue_db[event]
return func(*args, **kwargs)
# Rate limit and delay function call if necessary
# If the function called again within the rate limit interval then previous queued call will be dropped
# Return: Immediately gevent thread
def callAsync(event, allowed_again=10, func=None, *args, **kwargs):
if isAllowed(event, allowed_again): # Not called recently, call it now
called(event)
# print "Calling now"
return gevent.spawn(func, *args, **kwargs)
else: # Called recently, schedule it for later
time_left = allowed_again - max(0, time.time() - called_db[event])
log.debug("Added to queue (%.2fs left): %s " % (time_left, event))
if not queue_db.get(event): # Function call not queued yet
thread = gevent.spawn_later(time_left, lambda: callQueue(event)) # Call this function later
queue_db[event] = (func, args, kwargs, thread)
return thread
else: # Function call already queued, just update the parameters
thread = queue_db[event][3]
queue_db[event] = (func, args, kwargs, thread)
return thread
# Rate limit and delay function call if needed
# Return: Wait for execution/delay then return value
def call(event, allowed_again=10, func=None, *args, **kwargs):
if isAllowed(event): # Not called recently, call it now
called(event)
# print "Calling now", allowed_again
return func(*args, **kwargs)
else: # Called recently, schedule it for later
time_left = max(0, allowed_again - (time.time() - called_db[event]))
# print "Time left: %s" % time_left, args, kwargs
log.debug("Calling sync (%.2fs left): %s" % (time_left, event))
called(event, time_left)
time.sleep(time_left)
back = func(*args, **kwargs)
called(event)
return back
# Cleanup expired events every 3 minutes
def rateLimitCleanup():
while 1:
expired = time.time() - 60 * 2 # Cleanup if older than 2 minutes
for event in list(called_db.keys()):
if called_db[event] < expired:
del called_db[event]
time.sleep(60 * 3) # Every 3 minutes
gevent.spawn(rateLimitCleanup)
if __name__ == "__main__":
from gevent import monkey
monkey.patch_all()
import random
def publish(inner_path):
print("Publishing %s..." % inner_path)
return 1
def cb(thread):
print("Value:", thread.value)
print("Testing async spam requests rate limit to 1/sec...")
for i in range(3000):
thread = callAsync("publish content.json", 1, publish, "content.json %s" % i)
time.sleep(float(random.randint(1, 20)) / 100000)
print(thread.link(cb))
print("Done")
time.sleep(2)
print("Testing sync spam requests rate limit to 1/sec...")
for i in range(5):
call("publish data.json", 1, publish, "data.json %s" % i)
time.sleep(float(random.randint(1, 100)) / 100)
print("Done")
print("Testing cleanup")
thread = callAsync("publish content.json single", 1, publish, "content.json single")
print("Needs to cleanup:", called_db, queue_db)
print("Waiting 3min for cleanup process...")
time.sleep(60 * 3)
print("Cleaned up:", called_db, queue_db)