Skip to content

Commit 7e907a5

Browse files
authored
feat: add redis and redis-cluster in limit-conn (#10866)
1 parent abc86a5 commit 7e907a5

File tree

18 files changed

+1726
-170
lines changed

18 files changed

+1726
-170
lines changed

apisix/cli/ngx_tpl.lua

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -289,6 +289,7 @@ http {
289289
290290
{% if enabled_plugins["limit-conn"] then %}
291291
lua_shared_dict plugin-limit-conn {* http.lua_shared_dict["plugin-limit-conn"] *};
292+
lua_shared_dict plugin-limit-conn-redis-cluster-slot-lock {* http.lua_shared_dict["plugin-limit-conn-redis-cluster-slot-lock"] *};
292293
{% end %}
293294
294295
{% if enabled_plugins["limit-req"] then %}

apisix/plugins/limit-conn.lua

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,18 @@
1414
-- See the License for the specific language governing permissions and
1515
-- limitations under the License.
1616
--
17-
local core = require("apisix.core")
18-
local limit_conn = require("apisix.plugins.limit-conn.init")
17+
local core = require("apisix.core")
18+
local limit_conn = require("apisix.plugins.limit-conn.init")
19+
local redis_schema = require("apisix.utils.redis-schema")
20+
local policy_to_additional_properties = redis_schema.schema
21+
local plugin_name = "limit-conn"
22+
1923

2024

21-
local plugin_name = "limit-conn"
2225
local schema = {
2326
type = "object",
2427
properties = {
25-
conn = {type = "integer", exclusiveMinimum = 0},
28+
conn = {type = "integer", exclusiveMinimum = 0}, -- limit.conn max
2629
burst = {type = "integer", minimum = 0},
2730
default_conn_delay = {type = "number", exclusiveMinimum = 0},
2831
only_use_default_delay = {type = "boolean", default = false},
@@ -31,6 +34,11 @@ local schema = {
3134
enum = {"var", "var_combination"},
3235
default = "var",
3336
},
37+
policy = {
38+
type = "string",
39+
enum = {"redis", "redis-cluster", "local"},
40+
default = "local",
41+
},
3442
rejected_code = {
3543
type = "integer", minimum = 200, maximum = 599, default = 503
3644
},
@@ -39,7 +47,25 @@ local schema = {
3947
},
4048
allow_degradation = {type = "boolean", default = false}
4149
},
42-
required = {"conn", "burst", "default_conn_delay", "key"}
50+
required = {"conn", "burst", "default_conn_delay", "key"},
51+
["if"] = {
52+
properties = {
53+
policy = {
54+
enum = {"redis"},
55+
},
56+
},
57+
},
58+
["then"] = policy_to_additional_properties.redis,
59+
["else"] = {
60+
["if"] = {
61+
properties = {
62+
policy = {
63+
enum = {"redis-cluster"},
64+
},
65+
},
66+
},
67+
["then"] = policy_to_additional_properties["redis-cluster"],
68+
}
4369
}
4470

4571
local _M = {

apisix/plugins/limit-conn/init.lua

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,16 @@ if ngx.config.subsystem == "stream" then
2323
shdict_name = shdict_name .. "-stream"
2424
end
2525

26+
local redis_single_new
27+
local redis_cluster_new
28+
do
29+
local redis_src = "apisix.plugins.limit-conn.limit-conn-redis"
30+
redis_single_new = require(redis_src).new
31+
32+
local cluster_src = "apisix.plugins.limit-conn.limit-conn-redis-cluster"
33+
redis_cluster_new = require(cluster_src).new
34+
end
35+
2636

2737
local lrucache = core.lrucache.new({
2838
type = "plugin",
@@ -31,9 +41,26 @@ local _M = {}
3141

3242

3343
local function create_limit_obj(conf)
34-
core.log.info("create new limit-conn plugin instance")
35-
return limit_conn_new(shdict_name, conf.conn, conf.burst,
36-
conf.default_conn_delay)
44+
if conf.policy == "local" then
45+
core.log.info("create new limit-conn plugin instance")
46+
return limit_conn_new(shdict_name, conf.conn, conf.burst,
47+
conf.default_conn_delay)
48+
elseif conf.policy == "redis" then
49+
50+
core.log.info("create new limit-conn redis plugin instance")
51+
52+
return redis_single_new("plugin-limit-conn", conf, conf.conn, conf.burst,
53+
conf.default_conn_delay)
54+
55+
elseif conf.policy == "redis-cluster" then
56+
57+
core.log.info("create new limit-conn redis-cluster plugin instance")
58+
59+
return redis_cluster_new("plugin-limit-conn", conf, conf.conn, conf.burst,
60+
conf.default_conn_delay)
61+
else
62+
return nil, "policy enum not match"
63+
end
3764
end
3865

3966

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
--
2+
-- Licensed to the Apache Software Foundation (ASF) under one or more
3+
-- contributor license agreements. See the NOTICE file distributed with
4+
-- this work for additional information regarding copyright ownership.
5+
-- The ASF licenses this file to You under the Apache License, Version 2.0
6+
-- (the "License"); you may not use this file except in compliance with
7+
-- the License. You may obtain a copy of the License at
8+
--
9+
-- http://www.apache.org/licenses/LICENSE-2.0
10+
--
11+
-- Unless required by applicable law or agreed to in writing, software
12+
-- distributed under the License is distributed on an "AS IS" BASIS,
13+
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
-- See the License for the specific language governing permissions and
15+
-- limitations under the License.
16+
--
17+
local redis_cluster = require("apisix.utils.rediscluster")
18+
local core = require("apisix.core")
19+
local util = require("apisix.plugins.limit-conn.util")
20+
local setmetatable = setmetatable
21+
local ngx_timer_at = ngx.timer.at
22+
23+
local _M = {version = 0.1}
24+
25+
26+
local mt = {
27+
__index = _M
28+
}
29+
30+
31+
function _M.new(plugin_name, conf, max, burst, default_conn_delay)
32+
33+
local red_cli, err = redis_cluster.new(conf, "plugin-limit-conn-redis-cluster-slot-lock")
34+
if not red_cli then
35+
return nil, err
36+
end
37+
local self = {
38+
conf = conf,
39+
plugin_name = plugin_name,
40+
burst = burst,
41+
max = max + 0, -- just to ensure the param is good
42+
unit_delay = default_conn_delay,
43+
red_cli = red_cli,
44+
}
45+
return setmetatable(self, mt)
46+
end
47+
48+
49+
function _M.incoming(self, key, commit)
50+
return util.incoming(self, self.red_cli, key, commit)
51+
end
52+
53+
54+
function _M.is_committed(self)
55+
return self.committed
56+
end
57+
58+
59+
local function leaving_thread(premature, self, key, req_latency)
60+
return util.leaving(self, self.red_cli, key, req_latency)
61+
end
62+
63+
64+
function _M.leaving(self, key, req_latency)
65+
-- log_by_lua can't use cosocket
66+
local ok, err = ngx_timer_at(0, leaving_thread, self, key, req_latency)
67+
if not ok then
68+
core.log.error("failed to create timer: ", err)
69+
return nil, err
70+
end
71+
72+
return ok
73+
74+
end
75+
76+
77+
78+
return _M
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
--
2+
-- Licensed to the Apache Software Foundation (ASF) under one or more
3+
-- contributor license agreements. See the NOTICE file distributed with
4+
-- this work for additional information regarding copyright ownership.
5+
-- The ASF licenses this file to You under the Apache License, Version 2.0
6+
-- (the "License"); you may not use this file except in compliance with
7+
-- the License. You may obtain a copy of the License at
8+
--
9+
-- http://www.apache.org/licenses/LICENSE-2.0
10+
--
11+
-- Unless required by applicable law or agreed to in writing, software
12+
-- distributed under the License is distributed on an "AS IS" BASIS,
13+
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
-- See the License for the specific language governing permissions and
15+
-- limitations under the License.
16+
--
17+
local redis = require("apisix.utils.redis")
18+
local core = require("apisix.core")
19+
local util = require("apisix.plugins.limit-conn.util")
20+
local ngx_timer_at = ngx.timer.at
21+
22+
local setmetatable = setmetatable
23+
24+
25+
local _M = {version = 0.1}
26+
27+
28+
local mt = {
29+
__index = _M
30+
}
31+
32+
function _M.new(plugin_name, conf, max, burst, default_conn_delay)
33+
34+
local self = {
35+
conf = conf,
36+
plugin_name = plugin_name,
37+
burst = burst,
38+
max = max + 0, -- just to ensure the param is good
39+
unit_delay = default_conn_delay,
40+
}
41+
return setmetatable(self, mt)
42+
end
43+
44+
45+
function _M.incoming(self, key, commit)
46+
local conf = self.conf
47+
local red, err = redis.new(conf)
48+
if not red then
49+
return red, err
50+
end
51+
return util.incoming(self, red, key, commit)
52+
end
53+
54+
55+
function _M.is_committed(self)
56+
return self.committed
57+
end
58+
59+
60+
local function leaving_thread(premature, self, key, req_latency)
61+
62+
local conf = self.conf
63+
local red, err = redis.new(conf)
64+
if not red then
65+
return red, err
66+
end
67+
return util.leaving(self, red, key, req_latency)
68+
end
69+
70+
71+
function _M.leaving(self, key, req_latency)
72+
-- log_by_lua can't use cosocket
73+
local ok, err = ngx_timer_at(0, leaving_thread, self, key, req_latency)
74+
if not ok then
75+
core.log.error("failed to create timer: ", err)
76+
return nil, err
77+
end
78+
79+
return ok
80+
81+
end
82+
83+
84+
85+
return _M

apisix/plugins/limit-conn/util.lua

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
--
2+
-- Licensed to the Apache Software Foundation (ASF) under one or more
3+
-- contributor license agreements. See the NOTICE file distributed with
4+
-- this work for additional information regarding copyright ownership.
5+
-- The ASF licenses this file to You under the Apache License, Version 2.0
6+
-- (the "License"); you may not use this file except in compliance with
7+
-- the License. You may obtain a copy of the License at
8+
--
9+
-- http://www.apache.org/licenses/LICENSE-2.0
10+
--
11+
-- Unless required by applicable law or agreed to in writing, software
12+
-- distributed under the License is distributed on an "AS IS" BASIS,
13+
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
-- See the License for the specific language governing permissions and
15+
-- limitations under the License.
16+
--
17+
18+
local assert = assert
19+
local math = require "math"
20+
local floor = math.floor
21+
local _M = {version = 0.3}
22+
23+
24+
function _M.incoming(self, red, key, commit)
25+
local max = self.max
26+
self.committed = false
27+
key = "limit_conn" .. ":" .. key
28+
29+
local conn, err
30+
if commit then
31+
conn, err = red:incrby(key, 1)
32+
if not conn then
33+
return nil, err
34+
end
35+
36+
if conn > max + self.burst then
37+
conn, err = red:incrby(key, -1)
38+
if not conn then
39+
return nil, err
40+
end
41+
return nil, "rejected"
42+
end
43+
self.committed = true
44+
45+
else
46+
local conn_from_red, err = red:get(key)
47+
if err then
48+
return nil, err
49+
end
50+
conn = (conn_from_red or 0) + 1
51+
end
52+
53+
if conn > max then
54+
-- make the excessive connections wait
55+
return self.unit_delay * floor((conn - 1) / max), conn
56+
end
57+
58+
-- we return a 0 delay by default
59+
return 0, conn
60+
end
61+
62+
63+
function _M.leaving(self, red, key, req_latency)
64+
assert(key)
65+
key = "limit_conn" .. ":" .. key
66+
67+
local conn, err = red:incrby(key, -1)
68+
if not conn then
69+
return nil, err
70+
end
71+
72+
if req_latency then
73+
local unit_delay = self.unit_delay
74+
self.unit_delay = (req_latency + unit_delay) / 2
75+
end
76+
77+
return conn
78+
end
79+
80+
81+
return _M

0 commit comments

Comments
 (0)