Coverage for mindsdb / integrations / handlers / langchain_handler / langchain_handler.py: 0%

159 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1from concurrent.futures import as_completed, TimeoutError 

2from typing import Optional, Dict 

3import re 

4 

5from langchain.agents import AgentExecutor 

6from langchain.agents.initialize import initialize_agent 

7from langchain.chains.conversation.memory import ConversationSummaryBufferMemory 

8from langchain_core.prompts import PromptTemplate 

9from langchain_core.messages import SystemMessage, HumanMessage 

10 

11import numpy as np 

12import pandas as pd 

13 

14from mindsdb.interfaces.agents.safe_output_parser import SafeOutputParser 

15from mindsdb.interfaces.agents.langchain_agent import ( 

16 get_llm_provider, 

17 get_embedding_model_provider, 

18 create_chat_model, 

19 get_chat_model_params, 

20) 

21 

22from mindsdb.interfaces.agents.constants import ( 

23 DEFAULT_AGENT_TIMEOUT_SECONDS, 

24 DEFAULT_AGENT_TOOLS, 

25 get_default_agent_type, 

26 DEFAULT_MAX_ITERATIONS, 

27 DEFAULT_MAX_TOKENS, 

28 DEFAULT_MODEL_NAME, 

29 USER_COLUMN, 

30 ASSISTANT_COLUMN, 

31) 

32from mindsdb.integrations.utilities.rag.settings import DEFAULT_RAG_PROMPT_TEMPLATE 

33from mindsdb.integrations.handlers.langchain_handler.tools import setup_tools 

34from mindsdb.integrations.libs.base import BaseMLEngine 

35from mindsdb.interfaces.storage.model_fs import HandlerStorage, ModelStorage 

36from mindsdb.integrations.handlers.langchain_embedding_handler.langchain_embedding_handler import ( 

37 construct_model_from_args, 

38) 

39from mindsdb.integrations.handlers.openai_handler.constants import CHAT_MODELS_PREFIXES # noqa: F401 - for dependency checker 

40 

41from mindsdb.utilities import log 

42from mindsdb.utilities.context_executor import ContextThreadPoolExecutor 

43 

44_PARSING_ERROR_PREFIXES = ["An output parsing error occured", "Could not parse LLM output"] 

45 

46logger = log.getLogger(__name__) 

47 

48 

49class LangChainHandler(BaseMLEngine): 

50 """ 

51 This is a MindsDB integration for the LangChain library, which provides a unified interface for interacting with 

52 various large language models (LLMs). 

53 

54 Supported LLM providers: 

55 - OpenAI 

56 - Anthropic 

57 - Google 

58 - LiteLLM 

59 - Ollama 

60 

61 Supported standard tools: 

62 - python_repl 

63 - serper.dev search 

64 """ 

65 

66 name = "langchain" 

67 

68 def __init__(self, model_storage: ModelStorage, engine_storage: HandlerStorage, **kwargs): 

69 super().__init__(model_storage, engine_storage, **kwargs) 

70 # if True, the target column name does not have to be specified at creation time. 

71 self.generative = True 

72 self.default_agent_tools = DEFAULT_AGENT_TOOLS 

73 

74 def _create_embeddings_model(self, args: Dict): 

75 return construct_model_from_args(args) 

76 

77 def _handle_parsing_errors(self, error: Exception) -> str: 

78 response = str(error) 

79 for p in _PARSING_ERROR_PREFIXES: 

80 if response.startswith(p): 

81 # As a somewhat dirty workaround, we accept the output formatted incorrectly and use it as a response. 

82 # 

83 # Ideally, in the future, we would write a parser that is more robust and flexible than the one Langchain uses. 

84 # Response is wrapped in `` 

85 logger.info("Handling parsing error, salvaging response...") 

86 response_output = response.split("`") 

87 if len(response_output) >= 2: 

88 response = response_output[-2] 

89 

90 # Wrap response in Langchain conversational react format. 

91 langchain_react_formatted_response = f"""Thought: Do I need to use a tool? No 

92AI: {response}""" 

93 return langchain_react_formatted_response 

94 return f"Agent failed with error:\n{str(error)}..." 

95 

96 def create(self, target: str, args: Dict = None, **kwargs): 

97 self.default_agent_tools = args.get("tools", self.default_agent_tools) 

98 

99 args = args["using"] 

100 args["target"] = target 

101 args["model_name"] = args.get("model_name", DEFAULT_MODEL_NAME) 

102 args["provider"] = args.get("provider", get_llm_provider(args)) 

103 args["embedding_model_provider"] = args.get("embedding_model", get_embedding_model_provider(args)) 

104 if args.get("mode") == "retrieval": 

105 # use default prompt template for retrieval i.e. RAG if not provided 

106 if "prompt_template" not in args: 

107 args["prompt_template"] = DEFAULT_RAG_PROMPT_TEMPLATE 

108 

109 self.model_storage.json_set("args", args) 

110 

111 @staticmethod 

112 def create_validation(_, args: Dict = None, **kwargs): 

113 if "using" not in args: 

114 raise Exception("LangChain engine requires a USING clause! Refer to its documentation for more details.") 

115 else: 

116 args = args["using"] 

117 if "prompt_template" not in args: 

118 if not args.get("mode") == "retrieval": 

119 raise ValueError("Please provide a `prompt_template` for this engine.") 

120 

121 def predict(self, df: pd.DataFrame, args: Dict = None) -> pd.DataFrame: 

122 """ 

123 Dispatch is performed depending on the underlying model type. Currently, only the default text completion 

124 is supported. 

125 """ 

126 pred_args = args["predict_params"] if args else {} 

127 args = self.model_storage.json_get("args") 

128 if "prompt_template" not in args and "prompt_template" not in pred_args: 

129 raise ValueError("This model expects a `prompt_template`, please provide one.") 

130 # Back compatibility for old models 

131 args["provider"] = args.get("provider", get_llm_provider(args)) 

132 args["embedding_model_provider"] = args.get("embedding_model", get_embedding_model_provider(args)) 

133 

134 df = df.reset_index(drop=True) 

135 

136 if pred_args.get("mode") == "chat_model": 

137 return self.call_llm(df, args, pred_args) 

138 

139 agent = self.create_agent(df, args, pred_args) 

140 # Use last message as prompt, remove other questions. 

141 user_column = args.get("user_column", USER_COLUMN) 

142 if user_column not in df.columns: 

143 raise Exception( 

144 f"Expected user input in column `{user_column}`, which is not found in the input data. Either provide the column, or redefine the expected column at model creation (`USING user_column = 'value'`)" 

145 ) # noqa 

146 df.iloc[:-1, df.columns.get_loc(user_column)] = None 

147 return self.run_agent(df, agent, args, pred_args) 

148 

149 def call_llm(self, df, args=None, pred_args=None): 

150 llm = create_chat_model({**args, **pred_args}) 

151 

152 user_column = args.get("user_column", USER_COLUMN) 

153 assistant_column = args.get("assistant_column", ASSISTANT_COLUMN) 

154 

155 question = df[user_column].iloc[-1] 

156 resp = llm([HumanMessage(question)], stop=["\nObservation:", "\n\tObservation:"]) 

157 

158 return pd.DataFrame([resp.content], columns=[assistant_column]) 

159 

160 def create_agent(self, df: pd.DataFrame, args: Dict = None, pred_args: Dict = None) -> AgentExecutor: 

161 pred_args = pred_args if pred_args else {} 

162 

163 # Set up tools. 

164 model_kwargs = get_chat_model_params({**args, **pred_args}) 

165 llm = create_chat_model({**args, **pred_args}) 

166 

167 tools = setup_tools(llm, model_kwargs, pred_args, self.default_agent_tools) 

168 

169 # Prefer prediction prompt template over original if provided. 

170 prompt_template = pred_args.get("prompt_template", args["prompt_template"]) 

171 if "context" in pred_args: 

172 prompt_template += "\n\n" + "Useful information:\n" + pred_args["context"] + "\n" 

173 

174 # Set up memory. 

175 memory = ConversationSummaryBufferMemory( 

176 llm=llm, max_token_limit=model_kwargs.get("max_tokens", DEFAULT_MAX_TOKENS), memory_key="chat_history" 

177 ) 

178 memory.chat_memory.messages.insert(0, SystemMessage(content=prompt_template)) 

179 # User - Assistant conversation. All except the last message. 

180 user_column = args.get("user_column", USER_COLUMN) 

181 assistant_column = args.get("assistant_column", ASSISTANT_COLUMN) 

182 for row in df[:-1].to_dict("records"): 

183 question = row[user_column] 

184 answer = row[assistant_column] 

185 if question: 

186 memory.chat_memory.add_user_message(question) 

187 if answer: 

188 memory.chat_memory.add_ai_message(answer) 

189 default_agent = get_default_agent_type() 

190 agent_type = args.get("agent_type", default_agent) 

191 agent_executor = initialize_agent( 

192 tools, 

193 llm, 

194 agent=agent_type, 

195 # Use custom output parser to handle flaky LLMs that don't ALWAYS conform to output format. 

196 agent_kwargs={"output_parser": SafeOutputParser()}, 

197 # Calls the agent’s LLM Chain one final time to generate a final answer based on the previous steps 

198 early_stopping_method="generate", 

199 handle_parsing_errors=self._handle_parsing_errors, 

200 # Timeout per agent invocation. 

201 max_execution_time=pred_args.get( 

202 "timeout_seconds", args.get("timeout_seconds", DEFAULT_AGENT_TIMEOUT_SECONDS) 

203 ), 

204 max_iterations=pred_args.get("max_iterations", args.get("max_iterations", DEFAULT_MAX_ITERATIONS)), 

205 memory=memory, 

206 verbose=pred_args.get("verbose", args.get("verbose", True)), 

207 ) 

208 return agent_executor 

209 

210 def run_agent(self, df: pd.DataFrame, agent: AgentExecutor, args: Dict, pred_args: Dict) -> pd.DataFrame: 

211 # Prefer prediction time prompt template, if available. 

212 base_template = pred_args.get("prompt_template", args["prompt_template"]) 

213 

214 input_variables = [] 

215 matches = list(re.finditer("{{(.*?)}}", base_template)) 

216 

217 for m in matches: 

218 input_variables.append(m[0].replace("{", "").replace("}", "")) 

219 empty_prompt_ids = np.where(df[input_variables].isna().all(axis=1).values)[0] 

220 

221 base_template = base_template.replace("{{", "{").replace("}}", "}") 

222 prompts = [] 

223 

224 user_column = args.get("user_column", USER_COLUMN) 

225 for i, row in df.iterrows(): 

226 if i not in empty_prompt_ids: 

227 prompt = PromptTemplate(input_variables=input_variables, template=base_template) 

228 kwargs = {} 

229 for col in input_variables: 

230 kwargs[col] = row[col] if row[col] is not None else "" # add empty quote if data is missing 

231 prompts.append(prompt.format(**kwargs)) 

232 elif row.get(user_column): 

233 # Just add prompt 

234 prompts.append(row[user_column]) 

235 

236 def _invoke_agent_executor_with_prompt(agent_executor, prompt): 

237 if not prompt: 

238 return "" 

239 try: 

240 answer = agent_executor.invoke(prompt) 

241 except Exception as e: 

242 answer = str(e) 

243 if not answer.startswith("Could not parse LLM output: `"): 

244 raise e 

245 answer = {"output": answer.removeprefix("Could not parse LLM output: `").removesuffix("`")} 

246 

247 if "output" not in answer: 

248 # This should never happen unless Langchain changes invoke output format, but just in case. 

249 return agent_executor.run(prompt) 

250 return answer["output"] 

251 

252 completions = [] 

253 # max_workers defaults to number of processors on the machine multiplied by 5. 

254 # https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor 

255 max_workers = args.get("max_workers", None) 

256 agent_timeout_seconds = args.get("timeout", DEFAULT_AGENT_TIMEOUT_SECONDS) 

257 executor = ContextThreadPoolExecutor(max_workers=max_workers) 

258 futures = [executor.submit(_invoke_agent_executor_with_prompt, agent, prompt) for prompt in prompts] 

259 try: 

260 for future in as_completed(futures, timeout=agent_timeout_seconds): 

261 completions.append(future.result()) 

262 except TimeoutError: 

263 completions.append( 

264 f"I'm sorry! I couldn't generate a response within the allotted time ({agent_timeout_seconds} seconds). " 

265 "If you need more time for processing, you can adjust the timeout settings. " 

266 "Please refer to the documentation for instructions on how to change the timeout value. " 

267 "Feel free to try your request again." 

268 ) 

269 # Can't use ThreadPoolExecutor as context manager since we need wait=False. 

270 executor.shutdown(wait=False) 

271 

272 # Add null completion for empty prompts 

273 for i in sorted(empty_prompt_ids)[:-1]: 

274 completions.insert(i, None) 

275 

276 pred_df = pd.DataFrame(completions, columns=[args["target"]]) 

277 

278 return pred_df 

279 

280 def describe(self, attribute: Optional[str] = None) -> pd.DataFrame: 

281 tables = ["info"] 

282 return pd.DataFrame(tables, columns=["tables"]) 

283 

284 def finetune(self, df: Optional[pd.DataFrame] = None, args: Optional[Dict] = None) -> None: 

285 raise NotImplementedError("Fine-tuning is not supported for LangChain models")