User guide ========== Transport --------- :py:class:`~kaiju_redis.services.RedisTransportService` supports both `redis `_ and `keydb `_ in a single node or in a cluster configuration. Configuration example: .. code-block:: yaml services: - cls: RedisTransportService settings: host: localhost port: 6379 password: redis1 db: 0 cluster: False Cache ----- :py:class:`~kaiju_redis.services.RedisCache` provides a standard cache interface for a redis transport with strict typing support. The class uses `Coredis `_ backed to execute queries. .. code-block:: yaml services: - cls: RedisTransportService # transport is required - cls: RedisCache You can use standard cache methods like :py:meth:`~kaiju_redis.services.RedisCache.get`, :py:meth:`~kaiju_redis.services.RedisCache.set` etc. .. code-block:: python await cache.set('my_key1', 'my_data') value = await cache.get('my_key1') await cache.delete('my_key1', 'my_key2') Hash and list methods are also supported. .. code-block:: python await cache.hset('my_key1', {'key1': 'value1', 'key2': 'value2'}) value = await cache.hget('my_key1', 'key1') await cache.lpush('my_key1', ['value1', 'value2']) values = await cache.lpop('my_key1') Note that it's recommended to use strict typing support whenever possible. This method will optimize encoding speed and storage space. It also ensures that you get the same data type as dumped. To use strict typing you must specify data type explicitly when getting a value. This is unnecessary for simple types such as `str` or `int` but can be useful with `msgspec.Struct` and other structured data. The latter type is recommended since the cache can optimize stored `msgspec.Struct` objects. .. code-block:: python from msgspec import Struct class User(Struct): id: UUID name: str blocked: bool user = User(id=uuid4(), name='John', blocked=True) await cache.set(User.id.bytes, user) cached_user = await cache.get(user.id.bytes, data_type=User) # returns identical `User` object assert cached_user.name == user.name All the standard types are supported in strict cache typing and most of the typing types: .. code-block:: python await cache.set('my_key', frozenset({'dogs', 'cats', 'owls'})) crazy_animals = await cache.get('my_key', data_type=frozenset) To optimize caching further you may pass a `coredis.pipeline.Pipeline` as `conn` argument to caching methods. Then it will use it instead of the standard transport to execute a Redis command. .. code-block:: python async with await cache.transport.pipeline() as pipe: await cache.set('my_key', 'my_value', conn=pipe) results = await pipe.execute() .. note:: When using a pipeline it's up to the developer to execute commands and finalize the pipeline. The results of set methods won't be returned immediately and it's also up to the developer to process them after executing the pipe. See Coredis `guide on pipelines `_ for more details. Locks ----- :py:class:`~kaiju_redis.services.RedisLocksService` provides a standard shared locks interface for a redis transport. Locks method are :py:meth:`~kaiju_redis.services.RedisLocksService.acquire`, :py:meth:`~kaiju_redis.services.RedisLocksService.release`. All methods require namespace keys for ids. Configuration example: .. code-block:: yaml services: - cls: RedisTransportService # transport is required - cls: RedisLocksService Streams ------- Server ______ :py:class:`~kaiju_redis.services.RedisListener` provides a streaming interface for RPC server. The services use `redis streams `_ and require settings an consumer group and trim / claim policies. You can use `max_batch_size` setting to customize the batch size or `max_wait_time_ms` to change wait time between the batches. .. note:: Set `max_batch_size` to 1 to disable parallel processing of stream messages. `pending_messages_time_ms` can be used to automatically claim pending and not acked messages (see `XAUTOCLAIM `_). .. note:: Keep in mind that a message is considered 'pending' while it is being processed by a server. Do not set this value too small to keep the server from processing one message twice. `trim_size` settings can be customized to change the maximum length of the stream (see `XTRIM `_). Note that trim by stream size will remove *all* messages exceeding the trim size threshold, including the ones which haven't been processed yet. Alternatively you can use `trim_delivered` to trim only delivered messages. This option will disable trim by size behavior. However you shouldn't use `trim_delivered` if you plan to use your stream as quasi-persistent data stream, because you will not be able to re-read data from it. Server configuration example: .. code-block:: yaml services: - cls: RedisTransportService # transport, rpc service and locks are required - cls: JSONRPCServer - cls: RedisLocksService - cls: RedisListener name: redis_stream.rpc enabled: "[services_stream_rpc_enabled:True]" settings: group_id: "[main_name]" # consumer group topic: rpc # topic to listen scope: SYSTEM # override scope to disable RPC authentication in internal streams trim_delivered: True pending_messages_time_ms: null # set it to an integer (ms) to reclaim pending and not yet processed messages Client ______ You can configure and use :py:class:`~kaiju_redis.services.RedisStreamRPCClient` to send RPC requests over a Redis stream. The client interface is similar to the HTTP RPC client, however it doesn't return results since it's one-way communication, i.e. all outgoing messages will be automatically marked as "notify" requests. Usage example: .. code-block:: python from kaiju_redis import RedisStreamRPCClient class MyService(ContextableService): async def init(self): self._stream = self.discover_service(None, cls=RedisStreamRPCClient) async def send_rpc_request(self): """Send RPC request to other app method (other app must have a Redis Listener and the method must be exposed).""" await self._stream.call( method='ext_service.do_something', params={'value': 123}, app='other_app_name') async def send_raw_message(self): """Send a raw data to a stream, the data will be msgpack-encoded.""" topic = self.app.namespace.get_key('some_stream') #: custom topic key await self._stream.write(topic, body=42, headers={'Custom-Header': '1'}) The service requires a configured :py:class:`~kaiju_redis.services.RedisTransportService`. Basic configuration example: .. code-block:: yaml services: - cls: RedisTransportService - cls: RedisStreamRPCClient