Coverage for mindsdb / utilities / ml_task_queue / __init__.py: 100%
0 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1"""
2 Tasks queue allows to limit load by ordering tasks through a queue.
3 Current implementation use Redis as backend for the queue. To run MindsDB with tasks queue need:
4 1. config mindsdb to use tasks queue by one of:
5 - fill 'ml_task_queue' key in config.json:
6 {
7 "ml_task_queue": {
8 "type": "redis", // required
9 "host": "...",
10 "port": "...",
11 "db": "...",
12 "username": "...",
13 "password": "..."
14 }
15 }
16 - or set env vars:
17 MINDSDB_ML_QUEUE_TYPE=redis # required
18 MINDSDB_ML_QUEUE_HOST=...
19 MINDSDB_ML_QUEUE_PORT=...
20 MINDSDB_ML_QUEUE_DB=...
21 MINDSDB_ML_QUEUE_USERNAME=...
22 MINDSDB_ML_QUEUE_PASSWORD=...
23 2. run mindsdb with arg --ml_task_queue_consumer
25 In redis there is two types of entities used: streams (for distributing tasks) and regular
26 key-value storage with ttl (to transfer dataframes and some other data). Dataframes are not
27 transfer via streams to make stream messages lightweight.
29 Taks queue may work in single instnace to limit load on it, ot it may work in distributed
30 system. In that case mindsdb may be splitted into two modules: parser/planner/executioner (PPE)
31 and ML.
33 ┌─────────────┐ ┌─────────────┐
34 │ │ │ │
35 │ MindsDB PPE │ │ MindsDB PPE │
36 │ │ │ │
37 └───────────┬─┘ └─┬─────┬─▲───┘
38 │ │ │ │
39 ┌▼─────▼┐ │ │
40 │ │ ┌─▼─┴─────────┐
41 │ Queue │ │ │
42 │ │ │ Cache │
43 ├───────┤ │ │
44 │ Task │ ├─────────────┤
45 ├───────┤ │ Dataframe │
46 │ Task │ ├─────────────┤
47 ├───────┤ │ Status │
48 │ Task │ └─┬─▲─────────┘
49 └┬─────┬┘ │ │
50 │ │ │ │
51 │ │ │ │
52 ┌───────────▼─┐ ┌─▼─────▼─┴───┐
53 │ │ │ │
54 │ MindsDB ML │ │ MindsDB ML │
55 │ │ │ │
56 └─────────────┘ └─────────────┘
57"""