forked from knyar/nginx-lua-prometheus
-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathprometheus_keys.lua
More file actions
179 lines (164 loc) · 5.24 KB
/
prometheus_keys.lua
File metadata and controls
179 lines (164 loc) · 5.24 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
-- Storage to keep track of used keys. Allows to atomically create, delete
-- and list keys. The keys are synchronized between nginx workers
-- using ngx.shared.dict. The whole purpose of this module is to avoid
-- using ngx.shared.dict:get_keys (see https://github.com/openresty/lua-nginx-module#ngxshareddictget_keys),
-- which blocks all workers and therefore it shouldn't be used with large
-- amounts of keys.
local KeyIndex = {}
KeyIndex.__index = KeyIndex
-- check and remove expired keys
local function remove_expired_keys(_, self)
self:remove_expired_keys()
end
function KeyIndex.new(shared_dict, prefix, remove_expired_keys_interval)
local self = setmetatable({}, KeyIndex)
self.dict = shared_dict
self.key_prefix = prefix .. "key_"
self.delete_count = prefix .. "delete_count"
self.key_count = prefix .. "key_count"
self.last = 0
self.deleted = 0
self.not_expired_index = 1
self.keys = {}
self.index = {}
self.expire_keys = {}
ngx.timer.every(remove_expired_keys_interval or 600, remove_expired_keys, self)
return self
end
-- check and remove expired keys
function KeyIndex:remove_expired_keys()
for i, _ in pairs(self.expire_keys) do
-- Read i-th key. If it is nil or ttl is < 0, it means it was expired
local ttl, err = self.dict:ttl(self.key_prefix .. i)
if not (ttl and ttl >= 0 or err and err ~= "not found") then
if self.keys[i] then
self.index[self.keys[i]] = nil
self.keys[i] = nil
end
self.expire_keys[i] = nil
end
end
end
-- Loads new keys that might have been added by other workers since last sync.
function KeyIndex:sync()
local delete_count = self.dict:get(self.delete_count) or 0
local N = self.dict:get(self.key_count) or 0
if self.deleted ~= delete_count then
-- Some other worker deleted something, lets do a full sync.
self:sync_range(0, N)
self.deleted = delete_count
elseif N ~= self.last then
-- Sync only new keys, if there are any.
self:sync_range(self.last, N)
end
return N
end
-- Iterates keys from first to last, adds new items and removes deleted items.
function KeyIndex:sync_range(first, last)
for i = first, last do
-- Read i-th key. If it is nil, it means it was deleted by some other thread.
local key = self.dict:get(self.key_prefix .. i)
if key then
self.keys[i] = key
self.index[key] = i
-- if it is nil and ttl not is 0, set expire_keys map
if not self.expire_keys[i] then
local ttl, _ = self.dict:ttl(self.key_prefix .. i)
if ttl and ttl ~= 0 then
self.expire_keys[i] = true
end
end
elseif self.keys[i] then
self.index[self.keys[i]] = nil
self.keys[i] = nil
self.expire_keys[i] = nil
end
end
self.last = last
end
-- Returns array of all keys.
function KeyIndex:list()
self:sync()
local copy = {}
local i = 1
for _, v in pairs(self.keys) do
copy[i] = v
i = i + 1
end
return copy
end
-- Atomically adds one or more keys to the index.
--
-- Args:
-- key_or_keys: Single string or a list of strings containing keys to add.
--
-- Returns:
-- nil on success, string with error message otherwise
function KeyIndex:add(key_or_keys, err_msg_lru_eviction, exptime)
local keys = key_or_keys
if type(key_or_keys) == "string" then
keys = { key_or_keys }
end
for _, key in pairs(keys) do
while true do
local N = self:sync()
if self.index[key] ~= nil then
-- key already exists, if has exptime, set expire
local expired = false
if exptime then
local ok = self.dict:expire(self.key_prefix .. self.index[key], exptime)
-- if key has already expired, remove it from the index
if not ok then
local idx = self.index[key]
self.index[key] = nil
self.keys[idx] = nil
self.expire_keys[idx] = nil
expired = true
end
end
if not expired then
break
end
end
N = N+1
local ok, err, forcible = self.dict:add(self.key_prefix .. N, key, exptime)
if ok then
local _, _, forcible2 = self.dict:incr(self.key_count, 1, 0)
self.keys[N] = key
self.index[key] = N
if exptime and exptime > 0 then
self.expire_keys[N] = true
end
if forcible or forcible2 then
return (err_msg_lru_eviction .. "; key index: add key: idx=" ..
self.key_prefix .. N .. ", key=" .. key)
end
break
elseif err ~= "exists" then
return "Unexpected error adding a key: " .. err
end
end
end
end
-- Removes a key based on its value.
--
-- Args:
-- key: String value of the key, must exists in this index.
function KeyIndex:remove(key, err_msg_lru_eviction)
local i = self.index[key]
if i then
self.index[key] = nil
self.keys[i] = nil
self.expire_keys[i] = nil
self.dict:set(self.key_prefix .. i, nil)
self.deleted = self.deleted + 1
-- increment delete_count to signalize other workers that they should do a full sync
local _, err, forcible = self.dict:incr(self.delete_count, 1, 0)
if err or forcible then
return err or err_msg_lru_eviction
end
else
ngx.log(ngx.ERR, "Trying to remove non-existent key: ", key)
end
end
return KeyIndex