Coverage for mindsdb / utilities / langfuse.py: 17%

126 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import os 

2import typing 

3from typing import TYPE_CHECKING 

4 

5from mindsdb.utilities import log 

6 

7if TYPE_CHECKING: 

8 from langfuse.callback import CallbackHandler 

9 from langfuse.client import StatefulSpanClient 

10 

11logger = log.getLogger(__name__) 

12 

13# Define Langfuse public key. 

14LANGFUSE_PUBLIC_KEY = os.getenv("LANGFUSE_PUBLIC_KEY", "langfuse_public_key") 

15 

16# Define Langfuse secret key. 

17LANGFUSE_SECRET_KEY = os.getenv("LANGFUSE_SECRET_KEY", "langfuse_secret_key") 

18 

19# Define Langfuse host. 

20LANGFUSE_HOST = os.getenv("LANGFUSE_HOST", "http://localhost:3000") 

21 

22# Define Langfuse environment. 

23LANGFUSE_ENVIRONMENT = os.getenv("LANGFUSE_ENVIRONMENT", "local") 

24 

25# Define Langfuse release. 

26LANGFUSE_RELEASE = os.getenv("LANGFUSE_RELEASE", "local") 

27 

28# Define Langfuse debug mode. 

29LANGFUSE_DEBUG = os.getenv("LANGFUSE_DEBUG", "false").lower() == "true" 

30 

31# Define Langfuse timeout. 

32LANGFUSE_TIMEOUT = int(os.getenv("LANGFUSE_TIMEOUT", 10)) 

33 

34# Define Langfuse sample rate. 

35LANGFUSE_SAMPLE_RATE = float(os.getenv("LANGFUSE_SAMPLE_RATE", 1.0)) 

36 

37# Define if Langfuse is disabled. 

38LANGFUSE_DISABLED = os.getenv("LANGFUSE_DISABLED", "false").lower() == "true" or LANGFUSE_ENVIRONMENT == "local" 

39LANGFUSE_FORCE_RUN = os.getenv("LANGFUSE_FORCE_RUN", "false").lower() == "true" 

40 

41 

42class LangfuseClientWrapper: 

43 """ 

44 Langfuse client wrapper. Defines Langfuse client configuration and initializes Langfuse client. 

45 """ 

46 

47 def __init__( 

48 self, 

49 public_key: str = LANGFUSE_PUBLIC_KEY, 

50 secret_key: str = LANGFUSE_SECRET_KEY, 

51 host: str = LANGFUSE_HOST, 

52 environment: str = LANGFUSE_ENVIRONMENT, 

53 release: str = LANGFUSE_RELEASE, 

54 debug: bool = LANGFUSE_DEBUG, 

55 timeout: int = LANGFUSE_TIMEOUT, 

56 sample_rate: float = LANGFUSE_SAMPLE_RATE, 

57 disable: bool = LANGFUSE_DISABLED, 

58 force_run: bool = LANGFUSE_FORCE_RUN, 

59 ) -> None: 

60 """ 

61 Initialize Langfuse client. 

62 

63 Args: 

64 public_key (str): Langfuse public key. 

65 secret_key (str): Langfuse secret key. 

66 host (str): Langfuse host. 

67 release (str): Langfuse release. 

68 timeout (int): Langfuse timeout. 

69 sample_rate (float): Langfuse sample rate. 

70 """ 

71 

72 self.metadata = None 

73 self.public_key = public_key 

74 self.secret_key = secret_key 

75 self.host = host 

76 self.environment = environment 

77 self.release = release 

78 self.debug = debug 

79 self.timeout = timeout 

80 self.sample_rate = sample_rate 

81 self.disable = disable 

82 self.force_run = force_run 

83 

84 self.client = None 

85 self.trace = None 

86 self.metadata = None 

87 self.tags = None 

88 

89 # Check if Langfuse is disabled. 

90 if LANGFUSE_DISABLED and not LANGFUSE_FORCE_RUN: 

91 logger.info("Langfuse is disabled.") 

92 return 

93 

94 logger.info("Langfuse enabled") 

95 logger.debug(f"LANGFUSE_PUBLIC_KEY: {LANGFUSE_PUBLIC_KEY}") 

96 logger.debug(f"LANGFUSE_SECRET_KEY: {'*' * len(LANGFUSE_SECRET_KEY)}") 

97 logger.debug(f"LANGFUSE_HOST: {LANGFUSE_HOST}") 

98 logger.debug(f"LANGFUSE_ENVIRONMENT: {LANGFUSE_ENVIRONMENT}") 

99 logger.debug(f"LANGFUSE_RELEASE: {LANGFUSE_RELEASE}") 

100 logger.debug(f"LANGFUSE_DEBUG: {LANGFUSE_DEBUG}") 

101 logger.debug(f"LANGFUSE_TIMEOUT: {LANGFUSE_TIMEOUT}") 

102 logger.debug(f"LANGFUSE_SAMPLE_RATE: {LANGFUSE_SAMPLE_RATE * 100}%") 

103 

104 try: 

105 from langfuse import Langfuse 

106 except ImportError: 

107 logger.error("Langfuse is not installed. Please install it with `pip install langfuse`.") 

108 return 

109 

110 self.client = Langfuse( 

111 public_key=public_key, 

112 secret_key=secret_key, 

113 host=host, 

114 release=release, 

115 debug=debug, 

116 timeout=timeout, 

117 sample_rate=sample_rate, 

118 ) 

119 

120 def setup_trace( 

121 self, 

122 name: str, 

123 input: typing.Optional[typing.Any] = None, 

124 tags: typing.Optional[typing.List] = None, 

125 metadata: typing.Optional[typing.Dict] = None, 

126 user_id: str = None, 

127 session_id: str = None, 

128 ) -> None: 

129 """ 

130 Setup trace. If Langfuse is disabled, nothing will be done. 

131 Args: 

132 name (str): Trace name. 

133 input (dict): Trace input. 

134 tags (dict): Trace tags. 

135 metadata (dict): Trace metadata. 

136 user_id (str): User ID. 

137 session_id (str): Session ID. 

138 """ 

139 

140 if self.client is None: 

141 logger.debug("Langfuse is disabled.") 

142 return 

143 

144 self.set_metadata(metadata) 

145 self.set_tags(tags) 

146 

147 try: 

148 self.trace = self.client.trace( 

149 name=name, input=input, metadata=self.metadata, tags=self.tags, user_id=user_id, session_id=session_id 

150 ) 

151 except Exception: 

152 logger.exception(f"Something went wrong while processing Langfuse trace {self.trace.id}:") 

153 

154 logger.info(f"Langfuse trace configured with ID: {self.trace.id}") 

155 

156 def get_trace_id(self) -> typing.Optional[str]: 

157 """ 

158 Get trace ID. If Langfuse is disabled, returns None. 

159 """ 

160 

161 if self.client is None: 

162 logger.debug("Langfuse is disabled.") 

163 return "" 

164 

165 if self.trace is None: 

166 logger.debug("Langfuse trace is not setup.") 

167 return "" 

168 

169 return self.trace.id 

170 

171 def start_span(self, name: str, input: typing.Optional[typing.Any] = None) -> typing.Optional["StatefulSpanClient"]: 

172 """ 

173 Create span. If Langfuse is disabled, nothing will be done. 

174 

175 Args: 

176 name (str): Span name. 

177 input (dict): Span input. 

178 """ 

179 

180 if self.client is None: 

181 logger.debug("Langfuse is disabled.") 

182 return None 

183 

184 return self.trace.span(name=name, input=input) 

185 

186 def end_span_stream(self, span: typing.Optional["StatefulSpanClient"] = None) -> None: 

187 """ 

188 End span. If Langfuse is disabled, nothing will happen. 

189 Args: 

190 span (Any): Span object. 

191 """ 

192 

193 if self.client is None: 

194 logger.debug("Langfuse is disabled.") 

195 return 

196 

197 span.end() 

198 self.trace.update() 

199 

200 def end_span( 

201 self, span: typing.Optional["StatefulSpanClient"] = None, output: typing.Optional[typing.Any] = None 

202 ) -> None: 

203 """ 

204 End trace. If Langfuse is disabled, nothing will be done. 

205 

206 Args: 

207 span (Any): Span object. 

208 output (Any): Span output. 

209 """ 

210 

211 if self.client is None: 

212 logger.debug("Langfuse is disabled.") 

213 return 

214 

215 if span is None: 

216 logger.debug("Langfuse span is not created.") 

217 return 

218 

219 span.end(output=output) 

220 self.trace.update(output=output) 

221 

222 metadata = self.metadata or {} 

223 

224 try: 

225 # Ensure all batched traces are sent before fetching. 

226 self.client.flush() 

227 metadata["tool_usage"] = self._get_tool_usage() 

228 self.trace.update(metadata=metadata) 

229 except Exception: 

230 logger.exception(f"Something went wrong while processing Langfuse trace {self.trace.id}:") 

231 

232 def get_langchain_handler(self) -> typing.Optional["CallbackHandler"]: 

233 """ 

234 Get Langchain handler. If Langfuse is disabled, returns None. 

235 """ 

236 

237 if self.client is None: 

238 logger.debug("Langfuse is disabled.") 

239 return None 

240 

241 return self.trace.get_langchain_handler() 

242 

243 def set_metadata(self, custom_metadata: dict = None) -> None: 

244 """ 

245 Get default metadata. 

246 """ 

247 self.metadata = custom_metadata or {} 

248 

249 self.metadata["environment"] = self.environment 

250 self.metadata["release"] = self.release 

251 

252 def set_tags(self, custom_tags: typing.Optional[typing.List] = None) -> None: 

253 """ 

254 Get default tags. 

255 """ 

256 self.tags = custom_tags or [] 

257 

258 self.tags.append(self.environment) 

259 self.tags.append(self.release) 

260 

261 def _get_tool_usage(self) -> typing.Dict: 

262 """Retrieves tool usage information from a langfuse trace. 

263 Note: assumes trace marks an action with string `AgentAction` 

264 """ 

265 from langfuse.api.resources.commons.errors.not_found_error import NotFoundError as TraceNotFoundError 

266 

267 tool_usage = {} 

268 

269 try: 

270 fetched_trace = self.client.get_trace(self.trace.id) 

271 steps = [s.name for s in fetched_trace.observations] 

272 for step in steps: 

273 if "AgentAction" in step: 

274 tool_name = step.split("-")[1] 

275 if tool_name not in tool_usage: 

276 tool_usage[tool_name] = 0 

277 tool_usage[tool_name] += 1 

278 except TraceNotFoundError: 

279 logger.warning(f"Langfuse trace {self.trace.id} not found") 

280 except Exception: 

281 logger.exception(f"Something went wrong while processing Langfuse trace {self.trace.id}:") 

282 

283 return tool_usage