User guide¶
Transport¶
RedisTransportService
supports both redis and keydb in a single node
or in a cluster configuration.
Configuration example:
services:
- cls: RedisTransportService
settings:
host: localhost
port: 6379
password: redis1
db: 0
cluster: False
Cache¶
RedisCache provides a standard cache interface
for a redis transport with strict typing support. The class uses Coredis
backed to execute queries.
services:
- cls: RedisTransportService # transport is required
- cls: RedisCache
You can use standard cache methods like get(),
set() etc.
await cache.set('my_key1', 'my_data')
value = await cache.get('my_key1')
await cache.delete('my_key1', 'my_key2')
Hash and list methods are also supported.
await cache.hset('my_key1', {'key1': 'value1', 'key2': 'value2'})
value = await cache.hget('my_key1', 'key1')
await cache.lpush('my_key1', ['value1', 'value2'])
values = await cache.lpop('my_key1')
Note that it’s recommended to use strict typing support whenever possible. This method will optimize encoding speed and storage space. It also ensures that you get the same data type as dumped. To use strict typing you must specify data type explicitly when getting a value.
This is unnecessary for simple types such as str or int but can be useful with msgspec.Struct and other structured data. The latter type is recommended since the cache can optimize stored msgspec.Struct objects.
from msgspec import Struct
class User(Struct):
id: UUID
name: str
blocked: bool
user = User(id=uuid4(), name='John', blocked=True)
await cache.set(User.id.bytes, user)
cached_user = await cache.get(user.id.bytes, data_type=User) # returns identical `User` object
assert cached_user.name == user.name
All the standard types are supported in strict cache typing and most of the typing types:
await cache.set('my_key', frozenset({'dogs', 'cats', 'owls'}))
crazy_animals = await cache.get('my_key', data_type=frozenset)
To optimize caching further you may pass a coredis.pipeline.Pipeline as conn argument to caching methods. Then it will use it instead of the standard transport to execute a Redis command.
async with await cache.transport.pipeline() as pipe:
await cache.set('my_key', 'my_value', conn=pipe)
results = await pipe.execute()
Note
When using a pipeline it’s up to the developer to execute commands and finalize the pipeline. The results of set methods won’t be returned immediately and it’s also up to the developer to process them after executing the pipe. See Coredis guide on pipelines for more details.
Locks¶
RedisLocksService provides a standard shared locks interface
for a redis transport.
Locks method are
acquire(), release().
All methods require namespace keys for ids.
Configuration example:
services:
- cls: RedisTransportService # transport is required
- cls: RedisLocksService
Streams¶
Server¶
RedisListener provides
a streaming interface for RPC server. The services use redis streams
and require settings an consumer group and trim / claim policies.
You can use max_batch_size setting to customize the batch size or max_wait_time_ms to change wait time between the batches.
Note
Set max_batch_size to 1 to disable parallel processing of stream messages.
pending_messages_time_ms can be used to automatically claim pending and not acked messages (see XAUTOCLAIM).
Note
Keep in mind that a message is considered ‘pending’ while it is being processed by a server. Do not set this value too small to keep the server from processing one message twice.
trim_size settings can be customized to change the maximum length of the stream (see XTRIM).
Note that trim by stream size will remove all messages exceeding the trim size threshold, including the ones which haven’t been processed yet.
Alternatively you can use trim_delivered to trim only delivered messages. This option will disable trim by size behavior. However you shouldn’t use trim_delivered if you plan to use your stream as quasi-persistent data stream, because you will not be able to re-read data from it.
Server configuration example:
services:
- cls: RedisTransportService # transport, rpc service and locks are required
- cls: JSONRPCServer
- cls: RedisLocksService
- cls: RedisListener
name: redis_stream.rpc
enabled: "[services_stream_rpc_enabled:True]"
settings:
group_id: "[main_name]" # consumer group
topic: rpc # topic to listen
scope: SYSTEM # override scope to disable RPC authentication in internal streams
trim_delivered: True
pending_messages_time_ms: null # set it to an integer (ms) to reclaim pending and not yet processed messages
Client¶
You can configure and use RedisStreamRPCClient to send RPC requests over a Redis
stream. The client interface is similar to the HTTP RPC client, however it doesn’t return results since it’s
one-way communication, i.e. all outgoing messages will be automatically marked as “notify” requests.
Usage example:
from kaiju_redis import RedisStreamRPCClient
class MyService(ContextableService):
async def init(self):
self._stream = self.discover_service(None, cls=RedisStreamRPCClient)
async def send_rpc_request(self):
"""Send RPC request to other app method (other app must have a Redis Listener and the method must be exposed)."""
await self._stream.call(
method='ext_service.do_something',
params={'value': 123}, app='other_app_name')
async def send_raw_message(self):
"""Send a raw data to a stream, the data will be msgpack-encoded."""
topic = self.app.namespace.get_key('some_stream') #: custom topic key
await self._stream.write(topic, body=42, headers={'Custom-Header': '1'})
The service requires a configured RedisTransportService.
Basic configuration example:
services:
- cls: RedisTransportService
- cls: RedisStreamRPCClient