Coverage for mindsdb / integrations / handlers / dspy_handler / dspy_handler.py: 0%
104 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1from typing import Optional, Dict
2import dill
3import dspy
6from dspy import ColBERTv2
7from dspy.teleprompt import BootstrapFewShot
9from mindsdb.interfaces.llm.llm_controller import LLMDataController
10import pandas as pd
11from mindsdb.interfaces.agents.constants import (
12 DEFAULT_MODEL_NAME
13)
14from mindsdb.interfaces.agents.langchain_agent import (
15 get_llm_provider, get_embedding_model_provider
16)
17from mindsdb.integrations.utilities.rag.settings import DEFAULT_RAG_PROMPT_TEMPLATE
18from mindsdb.integrations.libs.base import BaseMLEngine
19from mindsdb.integrations.utilities.handler_utils import get_api_key
20from mindsdb.interfaces.storage.model_fs import HandlerStorage, ModelStorage
21from mindsdb.utilities import log
24_PARSING_ERROR_PREFIX = 'An output parsing error occured'
26logger = log.getLogger(__name__)
28START_URL = 'http://20.102.90.50:2017/wiki17_abstracts'
29DEMOS = 4
30DF_EXAMPLES = 25
31PRIME_MULTIPLIER = 31
34class DSPyHandler(BaseMLEngine):
36 """
37 This is a MindsDB integration for the DSPy library, which provides a unified interface for interacting with
38 various large language models (LLMs) and self improving prompts for these LLMs.
40 Supported LLM providers:
41 - OpenAI
43 Supported standard tools:
44 - python_repl
45 - serper.dev search
46 """
47 name = 'dspy'
49 def __init__(
50 self,
51 model_storage: ModelStorage,
52 engine_storage: HandlerStorage,
53 **kwargs):
54 super().__init__(model_storage, engine_storage, **kwargs)
55 # if True, the target column name does not have to be specified at creation time.
56 self.generative = True
57 self.llm_data_controller = LLMDataController()
58 self.model_id = 0
60 def calculate_model_id(self, model_name):
61 '''
62 Based on the model name calculate a unique number to be used as model_id
63 '''
64 sum_chars = sum((i + 1) * ord(char) * PRIME_MULTIPLIER for i, char in enumerate(model_name))
65 return sum_chars
67 def create(self, target: str, df: Optional[pd.DataFrame] = None, args: Dict = None, **kwargs):
68 """
69 Create a model by initializing the parameters and setting up a DSPy chain with cold start data
71 Args:
72 target (str): Type of engine
73 df (DataFrame): cold start df for DSPy
74 args (Dict): Parameters for the model
76 Returns:
77 None
78 """
80 args = args['using']
81 args['target'] = target
82 args['model_name'] = args.get('model_name', DEFAULT_MODEL_NAME)
83 args['provider'] = args.get('provider', get_llm_provider(args))
84 args['embedding_model_provider'] = args.get('embedding_model', get_embedding_model_provider(args))
85 if args.get('mode') == 'retrieval':
86 # use default prompt template for retrieval i.e. RAG if not provided
87 if "prompt_template" not in args:
88 args["prompt_template"] = DEFAULT_RAG_PROMPT_TEMPLATE
90 self.model_storage.json_set('args', args)
91 if not isinstance(df, type(None)):
92 self.model_storage.file_set('cold_start_df', dill.dumps(df.to_dict()))
93 else:
94 # no cold start df if data not provided
95 self.model_storage.file_set('cold_start_df', dill.dumps({}))
96 # TODO: temporal workaround: serialize df and args, instead. And recreate chain (with training) every inference call.
97 # ideally, we serialize the chain itself to avoid duplicate training.
99 def predict(self, df: pd.DataFrame, args: Dict = None) -> pd.DataFrame:
100 """
101 Predicts a response using DSPy
103 Args:
104 df (DataFrame): input for the model
105 args (Dict): Parameters for the model
107 Returns:
108 df (DataFrame): response from the model
109 """
110 pred_args = args['predict_params'] if args else {}
111 args = self.model_storage.json_get('args')
112 if 'prompt_template' not in args and 'prompt_template' not in pred_args:
113 raise ValueError("This model expects a `prompt_template`, please provide one.")
114 # Back compatibility for old models
115 args['provider'] = args.get('provider', get_llm_provider(args))
116 args['embedding_model_provider'] = args.get('embedding_model', get_embedding_model_provider(args))
118 # retrieves llm and pass it around as context
119 model = args.get('model_name')
120 self.model_id = self.calculate_model_id(model)
121 api_key = get_api_key('openai', args, self.engine_storage, strict=False)
122 llm = dspy.OpenAI(model=model, api_key=api_key)
124 df = df.reset_index(drop=True)
126 cold_start_df = pd.DataFrame(dill.loads(self.model_storage.file_get("cold_start_df"))) # fixed in "training" # noqa
127 # gets larger as agent is used more
128 self_improvement_df = pd.DataFrame(self.llm_data_controller.list_all_llm_data(self.model_id))
129 if self_improvement_df.empty:
130 self_improvement_df = pd.DataFrame([{'input': 'dummy_input', 'output': 'dummy_output'}])
132 self_improvement_df = self_improvement_df.rename(columns={
133 'output': args['target'],
134 'input': args['user_column']
135 })
136 self_improvement_df = self_improvement_df.tail(DF_EXAMPLES)
138 # add cold start DF
139 self_improvement_df = pd.concat([cold_start_df, self_improvement_df]).reset_index(drop=True)
140 chain = self.setup_dspy(self_improvement_df, args)
141 output = self.predict_dspy(df, args, chain, llm) # this stores new traces for self-improvement
142 return output
144 def describe(self, attribute: Optional[str] = None) -> pd.DataFrame:
145 tables = ['info']
146 return pd.DataFrame(tables, columns=['tables'])
148 def finetune(self, df: Optional[pd.DataFrame] = None, args: Optional[Dict] = None) -> None:
149 raise NotImplementedError('Fine-tuning is not supported for LangChain models')
151 def setup_dspy(self, df, args):
152 """
153 Use the default DSPy parameters to set up the chain
155 Args:
156 df (DataFrame): input to the model
157 args (Dict): Parameters for the model
159 Returns:
160 DSPy chain
161 """
162 # This is the default language model and retrieval model in DSPy
163 colbertv2 = ColBERTv2(url=START_URL)
164 dspy.configure(rm=colbertv2)
165 dspy_chain = self.create_dspy_chain(df, args)
166 return dspy_chain
168 def create_dspy_chain(self, df, args):
169 """
170 Iniialize chain with the llm, add the cold start examples and bootstrap some examples
172 Args:
173 df (DataFrame): input to the model
174 args (Dict): Parameters for the model
176 Returns:
177 Optimized DSPy chain
178 """
180 # Initialize the LLM with the API key
181 model = args.get('model_name')
182 api_key = get_api_key(model, args, self.engine_storage, strict=False)
183 llm = dspy.OpenAI(model=model, api_key=api_key)
185 # Convert to DSPy Module
186 with dspy.context(lm=llm):
187 dspy_module = dspy.ReAct(f'{args["user_column"]} -> {args["target"]}')
189 # create a list of DSPy examples
190 dspy_examples = []
192 # TODO: maybe random choose a fixed set of rows
193 for i, row in df.iterrows():
194 example = dspy.Example(
195 question=row[args["user_column"]],
196 answer=row[args["target"]]
197 ).with_inputs(args["user_column"])
198 dspy_examples.append(example)
200 # TODO: add the optimizer, maybe the metric
201 config = dict(max_bootstrapped_demos=DEMOS, max_labeled_demos=DEMOS)
202 metric = dspy.evaluate.metrics.answer_exact_match # TODO: passage match requires context from prediction... we'll probably modify the signature of ReAct
203 teleprompter = BootstrapFewShot(metric=metric, **config) # TODO: maybe it's better to have this persisted so that the internal state does a better job at optimizing RAG
204 with dspy.context(lm=llm):
205 optimized = teleprompter.compile(dspy_module, trainset=dspy_examples) # TODO: check columns have the right name
206 return optimized
208 def generate_dspy_response(self, question, chain, llm):
209 """
210 Generate response using DSPy
212 Args:
213 question (str): question asked as input to the model
214 chain: DSPy chain created
215 llm: OpenAI model
217 Returns:
218 Answer to the prompt
219 """
220 input_dict = {"question": question}
221 with dspy.context(lm=llm):
222 response = chain(question=input_dict['question'])
223 return response.answer
225 def predict_dspy(self, df: pd.DataFrame, args: Dict, chain, llm) -> pd.DataFrame:
226 """
227 Generate response using DSPy
229 Args:
230 df (DataFrame): contains the input to the model
231 args (Dict): parameters of the model
232 chain: DSPy chain created
233 llm: OpenAI model
235 Returns:
236 df (DataFrame): Dataframe of responses
237 """
238 responses = []
239 for index, row in df.iterrows():
240 question = row[args['user_column']]
241 answer = self.generate_dspy_response(question, chain, llm)
242 responses.append({'answer': answer, 'question': question})
243 # TODO: check this only adds new incoming rows
244 self.llm_data_controller.add_llm_data(question, answer, self.model_id) # stores new traces for use in new calls
246 # Set up the evaluator, which can be used multiple times.
247 # TODO: use this in the EVALUATE command
248 # evaluate = Evaluate(devset=gsm8k_devset, metric=gsm8k_metric, num_threads=4, display_progress=True, display_table=0)
250 return pd.DataFrame(responses)