Skip to content

Commit d99cc66

Browse files
author
lixiaoyu
committed
add file lock to avoid race condition
Signed-off-by: lixiaoyu <lixiaoyu2@taou.com>
1 parent bebdcf4 commit d99cc66

2 files changed

Lines changed: 120 additions & 2 deletions

File tree

prometheus_client/multiprocess.py

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
from __future__ import unicode_literals
22

33
from collections import defaultdict
4+
from functools import wraps
45
import glob
56
import json
67
import os
78

9+
from .process_lock import lock, unlock, LOCK_EX
810
from .metrics_core import Metric
911
from .mmap_dict import MmapedDict, mmap_key
1012
from .samples import Sample
@@ -18,6 +20,21 @@
1820
MP_METRIC_HELP = 'Multiprocess metric'
1921

2022

23+
def require_metrics_lock(func):
24+
@wraps(func)
25+
def _wrap(*args, **kwargs):
26+
path = os.environ.get('prometheus_multiproc_dir')
27+
f = open(os.path.join(path, 'metrics.lock'), 'w')
28+
try:
29+
lock(f, LOCK_EX)
30+
return func(*args, **kwargs)
31+
finally:
32+
unlock(f)
33+
f.close()
34+
35+
return _wrap
36+
37+
2138
class MultiProcessCollector(object):
2239
"""Collector for files for multi-process mode."""
2340

@@ -31,6 +48,7 @@ def __init__(self, registry, path=None):
3148
registry.register(self)
3249

3350
@staticmethod
51+
@require_metrics_lock
3452
def merge(files, accumulate=True):
3553
"""Merge metrics from given mmap files.
3654
@@ -149,6 +167,7 @@ def collect(self):
149167
return self.merge(files, accumulate=True)
150168

151169

170+
@require_metrics_lock
152171
def mark_process_dead(pid, path=None):
153172
"""Do bookkeeping for when one process dies in a multi-process setup."""
154173
if path is None:
@@ -175,8 +194,8 @@ def mark_process_dead(pid, path=None):
175194
if not os.path.exists(merge_file):
176195
MmapedDict(merge_file).close()
177196

178-
# do merge
179-
metrics = MultiProcessCollector(None).merge(files + merge_files, accumulate=False)
197+
# do merge, here we use the same method to merge
198+
metrics = MultiProcessCollector.merge(files + merge_files, accumulate=False)
180199
typ_metrics_dict = defaultdict(list)
181200
for metric in metrics:
182201
typ_metrics_dict[metric.type].append(metric)

prometheus_client/process_lock.py

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
#!/usr/bin/env python
2+
# coding=utf-8
3+
4+
import os
5+
6+
__all__ = ('LOCK_EX', 'LOCK_SH', 'LOCK_NB', 'lock', 'unlock')
7+
8+
9+
def _fd(f):
10+
return f.fileno() if hasattr(f, 'fileno') else f
11+
12+
13+
if os.name == 'nt':
14+
import msvcrt
15+
from ctypes import (sizeof, c_ulong, c_void_p, c_int64,
16+
Structure, Union, POINTER, windll, byref)
17+
from ctypes.wintypes import BOOL, DWORD, HANDLE
18+
19+
LOCK_SH = 0 # the default
20+
LOCK_NB = 0x1 # LOCKFILE_FAIL_IMMEDIATELY
21+
LOCK_EX = 0x2 # LOCKFILE_EXCLUSIVE_LOCK
22+
23+
if sizeof(c_ulong) != sizeof(c_void_p):
24+
ULONG_PTR = c_int64
25+
else:
26+
ULONG_PTR = c_ulong
27+
PVOID = c_void_p
28+
29+
30+
class _OFFSET(Structure):
31+
_fields_ = [
32+
('Offset', DWORD),
33+
('OffsetHigh', DWORD)]
34+
35+
36+
class _OFFSET_UNION(Union):
37+
_anonymous_ = ['_offset']
38+
_fields_ = [
39+
('_offset', _OFFSET),
40+
('Pointer', PVOID)]
41+
42+
43+
class OVERLAPPED(Structure):
44+
_anonymous_ = ['_offset_union']
45+
_fields_ = [
46+
('Internal', ULONG_PTR),
47+
('InternalHigh', ULONG_PTR),
48+
('_offset_union', _OFFSET_UNION),
49+
('hEvent', HANDLE)]
50+
51+
52+
LPOVERLAPPED = POINTER(OVERLAPPED)
53+
54+
LockFileEx = windll.kernel32.LockFileEx
55+
LockFileEx.restype = BOOL
56+
LockFileEx.argtypes = [HANDLE, DWORD, DWORD, DWORD, DWORD, LPOVERLAPPED]
57+
UnlockFileEx = windll.kernel32.UnlockFileEx
58+
UnlockFileEx.restype = BOOL
59+
UnlockFileEx.argtypes = [HANDLE, DWORD, DWORD, DWORD, LPOVERLAPPED]
60+
61+
62+
def lock(f, flags):
63+
hfile = msvcrt.get_osfhandle(_fd(f))
64+
overlapped = OVERLAPPED()
65+
ret = LockFileEx(hfile, flags, 0, 0, 0xFFFF0000, byref(overlapped))
66+
return bool(ret)
67+
68+
69+
def unlock(f):
70+
hfile = msvcrt.get_osfhandle(_fd(f))
71+
overlapped = OVERLAPPED()
72+
ret = UnlockFileEx(hfile, 0, 0, 0xFFFF0000, byref(overlapped))
73+
return bool(ret)
74+
else:
75+
try:
76+
import fcntl
77+
78+
LOCK_SH = fcntl.LOCK_SH # shared lock
79+
LOCK_NB = fcntl.LOCK_NB # non-blocking
80+
LOCK_EX = fcntl.LOCK_EX
81+
except (ImportError, AttributeError):
82+
LOCK_EX = LOCK_SH = LOCK_NB = 0
83+
84+
85+
def lock(f, flags):
86+
return False
87+
88+
89+
def unlock(f):
90+
return True
91+
else:
92+
def lock(f, flags):
93+
ret = fcntl.flock(_fd(f), flags)
94+
return ret == 0
95+
96+
97+
def unlock(f):
98+
ret = fcntl.flock(_fd(f), fcntl.LOCK_UN)
99+
return ret == 0

0 commit comments

Comments
 (0)