various fixes for redis/clusterstore - still WIP
This commit is contained in:
parent
a5e08e1c67
commit
7a7d83a754
|
@ -47,13 +47,9 @@ function cachestore:initialize(use_redis)
|
|||
end
|
||||
|
||||
function cachestore:get(key)
|
||||
local function callback(key)
|
||||
local callback = function(key)
|
||||
-- Connect to redis
|
||||
local clusterstore = require "bunkerweb.clusterstore"
|
||||
local ok, err = clusterstore:new()
|
||||
if not ok then
|
||||
return nil, "clusterstore:new() failed : " .. err, nil
|
||||
end
|
||||
local clusterstore = require "bunkerweb.clusterstore":new()
|
||||
local ok, err = clusterstore:connect()
|
||||
if not ok then
|
||||
return nil, "can't connect to redis : " .. err, nil
|
||||
|
@ -124,6 +120,7 @@ end
|
|||
|
||||
function cachestore:set_redis(key, value, ex)
|
||||
-- Connect to redis
|
||||
local clusterstore = require "bunkerweb.clusterstore":new()
|
||||
local redis, err = clusterstore:connect()
|
||||
if not redis then
|
||||
return false, "can't connect to redis : " .. err
|
||||
|
@ -155,6 +152,7 @@ end
|
|||
|
||||
function cachestore:del_redis(key)
|
||||
-- Connect to redis
|
||||
local clusterstore = require "bunkerweb.clusterstore":new()
|
||||
local redis, err = clusterstore:connect()
|
||||
if not redis then
|
||||
return false, "can't connect to redis : " .. err
|
||||
|
|
|
@ -62,7 +62,7 @@ function clusterstore:connect()
|
|||
return false, err
|
||||
end
|
||||
if times == 0 then
|
||||
local select, err = redis_client:select(tonumber(variables["REDIS_DATABASE"]))
|
||||
local select, err = redis_client:select(tonumber(self.variables["REDIS_DATABASE"]))
|
||||
if err then
|
||||
self:close()
|
||||
return false, err
|
||||
|
@ -74,8 +74,9 @@ end
|
|||
function clusterstore:close()
|
||||
if self.redis_client then
|
||||
-- Equivalent to close but keep a pool of connections
|
||||
local ok, err = self.redis_client:set_keepalive(tonumber(self.variables["REDIS_KEEPALIVE_IDLE"]), tonumber(self.variables["REDIS_KEEPALIVE_POOL"]))
|
||||
self.redis_client = nil
|
||||
return self.redis_client:set_keepalive(tonumber(self.variables["REDIS_KEEPALIVE_IDLE"]), tonumber(self.variables["REDIS_KEEPALIVE_POOL"]))
|
||||
return ok, err
|
||||
end
|
||||
return false, "not connected"
|
||||
end
|
||||
|
@ -102,7 +103,7 @@ function clusterstore:multi(calls)
|
|||
-- Loop on calls
|
||||
for i, call in ipairs(calls) do
|
||||
local method = call[1]
|
||||
local args = table.unpack(call[2])
|
||||
local args = unpack(call[2])
|
||||
local ok, err = self.redis_client[method](self.redis_client, args)
|
||||
if not ok then
|
||||
return false, method + "() failed : " .. err
|
||||
|
|
|
@ -1,73 +0,0 @@
|
|||
local clusterstore = require "clusterstore"
|
||||
local datastore = require "datastore"
|
||||
local utils = require "utils"
|
||||
|
||||
local redisutils = {}
|
||||
|
||||
redisutils.ban = function(ip)
|
||||
-- Connect
|
||||
local redis_client, err = clusterstore:connect()
|
||||
if not redis_client then
|
||||
return nil, "can't connect to redis server : " .. err
|
||||
end
|
||||
-- Start transaction
|
||||
local ok, err = redis_client:multi()
|
||||
if not ok then
|
||||
clusterstore:close(redis_client)
|
||||
return nil, "MULTI failed : " .. err
|
||||
end
|
||||
-- Get ban
|
||||
ok, err = redis_client:get("ban_" .. ip)
|
||||
if not ok then
|
||||
clusterstore:close(redis_client)
|
||||
return nil, "GET failed : " .. err
|
||||
end
|
||||
-- Get ttl
|
||||
ok, err = redis_client:ttl("ban_" .. ip)
|
||||
if not ok then
|
||||
clusterstore:close(redis_client)
|
||||
return nil, "TTL failed : " .. err
|
||||
end
|
||||
-- Exec transaction
|
||||
local exec, err = redis_client:exec()
|
||||
if err then
|
||||
clusterstore:close(redis_client)
|
||||
return nil, "EXEC failed : " .. err
|
||||
end
|
||||
if type(exec) ~= "table" then
|
||||
clusterstore:close(redis_client)
|
||||
return nil, "EXEC result is not a table"
|
||||
end
|
||||
-- Extract ban reason
|
||||
local reason = exec[1]
|
||||
if type(reason) == "table" then
|
||||
clusterstore:close(redis_client)
|
||||
return nil, "GET failed : " .. reason[2]
|
||||
end
|
||||
if reason == ngx.null then
|
||||
clusterstore:close(redis_client)
|
||||
datastore:delete("bans_ip_" .. ip)
|
||||
return false
|
||||
end
|
||||
-- Extract ttl
|
||||
local ttl = exec[2]
|
||||
if type(ttl) == "table" then
|
||||
clusterstore:close(redis_client)
|
||||
return nil, "TTL failed : " .. ttl[2]
|
||||
end
|
||||
if ttl <= 0 then
|
||||
clusterstore:close(redis_client)
|
||||
return nil, "TTL returned invalid value : " .. tostring(ttl)
|
||||
end
|
||||
ok, err = datastore:set("bans_ip_" .. ip, reason, ttl)
|
||||
if not ok then
|
||||
clusterstore:close(redis_client)
|
||||
datastore:delete("bans_ip_" .. ip)
|
||||
return nil, "can't save ban to local datastore : " .. err
|
||||
end
|
||||
-- Return reason
|
||||
clusterstore:close(redis_client)
|
||||
return true, reason
|
||||
end
|
||||
|
||||
return redisutils
|
|
@ -140,6 +140,27 @@ function badbehavior:redis_increase(ip)
|
|||
-- Our vars
|
||||
local count_time = tonumber(self.variables["BAD_BEHAVIOR_COUNT_TIME"])
|
||||
local ban_time = tonumber(self.variables["BAD_BEHAVIOR_BAN_TIME"])
|
||||
-- Our LUA script to execute on redis
|
||||
local redis_script = [[
|
||||
local ret_incr = redis.pcall("INCR", KEYS[1])
|
||||
if type(ret_incr) == "table" and ret_incr["err"] ~= nil then
|
||||
redis.log(redis.LOG_WARNING, "Bad behavior increase INCR error : " .. ret_incr["err"])
|
||||
return ret_incr
|
||||
end
|
||||
local ret_expire = redis.pcall("EXPIRE", KEYS[1], ARGV[1])
|
||||
if type(ret_expire) == "table" and ret_expire["err"] ~= nil then
|
||||
redis.log(redis.LOG_WARNING, "Bad behavior increase EXPIRE error : " .. ret_expire["err"])
|
||||
return ret_expire
|
||||
end
|
||||
if ret_incr > ARGV[2] then
|
||||
local ret_set = redis.pcall("SET", KEYS[2], "bad behavior", "EX", ARGV[2])
|
||||
if type(ret_set) == "table" and ret_set["err"] ~= nil then
|
||||
redis.log(redis.LOG_WARNING, "Bad behavior increase SET error : " .. ret_set["err"])
|
||||
return ret_set
|
||||
end
|
||||
end
|
||||
return ret_incr
|
||||
]]
|
||||
-- Connect to server
|
||||
local cstore, err = clusterstore:new()
|
||||
if not cstore then
|
||||
|
@ -149,42 +170,64 @@ function badbehavior:redis_increase(ip)
|
|||
if not ok then
|
||||
return false, err
|
||||
end
|
||||
-- Exec transaction
|
||||
local calls = {
|
||||
{"incr", {"bad_behavior_" .. ip}},
|
||||
{"expire", {"bad_behavior_" .. ip, count_time}}
|
||||
}
|
||||
local ok, err, exec = clusterstore:multi(calls)
|
||||
if not ok then
|
||||
clusterstore:close()
|
||||
-- Execute LUA script
|
||||
local counter, err = clusterstore:call("eval", redis_script, 2, "bad_behavior_" .. ip, "ban_" .. ip, "bad behavior", count_time, ban_time)
|
||||
self.logger(ngx.ERR, counter)
|
||||
if not counter then
|
||||
return false, err
|
||||
end
|
||||
-- Extract counter
|
||||
local counter = exec[1]
|
||||
if type(counter) == "table" then
|
||||
clusterstore:close()
|
||||
return false, counter[2]
|
||||
end
|
||||
-- Check expire result
|
||||
local expire = exec[2]
|
||||
if type(expire) == "table" then
|
||||
clusterstore:close()
|
||||
return false, expire[2]
|
||||
end
|
||||
-- Add IP to redis bans if needed
|
||||
if counter > threshold then
|
||||
local ok, err = clusterstore:call("set", "ban_" .. ip, "bad behavior", "EX", ban_time)
|
||||
if err then
|
||||
clusterstore:close()
|
||||
return false, err
|
||||
end
|
||||
end
|
||||
-- Exec transaction
|
||||
-- local calls = {
|
||||
-- {"incr", {"bad_behavior_" .. ip}},
|
||||
-- {"expire", {"bad_behavior_" .. ip, count_time}}
|
||||
-- }
|
||||
-- local ok, err, exec = clusterstore:multi(calls)
|
||||
-- if not ok then
|
||||
-- clusterstore:close()
|
||||
-- return false, err
|
||||
-- end
|
||||
-- -- Extract counter
|
||||
-- local counter = exec[1]
|
||||
-- if type(counter) == "table" then
|
||||
-- clusterstore:close()
|
||||
-- return false, counter[2]
|
||||
-- end
|
||||
-- -- Check expire result
|
||||
-- local expire = exec[2]
|
||||
-- if type(expire) == "table" then
|
||||
-- clusterstore:close()
|
||||
-- return false, expire[2]
|
||||
-- end
|
||||
-- -- Add IP to redis bans if needed
|
||||
-- if counter > threshold then
|
||||
-- local ok, err = clusterstore:call("set", "ban_" .. ip, "bad behavior", "EX", ban_time)
|
||||
-- if err then
|
||||
-- clusterstore:close()
|
||||
-- return false, err
|
||||
-- end
|
||||
-- end
|
||||
-- End connection
|
||||
clusterstore:close()
|
||||
return counter
|
||||
end
|
||||
|
||||
function badbehavior:redis_decrease(ip)
|
||||
-- Our LUA script to execute on redis
|
||||
local redis_script = [[
|
||||
local ret_decr = redis.pcall("DECR", KEYS[1])
|
||||
if type(ret_decr) == "table" and ret_decr["err"] ~= nil then
|
||||
redis.log(redis.LOG_WARNING, "Bad behavior decrease DECR error : " .. ret_decr["err"])
|
||||
return ret_decr
|
||||
end
|
||||
if ret_decr <= 0 then
|
||||
local ret_del = redis.pcall("DEL", KEYS[1])
|
||||
if type(ret_del) == "table" and ret_del["err"] ~= nil then
|
||||
redis.log(redis.LOG_WARNING, "Bad behavior increase DEL error : " .. ret_del["err"])
|
||||
return ret_del
|
||||
end
|
||||
end
|
||||
return ret_decr
|
||||
]]
|
||||
-- Connect to server
|
||||
local cstore, err = clusterstore:new()
|
||||
if not cstore then
|
||||
|
@ -194,23 +237,28 @@ function badbehavior:redis_decrease(ip)
|
|||
if not ok then
|
||||
return false, err
|
||||
end
|
||||
-- Decrement counter
|
||||
local counter, err = clusterstore:call("decr", "bad_behavior_" .. ip)
|
||||
if err then
|
||||
clusterstore:close()
|
||||
local counter, err = clusterstore:call("eval", redis_script, 1, "bad_behavior_" .. ip)
|
||||
self.logger(ngx.ERR, counter)
|
||||
if not counter then
|
||||
return false, err
|
||||
end
|
||||
-- Delete counter
|
||||
if counter < 0 then
|
||||
counter = 0
|
||||
end
|
||||
if counter == 0 then
|
||||
local ok, err = clusterstore:call("del", "bad_behavior_" .. ip)
|
||||
if err then
|
||||
clusterstore:close()
|
||||
return false, err
|
||||
end
|
||||
end
|
||||
-- -- Decrement counter
|
||||
-- local counter, err = clusterstore:call("decr", "bad_behavior_" .. ip)
|
||||
-- if err then
|
||||
-- clusterstore:close()
|
||||
-- return false, err
|
||||
-- end
|
||||
-- -- Delete counter
|
||||
-- if counter < 0 then
|
||||
-- counter = 0
|
||||
-- end
|
||||
-- if counter == 0 then
|
||||
-- local ok, err = clusterstore:call("del", "bad_behavior_" .. ip)
|
||||
-- if err then
|
||||
-- clusterstore:close()
|
||||
-- return false, err
|
||||
-- end
|
||||
-- end
|
||||
-- End connection
|
||||
clusterstore:close()
|
||||
return counter
|
||||
|
|
|
@ -30,7 +30,7 @@ function dnsbl:access()
|
|||
-- Check if IP is in cache
|
||||
local ok, cached = self:is_in_cache(ngx.ctx.bw.remote_addr)
|
||||
if not ok then
|
||||
return self:ret(false, "error while checking cache : " .. err)
|
||||
return self:ret(false, "error while checking cache : " .. cached)
|
||||
elseif cached then
|
||||
if cached == "ok" then
|
||||
return self:ret(true, "client IP " .. ngx.ctx.bw.remote_addr .. " is in DNSBL cache (not blacklisted)")
|
||||
|
|
|
@ -16,6 +16,7 @@ function limit:initialize()
|
|||
self.logger:log(ngx.ERR, err)
|
||||
end
|
||||
self.use_redis = use_redis == "yes"
|
||||
self.clusterstore = clusterstore:new()
|
||||
-- Load rules if needed
|
||||
if ngx.get_phase() ~= "init" and self.variables["USE_LIMIT_REQ"] == "yes" then
|
||||
-- Get all rules from datastore
|
||||
|
@ -172,19 +173,14 @@ function limit:limit_req_local(rate_max, rate_time)
|
|||
end
|
||||
|
||||
function limit:limit_req_redis(rate_max, rate_time)
|
||||
-- Connect to server
|
||||
local cstore, err = clusterstore:new()
|
||||
if not cstore then
|
||||
return nil, err
|
||||
end
|
||||
local ok, err = clusterstore:connect()
|
||||
local ok, err = self.clusterstore:connect()
|
||||
if not ok then
|
||||
return nil, err
|
||||
end
|
||||
-- Get timestamps
|
||||
local timestamps, err = clusterstore:call("get", "limit_" .. ngx.ctx.bw.server_name .. ngx.ctx.bw.remote_addr .. ngx.ctx.bw.uri)
|
||||
local timestamps, err = self.clusterstore:call("get", "limit_" .. ngx.ctx.bw.server_name .. ngx.ctx.bw.remote_addr .. ngx.ctx.bw.uri)
|
||||
if err then
|
||||
clusterstore:close()
|
||||
self.clusterstore:close()
|
||||
return nil, err
|
||||
end
|
||||
if timestamps then
|
||||
|
@ -196,13 +192,13 @@ function limit:limit_req_redis(rate_max, rate_time)
|
|||
local updated, new_timestamps, delay = self:limit_req_timestamps(rate_max, rate_time, timestamps)
|
||||
-- Save new timestamps if needed
|
||||
if updated then
|
||||
local ok, err = clusterstore:call("set", "limit_" .. ngx.ctx.bw.server_name .. ngx.ctx.bw.remote_addr .. ngx.ctx.bw.uri, cjson.encode(new_timestamps), "EX", delay)
|
||||
local ok, err = self.clusterstore:call("set", "limit_" .. ngx.ctx.bw.server_name .. ngx.ctx.bw.remote_addr .. ngx.ctx.bw.uri, cjson.encode(new_timestamps), "EX", delay)
|
||||
if not ok then
|
||||
clusterstore:close()
|
||||
self.clusterstore:close()
|
||||
return nil, err
|
||||
end
|
||||
end
|
||||
lusterstore:close()
|
||||
self.clusterstore:close()
|
||||
return new_timestamps, "success"
|
||||
end
|
||||
|
||||
|
|
|
@ -28,7 +28,7 @@ function whitelist:initialize()
|
|||
end
|
||||
end
|
||||
-- Instantiate cachestore
|
||||
self.cachestore = cachestore:new(self.use_redis)
|
||||
self.cachestore = cachestore:new(self.use_redis and ngx.get_phase() == "access")
|
||||
end
|
||||
|
||||
function whitelist:init()
|
||||
|
|
Loading…
Reference in New Issue