Coverage for mindsdb / integrations / handlers / langchain_handler / langchain_handler.py: 0%
159 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1from concurrent.futures import as_completed, TimeoutError
2from typing import Optional, Dict
3import re
5from langchain.agents import AgentExecutor
6from langchain.agents.initialize import initialize_agent
7from langchain.chains.conversation.memory import ConversationSummaryBufferMemory
8from langchain_core.prompts import PromptTemplate
9from langchain_core.messages import SystemMessage, HumanMessage
11import numpy as np
12import pandas as pd
14from mindsdb.interfaces.agents.safe_output_parser import SafeOutputParser
15from mindsdb.interfaces.agents.langchain_agent import (
16 get_llm_provider,
17 get_embedding_model_provider,
18 create_chat_model,
19 get_chat_model_params,
20)
22from mindsdb.interfaces.agents.constants import (
23 DEFAULT_AGENT_TIMEOUT_SECONDS,
24 DEFAULT_AGENT_TOOLS,
25 get_default_agent_type,
26 DEFAULT_MAX_ITERATIONS,
27 DEFAULT_MAX_TOKENS,
28 DEFAULT_MODEL_NAME,
29 USER_COLUMN,
30 ASSISTANT_COLUMN,
31)
32from mindsdb.integrations.utilities.rag.settings import DEFAULT_RAG_PROMPT_TEMPLATE
33from mindsdb.integrations.handlers.langchain_handler.tools import setup_tools
34from mindsdb.integrations.libs.base import BaseMLEngine
35from mindsdb.interfaces.storage.model_fs import HandlerStorage, ModelStorage
36from mindsdb.integrations.handlers.langchain_embedding_handler.langchain_embedding_handler import (
37 construct_model_from_args,
38)
39from mindsdb.integrations.handlers.openai_handler.constants import CHAT_MODELS_PREFIXES # noqa: F401 - for dependency checker
41from mindsdb.utilities import log
42from mindsdb.utilities.context_executor import ContextThreadPoolExecutor
44_PARSING_ERROR_PREFIXES = ["An output parsing error occured", "Could not parse LLM output"]
46logger = log.getLogger(__name__)
49class LangChainHandler(BaseMLEngine):
50 """
51 This is a MindsDB integration for the LangChain library, which provides a unified interface for interacting with
52 various large language models (LLMs).
54 Supported LLM providers:
55 - OpenAI
56 - Anthropic
57 - Google
58 - LiteLLM
59 - Ollama
61 Supported standard tools:
62 - python_repl
63 - serper.dev search
64 """
66 name = "langchain"
68 def __init__(self, model_storage: ModelStorage, engine_storage: HandlerStorage, **kwargs):
69 super().__init__(model_storage, engine_storage, **kwargs)
70 # if True, the target column name does not have to be specified at creation time.
71 self.generative = True
72 self.default_agent_tools = DEFAULT_AGENT_TOOLS
74 def _create_embeddings_model(self, args: Dict):
75 return construct_model_from_args(args)
77 def _handle_parsing_errors(self, error: Exception) -> str:
78 response = str(error)
79 for p in _PARSING_ERROR_PREFIXES:
80 if response.startswith(p):
81 # As a somewhat dirty workaround, we accept the output formatted incorrectly and use it as a response.
82 #
83 # Ideally, in the future, we would write a parser that is more robust and flexible than the one Langchain uses.
84 # Response is wrapped in ``
85 logger.info("Handling parsing error, salvaging response...")
86 response_output = response.split("`")
87 if len(response_output) >= 2:
88 response = response_output[-2]
90 # Wrap response in Langchain conversational react format.
91 langchain_react_formatted_response = f"""Thought: Do I need to use a tool? No
92AI: {response}"""
93 return langchain_react_formatted_response
94 return f"Agent failed with error:\n{str(error)}..."
96 def create(self, target: str, args: Dict = None, **kwargs):
97 self.default_agent_tools = args.get("tools", self.default_agent_tools)
99 args = args["using"]
100 args["target"] = target
101 args["model_name"] = args.get("model_name", DEFAULT_MODEL_NAME)
102 args["provider"] = args.get("provider", get_llm_provider(args))
103 args["embedding_model_provider"] = args.get("embedding_model", get_embedding_model_provider(args))
104 if args.get("mode") == "retrieval":
105 # use default prompt template for retrieval i.e. RAG if not provided
106 if "prompt_template" not in args:
107 args["prompt_template"] = DEFAULT_RAG_PROMPT_TEMPLATE
109 self.model_storage.json_set("args", args)
111 @staticmethod
112 def create_validation(_, args: Dict = None, **kwargs):
113 if "using" not in args:
114 raise Exception("LangChain engine requires a USING clause! Refer to its documentation for more details.")
115 else:
116 args = args["using"]
117 if "prompt_template" not in args:
118 if not args.get("mode") == "retrieval":
119 raise ValueError("Please provide a `prompt_template` for this engine.")
121 def predict(self, df: pd.DataFrame, args: Dict = None) -> pd.DataFrame:
122 """
123 Dispatch is performed depending on the underlying model type. Currently, only the default text completion
124 is supported.
125 """
126 pred_args = args["predict_params"] if args else {}
127 args = self.model_storage.json_get("args")
128 if "prompt_template" not in args and "prompt_template" not in pred_args:
129 raise ValueError("This model expects a `prompt_template`, please provide one.")
130 # Back compatibility for old models
131 args["provider"] = args.get("provider", get_llm_provider(args))
132 args["embedding_model_provider"] = args.get("embedding_model", get_embedding_model_provider(args))
134 df = df.reset_index(drop=True)
136 if pred_args.get("mode") == "chat_model":
137 return self.call_llm(df, args, pred_args)
139 agent = self.create_agent(df, args, pred_args)
140 # Use last message as prompt, remove other questions.
141 user_column = args.get("user_column", USER_COLUMN)
142 if user_column not in df.columns:
143 raise Exception(
144 f"Expected user input in column `{user_column}`, which is not found in the input data. Either provide the column, or redefine the expected column at model creation (`USING user_column = 'value'`)"
145 ) # noqa
146 df.iloc[:-1, df.columns.get_loc(user_column)] = None
147 return self.run_agent(df, agent, args, pred_args)
149 def call_llm(self, df, args=None, pred_args=None):
150 llm = create_chat_model({**args, **pred_args})
152 user_column = args.get("user_column", USER_COLUMN)
153 assistant_column = args.get("assistant_column", ASSISTANT_COLUMN)
155 question = df[user_column].iloc[-1]
156 resp = llm([HumanMessage(question)], stop=["\nObservation:", "\n\tObservation:"])
158 return pd.DataFrame([resp.content], columns=[assistant_column])
160 def create_agent(self, df: pd.DataFrame, args: Dict = None, pred_args: Dict = None) -> AgentExecutor:
161 pred_args = pred_args if pred_args else {}
163 # Set up tools.
164 model_kwargs = get_chat_model_params({**args, **pred_args})
165 llm = create_chat_model({**args, **pred_args})
167 tools = setup_tools(llm, model_kwargs, pred_args, self.default_agent_tools)
169 # Prefer prediction prompt template over original if provided.
170 prompt_template = pred_args.get("prompt_template", args["prompt_template"])
171 if "context" in pred_args:
172 prompt_template += "\n\n" + "Useful information:\n" + pred_args["context"] + "\n"
174 # Set up memory.
175 memory = ConversationSummaryBufferMemory(
176 llm=llm, max_token_limit=model_kwargs.get("max_tokens", DEFAULT_MAX_TOKENS), memory_key="chat_history"
177 )
178 memory.chat_memory.messages.insert(0, SystemMessage(content=prompt_template))
179 # User - Assistant conversation. All except the last message.
180 user_column = args.get("user_column", USER_COLUMN)
181 assistant_column = args.get("assistant_column", ASSISTANT_COLUMN)
182 for row in df[:-1].to_dict("records"):
183 question = row[user_column]
184 answer = row[assistant_column]
185 if question:
186 memory.chat_memory.add_user_message(question)
187 if answer:
188 memory.chat_memory.add_ai_message(answer)
189 default_agent = get_default_agent_type()
190 agent_type = args.get("agent_type", default_agent)
191 agent_executor = initialize_agent(
192 tools,
193 llm,
194 agent=agent_type,
195 # Use custom output parser to handle flaky LLMs that don't ALWAYS conform to output format.
196 agent_kwargs={"output_parser": SafeOutputParser()},
197 # Calls the agent’s LLM Chain one final time to generate a final answer based on the previous steps
198 early_stopping_method="generate",
199 handle_parsing_errors=self._handle_parsing_errors,
200 # Timeout per agent invocation.
201 max_execution_time=pred_args.get(
202 "timeout_seconds", args.get("timeout_seconds", DEFAULT_AGENT_TIMEOUT_SECONDS)
203 ),
204 max_iterations=pred_args.get("max_iterations", args.get("max_iterations", DEFAULT_MAX_ITERATIONS)),
205 memory=memory,
206 verbose=pred_args.get("verbose", args.get("verbose", True)),
207 )
208 return agent_executor
210 def run_agent(self, df: pd.DataFrame, agent: AgentExecutor, args: Dict, pred_args: Dict) -> pd.DataFrame:
211 # Prefer prediction time prompt template, if available.
212 base_template = pred_args.get("prompt_template", args["prompt_template"])
214 input_variables = []
215 matches = list(re.finditer("{{(.*?)}}", base_template))
217 for m in matches:
218 input_variables.append(m[0].replace("{", "").replace("}", ""))
219 empty_prompt_ids = np.where(df[input_variables].isna().all(axis=1).values)[0]
221 base_template = base_template.replace("{{", "{").replace("}}", "}")
222 prompts = []
224 user_column = args.get("user_column", USER_COLUMN)
225 for i, row in df.iterrows():
226 if i not in empty_prompt_ids:
227 prompt = PromptTemplate(input_variables=input_variables, template=base_template)
228 kwargs = {}
229 for col in input_variables:
230 kwargs[col] = row[col] if row[col] is not None else "" # add empty quote if data is missing
231 prompts.append(prompt.format(**kwargs))
232 elif row.get(user_column):
233 # Just add prompt
234 prompts.append(row[user_column])
236 def _invoke_agent_executor_with_prompt(agent_executor, prompt):
237 if not prompt:
238 return ""
239 try:
240 answer = agent_executor.invoke(prompt)
241 except Exception as e:
242 answer = str(e)
243 if not answer.startswith("Could not parse LLM output: `"):
244 raise e
245 answer = {"output": answer.removeprefix("Could not parse LLM output: `").removesuffix("`")}
247 if "output" not in answer:
248 # This should never happen unless Langchain changes invoke output format, but just in case.
249 return agent_executor.run(prompt)
250 return answer["output"]
252 completions = []
253 # max_workers defaults to number of processors on the machine multiplied by 5.
254 # https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor
255 max_workers = args.get("max_workers", None)
256 agent_timeout_seconds = args.get("timeout", DEFAULT_AGENT_TIMEOUT_SECONDS)
257 executor = ContextThreadPoolExecutor(max_workers=max_workers)
258 futures = [executor.submit(_invoke_agent_executor_with_prompt, agent, prompt) for prompt in prompts]
259 try:
260 for future in as_completed(futures, timeout=agent_timeout_seconds):
261 completions.append(future.result())
262 except TimeoutError:
263 completions.append(
264 f"I'm sorry! I couldn't generate a response within the allotted time ({agent_timeout_seconds} seconds). "
265 "If you need more time for processing, you can adjust the timeout settings. "
266 "Please refer to the documentation for instructions on how to change the timeout value. "
267 "Feel free to try your request again."
268 )
269 # Can't use ThreadPoolExecutor as context manager since we need wait=False.
270 executor.shutdown(wait=False)
272 # Add null completion for empty prompts
273 for i in sorted(empty_prompt_ids)[:-1]:
274 completions.insert(i, None)
276 pred_df = pd.DataFrame(completions, columns=[args["target"]])
278 return pred_df
280 def describe(self, attribute: Optional[str] = None) -> pd.DataFrame:
281 tables = ["info"]
282 return pd.DataFrame(tables, columns=["tables"])
284 def finetune(self, df: Optional[pd.DataFrame] = None, args: Optional[Dict] = None) -> None:
285 raise NotImplementedError("Fine-tuning is not supported for LangChain models")