Перейти к содержанию

SyncKafkaBroker

Sync Kafka Broker.

SyncKafkaBroker

Bases: BaseBroker, SyncPluginMixin

Брокер, слушающий Kafka и добавляющий задачи в очередь.

Пример

from qtasks import QueueTasks
from qtasks.brokers import SyncKafkaBroker

broker = SyncKafkaBroker(name="QueueTasks", url="localhost:9092")

app = QueueTasks(broker=broker)

__init__(name='QueueTasks', url=None, storage=None, topic='task_queue', log=None, config=None)

Инициализация SyncKafkaBroker.

Parameters:

Name Type Description Default
name str

Имя проекта. По умолчанию: "QueueTasks".

'QueueTasks'
url str

URL для подключения к Kafka. По умолчанию: None.

None
storage BaseStorage

Хранилище. По умолчанию: None.

None
topic str

Топик Kafka. По умолчанию: "task_queue".

'task_queue'
log Logger

Логгер. По умолчанию: None.

None
config QueueConfig

Конфиг. По умолчанию: None.

None

add(task_name, priority=0, extra=None, args=None, kwargs=None)

Добавляет задачу в брокер.

Parameters:

Name Type Description Default
task_name str

Имя задачи.

required
priority int

Приоритет задачи. По умоланию: 0.

0
args tuple

Аргументы задачи типа args.

None
kwargs dict

Аргументы задачи типа kwargs.

None

Returns:

Name Type Description
Task Task

schemas.task.Task

flush_all()

Удалить все данные.

get(uuid)

Получение информации о задаче.

Parameters:

Name Type Description Default
uuid UUID | str

UUID задачи.

required

Returns:

Type Description
Task | None

Task|None: Если есть информация о задаче, возвращает schemas.task.Task, иначе None.

listen(worker)

Слушает Kafka и передаёт задачи воркеру.

Parameters:

Name Type Description Default
worker BaseWorker

Класс воркера.

required

remove_finished_task(task_broker, model)

Обновляет данные хранилища через функцию self.storage.remove_finished_task.

Parameters:

Name Type Description Default
task_broker TaskPrioritySchema

Схема приоритетной задачи.

required
model TaskStatusNewSchema | TaskStatusErrorSchema

Модель результата задачи.

required

start(worker)

Запускает брокер.

Parameters:

Name Type Description Default
worker BaseWorker

Класс Воркера.

required

stop()

Останавливает брокер.

update(**kwargs)

Обновляет информацию о задаче.

Parameters:

Name Type Description Default
kwargs dict

данные задачи типа kwargs.

{}