Coverage for mindsdb / interfaces / agents / event_dispatch_callback_handler.py: 53%

15 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import queue 

2from typing import Any, Dict, List, Optional, Sequence 

3from uuid import UUID 

4 

5from langchain_core.callbacks import BaseCallbackHandler 

6from langchain_core.documents import Document 

7 

8 

9class EventDispatchCallbackHandler(BaseCallbackHandler): 

10 '''Puts dispatched events onto an event queue to be processed as a streaming chunk''' 

11 def __init__(self, queue: queue.Queue): 

12 self.queue = queue 

13 

14 def on_custom_event( 

15 self, 

16 name: str, 

17 data: Any, 

18 *, 

19 run_id: UUID, 

20 tags: Optional[List[str]] = None, 

21 metadata: Optional[Dict[str, Any]] = None, 

22 **kwargs 

23 ): 

24 self.queue.put({ 

25 'type': 'event', 

26 'name': name, 

27 'data': data 

28 }) 

29 

30 def on_retriever_end( 

31 self, 

32 documents: Sequence[Document], 

33 *, 

34 run_id: UUID, 

35 parent_run_id: Optional[UUID] = None, 

36 **kwargs: Any, 

37 ) -> Any: 

38 document_objects = [] 

39 for d in documents: 

40 document_objects.append({ 

41 'content': d.page_content, 

42 'metadata': d.metadata 

43 }) 

44 self.queue.put({ 

45 'type': 'event', 

46 'name': 'retriever_end', 

47 'data': { 

48 'documents': document_objects 

49 } 

50 })