Coverage for mindsdb / utilities / partitioning.py: 18%
30 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import os
2from typing import Iterable, Callable
3import pandas as pd
5from mindsdb.utilities.config import Config
6from mindsdb.utilities.context_executor import execute_in_threads
9def get_max_thread_count() -> int:
10 """
11 Calculate the maximum number of threads allowed for the system.
12 """
13 # workers count
14 is_cloud = Config().is_cloud
15 if is_cloud:
16 max_threads = int(os.getenv('MINDSDB_MAX_PARTITIONING_THREADS', 10))
17 else:
18 max_threads = os.cpu_count() - 3
20 if max_threads < 1:
21 max_threads = 1
23 return max_threads
26def split_data_frame(df: pd.DataFrame, partition_size: int) -> Iterable[pd.DataFrame]:
27 """
28 Split data frame into chunks with partition_size and yield them out
29 """
30 chunk = 0
31 while chunk * partition_size < len(df):
32 # create results with partition
33 df1 = df.iloc[chunk * partition_size: (chunk + 1) * partition_size]
34 chunk += 1
35 yield df1
38def process_dataframe_in_partitions(df: pd.DataFrame, callback: Callable, partition_size: int) -> Iterable:
39 """
40 Splits dataframe into partitions and apply callback on each partition
42 :param df: input dataframe
43 :param callback: function to apply on each partition
44 :param partition_size: size of each partition
45 :return: yield results
46 """
48 # tasks
50 tasks = split_data_frame(df, partition_size)
52 max_threads = get_max_thread_count()
54 chunk_count = int(len(df) / partition_size)
55 # don't exceed chunk_count
56 if chunk_count > 0:
57 max_threads = min(max_threads, chunk_count)
59 if max_threads == 1:
60 # don't spawn threads
62 for task in tasks:
63 yield callback(task)
65 else:
66 for result in execute_in_threads(callback, tasks, thread_count=max_threads):
67 yield result