Coverage for mindsdb / interfaces / chatbot / chatbot_task.py: 21%
88 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import traceback
2import datetime as dt
4from mindsdb.integrations.libs.api_handler import APIChatHandler
6from mindsdb.api.executor.controllers.session_controller import SessionController
7from mindsdb.interfaces.storage import db
8from mindsdb.interfaces.tasks.task import BaseTask
10from mindsdb.utilities import log
12from .polling import MessageCountPolling, RealtimePolling, WebhookPolling
13from .memory import BaseMemory, DBMemory, HandlerMemory
14from .chatbot_executor import MultiModeBotExecutor, BotExecutor, AgentExecutor
16from .types import ChatBotMessage
18logger = log.getLogger(__name__)
20HOLDING_MESSAGE = "Bot is typing..."
23class ChatBotTask(BaseTask):
24 def __init__(self, *args, **kwargs):
25 super().__init__(*args, **kwargs)
26 self.bot_id = self.object_id
28 self.session = SessionController()
30 bot_record = db.ChatBots.query.get(self.bot_id)
32 self.base_model_name = bot_record.model_name
33 self.project_name = db.Project.query.get(bot_record.project_id).name
34 self.project_datanode = self.session.datahub.get(self.project_name)
36 # get chat handler info
37 self.bot_params = bot_record.params or {}
39 self.agent_id = bot_record.agent_id
40 if self.agent_id is not None:
41 self.bot_executor_cls = AgentExecutor
42 elif self.bot_params.get("modes") is None:
43 self.bot_executor_cls = BotExecutor
44 else:
45 self.bot_executor_cls = MultiModeBotExecutor
47 database_name = db.Integration.query.get(bot_record.database_id).name
49 self.chat_handler = self.session.integration_controller.get_data_handler(database_name)
50 if not isinstance(self.chat_handler, APIChatHandler):
51 raise Exception(f"Can't use chat database: {database_name}")
53 chat_params = self.chat_handler.get_chat_config()
54 polling = chat_params["polling"]["type"]
56 memory = chat_params["memory"]["type"] if "memory" in chat_params else None
57 memory_cls = None
58 if memory:
59 memory_cls = DBMemory if memory == "db" else HandlerMemory
61 if polling == "message_count":
62 chat_params = chat_params["tables"] if "tables" in chat_params else [chat_params]
63 self.chat_pooling = MessageCountPolling(self, chat_params)
64 # The default type for message count polling is HandlerMemory if not specified.
65 self.memory = HandlerMemory(self, chat_params) if memory_cls is None else memory_cls(self, chat_params)
67 elif polling == "realtime":
68 chat_params = chat_params["tables"] if "tables" in chat_params else [chat_params]
69 self.chat_pooling = RealtimePolling(self, chat_params)
70 # The default type for real-time polling is DBMemory if not specified.
71 self.memory = DBMemory(self, chat_params) if memory_cls is None else memory_cls(self, chat_params)
73 elif polling == "webhook":
74 self.chat_pooling = WebhookPolling(self, chat_params)
75 self.memory = DBMemory(self, chat_params)
77 else:
78 raise Exception(f"Not supported polling: {polling}")
80 self.bot_params["bot_username"] = self.chat_handler.get_my_user_name()
82 def run(self, stop_event):
83 # TODO check deleted, raise errors
84 # TODO checks on delete predictor / project/ integration
86 self.chat_pooling.run(stop_event)
88 def on_message(self, message: ChatBotMessage, chat_id=None, chat_memory=None, table_name=None):
89 if not chat_id and not chat_memory:
90 raise Exception("chat_id or chat_memory should be provided")
92 try:
93 self._on_holding_message(chat_id, chat_memory, table_name)
94 self._on_message(message, chat_id, chat_memory, table_name)
95 except (SystemExit, KeyboardInterrupt):
96 raise
97 except Exception:
98 logger.exception("Error during chatbot message processing:")
99 self.set_error(traceback.format_exc())
101 def _on_holding_message(self, chat_id: str = None, chat_memory: BaseMemory = None, table_name: str = None):
102 """
103 Send a message to hold the user's attention while the bot is processing the request.
104 This message will not be saved in the chat memory.
106 Args:
107 chat_id (str): The ID of the chat.
108 chat_memory (BaseMemory): The memory of the chat.
109 table_name (str): The name of the table.
110 """
111 chat_id = chat_id if chat_id else chat_memory.chat_id
113 response_message = ChatBotMessage(
114 ChatBotMessage.Type.DIRECT,
115 HOLDING_MESSAGE,
116 # In Slack direct messages are treated as channels themselves.
117 user=self.bot_params["bot_username"],
118 destination=chat_id,
119 sent_at=dt.datetime.now(),
120 )
122 # send to chat adapter
123 self.chat_pooling.send_message(response_message, table_name=table_name)
124 logger.debug(f">>chatbot {chat_id} out (holding message): {response_message.text}")
126 def _on_message(self, message: ChatBotMessage, chat_id, chat_memory, table_name=None):
127 # add question to history
128 # TODO move it to realtime pooling
129 chat_memory = chat_memory if chat_memory else self.memory.get_chat(chat_id, table_name=table_name)
130 chat_memory.add_to_history(message)
132 logger.debug(f">>chatbot {chat_memory.chat_id} in: {message.text}")
134 # process
135 bot_executor = self.bot_executor_cls(self, chat_memory)
136 response_text = bot_executor.process()
138 chat_id = chat_memory.chat_id
139 bot_username = self.bot_params["bot_username"]
140 response_message = ChatBotMessage(
141 ChatBotMessage.Type.DIRECT,
142 response_text,
143 # In Slack direct messages are treated as channels themselves.
144 user=bot_username,
145 destination=chat_id,
146 sent_at=dt.datetime.now(),
147 )
149 # send to chat adapter
150 self.chat_pooling.send_message(response_message, table_name=table_name)
151 logger.debug(f">>chatbot {chat_id} out: {response_message.text}")
153 # send to history
154 chat_memory.add_to_history(response_message)
156 def on_webhook(self, request: dict) -> None:
157 """
158 Handle incoming webhook requests.
159 Passes the request to the chat handler along with the callback method.
161 Args:
162 request (dict): The incoming webhook request.
163 """
164 self.chat_handler.on_webhook(request, self.on_message)
166 def get_memory(self) -> BaseMemory:
167 """
168 Get the memory of the chatbot task.
170 Returns:
171 BaseMemory: The memory of the chatbot task.
172 """
173 return self.memory
175 def set_memory(self, memory: BaseMemory) -> None:
176 """
177 Set the memory of the chatbot task.
179 Args:
180 memory (BaseMemory): The memory to set for the chatbot task.
181 """
182 self.memory = memory