services - application services

class RedisTransportService[source]

Bases: ContextableService

Redis transport.

It provides a service layer for coredis client and includes its initialization.

transport_class

alias of Redis

cluster_transport_class

alias of RedisCluster

__init__(app, *args, cluster: bool = False, tracking: bool = False, tracking_cache_settings: dict = None, connect_timeout: int = 30, max_idle_time: int = 60, logger=None, **kws)[source]

Initialize.

Parameters:
  • app – web app

  • args – additional transport cls args

  • cluster – use cluster connector class

  • tracking – use tracking cache for this connector, see tracking cache Do not use tracking cache for streams / pub-sub queues.

  • tracking_cache_settings – tracking cache settings for coredis.cache.TrackingCache

  • connect_timeout – connection timeout

  • max_idle_time – max connection idle time

  • logger – optional logger instance

  • kws – additional transport cls args

class RedisCache[source]

Bases: ContextableService

Redis cache service.

__init__(*args, transport: RedisTransportService, **kws)[source]

Initialize.

Parameters:
  • app – aiohttp web application

  • logger – a logger instance (None - app logger)

async exists(keys: _Key) int[source]

Check if keys exist and return their number.

async set(key: _Key, obj: _CachedItem, *, conn: Pipeline = None, ex: int = None, get: bool = False, **redis_args) _CachedItem | None | bool[source]

Set object to the cache.

Parameters:
  • key – object key

  • obj – data

  • conn – optional redis pipe

  • ex – expiration time in seconds

  • get – get the previous value (or None if not exists)

  • redis_args – other arguments for the Redis get method

Returns:

if conn is provided then nothing is returned (user must handle returns manually) if get=True then the previous value will be returned if present otherwise returns True if the value has been set successfully

async get(key: _Key, data_type: type[_CachedItem] = Any, *, conn: Pipeline = None) _CachedItem | None[source]

Get a single item from the cache.

Parameters:
  • key – cache key

  • data_type – item data type to convert to

  • conn – optional redis pipe

Returns:

item or None if not found

async mget(keys: Collection[_Key], data_type: type[_CachedItem] = Any, *, conn: Pipeline = None) list[Optional[_CachedItem]][source]

Get multiple items from the cache.

Parameters:
  • keys – list of cache keys

  • data_type – item data type to convert to

  • conn – optional redis pipe

Returns:

a list of cached items, None for not found items

async delete(*keys: _Key, conn: Pipeline = None) int | None[source]

Delete one or more keys.

Parameters:
  • keys – list of keys to delete

  • conn – optional redis pipe

Returns:

if conn is provided then nothing is returned (user must handle returns manually) otherwise returns the number of keys deleted

async mset(items: Mapping[_Key, _CachedItem], *, conn: Pipeline = None) bool | None[source]

Set multiple items to the cache.

Parameters:
  • items – a mapping from key to item

  • conn – optional redis pipe

Returns:

if conn is provided then nothing is returned (user must handle returns manually) otherwise returns True if all was successfully set

async hset(key: _Key, items: Mapping[_Key, _CachedItem], *, conn: Pipeline = None) int | None[source]

Set multiple items to the hash key.

Parameters:
  • key – cache key

  • items – a mapping from key to item

  • conn – optional redis pipe

Returns:

if conn is provided then nothing is returned (user must handle returns manually) otherwise returns number of items set

async hget(key: _Key, item_key: _Key, data_type: type[_CachedItem] = Any, *, conn: Pipeline = None) _CachedItem | None[source]

Get a single item from a hash key.

Parameters:
  • key – hash key

  • item_key – key inside the hash

  • data_type – item data type to convert to

  • conn – optional redis pipe

Returns:

item or None if not found

async hgetall(key: _Key, data_type: type[_CachedItem] = Any, *, conn: Pipeline = None) dict[bytes, _CachedItem][source]

Get all values from a hash.

Parameters:
  • key – hash key

  • data_type – item data type to convert to

  • conn – optional redis pipe

Returns:

item or None if not found

async hdel(key: _Key, *item_keys: _Key, conn: Pipeline = None) int | None[source]

Delete items from a hash.

Parameters:
  • key – hash key

  • item_keys – item keys

  • conn – optional redis pipe

Returns:

if conn is provided then nothing is returned (user must handle returns manually) otherwise returns number of items deleted

async lpush(key: _Key, *items: _CachedItem, conn: Pipeline = None) int | None[source]

Set multiple items to the list.

Parameters:
  • key – cache key

  • items – collection of items

  • conn – optional redis pipe

Returns:

if conn is provided then nothing is returned (user must handle returns manually) otherwise returns the length of the list after the push operations

async rpush(key: _Key, *items: _CachedItem, conn: Pipeline = None) int | None[source]

Set multiple items to the list.

Parameters:
  • key – cache key

  • items – collection of items

  • conn – optional redis pipe

Returns:

if conn is provided then nothing is returned (user must handle returns manually) otherwise returns the length of the list after the push operations

async lrange(key: _Key, start: int = 0, stop: int = -1, data_type: type[_CachedItem] = Any, *, conn: Pipeline = None) list[Optional[_CachedItem]][source]

Get multiple items from the list.

Parameters:
  • key – list key

  • start – first index

  • stop – last index

  • data_type – item data type to convert to

  • conn – optional redis pipe

Returns:

a list of cached items

async lpop(key: _Key, num: int = 1, data_type: type[_CachedItem] = Any, *, conn: Pipeline = None) list[_CachedItem][source]

Pop multiple items from the list.

Parameters:
  • key – list key

  • num – number of items

  • data_type – item data type to convert to

  • conn – optional redis pipe

Returns:

a list of cached items

async rpop(key: _Key, num: int = 1, data_type: type[_CachedItem] = Any, *, conn: Pipeline = None) list[_CachedItem][source]

Pop multiple items from the list.

Parameters:
  • key – list key

  • num – number of items

  • data_type – item data type to convert to

  • conn – optional redis pipe

Returns:

a list of cached items

async llen(key: _Key, conn: Pipeline = None) int[source]

Get the length of a list.

get_struct_array_type(struct_type: type[msgspec.Struct], /) type[msgspec.Struct][source]

Register a struct type and return its array-like type.

service_name = None

you may define a custom service name here

class RedisLocksService[source]

Bases: BaseLocksService

Locks service with Redis backend.

__init__(*args, transport: Service = None, refresh_interval: int = 30, scheduler: Scheduler = None, **kws)

Initialize.

Parameters:
  • transport – db / redis connector

  • refresh_interval – how often locks will be renewed

  • scheduler – local scheduler

async acquire(id: NSKey, identifier: LockId = None, ttl: int = None, wait=True, timeout: float = None) LockId

Wait for lock and acquire it.

Parameters:
  • id – lock name

  • identifier – service/owner identifier, if id is None then the app[‘id’] will be used

  • ttl – optional ttl in seconds, None for eternal (until app exists)

  • wait – wait for a lock to release (if False then it will raise a LockError if lock with such key already exists)

  • timeout – optional max wait time in seconds

Returns:

a lock id to be used when releasing the lock

Raises:
  • LockExistsError – when the lock already exists and wait was set to False

  • LockAcquireTimeout – when the lock exists and the specified timeout has been reached

  • LockError – internal error

async is_owner(id: NSKey) bool

Return True if the current instance is an owner of this lock.

async owner(id: NSKey) LockId | None

Return a current lock owner identifier or None if not found / has no owner.

async release(id: NSKey, identifier: LockId) None

Release a lock.

Parameters:
  • id – lock name

  • identifier – service/owner identifier

Raises:
  • LockError – if the lock can’t be released by this service

  • NotLockOwnerError – if someone who doesn’t have this lock tries to release it

service_name = 'locks'

you may define a custom service name here

class RedisListener[source]

Bases: Listener

Redis stream listener.

__init__(*args, group_id: str = None, consumer_id: str = None, max_batch_size: int = 10, max_wait_time_ms: int = 500, pending_messages_time_ms: int = None, trim_size: int = 10_000_000, trim_delivered: bool = True, **kws)[source]

Initialize.

Parameters:
  • args – listener args

  • group_id – consumer group, app.name by default

  • consumer_id – unique instance id, app.id by default

  • max_batch_size – max single acquired batch size

  • max_wait_time_ms – max wait time when waiting for a batch

  • pending_messages_time_ms – processing pending messages timeout in ms (messages not acked since this interval will be auto-claimed by this listener) set None to disable this behavior

  • trim_delivered – trim all delivered messages

  • trim_size – stream records trim size: older records will be removed if the stream is longer than this value set None to disable this behavior note that this function is disabled if trim_delivered is set

Params kws:

listener kws

class RedisStreamRPCClient[source]

Bases: StreamRPCClient

Redis stream client.

async write(topic: NSKey, body, headers: dict = None, key=None) None[source]

Write data to the stream.

Parameters:
  • topic – topic key

  • body – rpc request body

  • headers – request headers

  • key – (optional) message id

__init__(*args, app_name: str = None, topic: str = Topic.RPC, **kws)

Initialize.

Parameters:
  • app_name – application (topic) name

  • listener_service – stream listener service instance

  • topic – topic name

async call(method: str, params: dict | None = None, nowait: bool = True, request_id: int = 0, max_timeout: int = None, use_context: bool = True, retries: int = None, headers: dict = None, app: str = None, topic: str = None)

Make an RPC call.

Parameters:
  • method – rpc method name

  • params – method call arguments

  • nowait – create a ‘notify’ request - do not wait for the result

  • request_id – optional request id (usually you don’t need to set it)

  • max_timeout – request timeout in sec

  • use_context – use app request context such as correlation id and request chain deadline

  • retries – optional number of retries (on the server side)

  • headers – additional headers

async call_multiple(requests: Collection[RPCRequest], raise_exception: bool = True, nowait: bool = True, max_timeout: int = None, use_context: bool = True, retries: int = None, abort_on_error: bool = None, use_template: bool = None, headers: dict = None, app: str = None, topic: str = None)

Make an RPC batch call.

Parameters:
  • requests – list of request dicts

  • nowait – create a ‘notify’ request - do not wait for the result

  • max_timeout – request timeout in sec

  • use_context – use app request context such as correlation id and request chain deadline

  • raise_exception – raise exception instead of returning error objects in the list

  • retries – optional number of retries (on the server side)

  • abort_on_error – abort the whole batch on the first error

  • use_template – use templates in batch requests

  • headers – additional headers