lock.py 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
  1. import threading
  2. import time as mod_time
  3. import uuid
  4. from redis.exceptions import LockError
  5. from redis.utils import dummy
  6. class Lock(object):
  7. """
  8. A shared, distributed Lock. Using Redis for locking allows the Lock
  9. to be shared across processes and/or machines.
  10. It's left to the user to resolve deadlock issues and make sure
  11. multiple clients play nicely together.
  12. """
  13. lua_release = None
  14. lua_extend = None
  15. # KEYS[1] - lock name
  16. # ARGS[1] - token
  17. # return 1 if the lock was released, otherwise 0
  18. LUA_RELEASE_SCRIPT = """
  19. local token = redis.call('get', KEYS[1])
  20. if not token or token ~= ARGV[1] then
  21. return 0
  22. end
  23. redis.call('del', KEYS[1])
  24. return 1
  25. """
  26. # KEYS[1] - lock name
  27. # ARGS[1] - token
  28. # ARGS[2] - additional milliseconds
  29. # return 1 if the locks time was extended, otherwise 0
  30. LUA_EXTEND_SCRIPT = """
  31. local token = redis.call('get', KEYS[1])
  32. if not token or token ~= ARGV[1] then
  33. return 0
  34. end
  35. local expiration = redis.call('pttl', KEYS[1])
  36. if not expiration then
  37. expiration = 0
  38. end
  39. if expiration < 0 then
  40. return 0
  41. end
  42. redis.call('pexpire', KEYS[1], expiration + ARGV[2])
  43. return 1
  44. """
  45. def __init__(self, redis, name, timeout=None, sleep=0.1,
  46. blocking=True, blocking_timeout=None, thread_local=True):
  47. """
  48. Create a new Lock instance named ``name`` using the Redis client
  49. supplied by ``redis``.
  50. ``timeout`` indicates a maximum life for the lock.
  51. By default, it will remain locked until release() is called.
  52. ``timeout`` can be specified as a float or integer, both representing
  53. the number of seconds to wait.
  54. ``sleep`` indicates the amount of time to sleep per loop iteration
  55. when the lock is in blocking mode and another client is currently
  56. holding the lock.
  57. ``blocking`` indicates whether calling ``acquire`` should block until
  58. the lock has been acquired or to fail immediately, causing ``acquire``
  59. to return False and the lock not being acquired. Defaults to True.
  60. Note this value can be overridden by passing a ``blocking``
  61. argument to ``acquire``.
  62. ``blocking_timeout`` indicates the maximum amount of time in seconds to
  63. spend trying to acquire the lock. A value of ``None`` indicates
  64. continue trying forever. ``blocking_timeout`` can be specified as a
  65. float or integer, both representing the number of seconds to wait.
  66. ``thread_local`` indicates whether the lock token is placed in
  67. thread-local storage. By default, the token is placed in thread local
  68. storage so that a thread only sees its token, not a token set by
  69. another thread. Consider the following timeline:
  70. time: 0, thread-1 acquires `my-lock`, with a timeout of 5 seconds.
  71. thread-1 sets the token to "abc"
  72. time: 1, thread-2 blocks trying to acquire `my-lock` using the
  73. Lock instance.
  74. time: 5, thread-1 has not yet completed. redis expires the lock
  75. key.
  76. time: 5, thread-2 acquired `my-lock` now that it's available.
  77. thread-2 sets the token to "xyz"
  78. time: 6, thread-1 finishes its work and calls release(). if the
  79. token is *not* stored in thread local storage, then
  80. thread-1 would see the token value as "xyz" and would be
  81. able to successfully release the thread-2's lock.
  82. In some use cases it's necessary to disable thread local storage. For
  83. example, if you have code where one thread acquires a lock and passes
  84. that lock instance to a worker thread to release later. If thread
  85. local storage isn't disabled in this case, the worker thread won't see
  86. the token set by the thread that acquired the lock. Our assumption
  87. is that these cases aren't common and as such default to using
  88. thread local storage.
  89. """
  90. self.redis = redis
  91. self.name = name
  92. self.timeout = timeout
  93. self.sleep = sleep
  94. self.blocking = blocking
  95. self.blocking_timeout = blocking_timeout
  96. self.thread_local = bool(thread_local)
  97. self.local = threading.local() if self.thread_local else dummy()
  98. self.local.token = None
  99. if self.timeout and self.sleep > self.timeout:
  100. raise LockError("'sleep' must be less than 'timeout'")
  101. self.register_scripts()
  102. def register_scripts(self):
  103. cls = self.__class__
  104. client = self.redis
  105. if cls.lua_release is None:
  106. cls.lua_release = client.register_script(cls.LUA_RELEASE_SCRIPT)
  107. if cls.lua_extend is None:
  108. cls.lua_extend = client.register_script(cls.LUA_EXTEND_SCRIPT)
  109. def __enter__(self):
  110. # force blocking, as otherwise the user would have to check whether
  111. # the lock was actually acquired or not.
  112. if self.acquire(blocking=True):
  113. return self
  114. raise LockError("Unable to acquire lock within the time specified")
  115. def __exit__(self, exc_type, exc_value, traceback):
  116. self.release()
  117. def acquire(self, blocking=None, blocking_timeout=None):
  118. """
  119. Use Redis to hold a shared, distributed lock named ``name``.
  120. Returns True once the lock is acquired.
  121. If ``blocking`` is False, always return immediately. If the lock
  122. was acquired, return True, otherwise return False.
  123. ``blocking_timeout`` specifies the maximum number of seconds to
  124. wait trying to acquire the lock.
  125. """
  126. sleep = self.sleep
  127. token = uuid.uuid1().hex.encode()
  128. if blocking is None:
  129. blocking = self.blocking
  130. if blocking_timeout is None:
  131. blocking_timeout = self.blocking_timeout
  132. stop_trying_at = None
  133. if blocking_timeout is not None:
  134. stop_trying_at = mod_time.time() + blocking_timeout
  135. while True:
  136. if self.do_acquire(token):
  137. self.local.token = token
  138. return True
  139. if not blocking:
  140. return False
  141. if stop_trying_at is not None and mod_time.time() > stop_trying_at:
  142. return False
  143. mod_time.sleep(sleep)
  144. def do_acquire(self, token):
  145. if self.timeout:
  146. # convert to milliseconds
  147. timeout = int(self.timeout * 1000)
  148. else:
  149. timeout = None
  150. if self.redis.set(self.name, token, nx=True, px=timeout):
  151. return True
  152. return False
  153. def locked(self):
  154. """
  155. Returns True if this key is locked by any process, otherwise False.
  156. """
  157. return self.redis.get(self.name) is not None
  158. def release(self):
  159. "Releases the already acquired lock"
  160. expected_token = self.local.token
  161. if expected_token is None:
  162. raise LockError("Cannot release an unlocked lock")
  163. self.local.token = None
  164. self.do_release(expected_token)
  165. def do_release(self, expected_token):
  166. if not bool(self.lua_release(keys=[self.name],
  167. args=[expected_token],
  168. client=self.redis)):
  169. raise LockError("Cannot release a lock that's no longer owned")
  170. def extend(self, additional_time):
  171. """
  172. Adds more time to an already acquired lock.
  173. ``additional_time`` can be specified as an integer or a float, both
  174. representing the number of seconds to add.
  175. """
  176. if self.local.token is None:
  177. raise LockError("Cannot extend an unlocked lock")
  178. if self.timeout is None:
  179. raise LockError("Cannot extend a lock with no timeout")
  180. return self.do_extend(additional_time)
  181. def do_extend(self, additional_time):
  182. additional_time = int(additional_time * 1000)
  183. if not bool(self.lua_extend(keys=[self.name],
  184. args=[self.local.token, additional_time],
  185. client=self.redis)):
  186. raise LockError("Cannot extend a lock that's no longer owned")
  187. return True