Перейти к содержанию

QueueTasks

qtasks.py - Main module for the QueueTasks framework.

QueueTasks

Bases: BaseQueueTasks, SyncPluginMixin

QueueTasks - Фреймворк для очередей задач.

Читать больше: Первые шаги.

Пример

from qtasks import QueueTasks

app = QueueTasks()

init_starting property

init_starting - Инициализация при запуске QueueTasks.

Примеры
from qtasks import QueueTasks

app = QueueTasks()

@app.init_starting
def test(worker, broker):
    pass

init_stoping property

init_stoping - Инициализация при остановке QueueTasks.

Примеры
from qtasks import QueueTasks

app = QueueTasks()

@app.init_stoping
def test(worker, broker):
    pass

init_task_running property

init_task_running - Инициализация при запуске задачи функцией QueueTasks.worker.listen().

Примеры
from qtasks import QueueTasks

app = QueueTasks()

@app.init_task_running
def test(task_func: TaskExecSchema, task_broker: TaskPrioritySchema):
    pass

init_task_stoping property

init_task_stoping - Инициализация при завершении задачи функцией QueueTasks.worker.listen().

Примеры
from qtasks import QueueTasks

app = QueueTasks()

@app.init_task_stoping
def test(task_func: TaskExecSchema, task_broker: TaskPrioritySchema, returning: TaskStatusSuccessSchema|TaskStatusErrorSchema):
    pass

init_worker_running property

init_worker_running - Инициализация при запуске QueueTasks.worker.worker().

Примеры
from qtasks import QueueTasks

app = QueueTasks()

@app.init_worker_running
def test(worker):
    pass

init_worker_stoping property

init_worker_stoping - Инициализация при остановке QueueTasks.worker.worker().

Примеры
from qtasks import QueueTasks

app = QueueTasks()

@app.init_worker_stoping
def test(worker):
    pass

__init__(name='QueueTasks', broker_url=None, broker=None, worker=None, log=None, config=None)

Инициализация QueueTasks.

Parameters:

Name Type Description Default
name str

Имя проекта. По умолчанию: QueueTasks.

'QueueTasks'
broker_url str

URL для Брокера. Используется Брокером по умолчанию через параметр url. По умолчанию: None.

None
broker Type[BaseBroker]

Брокер. Хранит в себе обработку из очередей задач и хранилище данных. По умолчанию: qtasks.brokers.AsyncRedisBroker.

None
worker Type[BaseWorker]

Воркер. Хранит в себе обработку задач. По умолчанию: qtasks.workers.AsyncWorker.

None
log Logger

Логгер. По умолчанию: qtasks.logs.Logger.

None
config QueueConfig

Конфиг. По умолчанию: qtasks.configs.QueueConfig.

None

add_task(task_name, priority=None, args=None, kwargs=None, timeout=None)

Добавить задачу.

Parameters:

Name Type Description Default
task_name str

Имя задачи.

required
priority int

Приоритет задачи. По умолчанию: Значение приоритета у задачи.

None
args tuple

args задачи. По умолчанию ().

None
kwargs dict

kwags задачи. По умолчанию {}.

None
timeout float

Таймаут задачи. Если указан, задача возвращается через qtasks.results.SyncResult.

None

Returns:

Type Description
Task

Task|None: schemas.task.Task или None.

flush_all()

Удалить все данные.

get(uuid)

Получить задачу.

Parameters:

Name Type Description Default
uuid UUID | str

UUID Задачи.

required

Returns:

Type Description
Task | None

Task|None: Данные задачи или None.

ping(server=True)

Проверка запуска сервера.

Parameters:

Name Type Description Default
server bool

Проверка через сервер. По умолчанию True.

True

Returns:

Name Type Description
bool bool

True - Работает, False - Не работает.

run_forever(starter=None, num_workers=4, reset_config=True)

Запуск синхронно Приложение.

Parameters:

Name Type Description Default
starter BaseStarter

Стартер. По умолчанию: qtasks.starters.SyncStarter.

None
num_workers int

Количество запущенных воркеров. По умолчанию: 4.

4
reset_config bool

Обновить config у воркера и брокера. По умолчанию: True.

True

stop()

Останавливает все компоненты.

task(*args, **kwargs)

task(name: str | None = None, *, priority: int | None = None, echo: bool = False, retry: int | None = None, retry_on_exc: list[Type[Exception]] | None = None, decode: Callable | None = None, tags: list[str] | None = None, generate_handler: Callable | None = None, executor: Type[BaseTaskExecutor] | None = None, middlewares: List[TaskMiddleware] | None = None, **kwargs) -> Callable[[Callable[P, R]], SyncTask[P, R]]

Декоратор для регистрации задач.

Parameters:

Name Type Description Default
name str

Имя задачи. По умолчанию: func.__name__.

required
priority int

Приоритет у задачи по умолчанию. По умолчанию: config.default_task_priority.

required
echo bool

Включить вывод в консоль. По умолчанию: False.

required
retry int

Количество попыток повторного выполнения задачи. По умолчанию: None.

required
retry_on_exc list[Type[Exception]]

Исключения, при которых задача будет повторно выполнена. По умолчанию: None.

required
decode Callable

Декодер результата задачи. По умолчанию: None.

required
tags list[str]

Теги задачи. По умолчанию: None.

required
generate_handler Callable

Генератор обработчика. По умолчанию: None.

required
executor Type[BaseTaskExecutor]

Класс BaseTaskExecutor. По умолчанию: SyncTaskExecutor.

required
middlewares List[TaskMiddleware]

Мидлвари. По умолчанию: Пустой массив.

required

Raises:

Type Description
ValueError

Если задача с таким именем уже зарегистрирована.

ValueError

Неизвестный метод {self._method}.

Returns:

Name Type Description
SyncTask

Декоратор для регистрации задачи.