API Reference

aio_pika.patterns.base

alias of <module ‘aio_pika.patterns.base’ from ‘/build/python-aio-pika-mQWjZH/python-aio-pika-9.5.4/aio_pika/patterns/base.py’>

class aio_pika.patterns.Master(channel: AbstractChannel, requeue: bool = True, reject_on_redelivered: bool = False)[source]

Implements Master/Worker pattern. Usage example:

worker.py

master = Master(channel)
worker = await master.create_worker('test_worker', lambda x: print(x))

master.py

master = Master(channel)
await master.proxy.test_worker('foo')

Creates a new Master instance.

Parameters:

channel – Initialized instance of aio_pika.Channel

async create_task(channel_name: str, kwargs: Mapping[str, Any] = mappingproxy({}), **message_kwargs: Any) Ack | Nack | Reject | None[source]

Creates a new task for the worker

async create_worker(queue_name: str, func: Callable[[...], Awaitable[T]], **kwargs: Any) Worker[source]

Creates a new Worker instance.

deserialize(data: bytes) Any[source]

Deserialize data from bytes. Uses pickle by default. You should overlap this method when you want to change serializer

Parameters:

data – Data which will be deserialized

Returns:

Any

serialize(data: Any) bytes[source]

Serialize data to the bytes. Uses pickle by default. You should overlap this method when you want to change serializer

Parameters:

data – Data which will be serialized

Returns:

bytes

class aio_pika.patterns.Worker(queue: AbstractQueue, consumer_tag: str, loop: AbstractEventLoop)[source]
close() Awaitable[None][source]

Cancel subscription to the channel

Returns:

asyncio.Task

class aio_pika.patterns.RPC(channel: AbstractChannel, host_exceptions: bool = False)[source]

Remote Procedure Call helper.

Create an instance

rpc = await RPC.create(channel, host_exceptions=False)

Registering python function

# RPC instance passes only keyword arguments
def multiply(*, x, y):
    return x * y

await rpc.register("multiply", multiply)

Call function through proxy

assert await rpc.proxy.multiply(x=2, y=3) == 6

Call function explicit

assert await rpc.call('multiply', dict(x=2, y=3)) == 6

Show exceptions on remote side

rpc = await RPC.create(channel, host_exceptions=True)
async call(method_name: str, kwargs: Dict[str, Any] | None = None, *, expiration: int | None = None, priority: int = 5, delivery_mode: DeliveryMode = DeliveryMode.NOT_PERSISTENT) Any[source]

Call remote method and awaiting result.

Parameters:
  • method_name – Name of method

  • kwargs – Methos kwargs

  • expiration – If not None messages which staying in queue longer will be returned and asyncio.TimeoutError will be raised.

  • priority – Message priority

  • delivery_mode – Call message delivery mode

Raises:
  • asyncio.TimeoutError – when message expired

  • CancelledError – when called RPC.cancel()

  • RuntimeError – internal error

async classmethod create(channel: AbstractChannel, **kwargs: Any) RPC[source]

Creates a new instance of aio_pika.patterns.RPC. You should use this method instead of __init__(), because create() returns coroutine and makes async initialize

Parameters:

channel – initialized instance of aio_pika.Channel

Returns:

RPC

async execute(func: Callable[[...], Awaitable[T]], payload: Dict[str, Any]) T[source]

Executes rpc call. Might be overlapped.

async register(method_name: str, func: Callable[[...], Awaitable[T]], **kwargs: Any) Any[source]

Method creates a queue with name which equal of method_name argument. Then subscribes this queue.

Parameters:
  • method_name – Method name

  • func – target function. Function MUST accept only keyword arguments.

  • kwargs – arguments which will be passed to queue_declare

Raises:

RuntimeError – Function already registered in this RPC instance or method_name already used.

serialize_exception(exception: Exception) Any[source]

Make python exception serializable

async unregister(func: Callable[[...], Awaitable[T]]) None[source]

Cancels subscription to the method-queue.

Parameters:

func – Function