sentinel.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298
  1. import os
  2. import random
  3. import weakref
  4. from redis.client import Redis
  5. from redis.connection import ConnectionPool, Connection
  6. from redis.exceptions import (ConnectionError, ResponseError, ReadOnlyError,
  7. TimeoutError)
  8. from redis._compat import iteritems, nativestr, xrange
  9. class MasterNotFoundError(ConnectionError):
  10. pass
  11. class SlaveNotFoundError(ConnectionError):
  12. pass
  13. class SentinelManagedConnection(Connection):
  14. def __init__(self, **kwargs):
  15. self.connection_pool = kwargs.pop('connection_pool')
  16. super(SentinelManagedConnection, self).__init__(**kwargs)
  17. def __repr__(self):
  18. pool = self.connection_pool
  19. s = '%s<service=%s%%s>' % (type(self).__name__, pool.service_name)
  20. if self.host:
  21. host_info = ',host=%s,port=%s' % (self.host, self.port)
  22. s = s % host_info
  23. return s
  24. def connect_to(self, address):
  25. self.host, self.port = address
  26. super(SentinelManagedConnection, self).connect()
  27. if self.connection_pool.check_connection:
  28. self.send_command('PING')
  29. if nativestr(self.read_response()) != 'PONG':
  30. raise ConnectionError('PING failed')
  31. def connect(self):
  32. if self._sock:
  33. return # already connected
  34. if self.connection_pool.is_master:
  35. self.connect_to(self.connection_pool.get_master_address())
  36. else:
  37. for slave in self.connection_pool.rotate_slaves():
  38. try:
  39. return self.connect_to(slave)
  40. except ConnectionError:
  41. continue
  42. raise SlaveNotFoundError # Never be here
  43. def read_response(self):
  44. try:
  45. return super(SentinelManagedConnection, self).read_response()
  46. except ReadOnlyError:
  47. if self.connection_pool.is_master:
  48. # When talking to a master, a ReadOnlyError when likely
  49. # indicates that the previous master that we're still connected
  50. # to has been demoted to a slave and there's a new master.
  51. # calling disconnect will force the connection to re-query
  52. # sentinel during the next connect() attempt.
  53. self.disconnect()
  54. raise ConnectionError('The previous master is now a slave')
  55. raise
  56. class SentinelConnectionPool(ConnectionPool):
  57. """
  58. Sentinel backed connection pool.
  59. If ``check_connection`` flag is set to True, SentinelManagedConnection
  60. sends a PING command right after establishing the connection.
  61. """
  62. def __init__(self, service_name, sentinel_manager, **kwargs):
  63. kwargs['connection_class'] = kwargs.get(
  64. 'connection_class', SentinelManagedConnection)
  65. self.is_master = kwargs.pop('is_master', True)
  66. self.check_connection = kwargs.pop('check_connection', False)
  67. super(SentinelConnectionPool, self).__init__(**kwargs)
  68. self.connection_kwargs['connection_pool'] = weakref.proxy(self)
  69. self.service_name = service_name
  70. self.sentinel_manager = sentinel_manager
  71. def __repr__(self):
  72. return "%s<service=%s(%s)" % (
  73. type(self).__name__,
  74. self.service_name,
  75. self.is_master and 'master' or 'slave',
  76. )
  77. def reset(self):
  78. super(SentinelConnectionPool, self).reset()
  79. self.master_address = None
  80. self.slave_rr_counter = None
  81. def get_master_address(self):
  82. master_address = self.sentinel_manager.discover_master(
  83. self.service_name)
  84. if self.is_master:
  85. if self.master_address is None:
  86. self.master_address = master_address
  87. elif master_address != self.master_address:
  88. # Master address changed, disconnect all clients in this pool
  89. self.disconnect()
  90. return master_address
  91. def rotate_slaves(self):
  92. "Round-robin slave balancer"
  93. slaves = self.sentinel_manager.discover_slaves(self.service_name)
  94. if slaves:
  95. if self.slave_rr_counter is None:
  96. self.slave_rr_counter = random.randint(0, len(slaves) - 1)
  97. for _ in xrange(len(slaves)):
  98. self.slave_rr_counter = (
  99. self.slave_rr_counter + 1) % len(slaves)
  100. slave = slaves[self.slave_rr_counter]
  101. yield slave
  102. # Fallback to the master connection
  103. try:
  104. yield self.get_master_address()
  105. except MasterNotFoundError:
  106. pass
  107. raise SlaveNotFoundError('No slave found for %r' % (self.service_name))
  108. def _checkpid(self):
  109. if self.pid != os.getpid():
  110. self.disconnect()
  111. self.reset()
  112. self.__init__(self.service_name, self.sentinel_manager,
  113. is_master=self.is_master,
  114. check_connection=self.check_connection,
  115. connection_class=self.connection_class,
  116. max_connections=self.max_connections,
  117. **self.connection_kwargs)
  118. class Sentinel(object):
  119. """
  120. Redis Sentinel cluster client
  121. >>> from redis.sentinel import Sentinel
  122. >>> sentinel = Sentinel([('localhost', 26379)], socket_timeout=0.1)
  123. >>> master = sentinel.master_for('mymaster', socket_timeout=0.1)
  124. >>> master.set('foo', 'bar')
  125. >>> slave = sentinel.slave_for('mymaster', socket_timeout=0.1)
  126. >>> slave.get('foo')
  127. 'bar'
  128. ``sentinels`` is a list of sentinel nodes. Each node is represented by
  129. a pair (hostname, port).
  130. ``min_other_sentinels`` defined a minimum number of peers for a sentinel.
  131. When querying a sentinel, if it doesn't meet this threshold, responses
  132. from that sentinel won't be considered valid.
  133. ``sentinel_kwargs`` is a dictionary of connection arguments used when
  134. connecting to sentinel instances. Any argument that can be passed to
  135. a normal Redis connection can be specified here. If ``sentinel_kwargs`` is
  136. not specified, any socket_timeout and socket_keepalive options specified
  137. in ``connection_kwargs`` will be used.
  138. ``connection_kwargs`` are keyword arguments that will be used when
  139. establishing a connection to a Redis server.
  140. """
  141. def __init__(self, sentinels, min_other_sentinels=0, sentinel_kwargs=None,
  142. **connection_kwargs):
  143. # if sentinel_kwargs isn't defined, use the socket_* options from
  144. # connection_kwargs
  145. if sentinel_kwargs is None:
  146. sentinel_kwargs = {
  147. k: v
  148. for k, v in iteritems(connection_kwargs)
  149. if k.startswith('socket_')
  150. }
  151. self.sentinel_kwargs = sentinel_kwargs
  152. self.sentinels = [Redis(hostname, port, **self.sentinel_kwargs)
  153. for hostname, port in sentinels]
  154. self.min_other_sentinels = min_other_sentinels
  155. self.connection_kwargs = connection_kwargs
  156. def __repr__(self):
  157. sentinel_addresses = []
  158. for sentinel in self.sentinels:
  159. sentinel_addresses.append('%s:%s' % (
  160. sentinel.connection_pool.connection_kwargs['host'],
  161. sentinel.connection_pool.connection_kwargs['port'],
  162. ))
  163. return '%s<sentinels=[%s]>' % (
  164. type(self).__name__,
  165. ','.join(sentinel_addresses))
  166. def check_master_state(self, state, service_name):
  167. if not state['is_master'] or state['is_sdown'] or state['is_odown']:
  168. return False
  169. # Check if our sentinel doesn't see other nodes
  170. if state['num-other-sentinels'] < self.min_other_sentinels:
  171. return False
  172. return True
  173. def discover_master(self, service_name):
  174. """
  175. Asks sentinel servers for the Redis master's address corresponding
  176. to the service labeled ``service_name``.
  177. Returns a pair (address, port) or raises MasterNotFoundError if no
  178. master is found.
  179. """
  180. for sentinel_no, sentinel in enumerate(self.sentinels):
  181. try:
  182. masters = sentinel.sentinel_masters()
  183. except (ConnectionError, TimeoutError):
  184. continue
  185. state = masters.get(service_name)
  186. if state and self.check_master_state(state, service_name):
  187. # Put this sentinel at the top of the list
  188. self.sentinels[0], self.sentinels[sentinel_no] = (
  189. sentinel, self.sentinels[0])
  190. return state['ip'], state['port']
  191. raise MasterNotFoundError("No master found for %r" % (service_name,))
  192. def filter_slaves(self, slaves):
  193. "Remove slaves that are in an ODOWN or SDOWN state"
  194. slaves_alive = []
  195. for slave in slaves:
  196. if slave['is_odown'] or slave['is_sdown']:
  197. continue
  198. slaves_alive.append((slave['ip'], slave['port']))
  199. return slaves_alive
  200. def discover_slaves(self, service_name):
  201. "Returns a list of alive slaves for service ``service_name``"
  202. for sentinel in self.sentinels:
  203. try:
  204. slaves = sentinel.sentinel_slaves(service_name)
  205. except (ConnectionError, ResponseError, TimeoutError):
  206. continue
  207. slaves = self.filter_slaves(slaves)
  208. if slaves:
  209. return slaves
  210. return []
  211. def master_for(self, service_name, redis_class=Redis,
  212. connection_pool_class=SentinelConnectionPool, **kwargs):
  213. """
  214. Returns a redis client instance for the ``service_name`` master.
  215. A SentinelConnectionPool class is used to retrive the master's
  216. address before establishing a new connection.
  217. NOTE: If the master's address has changed, any cached connections to
  218. the old master are closed.
  219. By default clients will be a redis.Redis instance. Specify a
  220. different class to the ``redis_class`` argument if you desire
  221. something different.
  222. The ``connection_pool_class`` specifies the connection pool to use.
  223. The SentinelConnectionPool will be used by default.
  224. All other keyword arguments are merged with any connection_kwargs
  225. passed to this class and passed to the connection pool as keyword
  226. arguments to be used to initialize Redis connections.
  227. """
  228. kwargs['is_master'] = True
  229. connection_kwargs = dict(self.connection_kwargs)
  230. connection_kwargs.update(kwargs)
  231. return redis_class(connection_pool=connection_pool_class(
  232. service_name, self, **connection_kwargs))
  233. def slave_for(self, service_name, redis_class=Redis,
  234. connection_pool_class=SentinelConnectionPool, **kwargs):
  235. """
  236. Returns redis client instance for the ``service_name`` slave(s).
  237. A SentinelConnectionPool class is used to retrive the slave's
  238. address before establishing a new connection.
  239. By default clients will be a redis.Redis instance. Specify a
  240. different class to the ``redis_class`` argument if you desire
  241. something different.
  242. The ``connection_pool_class`` specifies the connection pool to use.
  243. The SentinelConnectionPool will be used by default.
  244. All other keyword arguments are merged with any connection_kwargs
  245. passed to this class and passed to the connection pool as keyword
  246. arguments to be used to initialize Redis connections.
  247. """
  248. kwargs['is_master'] = False
  249. connection_kwargs = dict(self.connection_kwargs)
  250. connection_kwargs.update(kwargs)
  251. return redis_class(connection_pool=connection_pool_class(
  252. service_name, self, **connection_kwargs))