services - application services¶
- class RedisTransportService[source]¶
Bases:
ContextableServiceRedis transport.
It provides a service layer for coredis client and includes its initialization.
- transport_class¶
alias of
Redis
- cluster_transport_class¶
alias of
RedisCluster
- __init__(app, *args, cluster: bool = False, tracking: bool = False, tracking_cache_settings: dict = None, connect_timeout: int = 30, max_idle_time: int = 60, logger=None, **kws)[source]¶
Initialize.
- Parameters:
app – web app
args – additional transport cls args
cluster – use cluster connector class
tracking – use tracking cache for this connector, see tracking cache Do not use tracking cache for streams / pub-sub queues.
tracking_cache_settings – tracking cache settings for coredis.cache.TrackingCache
connect_timeout – connection timeout
max_idle_time – max connection idle time
logger – optional logger instance
kws – additional transport cls args
- class RedisCache[source]¶
Bases:
ContextableServiceRedis cache service.
- __init__(*args, transport: RedisTransportService, **kws)[source]¶
Initialize.
- Parameters:
app – aiohttp web application
logger – a logger instance (None - app logger)
- async set(key: _Key, obj: _CachedItem, *, conn: Pipeline = None, ex: int = None, get: bool = False, **redis_args) _CachedItem | None | bool[source]¶
Set object to the cache.
- Parameters:
key – object key
obj – data
conn – optional redis pipe
ex – expiration time in seconds
get – get the previous value (or None if not exists)
redis_args – other arguments for the Redis get method
- Returns:
if conn is provided then nothing is returned (user must handle returns manually) if get=True then the previous value will be returned if present otherwise returns True if the value has been set successfully
- async get(key: _Key, data_type: type[_CachedItem] = Any, *, conn: Pipeline = None) _CachedItem | None[source]¶
Get a single item from the cache.
- Parameters:
key – cache key
data_type – item data type to convert to
conn – optional redis pipe
- Returns:
item or None if not found
- async mget(keys: Collection[_Key], data_type: type[_CachedItem] = Any, *, conn: Pipeline = None) list[Optional[_CachedItem]][source]¶
Get multiple items from the cache.
- Parameters:
keys – list of cache keys
data_type – item data type to convert to
conn – optional redis pipe
- Returns:
a list of cached items, None for not found items
- async delete(*keys: _Key, conn: Pipeline = None) int | None[source]¶
Delete one or more keys.
- Parameters:
keys – list of keys to delete
conn – optional redis pipe
- Returns:
if conn is provided then nothing is returned (user must handle returns manually) otherwise returns the number of keys deleted
- async mset(items: Mapping[_Key, _CachedItem], *, conn: Pipeline = None) bool | None[source]¶
Set multiple items to the cache.
- Parameters:
items – a mapping from key to item
conn – optional redis pipe
- Returns:
if conn is provided then nothing is returned (user must handle returns manually) otherwise returns True if all was successfully set
- async hset(key: _Key, items: Mapping[_Key, _CachedItem], *, conn: Pipeline = None) int | None[source]¶
Set multiple items to the hash key.
- Parameters:
key – cache key
items – a mapping from key to item
conn – optional redis pipe
- Returns:
if conn is provided then nothing is returned (user must handle returns manually) otherwise returns number of items set
- async hget(key: _Key, item_key: _Key, data_type: type[_CachedItem] = Any, *, conn: Pipeline = None) _CachedItem | None[source]¶
Get a single item from a hash key.
- Parameters:
key – hash key
item_key – key inside the hash
data_type – item data type to convert to
conn – optional redis pipe
- Returns:
item or None if not found
- async hgetall(key: _Key, data_type: type[_CachedItem] = Any, *, conn: Pipeline = None) dict[bytes, _CachedItem][source]¶
Get all values from a hash.
- Parameters:
key – hash key
data_type – item data type to convert to
conn – optional redis pipe
- Returns:
item or None if not found
- async hdel(key: _Key, *item_keys: _Key, conn: Pipeline = None) int | None[source]¶
Delete items from a hash.
- Parameters:
key – hash key
item_keys – item keys
conn – optional redis pipe
- Returns:
if conn is provided then nothing is returned (user must handle returns manually) otherwise returns number of items deleted
- async lpush(key: _Key, *items: _CachedItem, conn: Pipeline = None) int | None[source]¶
Set multiple items to the list.
- Parameters:
key – cache key
items – collection of items
conn – optional redis pipe
- Returns:
if conn is provided then nothing is returned (user must handle returns manually) otherwise returns the length of the list after the push operations
- async rpush(key: _Key, *items: _CachedItem, conn: Pipeline = None) int | None[source]¶
Set multiple items to the list.
- Parameters:
key – cache key
items – collection of items
conn – optional redis pipe
- Returns:
if conn is provided then nothing is returned (user must handle returns manually) otherwise returns the length of the list after the push operations
- async lrange(key: _Key, start: int = 0, stop: int = -1, data_type: type[_CachedItem] = Any, *, conn: Pipeline = None) list[Optional[_CachedItem]][source]¶
Get multiple items from the list.
- Parameters:
key – list key
start – first index
stop – last index
data_type – item data type to convert to
conn – optional redis pipe
- Returns:
a list of cached items
- async lpop(key: _Key, num: int = 1, data_type: type[_CachedItem] = Any, *, conn: Pipeline = None) list[_CachedItem][source]¶
Pop multiple items from the list.
- Parameters:
key – list key
num – number of items
data_type – item data type to convert to
conn – optional redis pipe
- Returns:
a list of cached items
- async rpop(key: _Key, num: int = 1, data_type: type[_CachedItem] = Any, *, conn: Pipeline = None) list[_CachedItem][source]¶
Pop multiple items from the list.
- Parameters:
key – list key
num – number of items
data_type – item data type to convert to
conn – optional redis pipe
- Returns:
a list of cached items
- get_struct_array_type(struct_type: type[msgspec.Struct], /) type[msgspec.Struct][source]¶
Register a struct type and return its array-like type.
- service_name = None¶
you may define a custom service name here
- class RedisLocksService[source]¶
Bases:
BaseLocksServiceLocks service with Redis backend.
- __init__(*args, transport: Service = None, refresh_interval: int = 30, scheduler: Scheduler = None, **kws)¶
Initialize.
- Parameters:
transport – db / redis connector
refresh_interval – how often locks will be renewed
scheduler – local scheduler
- async acquire(id: NSKey, identifier: LockId = None, ttl: int = None, wait=True, timeout: float = None) LockId¶
Wait for lock and acquire it.
- Parameters:
id – lock name
identifier – service/owner identifier, if id is None then the app[‘id’] will be used
ttl – optional ttl in seconds, None for eternal (until app exists)
wait – wait for a lock to release (if False then it will raise a LockError if lock with such key already exists)
timeout – optional max wait time in seconds
- Returns:
a lock id to be used when releasing the lock
- Raises:
LockExistsError – when the lock already exists and wait was set to False
LockAcquireTimeout – when the lock exists and the specified timeout has been reached
LockError – internal error
- async is_owner(id: NSKey) bool¶
Return True if the current instance is an owner of this lock.
- async owner(id: NSKey) LockId | None¶
Return a current lock owner identifier or None if not found / has no owner.
- async release(id: NSKey, identifier: LockId) None¶
Release a lock.
- Parameters:
id – lock name
identifier – service/owner identifier
- Raises:
LockError – if the lock can’t be released by this service
NotLockOwnerError – if someone who doesn’t have this lock tries to release it
- service_name = 'locks'¶
you may define a custom service name here
- class RedisListener[source]¶
Bases:
ListenerRedis stream listener.
- __init__(*args, group_id: str = None, consumer_id: str = None, max_batch_size: int = 10, max_wait_time_ms: int = 500, pending_messages_time_ms: int = None, trim_size: int = 10_000_000, trim_delivered: bool = True, **kws)[source]¶
Initialize.
- Parameters:
args – listener args
group_id – consumer group, app.name by default
consumer_id – unique instance id, app.id by default
max_batch_size – max single acquired batch size
max_wait_time_ms – max wait time when waiting for a batch
pending_messages_time_ms – processing pending messages timeout in ms (messages not acked since this interval will be auto-claimed by this listener) set None to disable this behavior
trim_delivered – trim all delivered messages
trim_size – stream records trim size: older records will be removed if the stream is longer than this value set None to disable this behavior note that this function is disabled if trim_delivered is set
- Params kws:
listener kws
- class RedisStreamRPCClient[source]¶
Bases:
StreamRPCClientRedis stream client.
- async write(topic: NSKey, body, headers: dict = None, key=None) None[source]¶
Write data to the stream.
- Parameters:
topic – topic key
body – rpc request body
headers – request headers
key – (optional) message id
- __init__(*args, app_name: str = None, topic: str = Topic.RPC, **kws)¶
Initialize.
- Parameters:
app_name – application (topic) name
listener_service – stream listener service instance
topic – topic name
- async call(method: str, params: dict | None = None, nowait: bool = True, request_id: int = 0, max_timeout: int = None, use_context: bool = True, retries: int = None, headers: dict = None, app: str = None, topic: str = None)¶
Make an RPC call.
- Parameters:
method – rpc method name
params – method call arguments
nowait – create a ‘notify’ request - do not wait for the result
request_id – optional request id (usually you don’t need to set it)
max_timeout – request timeout in sec
use_context – use app request context such as correlation id and request chain deadline
retries – optional number of retries (on the server side)
headers – additional headers
- async call_multiple(requests: Collection[RPCRequest], raise_exception: bool = True, nowait: bool = True, max_timeout: int = None, use_context: bool = True, retries: int = None, abort_on_error: bool = None, use_template: bool = None, headers: dict = None, app: str = None, topic: str = None)¶
Make an RPC batch call.
- Parameters:
requests – list of request dicts
nowait – create a ‘notify’ request - do not wait for the result
max_timeout – request timeout in sec
use_context – use app request context such as correlation id and request chain deadline
raise_exception – raise exception instead of returning error objects in the list
retries – optional number of retries (on the server side)
abort_on_error – abort the whole batch on the first error
use_template – use templates in batch requests
headers – additional headers