API Reference¶
- aio_pika.patterns.base¶
alias of <module ‘aio_pika.patterns.base’ from ‘/build/python-aio-pika-l1K9Xz/python-aio-pika-9.5.5/aio_pika/patterns/base.py’>
- class aio_pika.patterns.Master(channel: AbstractChannel, requeue: bool = True, reject_on_redelivered: bool = False)[source]¶
Implements Master/Worker pattern. Usage example:
worker.py
master = Master(channel) worker = await master.create_worker('test_worker', lambda x: print(x))
master.py
master = Master(channel) await master.proxy.test_worker('foo')
Creates a new
Master
instance.- Parameters:
channel – Initialized instance of
aio_pika.Channel
- async create_task(channel_name: str, kwargs: Mapping[str, Any] = mappingproxy({}), **message_kwargs: Any) Ack | Nack | Reject | None [source]¶
Creates a new task for the worker
- async create_worker(queue_name: str, func: Callable[[...], Awaitable[T]], **kwargs: Any) Worker [source]¶
Creates a new
Worker
instance.
- class aio_pika.patterns.Worker(queue: AbstractQueue, consumer_tag: str, loop: AbstractEventLoop)[source]¶
- class aio_pika.patterns.RPC(channel: AbstractChannel, host_exceptions: bool = False)[source]¶
Remote Procedure Call helper.
Create an instance
rpc = await RPC.create(channel, host_exceptions=False)
Registering python function
# RPC instance passes only keyword arguments def multiply(*, x, y): return x * y await rpc.register("multiply", multiply)
Call function through proxy
assert await rpc.proxy.multiply(x=2, y=3) == 6
Call function explicit
assert await rpc.call('multiply', dict(x=2, y=3)) == 6
Show exceptions on remote side
rpc = await RPC.create(channel, host_exceptions=True)
- async call(method_name: str, kwargs: Dict[str, Any] | None = None, *, expiration: int | None = None, priority: int = 5, delivery_mode: DeliveryMode = DeliveryMode.NOT_PERSISTENT) Any [source]¶
Call remote method and awaiting result.
- Parameters:
method_name – Name of method
kwargs – Methos kwargs
expiration – If not None messages which staying in queue longer will be returned and
asyncio.TimeoutError
will be raised.priority – Message priority
delivery_mode – Call message delivery mode
- Raises:
asyncio.TimeoutError – when message expired
CancelledError – when called
RPC.cancel()
RuntimeError – internal error
- async classmethod create(channel: AbstractChannel, **kwargs: Any) RPC [source]¶
Creates a new instance of
aio_pika.patterns.RPC
. You should use this method instead of__init__()
, becausecreate()
returns coroutine and makes async initialize- Parameters:
channel – initialized instance of
aio_pika.Channel
- Returns:
- async execute(func: Callable[[...], Awaitable[T]], payload: Dict[str, Any]) T [source]¶
Executes rpc call. Might be overlapped.
- async register(method_name: str, func: Callable[[...], Awaitable[T]], **kwargs: Any) Any [source]¶
Method creates a queue with name which equal of method_name argument. Then subscribes this queue.
- Parameters:
method_name – Method name
func – target function. Function MUST accept only keyword arguments.
kwargs – arguments which will be passed to queue_declare
- Raises:
RuntimeError – Function already registered in this
RPC
instance or method_name already used.