Coverage for mindsdb / utilities / context_executor.py: 17%

32 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import time 

2import types 

3from concurrent.futures import ThreadPoolExecutor 

4import contextvars 

5 

6 

7class ContextThreadPoolExecutor(ThreadPoolExecutor): 

8 '''Handles copying context variables to threads created by ThreadPoolExecutor''' 

9 def __init__(self, max_workers=None): 

10 self.context = contextvars.copy_context() 

11 # ThreadPoolExecutor does not propagate context to threads by default, so we need a custom initializer. 

12 super().__init__(max_workers=max_workers, initializer=self._set_child_context) 

13 

14 def _set_child_context(self): 

15 for var, value in self.context.items(): 

16 var.set(value) 

17 

18 

19def execute_in_threads(func, tasks, thread_count=3, queue_size_k=1.5): 

20 """ 

21 Should be used as generator. 

22 Can accept input tasks as generator and keep queue size the same to not overflow the RAM 

23 

24 :param func: callable, function to execute in threads 

25 :param tasks: generator or iterable, list of input for function 

26 :param thread_count: number of threads 

27 :param queue_size_k: how a queue for workers is bigger than count of threads 

28 :return: yield results 

29 """ 

30 executor = ContextThreadPoolExecutor(max_workers=thread_count) 

31 

32 queue_size = int(thread_count * queue_size_k) 

33 

34 if not isinstance(tasks, types.GeneratorType): 

35 tasks = iter(tasks) 

36 

37 futures = None 

38 while futures is None or len(futures) > 0: 

39 if futures is None: 

40 futures = [] 

41 

42 # add new portion 

43 for i in range(queue_size): 

44 try: 

45 args = next(tasks) 

46 futures.append(executor.submit(func, args)) 

47 except StopIteration: 

48 break 

49 

50 # save results 

51 for task in futures: 

52 if task.done(): 

53 yield task.result() 

54 

55 # remove completed tasks 

56 futures[:] = [t for t in futures if not t.done()] 

57 

58 time.sleep(0.1) 

59 executor.shutdown(wait=False)