Coverage for mindsdb / utilities / partitioning.py: 18%

30 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import os 

2from typing import Iterable, Callable 

3import pandas as pd 

4 

5from mindsdb.utilities.config import Config 

6from mindsdb.utilities.context_executor import execute_in_threads 

7 

8 

9def get_max_thread_count() -> int: 

10 """ 

11 Calculate the maximum number of threads allowed for the system. 

12 """ 

13 # workers count 

14 is_cloud = Config().is_cloud 

15 if is_cloud: 

16 max_threads = int(os.getenv('MINDSDB_MAX_PARTITIONING_THREADS', 10)) 

17 else: 

18 max_threads = os.cpu_count() - 3 

19 

20 if max_threads < 1: 

21 max_threads = 1 

22 

23 return max_threads 

24 

25 

26def split_data_frame(df: pd.DataFrame, partition_size: int) -> Iterable[pd.DataFrame]: 

27 """ 

28 Split data frame into chunks with partition_size and yield them out 

29 """ 

30 chunk = 0 

31 while chunk * partition_size < len(df): 

32 # create results with partition 

33 df1 = df.iloc[chunk * partition_size: (chunk + 1) * partition_size] 

34 chunk += 1 

35 yield df1 

36 

37 

38def process_dataframe_in_partitions(df: pd.DataFrame, callback: Callable, partition_size: int) -> Iterable: 

39 """ 

40 Splits dataframe into partitions and apply callback on each partition 

41 

42 :param df: input dataframe 

43 :param callback: function to apply on each partition 

44 :param partition_size: size of each partition 

45 :return: yield results 

46 """ 

47 

48 # tasks 

49 

50 tasks = split_data_frame(df, partition_size) 

51 

52 max_threads = get_max_thread_count() 

53 

54 chunk_count = int(len(df) / partition_size) 

55 # don't exceed chunk_count 

56 if chunk_count > 0: 

57 max_threads = min(max_threads, chunk_count) 

58 

59 if max_threads == 1: 

60 # don't spawn threads 

61 

62 for task in tasks: 

63 yield callback(task) 

64 

65 else: 

66 for result in execute_in_threads(callback, tasks, thread_count=max_threads): 

67 yield result