Перейти к содержанию

AsyncWorker

Init module for async worker.

AsyncWorker

Bases: BaseWorker, AsyncPluginMixin

Воркер, Получающий из Брокера задачи и обрабатывающий их.

Пример

from qtasks import QueueTasks
from qtasks.workers import AsyncWorker

worker = AsyncWorker()
app = QueueTasks(worker=worker)

__init__(name='QueueTasks', broker=None, log=None, config=None)

Инициализация асинхронного воркера.

Parameters:

Name Type Description Default
name str

Имя проекта. По умолчанию: "QueueTasks".

'QueueTasks'
broker BaseBroker

Брокер. По умолчанию: None.

None
log Logger

Логгер. По умолчанию: None.

None
config QueueConfig

Конфиг. По умолчанию: None.

None

add(name, uuid, priority, created_at, args, kwargs) async

Добавление задачи в очередь.

Parameters:

Name Type Description Default
name str

Имя задачи.

required
uuid UUID

UUID задачи.

required
priority int

Приоритет задачи.

required
created_at float

Создание задачи в формате timestamp.

required
args tuple

Аргументы задачи типа args.

required
kwargs dict

Аргументы задачи типа kwargs.

required

init_plugins()

Инициализация плагинов.

remove_finished_task(task_func, task_broker, model) async

Обновляет данные хранилища через функцию self.storage.remove_finished_task.

Parameters:

Name Type Description Default
task_func TaskExecSchema

Схема функции задачи. По умолчанию: None.

required
task_broker TaskPrioritySchema

Схема приоритетной задачи.

required
model TaskStatusNewSchema | TaskStatusErrorSchema | TaskStatusCancelSchema

Модель результата задачи.

required

start(num_workers=4) async

Запускает несколько обработчиков задач.

Parameters:

Name Type Description Default
num_workers int

Количество воркеров. По умолчанию: 4.

4

stop() async

Останавливает воркеры.

update_config(config)

Обновляет конфиг брокера и семафору.

Parameters:

Name Type Description Default
config QueueConfig

Конфиг.

required

worker(number) async

Обработчик задач.

Parameters:

Name Type Description Default
number int

Номер Воркера.

required