Перейти к содержанию

Роутер

Router for task execution.

Router

Bases: SyncPluginMixin

Роутер, который хранит в себе задачи, которые подключает к себе основной QueueTasks.

Example

from qtasks import QueueTasks, Router

app = QueueTasks()

router = Router()

@router.task()
async def test():
    pass

app.include_router(router)

__init__(method=None)

__init__(method: Literal['sync'] = None) -> None
__init__(method: Literal['async'] = None) -> None

Инициализация роутера.

Parameters:

Name Type Description Default
method Literal['sync', 'async']

Метод выполнения задач. По умолчанию: None.

None

task(name=None, *, priority=None, echo=False, retry=None, retry_on_exc=None, decode=None, tags=None, generate_handler=None, executor=None, middlewares=None, **kwargs)

Декоратор для регистрации задач.

Parameters:

Name Type Description Default
name str

Имя задачи. По умолчанию: func.__name__.

None
priority int

Приоритет у задачи по умолчанию. По умолчанию: config.default_task_priority.

None
echo bool

Включить вывод в консоль. По умолчанию: False.

False
retry int

Количество попыток повторного выполнения задачи. По умолчанию: None.

None
retry_on_exc list[Type[Exception]]

Исключения, при которых задача будет повторно выполнена. По умолчанию: None.

None
decode Callable

Декодер результата задачи. По умолчанию: None.

None
tags list[str]

Теги задачи. По умолчанию: None.

None
generate_handler Callable

Генератор обработчика. По умолчанию: None.

None
executor Type[BaseTaskExecutor]

Класс BaseTaskExecutor. По умолчанию: SyncTaskExecutor.

None
middlewares List[TaskMiddleware]

Мидлвари. По умолчанию: Пустой массив.

None

Raises:

Type Description
ValueError

Если задача с таким именем уже зарегистрирована.

ValueError

Неизвестный метод {self._method}.

Returns:

Type Description
Callable[[Callable[P, R]], Union[SyncTask[P, R], AsyncTask[P, R]]]

SyncTask | AsyncTask: Декоратор для регистрации задачи.