AsyncKafkaBroker
Async Kafka Broker.
AsyncKafkaBroker
¶
Bases: BaseBroker
, AsyncPluginMixin
Брокер, слушающий Kafka и добавляющий задачи в очередь.
Пример¶
from qtasks.asyncio import QueueTasks
from qtasks.brokers import AsyncKafkaBroker
broker = AsyncKafkaBroker(name="QueueTasks", url="localhost:9092")
app = QueueTasks(broker=broker)
__init__(name='QueueTasks', url=None, storage=None, topic='task_queue', log=None, config=None)
¶
Инициализация AsyncKafkaBroker.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name
|
str
|
Имя проекта. По умолчанию: "QueueTasks". |
'QueueTasks'
|
url
|
str
|
URL для подключения к Kafka. По умолчанию: None. |
None
|
storage
|
BaseStorage
|
Хранилище. По умолчанию: None. |
None
|
topic
|
str
|
Топик Kafka. По умолчанию: "task_queue". |
'task_queue'
|
log
|
Logger
|
Логгер. По умолчанию: None. |
None
|
config
|
QueueConfig
|
Конфиг. По умолчанию: None. |
None
|
add(task_name, priority=0, extra=None, args=None, kwargs=None)
async
¶
Добавляет задачу в брокер.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
task_name
|
str
|
Имя задачи. |
required |
priority
|
int
|
Приоритет задачи. По умоланию: 0. |
0
|
extra
|
dict
|
Дополнительные параметры задачи. |
None
|
args
|
tuple
|
Аргументы задачи типа args. |
None
|
kwargs
|
dict
|
Аргументы задачи типа kwargs. |
None
|
Returns:
Name | Type | Description |
---|---|---|
Task |
Task
|
|
flush_all()
async
¶
Удалить все данные.
get(uuid)
async
¶
Получение информации о задаче.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
uuid
|
UUID | str
|
UUID задачи. |
required |
Returns:
Type | Description |
---|---|
Task | None
|
Task|None: Если есть информация о задаче, возвращает |
listen(worker)
async
¶
Слушает Kafka и передаёт задачи воркеру.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
worker
|
BaseWorker
|
Класс воркера. |
required |
remove_finished_task(task_broker, model)
async
¶
Обновляет данные хранилища через функцию self.storage.remove_finished_task
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
task_broker
|
TaskPrioritySchema
|
Схема приоритетной задачи. |
required |
model
|
TaskStatusNewSchema | TaskStatusErrorSchema
|
Модель результата задачи. |
required |
start(worker)
async
¶
Запускает брокер.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
worker
|
BaseWorker
|
Класс Воркера. |
required |
stop()
async
¶
Останавливает брокер.
update(**kwargs)
async
¶
Обновляет информацию о задаче.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
kwargs
|
dict
|
данные задачи типа kwargs. |
{}
|