D7net
Home
Console
Upload
information
Create File
Create Folder
About
Tools
:
/
proc
/
3023066
/
root
/
opt
/
saltstack
/
salt
/
lib
/
python3.10
/
site-packages
/
salt
/
cache
/
Filename :
redis_cache.py
back
Copy
r""" Redis ===== Redis plugin for the Salt caching subsystem. .. versionadded:: 2017.7.0 .. versionchanged:: 3005 To enable this cache plugin, the master will need the python client for redis installed. This can be easily installed with pip: .. code-block:: bash salt \* pip.install redis As Redis provides a simple mechanism for very fast key-value store, in order to provide the necessary features for the Salt caching subsystem, the following conventions are used: - A Redis key consists of the bank name and the cache key separated by ``/``, e.g.: ``$KEY_minions/alpha/stuff`` where ``minions/alpha`` is the bank name and ``stuff`` is the key name. - As the caching subsystem is organised as a tree, we need to store the caching path and identify the bank and its offspring. At the same time, Redis is linear and we need to avoid doing ``keys <pattern>`` which is very inefficient as it goes through all the keys on the remote Redis server. Instead, each bank hierarchy has a Redis SET associated which stores the list of sub-banks. By default, these keys begin with ``$BANK_``. - In addition, each key name is stored in a separate SET of all the keys within a bank. By default, these SETs begin with ``$BANKEYS_``. For example, to store the key ``my-key`` under the bank ``root-bank/sub-bank/leaf-bank``, the following hierarchy will be built: .. code-block:: text 127.0.0.1:6379> SMEMBERS $BANK_root-bank 1) "sub-bank" 127.0.0.1:6379> SMEMBERS $BANK_root-bank/sub-bank 1) "leaf-bank" 127.0.0.1:6379> SMEMBERS $BANKEYS_root-bank/sub-bank/leaf-bank 1) "my-key" 127.0.0.1:6379> GET $KEY_root-bank/sub-bank/leaf-bank/my-key "my-value" There are four types of keys stored: - ``$BANK_*`` is a Redis SET containing the list of banks under the current bank. - ``$BANKEYS_*`` is a Redis SET containing the list of keys under the current bank. - ``$KEY_*`` keeps the value of the key. - ``$TSTAMP_*`` stores the last updated timestamp of the key. These prefixes and the separator can be adjusted using the configuration options: bank_prefix: ``$BANK`` The prefix used for the name of the Redis key storing the list of sub-banks. bank_keys_prefix: ``$BANKEYS`` The prefix used for the name of the Redis key storing the list of keys under a certain bank. key_prefix: ``$KEY`` The prefix of the Redis keys having the value of the keys to be cached under a certain bank. timestamp_prefix: ``$TSTAMP`` The prefix for the last modified timestamp for keys. .. versionadded:: 3005 separator: ``_`` The separator between the prefix and the key body. The connection details can be specified using: host: ``localhost`` The hostname of the Redis server. port: ``6379`` The Redis server port. cluster_mode: ``False`` Whether cluster_mode is enabled or not cluster.startup_nodes: A list of host, port dictionaries pointing to cluster members. At least one is required but multiple nodes are better .. code-block:: yaml cache.redis.cluster.startup_nodes - host: redis-member-1 port: 6379 - host: redis-member-2 port: 6379 cluster.skip_full_coverage_check: ``False`` Some cluster providers restrict certain redis commands such as CONFIG for enhanced security. Set this option to true to skip checks that required advanced privileges. .. note:: Most cloud hosted redis clusters will require this to be set to ``True`` db: ``'0'`` The database index. .. note:: The database index must be specified as string not as integer value! password: Redis connection password. unix_socket_path: .. versionadded:: 2018.3.1 Path to a UNIX socket for access. Overrides `host` / `port`. Configuration Example: .. code-block:: yaml cache.redis.host: localhost cache.redis.port: 6379 cache.redis.db: '0' cache.redis.password: my pass cache.redis.bank_prefix: #BANK cache.redis.bank_keys_prefix: #BANKEYS cache.redis.key_prefix: #KEY cache.redis.timestamp_prefix: #TICKS cache.redis.separator: '@' Cluster Configuration Example: .. code-block:: yaml cache.redis.cluster_mode: true cache.redis.cluster.skip_full_coverage_check: true cache.redis.cluster.startup_nodes: - host: redis-member-1 port: 6379 - host: redis-member-2 port: 6379 cache.redis.db: '0' cache.redis.password: my pass cache.redis.bank_prefix: #BANK cache.redis.bank_keys_prefix: #BANKEYS cache.redis.key_prefix: #KEY cache.redis.separator: '@' """ import logging import time import salt.payload import salt.utils.stringutils from salt.exceptions import SaltCacheError # Import salt try: import redis from redis.exceptions import ConnectionError as RedisConnectionError from redis.exceptions import ResponseError as RedisResponseError HAS_REDIS = True except ImportError: HAS_REDIS = False try: from rediscluster import RedisCluster # pylint: disable=no-name-in-module HAS_REDIS_CLUSTER = True except ImportError: HAS_REDIS_CLUSTER = False # ----------------------------------------------------------------------------- # module properties # ----------------------------------------------------------------------------- __virtualname__ = "redis" __func_alias__ = {"list_": "list"} log = logging.getLogger(__file__) _BANK_PREFIX = "$BANK" _KEY_PREFIX = "$KEY" _TIMESTAMP_PREFIX = "$TSTAMP" _BANK_KEYS_PREFIX = "$BANKEYS" _SEPARATOR = "_" REDIS_SERVER = None # ----------------------------------------------------------------------------- # property functions # ----------------------------------------------------------------------------- def __virtual__(): """ The redis library must be installed for this module to work. The redis redis cluster library must be installed if cluster_mode is True """ if not HAS_REDIS: return (False, "Please install the python-redis package.") if not HAS_REDIS_CLUSTER and _get_redis_cache_opts()["cluster_mode"]: return (False, "Please install the redis-py-cluster package.") return __virtualname__ # ----------------------------------------------------------------------------- # helper functions -- will not be exported # ----------------------------------------------------------------------------- def init_kwargs(kwargs): """ Effectively a noop. Return an empty dictionary. """ return {} def _get_redis_cache_opts(): """ Return the Redis server connection details from the __opts__. """ return { "host": __opts__.get("cache.redis.host", "localhost"), "port": __opts__.get("cache.redis.port", 6379), "unix_socket_path": __opts__.get("cache.redis.unix_socket_path", None), "db": __opts__.get("cache.redis.db", "0"), "password": __opts__.get("cache.redis.password", ""), "cluster_mode": __opts__.get("cache.redis.cluster_mode", False), "startup_nodes": __opts__.get("cache.redis.cluster.startup_nodes", {}), "skip_full_coverage_check": __opts__.get( "cache.redis.cluster.skip_full_coverage_check", False ), } def _get_redis_server(opts=None): """ Return the Redis server instance. Caching the object instance. """ global REDIS_SERVER if REDIS_SERVER: return REDIS_SERVER if not opts: opts = _get_redis_cache_opts() if opts["cluster_mode"]: REDIS_SERVER = RedisCluster( startup_nodes=opts["startup_nodes"], skip_full_coverage_check=opts["skip_full_coverage_check"], ) else: REDIS_SERVER = redis.StrictRedis( opts["host"], opts["port"], unix_socket_path=opts["unix_socket_path"], db=opts["db"], password=opts["password"], ) return REDIS_SERVER def _get_redis_keys_opts(): """ Build the key opts based on the user options. """ return { "bank_prefix": __opts__.get("cache.redis.bank_prefix", _BANK_PREFIX), "bank_keys_prefix": __opts__.get( "cache.redis.bank_keys_prefix", _BANK_KEYS_PREFIX ), "key_prefix": __opts__.get("cache.redis.key_prefix", _KEY_PREFIX), "separator": __opts__.get("cache.redis.separator", _SEPARATOR), "timestamp_prefix": __opts__.get( "cache.redis.timestamp_prefix", _TIMESTAMP_PREFIX ), } def _get_bank_redis_key(bank): """ Return the Redis key for the bank given the name. """ opts = _get_redis_keys_opts() return "{prefix}{separator}{bank}".format( prefix=opts["bank_prefix"], separator=opts["separator"], bank=bank ) def _get_timestamp_key(bank, key): opts = _get_redis_keys_opts() return "{}{}{}/{}".format( opts["timestamp_prefix"], opts["separator"], {bank}, {key} ) # Use this line when we can use modern python # return f"{opts['timestamp_prefix']}{opts['separator']}{bank}/{key}" def _get_key_redis_key(bank, key): """ Return the Redis key given the bank name and the key name. """ opts = _get_redis_keys_opts() return "{prefix}{separator}{bank}/{key}".format( prefix=opts["key_prefix"], separator=opts["separator"], bank=bank, key=salt.utils.stringutils.to_str(key), ) def _get_bank_keys_redis_key(bank): """ Return the Redis key for the SET of keys under a certain bank, given the bank name. """ opts = _get_redis_keys_opts() return "{prefix}{separator}{bank}".format( prefix=opts["bank_keys_prefix"], separator=opts["separator"], bank=bank ) def _build_bank_hier(bank, redis_pipe): """ Build the bank hierarchy from the root of the tree. For each level in the bank path: - ensure a ``.`` placeholder exists in ``$BANK_<level>`` so that an empty bank still has a recognisable record; - for every non-root level, register this segment as a child of its parent in both ``$BANK_<parent>`` (consumed by the flush tree-traversal in ``_get_banks_to_remove``) and ``$BANKEYS_<parent>`` (consumed by ``list_()``). Without this, ``list_("minions")`` reads an empty set even though the data is stored correctly under ``minions/<id>``, breaking ``CkMinions.connected_ids()`` and therefore ``salt-run manage.present`` / ``manage.up`` for any deployment that uses the redis cache backend. Uses the Redis pipeline so there is only one round-trip with the server. """ parts = bank.split("/") for index, segment in enumerate(parts): bank_path = "/".join(parts[: index + 1]) bank_set = _get_bank_redis_key(bank_path) log.debug("Adding %s to %s", bank, bank_set) redis_pipe.sadd(bank_set, ".") if index > 0: parent_path = "/".join(parts[:index]) # Register the child in BOTH the parent's $BANK_ set (so # ``_get_banks_to_remove`` finds it for flush traversal) # AND the parent's $BANKEYS_ set (so ``list_()`` returns # it). Both reads exist in the codebase and both must see # the child for the cache to behave like the localfs # backend. redis_pipe.sadd(_get_bank_redis_key(parent_path), segment) redis_pipe.sadd(_get_bank_keys_redis_key(parent_path), segment) def _get_banks_to_remove(redis_server, bank, path=""): """ A simple tree traversal algorithm that builds the list of banks to remove, starting from an arbitrary node in the tree. """ current_path = bank if not path else f"{path}/{bank}" bank_paths_to_remove = [current_path] # as you got here, you'll be removed bank_key = _get_bank_redis_key(current_path) child_banks = redis_server.smembers(bank_key) if not child_banks: return bank_paths_to_remove # this bank does not have any child banks so we stop here for child_bank in child_banks: # ``smembers`` returns ``bytes`` because the cache client is not # configured with ``decode_responses=True``; decode here so that # the recursive path concatenation does not embed ``b'foo'`` in # the resulting Redis key name. if isinstance(child_bank, bytes): child_bank = child_bank.decode() # Skip the ``.`` placeholder written by ``_build_bank_hier`` -- # it marks "this bank exists" and is not itself a sub-bank. if child_bank == ".": continue bank_paths_to_remove.extend( _get_banks_to_remove(redis_server, child_bank, path=current_path) ) # go one more level deeper # and also remove the children of this child bank (if any) return bank_paths_to_remove # ----------------------------------------------------------------------------- # cache subsystem functions # ----------------------------------------------------------------------------- def store(bank, key, data): """ Store the data in a Redis key. """ redis_server = _get_redis_server() redis_pipe = redis_server.pipeline() redis_key = _get_key_redis_key(bank, key) redis_bank_keys = _get_bank_keys_redis_key(bank) try: _build_bank_hier(bank, redis_pipe) value = salt.payload.dumps(data) redis_pipe.set(redis_key, value) log.debug("Setting the value for %s under %s (%s)", key, bank, redis_key) redis_pipe.sadd(redis_bank_keys, key) # localfs cache truncates the timestamp to int only. We'll do the same. redis_pipe.set( _get_timestamp_key(bank=bank, key=key), salt.payload.dumps(int(time.time())), ) log.debug("Adding %s to %s", key, redis_bank_keys) redis_pipe.execute() except (RedisConnectionError, RedisResponseError) as rerr: mesg = "Cannot set the Redis cache key {rkey}: {rerr}".format( rkey=redis_key, rerr=rerr ) log.error(mesg) raise SaltCacheError(mesg) def fetch(bank, key): """ Fetch data from the Redis cache. """ redis_server = _get_redis_server() redis_key = _get_key_redis_key(bank, key) redis_value = None try: redis_value = redis_server.get(redis_key) except (RedisConnectionError, RedisResponseError) as rerr: mesg = "Cannot fetch the Redis cache key {rkey}: {rerr}".format( rkey=redis_key, rerr=rerr ) log.error(mesg) raise SaltCacheError(mesg) if redis_value is None: return {} return salt.payload.loads(redis_value) def flush(bank, key=None): """ Remove the key from the cache bank with all the key content. If no key is specified, remove the entire bank with all keys and sub-banks inside. This function is using the Redis pipelining for best performance. However, when removing a whole bank, in order to re-create the tree, there are a couple of requests made. In total: - one for node in the hierarchy sub-tree, starting from the bank node - one pipelined request to get the keys under all banks in the sub-tree - one pipeline request to remove the corresponding keys This is not quite optimal, as if we need to flush a bank having a very long list of sub-banks, the number of requests to build the sub-tree may grow quite big. An improvement for this would be loading a custom Lua script in the Redis instance of the user (using the ``register_script`` feature) and call it whenever we flush. This script would only need to build this sub-tree causing problems. It can be added later and the behaviour should not change as the user needs to explicitly allow Salt inject scripts in their Redis instance. """ redis_server = _get_redis_server() redis_pipe = redis_server.pipeline() if key is None: # will remove all bank keys bank_paths_to_remove = _get_banks_to_remove(redis_server, bank) # tree traversal to get all bank hierarchy for bank_to_remove in bank_paths_to_remove: bank_keys_redis_key = _get_bank_keys_redis_key(bank_to_remove) # Redis key of the SET that stores the bank keys redis_pipe.smembers(bank_keys_redis_key) # fetch these keys log.debug( "Fetching the keys of the %s bank (%s)", bank_to_remove, bank_keys_redis_key, ) try: log.debug("Executing the pipe...") subtree_keys = ( redis_pipe.execute() ) # here are the keys under these banks to be removed # this retunrs a list of sets, e.g.: # [set([]), set(['my-key']), set(['my-other-key', 'yet-another-key'])] # one set corresponding to a bank except (RedisConnectionError, RedisResponseError) as rerr: mesg = "Cannot retrieve the keys under these cache banks: {rbanks}: {rerr}".format( rbanks=", ".join(bank_paths_to_remove), rerr=rerr ) log.error(mesg) raise SaltCacheError(mesg) total_banks = len(bank_paths_to_remove) # bank_paths_to_remove and subtree_keys have the same length (see above) for index in range(total_banks): bank_keys = subtree_keys[index] # all the keys under this bank bank_path = bank_paths_to_remove[index] for key in bank_keys: redis_key = _get_key_redis_key(bank_path, key) redis_pipe.delete(redis_key) # kill 'em all! timestamp_key = _get_timestamp_key(bank=bank_path, key=key.decode()) redis_pipe.delete(timestamp_key) log.debug( "Removing the key %s under the %s bank (%s)", key, bank_path, redis_key, ) bank_keys_redis_key = _get_bank_keys_redis_key(bank_path) redis_pipe.delete(bank_keys_redis_key) log.debug( "Removing the bank-keys key for the %s bank (%s)", bank_path, bank_keys_redis_key, ) # delete the Redis key where are stored # the list of keys under this bank bank_key = _get_bank_redis_key(bank_path) redis_pipe.delete(bank_key) log.debug("Removing the %s bank (%s)", bank_path, bank_key) # delete the bank key itself # Drop this bank's own reference from its parent's index sets, # otherwise ``list_(parent)`` would still report the bank as # present after a full flush. ``_build_bank_hier`` writes into # both the parent's $BANK_ and $BANKEYS_ sets (see that # function's docstring); both must be cleaned up here. if "/" in bank: parent_path, segment = bank.rsplit("/", 1) redis_pipe.srem(_get_bank_redis_key(parent_path), segment) redis_pipe.srem(_get_bank_keys_redis_key(parent_path), segment) else: redis_key = _get_key_redis_key(bank, key) redis_pipe.delete(redis_key) # delete the key cached timestamp_key = _get_timestamp_key(bank=bank, key=key) redis_pipe.delete(timestamp_key) log.debug("Removing the key %s under the %s bank (%s)", key, bank, redis_key) bank_keys_redis_key = _get_bank_keys_redis_key(bank) redis_pipe.srem(bank_keys_redis_key, key) log.debug( "De-referencing the key %s from the bank-keys of the %s bank (%s)", key, bank, bank_keys_redis_key, ) # but also its reference from $BANKEYS list try: redis_pipe.execute() # Fluuuush except (RedisConnectionError, RedisResponseError) as rerr: mesg = "Cannot flush the Redis cache bank {rbank}: {rerr}".format( rbank=bank, rerr=rerr ) log.error(mesg) raise SaltCacheError(mesg) return True def list_(bank): """ Lists entries stored in the specified bank. """ redis_server = _get_redis_server() bank_redis_key = _get_bank_keys_redis_key(bank) try: banks = redis_server.smembers(bank_redis_key) except (RedisConnectionError, RedisResponseError) as rerr: mesg = "Cannot list the Redis cache key {rkey}: {rerr}".format( rkey=bank_redis_key, rerr=rerr ) log.error(mesg) raise SaltCacheError(mesg) if not banks: return [] return [bank.decode() for bank in banks if bank != b"."] def contains(bank, key): """ Checks if the specified bank contains the specified key. """ redis_server = _get_redis_server() bank_redis_key = _get_bank_keys_redis_key(bank) try: if key is None: return ( salt.utils.stringutils.to_str(redis_server.type(bank_redis_key)) != "none" ) else: return redis_server.sismember(bank_redis_key, key) except (RedisConnectionError, RedisResponseError) as rerr: mesg = "Cannot retrieve the Redis cache key {rkey}: {rerr}".format( rkey=bank_redis_key, rerr=rerr ) log.error(mesg) raise SaltCacheError(mesg) def updated(bank, key): """ Return the Unix Epoch timestamp of when the key was last updated. Return None if key is not found. """ redis_server = _get_redis_server() timestamp_key = _get_timestamp_key(bank=bank, key=key) value = redis_server.get(timestamp_key) if value is not None: value = salt.payload.loads(value) return value