Coverage for mindsdb / utilities / langfuse.py: 17%
126 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import os
2import typing
3from typing import TYPE_CHECKING
5from mindsdb.utilities import log
7if TYPE_CHECKING:
8 from langfuse.callback import CallbackHandler
9 from langfuse.client import StatefulSpanClient
11logger = log.getLogger(__name__)
13# Define Langfuse public key.
14LANGFUSE_PUBLIC_KEY = os.getenv("LANGFUSE_PUBLIC_KEY", "langfuse_public_key")
16# Define Langfuse secret key.
17LANGFUSE_SECRET_KEY = os.getenv("LANGFUSE_SECRET_KEY", "langfuse_secret_key")
19# Define Langfuse host.
20LANGFUSE_HOST = os.getenv("LANGFUSE_HOST", "http://localhost:3000")
22# Define Langfuse environment.
23LANGFUSE_ENVIRONMENT = os.getenv("LANGFUSE_ENVIRONMENT", "local")
25# Define Langfuse release.
26LANGFUSE_RELEASE = os.getenv("LANGFUSE_RELEASE", "local")
28# Define Langfuse debug mode.
29LANGFUSE_DEBUG = os.getenv("LANGFUSE_DEBUG", "false").lower() == "true"
31# Define Langfuse timeout.
32LANGFUSE_TIMEOUT = int(os.getenv("LANGFUSE_TIMEOUT", 10))
34# Define Langfuse sample rate.
35LANGFUSE_SAMPLE_RATE = float(os.getenv("LANGFUSE_SAMPLE_RATE", 1.0))
37# Define if Langfuse is disabled.
38LANGFUSE_DISABLED = os.getenv("LANGFUSE_DISABLED", "false").lower() == "true" or LANGFUSE_ENVIRONMENT == "local"
39LANGFUSE_FORCE_RUN = os.getenv("LANGFUSE_FORCE_RUN", "false").lower() == "true"
42class LangfuseClientWrapper:
43 """
44 Langfuse client wrapper. Defines Langfuse client configuration and initializes Langfuse client.
45 """
47 def __init__(
48 self,
49 public_key: str = LANGFUSE_PUBLIC_KEY,
50 secret_key: str = LANGFUSE_SECRET_KEY,
51 host: str = LANGFUSE_HOST,
52 environment: str = LANGFUSE_ENVIRONMENT,
53 release: str = LANGFUSE_RELEASE,
54 debug: bool = LANGFUSE_DEBUG,
55 timeout: int = LANGFUSE_TIMEOUT,
56 sample_rate: float = LANGFUSE_SAMPLE_RATE,
57 disable: bool = LANGFUSE_DISABLED,
58 force_run: bool = LANGFUSE_FORCE_RUN,
59 ) -> None:
60 """
61 Initialize Langfuse client.
63 Args:
64 public_key (str): Langfuse public key.
65 secret_key (str): Langfuse secret key.
66 host (str): Langfuse host.
67 release (str): Langfuse release.
68 timeout (int): Langfuse timeout.
69 sample_rate (float): Langfuse sample rate.
70 """
72 self.metadata = None
73 self.public_key = public_key
74 self.secret_key = secret_key
75 self.host = host
76 self.environment = environment
77 self.release = release
78 self.debug = debug
79 self.timeout = timeout
80 self.sample_rate = sample_rate
81 self.disable = disable
82 self.force_run = force_run
84 self.client = None
85 self.trace = None
86 self.metadata = None
87 self.tags = None
89 # Check if Langfuse is disabled.
90 if LANGFUSE_DISABLED and not LANGFUSE_FORCE_RUN:
91 logger.info("Langfuse is disabled.")
92 return
94 logger.info("Langfuse enabled")
95 logger.debug(f"LANGFUSE_PUBLIC_KEY: {LANGFUSE_PUBLIC_KEY}")
96 logger.debug(f"LANGFUSE_SECRET_KEY: {'*' * len(LANGFUSE_SECRET_KEY)}")
97 logger.debug(f"LANGFUSE_HOST: {LANGFUSE_HOST}")
98 logger.debug(f"LANGFUSE_ENVIRONMENT: {LANGFUSE_ENVIRONMENT}")
99 logger.debug(f"LANGFUSE_RELEASE: {LANGFUSE_RELEASE}")
100 logger.debug(f"LANGFUSE_DEBUG: {LANGFUSE_DEBUG}")
101 logger.debug(f"LANGFUSE_TIMEOUT: {LANGFUSE_TIMEOUT}")
102 logger.debug(f"LANGFUSE_SAMPLE_RATE: {LANGFUSE_SAMPLE_RATE * 100}%")
104 try:
105 from langfuse import Langfuse
106 except ImportError:
107 logger.error("Langfuse is not installed. Please install it with `pip install langfuse`.")
108 return
110 self.client = Langfuse(
111 public_key=public_key,
112 secret_key=secret_key,
113 host=host,
114 release=release,
115 debug=debug,
116 timeout=timeout,
117 sample_rate=sample_rate,
118 )
120 def setup_trace(
121 self,
122 name: str,
123 input: typing.Optional[typing.Any] = None,
124 tags: typing.Optional[typing.List] = None,
125 metadata: typing.Optional[typing.Dict] = None,
126 user_id: str = None,
127 session_id: str = None,
128 ) -> None:
129 """
130 Setup trace. If Langfuse is disabled, nothing will be done.
131 Args:
132 name (str): Trace name.
133 input (dict): Trace input.
134 tags (dict): Trace tags.
135 metadata (dict): Trace metadata.
136 user_id (str): User ID.
137 session_id (str): Session ID.
138 """
140 if self.client is None:
141 logger.debug("Langfuse is disabled.")
142 return
144 self.set_metadata(metadata)
145 self.set_tags(tags)
147 try:
148 self.trace = self.client.trace(
149 name=name, input=input, metadata=self.metadata, tags=self.tags, user_id=user_id, session_id=session_id
150 )
151 except Exception:
152 logger.exception(f"Something went wrong while processing Langfuse trace {self.trace.id}:")
154 logger.info(f"Langfuse trace configured with ID: {self.trace.id}")
156 def get_trace_id(self) -> typing.Optional[str]:
157 """
158 Get trace ID. If Langfuse is disabled, returns None.
159 """
161 if self.client is None:
162 logger.debug("Langfuse is disabled.")
163 return ""
165 if self.trace is None:
166 logger.debug("Langfuse trace is not setup.")
167 return ""
169 return self.trace.id
171 def start_span(self, name: str, input: typing.Optional[typing.Any] = None) -> typing.Optional["StatefulSpanClient"]:
172 """
173 Create span. If Langfuse is disabled, nothing will be done.
175 Args:
176 name (str): Span name.
177 input (dict): Span input.
178 """
180 if self.client is None:
181 logger.debug("Langfuse is disabled.")
182 return None
184 return self.trace.span(name=name, input=input)
186 def end_span_stream(self, span: typing.Optional["StatefulSpanClient"] = None) -> None:
187 """
188 End span. If Langfuse is disabled, nothing will happen.
189 Args:
190 span (Any): Span object.
191 """
193 if self.client is None:
194 logger.debug("Langfuse is disabled.")
195 return
197 span.end()
198 self.trace.update()
200 def end_span(
201 self, span: typing.Optional["StatefulSpanClient"] = None, output: typing.Optional[typing.Any] = None
202 ) -> None:
203 """
204 End trace. If Langfuse is disabled, nothing will be done.
206 Args:
207 span (Any): Span object.
208 output (Any): Span output.
209 """
211 if self.client is None:
212 logger.debug("Langfuse is disabled.")
213 return
215 if span is None:
216 logger.debug("Langfuse span is not created.")
217 return
219 span.end(output=output)
220 self.trace.update(output=output)
222 metadata = self.metadata or {}
224 try:
225 # Ensure all batched traces are sent before fetching.
226 self.client.flush()
227 metadata["tool_usage"] = self._get_tool_usage()
228 self.trace.update(metadata=metadata)
229 except Exception:
230 logger.exception(f"Something went wrong while processing Langfuse trace {self.trace.id}:")
232 def get_langchain_handler(self) -> typing.Optional["CallbackHandler"]:
233 """
234 Get Langchain handler. If Langfuse is disabled, returns None.
235 """
237 if self.client is None:
238 logger.debug("Langfuse is disabled.")
239 return None
241 return self.trace.get_langchain_handler()
243 def set_metadata(self, custom_metadata: dict = None) -> None:
244 """
245 Get default metadata.
246 """
247 self.metadata = custom_metadata or {}
249 self.metadata["environment"] = self.environment
250 self.metadata["release"] = self.release
252 def set_tags(self, custom_tags: typing.Optional[typing.List] = None) -> None:
253 """
254 Get default tags.
255 """
256 self.tags = custom_tags or []
258 self.tags.append(self.environment)
259 self.tags.append(self.release)
261 def _get_tool_usage(self) -> typing.Dict:
262 """Retrieves tool usage information from a langfuse trace.
263 Note: assumes trace marks an action with string `AgentAction`
264 """
265 from langfuse.api.resources.commons.errors.not_found_error import NotFoundError as TraceNotFoundError
267 tool_usage = {}
269 try:
270 fetched_trace = self.client.get_trace(self.trace.id)
271 steps = [s.name for s in fetched_trace.observations]
272 for step in steps:
273 if "AgentAction" in step:
274 tool_name = step.split("-")[1]
275 if tool_name not in tool_usage:
276 tool_usage[tool_name] = 0
277 tool_usage[tool_name] += 1
278 except TraceNotFoundError:
279 logger.warning(f"Langfuse trace {self.trace.id} not found")
280 except Exception:
281 logger.exception(f"Something went wrong while processing Langfuse trace {self.trace.id}:")
283 return tool_usage