Coverage for mindsdb / utilities / context_executor.py: 17%
32 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import time
2import types
3from concurrent.futures import ThreadPoolExecutor
4import contextvars
7class ContextThreadPoolExecutor(ThreadPoolExecutor):
8 '''Handles copying context variables to threads created by ThreadPoolExecutor'''
9 def __init__(self, max_workers=None):
10 self.context = contextvars.copy_context()
11 # ThreadPoolExecutor does not propagate context to threads by default, so we need a custom initializer.
12 super().__init__(max_workers=max_workers, initializer=self._set_child_context)
14 def _set_child_context(self):
15 for var, value in self.context.items():
16 var.set(value)
19def execute_in_threads(func, tasks, thread_count=3, queue_size_k=1.5):
20 """
21 Should be used as generator.
22 Can accept input tasks as generator and keep queue size the same to not overflow the RAM
24 :param func: callable, function to execute in threads
25 :param tasks: generator or iterable, list of input for function
26 :param thread_count: number of threads
27 :param queue_size_k: how a queue for workers is bigger than count of threads
28 :return: yield results
29 """
30 executor = ContextThreadPoolExecutor(max_workers=thread_count)
32 queue_size = int(thread_count * queue_size_k)
34 if not isinstance(tasks, types.GeneratorType):
35 tasks = iter(tasks)
37 futures = None
38 while futures is None or len(futures) > 0:
39 if futures is None:
40 futures = []
42 # add new portion
43 for i in range(queue_size):
44 try:
45 args = next(tasks)
46 futures.append(executor.submit(func, args))
47 except StopIteration:
48 break
50 # save results
51 for task in futures:
52 if task.done():
53 yield task.result()
55 # remove completed tasks
56 futures[:] = [t for t in futures if not t.done()]
58 time.sleep(0.1)
59 executor.shutdown(wait=False)