Coverage for mindsdb / integrations / handlers / dspy_handler / dspy_handler.py: 0%

104 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1from typing import Optional, Dict 

2import dill 

3import dspy 

4 

5 

6from dspy import ColBERTv2 

7from dspy.teleprompt import BootstrapFewShot 

8 

9from mindsdb.interfaces.llm.llm_controller import LLMDataController 

10import pandas as pd 

11from mindsdb.interfaces.agents.constants import ( 

12 DEFAULT_MODEL_NAME 

13) 

14from mindsdb.interfaces.agents.langchain_agent import ( 

15 get_llm_provider, get_embedding_model_provider 

16) 

17from mindsdb.integrations.utilities.rag.settings import DEFAULT_RAG_PROMPT_TEMPLATE 

18from mindsdb.integrations.libs.base import BaseMLEngine 

19from mindsdb.integrations.utilities.handler_utils import get_api_key 

20from mindsdb.interfaces.storage.model_fs import HandlerStorage, ModelStorage 

21from mindsdb.utilities import log 

22 

23 

24_PARSING_ERROR_PREFIX = 'An output parsing error occured' 

25 

26logger = log.getLogger(__name__) 

27 

28START_URL = 'http://20.102.90.50:2017/wiki17_abstracts' 

29DEMOS = 4 

30DF_EXAMPLES = 25 

31PRIME_MULTIPLIER = 31 

32 

33 

34class DSPyHandler(BaseMLEngine): 

35 

36 """ 

37 This is a MindsDB integration for the DSPy library, which provides a unified interface for interacting with 

38 various large language models (LLMs) and self improving prompts for these LLMs. 

39 

40 Supported LLM providers: 

41 - OpenAI 

42 

43 Supported standard tools: 

44 - python_repl 

45 - serper.dev search 

46 """ 

47 name = 'dspy' 

48 

49 def __init__( 

50 self, 

51 model_storage: ModelStorage, 

52 engine_storage: HandlerStorage, 

53 **kwargs): 

54 super().__init__(model_storage, engine_storage, **kwargs) 

55 # if True, the target column name does not have to be specified at creation time. 

56 self.generative = True 

57 self.llm_data_controller = LLMDataController() 

58 self.model_id = 0 

59 

60 def calculate_model_id(self, model_name): 

61 ''' 

62 Based on the model name calculate a unique number to be used as model_id 

63 ''' 

64 sum_chars = sum((i + 1) * ord(char) * PRIME_MULTIPLIER for i, char in enumerate(model_name)) 

65 return sum_chars 

66 

67 def create(self, target: str, df: Optional[pd.DataFrame] = None, args: Dict = None, **kwargs): 

68 """ 

69 Create a model by initializing the parameters and setting up a DSPy chain with cold start data 

70 

71 Args: 

72 target (str): Type of engine 

73 df (DataFrame): cold start df for DSPy 

74 args (Dict): Parameters for the model 

75 

76 Returns: 

77 None 

78 """ 

79 

80 args = args['using'] 

81 args['target'] = target 

82 args['model_name'] = args.get('model_name', DEFAULT_MODEL_NAME) 

83 args['provider'] = args.get('provider', get_llm_provider(args)) 

84 args['embedding_model_provider'] = args.get('embedding_model', get_embedding_model_provider(args)) 

85 if args.get('mode') == 'retrieval': 

86 # use default prompt template for retrieval i.e. RAG if not provided 

87 if "prompt_template" not in args: 

88 args["prompt_template"] = DEFAULT_RAG_PROMPT_TEMPLATE 

89 

90 self.model_storage.json_set('args', args) 

91 if not isinstance(df, type(None)): 

92 self.model_storage.file_set('cold_start_df', dill.dumps(df.to_dict())) 

93 else: 

94 # no cold start df if data not provided 

95 self.model_storage.file_set('cold_start_df', dill.dumps({})) 

96 # TODO: temporal workaround: serialize df and args, instead. And recreate chain (with training) every inference call. 

97 # ideally, we serialize the chain itself to avoid duplicate training. 

98 

99 def predict(self, df: pd.DataFrame, args: Dict = None) -> pd.DataFrame: 

100 """ 

101 Predicts a response using DSPy 

102 

103 Args: 

104 df (DataFrame): input for the model 

105 args (Dict): Parameters for the model 

106 

107 Returns: 

108 df (DataFrame): response from the model 

109 """ 

110 pred_args = args['predict_params'] if args else {} 

111 args = self.model_storage.json_get('args') 

112 if 'prompt_template' not in args and 'prompt_template' not in pred_args: 

113 raise ValueError("This model expects a `prompt_template`, please provide one.") 

114 # Back compatibility for old models 

115 args['provider'] = args.get('provider', get_llm_provider(args)) 

116 args['embedding_model_provider'] = args.get('embedding_model', get_embedding_model_provider(args)) 

117 

118 # retrieves llm and pass it around as context 

119 model = args.get('model_name') 

120 self.model_id = self.calculate_model_id(model) 

121 api_key = get_api_key('openai', args, self.engine_storage, strict=False) 

122 llm = dspy.OpenAI(model=model, api_key=api_key) 

123 

124 df = df.reset_index(drop=True) 

125 

126 cold_start_df = pd.DataFrame(dill.loads(self.model_storage.file_get("cold_start_df"))) # fixed in "training" # noqa 

127 # gets larger as agent is used more 

128 self_improvement_df = pd.DataFrame(self.llm_data_controller.list_all_llm_data(self.model_id)) 

129 if self_improvement_df.empty: 

130 self_improvement_df = pd.DataFrame([{'input': 'dummy_input', 'output': 'dummy_output'}]) 

131 

132 self_improvement_df = self_improvement_df.rename(columns={ 

133 'output': args['target'], 

134 'input': args['user_column'] 

135 }) 

136 self_improvement_df = self_improvement_df.tail(DF_EXAMPLES) 

137 

138 # add cold start DF 

139 self_improvement_df = pd.concat([cold_start_df, self_improvement_df]).reset_index(drop=True) 

140 chain = self.setup_dspy(self_improvement_df, args) 

141 output = self.predict_dspy(df, args, chain, llm) # this stores new traces for self-improvement 

142 return output 

143 

144 def describe(self, attribute: Optional[str] = None) -> pd.DataFrame: 

145 tables = ['info'] 

146 return pd.DataFrame(tables, columns=['tables']) 

147 

148 def finetune(self, df: Optional[pd.DataFrame] = None, args: Optional[Dict] = None) -> None: 

149 raise NotImplementedError('Fine-tuning is not supported for LangChain models') 

150 

151 def setup_dspy(self, df, args): 

152 """ 

153 Use the default DSPy parameters to set up the chain 

154 

155 Args: 

156 df (DataFrame): input to the model 

157 args (Dict): Parameters for the model 

158 

159 Returns: 

160 DSPy chain 

161 """ 

162 # This is the default language model and retrieval model in DSPy 

163 colbertv2 = ColBERTv2(url=START_URL) 

164 dspy.configure(rm=colbertv2) 

165 dspy_chain = self.create_dspy_chain(df, args) 

166 return dspy_chain 

167 

168 def create_dspy_chain(self, df, args): 

169 """ 

170 Iniialize chain with the llm, add the cold start examples and bootstrap some examples 

171 

172 Args: 

173 df (DataFrame): input to the model 

174 args (Dict): Parameters for the model 

175 

176 Returns: 

177 Optimized DSPy chain 

178 """ 

179 

180 # Initialize the LLM with the API key 

181 model = args.get('model_name') 

182 api_key = get_api_key(model, args, self.engine_storage, strict=False) 

183 llm = dspy.OpenAI(model=model, api_key=api_key) 

184 

185 # Convert to DSPy Module 

186 with dspy.context(lm=llm): 

187 dspy_module = dspy.ReAct(f'{args["user_column"]} -> {args["target"]}') 

188 

189 # create a list of DSPy examples 

190 dspy_examples = [] 

191 

192 # TODO: maybe random choose a fixed set of rows 

193 for i, row in df.iterrows(): 

194 example = dspy.Example( 

195 question=row[args["user_column"]], 

196 answer=row[args["target"]] 

197 ).with_inputs(args["user_column"]) 

198 dspy_examples.append(example) 

199 

200 # TODO: add the optimizer, maybe the metric 

201 config = dict(max_bootstrapped_demos=DEMOS, max_labeled_demos=DEMOS) 

202 metric = dspy.evaluate.metrics.answer_exact_match # TODO: passage match requires context from prediction... we'll probably modify the signature of ReAct 

203 teleprompter = BootstrapFewShot(metric=metric, **config) # TODO: maybe it's better to have this persisted so that the internal state does a better job at optimizing RAG 

204 with dspy.context(lm=llm): 

205 optimized = teleprompter.compile(dspy_module, trainset=dspy_examples) # TODO: check columns have the right name 

206 return optimized 

207 

208 def generate_dspy_response(self, question, chain, llm): 

209 """ 

210 Generate response using DSPy 

211 

212 Args: 

213 question (str): question asked as input to the model 

214 chain: DSPy chain created 

215 llm: OpenAI model 

216 

217 Returns: 

218 Answer to the prompt 

219 """ 

220 input_dict = {"question": question} 

221 with dspy.context(lm=llm): 

222 response = chain(question=input_dict['question']) 

223 return response.answer 

224 

225 def predict_dspy(self, df: pd.DataFrame, args: Dict, chain, llm) -> pd.DataFrame: 

226 """ 

227 Generate response using DSPy 

228 

229 Args: 

230 df (DataFrame): contains the input to the model 

231 args (Dict): parameters of the model 

232 chain: DSPy chain created 

233 llm: OpenAI model 

234 

235 Returns: 

236 df (DataFrame): Dataframe of responses 

237 """ 

238 responses = [] 

239 for index, row in df.iterrows(): 

240 question = row[args['user_column']] 

241 answer = self.generate_dspy_response(question, chain, llm) 

242 responses.append({'answer': answer, 'question': question}) 

243 # TODO: check this only adds new incoming rows 

244 self.llm_data_controller.add_llm_data(question, answer, self.model_id) # stores new traces for use in new calls 

245 

246 # Set up the evaluator, which can be used multiple times. 

247 # TODO: use this in the EVALUATE command 

248 # evaluate = Evaluate(devset=gsm8k_devset, metric=gsm8k_metric, num_threads=4, display_progress=True, display_table=0) 

249 

250 return pd.DataFrame(responses)