Coverage for mindsdb / interfaces / chatbot / chatbot_task.py: 21%

88 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import traceback 

2import datetime as dt 

3 

4from mindsdb.integrations.libs.api_handler import APIChatHandler 

5 

6from mindsdb.api.executor.controllers.session_controller import SessionController 

7from mindsdb.interfaces.storage import db 

8from mindsdb.interfaces.tasks.task import BaseTask 

9 

10from mindsdb.utilities import log 

11 

12from .polling import MessageCountPolling, RealtimePolling, WebhookPolling 

13from .memory import BaseMemory, DBMemory, HandlerMemory 

14from .chatbot_executor import MultiModeBotExecutor, BotExecutor, AgentExecutor 

15 

16from .types import ChatBotMessage 

17 

18logger = log.getLogger(__name__) 

19 

20HOLDING_MESSAGE = "Bot is typing..." 

21 

22 

23class ChatBotTask(BaseTask): 

24 def __init__(self, *args, **kwargs): 

25 super().__init__(*args, **kwargs) 

26 self.bot_id = self.object_id 

27 

28 self.session = SessionController() 

29 

30 bot_record = db.ChatBots.query.get(self.bot_id) 

31 

32 self.base_model_name = bot_record.model_name 

33 self.project_name = db.Project.query.get(bot_record.project_id).name 

34 self.project_datanode = self.session.datahub.get(self.project_name) 

35 

36 # get chat handler info 

37 self.bot_params = bot_record.params or {} 

38 

39 self.agent_id = bot_record.agent_id 

40 if self.agent_id is not None: 

41 self.bot_executor_cls = AgentExecutor 

42 elif self.bot_params.get("modes") is None: 

43 self.bot_executor_cls = BotExecutor 

44 else: 

45 self.bot_executor_cls = MultiModeBotExecutor 

46 

47 database_name = db.Integration.query.get(bot_record.database_id).name 

48 

49 self.chat_handler = self.session.integration_controller.get_data_handler(database_name) 

50 if not isinstance(self.chat_handler, APIChatHandler): 

51 raise Exception(f"Can't use chat database: {database_name}") 

52 

53 chat_params = self.chat_handler.get_chat_config() 

54 polling = chat_params["polling"]["type"] 

55 

56 memory = chat_params["memory"]["type"] if "memory" in chat_params else None 

57 memory_cls = None 

58 if memory: 

59 memory_cls = DBMemory if memory == "db" else HandlerMemory 

60 

61 if polling == "message_count": 

62 chat_params = chat_params["tables"] if "tables" in chat_params else [chat_params] 

63 self.chat_pooling = MessageCountPolling(self, chat_params) 

64 # The default type for message count polling is HandlerMemory if not specified. 

65 self.memory = HandlerMemory(self, chat_params) if memory_cls is None else memory_cls(self, chat_params) 

66 

67 elif polling == "realtime": 

68 chat_params = chat_params["tables"] if "tables" in chat_params else [chat_params] 

69 self.chat_pooling = RealtimePolling(self, chat_params) 

70 # The default type for real-time polling is DBMemory if not specified. 

71 self.memory = DBMemory(self, chat_params) if memory_cls is None else memory_cls(self, chat_params) 

72 

73 elif polling == "webhook": 

74 self.chat_pooling = WebhookPolling(self, chat_params) 

75 self.memory = DBMemory(self, chat_params) 

76 

77 else: 

78 raise Exception(f"Not supported polling: {polling}") 

79 

80 self.bot_params["bot_username"] = self.chat_handler.get_my_user_name() 

81 

82 def run(self, stop_event): 

83 # TODO check deleted, raise errors 

84 # TODO checks on delete predictor / project/ integration 

85 

86 self.chat_pooling.run(stop_event) 

87 

88 def on_message(self, message: ChatBotMessage, chat_id=None, chat_memory=None, table_name=None): 

89 if not chat_id and not chat_memory: 

90 raise Exception("chat_id or chat_memory should be provided") 

91 

92 try: 

93 self._on_holding_message(chat_id, chat_memory, table_name) 

94 self._on_message(message, chat_id, chat_memory, table_name) 

95 except (SystemExit, KeyboardInterrupt): 

96 raise 

97 except Exception: 

98 logger.exception("Error during chatbot message processing:") 

99 self.set_error(traceback.format_exc()) 

100 

101 def _on_holding_message(self, chat_id: str = None, chat_memory: BaseMemory = None, table_name: str = None): 

102 """ 

103 Send a message to hold the user's attention while the bot is processing the request. 

104 This message will not be saved in the chat memory. 

105 

106 Args: 

107 chat_id (str): The ID of the chat. 

108 chat_memory (BaseMemory): The memory of the chat. 

109 table_name (str): The name of the table. 

110 """ 

111 chat_id = chat_id if chat_id else chat_memory.chat_id 

112 

113 response_message = ChatBotMessage( 

114 ChatBotMessage.Type.DIRECT, 

115 HOLDING_MESSAGE, 

116 # In Slack direct messages are treated as channels themselves. 

117 user=self.bot_params["bot_username"], 

118 destination=chat_id, 

119 sent_at=dt.datetime.now(), 

120 ) 

121 

122 # send to chat adapter 

123 self.chat_pooling.send_message(response_message, table_name=table_name) 

124 logger.debug(f">>chatbot {chat_id} out (holding message): {response_message.text}") 

125 

126 def _on_message(self, message: ChatBotMessage, chat_id, chat_memory, table_name=None): 

127 # add question to history 

128 # TODO move it to realtime pooling 

129 chat_memory = chat_memory if chat_memory else self.memory.get_chat(chat_id, table_name=table_name) 

130 chat_memory.add_to_history(message) 

131 

132 logger.debug(f">>chatbot {chat_memory.chat_id} in: {message.text}") 

133 

134 # process 

135 bot_executor = self.bot_executor_cls(self, chat_memory) 

136 response_text = bot_executor.process() 

137 

138 chat_id = chat_memory.chat_id 

139 bot_username = self.bot_params["bot_username"] 

140 response_message = ChatBotMessage( 

141 ChatBotMessage.Type.DIRECT, 

142 response_text, 

143 # In Slack direct messages are treated as channels themselves. 

144 user=bot_username, 

145 destination=chat_id, 

146 sent_at=dt.datetime.now(), 

147 ) 

148 

149 # send to chat adapter 

150 self.chat_pooling.send_message(response_message, table_name=table_name) 

151 logger.debug(f">>chatbot {chat_id} out: {response_message.text}") 

152 

153 # send to history 

154 chat_memory.add_to_history(response_message) 

155 

156 def on_webhook(self, request: dict) -> None: 

157 """ 

158 Handle incoming webhook requests. 

159 Passes the request to the chat handler along with the callback method. 

160 

161 Args: 

162 request (dict): The incoming webhook request. 

163 """ 

164 self.chat_handler.on_webhook(request, self.on_message) 

165 

166 def get_memory(self) -> BaseMemory: 

167 """ 

168 Get the memory of the chatbot task. 

169 

170 Returns: 

171 BaseMemory: The memory of the chatbot task. 

172 """ 

173 return self.memory 

174 

175 def set_memory(self, memory: BaseMemory) -> None: 

176 """ 

177 Set the memory of the chatbot task. 

178 

179 Args: 

180 memory (BaseMemory): The memory to set for the chatbot task. 

181 """ 

182 self.memory = memory