Coverage for mindsdb / interfaces / agents / event_dispatch_callback_handler.py: 53%
15 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import queue
2from typing import Any, Dict, List, Optional, Sequence
3from uuid import UUID
5from langchain_core.callbacks import BaseCallbackHandler
6from langchain_core.documents import Document
9class EventDispatchCallbackHandler(BaseCallbackHandler):
10 '''Puts dispatched events onto an event queue to be processed as a streaming chunk'''
11 def __init__(self, queue: queue.Queue):
12 self.queue = queue
14 def on_custom_event(
15 self,
16 name: str,
17 data: Any,
18 *,
19 run_id: UUID,
20 tags: Optional[List[str]] = None,
21 metadata: Optional[Dict[str, Any]] = None,
22 **kwargs
23 ):
24 self.queue.put({
25 'type': 'event',
26 'name': name,
27 'data': data
28 })
30 def on_retriever_end(
31 self,
32 documents: Sequence[Document],
33 *,
34 run_id: UUID,
35 parent_run_id: Optional[UUID] = None,
36 **kwargs: Any,
37 ) -> Any:
38 document_objects = []
39 for d in documents:
40 document_objects.append({
41 'content': d.page_content,
42 'metadata': d.metadata
43 })
44 self.queue.put({
45 'type': 'event',
46 'name': 'retriever_end',
47 'data': {
48 'documents': document_objects
49 }
50 })