Coverage for mindsdb / interfaces / database / log.py: 77%
109 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1from typing import List
2from copy import deepcopy
3from abc import ABC, abstractmethod
4from collections import OrderedDict
6import pandas as pd
7from mindsdb_sql_parser import parse_sql
8from mindsdb_sql_parser.ast import Select, Identifier, Star, BinaryOperation, Constant, Join, Function
9from mindsdb_sql_parser.utils import JoinType
11from mindsdb.utilities.render.sqlalchemy_render import SqlalchemyRender
12from mindsdb.integrations.utilities.query_traversal import query_traversal
13from mindsdb.utilities.functions import resolve_table_identifier
14from mindsdb.api.executor.utilities.sql import get_query_tables
15from mindsdb.utilities.exception import EntityNotExistsError
16import mindsdb.interfaces.storage.db as db
17from mindsdb.utilities.context import context as ctx
18from mindsdb.api.executor.datahub.classes.response import DataHubResponse
19from mindsdb.api.executor.datahub.classes.tables_row import (
20 TABLES_ROW_TYPE,
21 TablesRow,
22)
25class LogTable(ABC):
26 """Base class for 'table' entitie in internal 'log' database
28 Attributes:
29 name (str): name of the table
30 deletable (bool): is it possible to delete a table
31 visible (bool): should be table visible in GUI sidebar
32 kind (str): type of the table/view
33 """
35 name: str
36 deletable: bool = False
37 visible: bool = True
38 kind: str = "table"
40 @staticmethod
41 @abstractmethod
42 def _get_base_subquery() -> Select:
43 """Get a query that returns the table from internal db
45 Returns:
46 Select: 'select' query that returns table
47 """
48 pass
50 @staticmethod
51 def company_id_comparison(table_a: str, table_b: str) -> BinaryOperation:
52 """Make statement for 'safe' comparison of company_id of two tables
54 Args:
55 table_a (str): name of first table
56 table_b (str): name of second table
58 Returns:
59 BinaryOperation: statement that can be used for 'safe' comparison
60 """
61 return BinaryOperation(
62 op="=",
63 args=(
64 Function(op="coalesce", args=(Identifier(f"{table_a}.company_id"), "0")),
65 Function(op="coalesce", args=(Identifier(f"{table_b}.company_id"), "0")),
66 ),
67 )
70class LLMLogTable(LogTable):
71 name = "llm_log"
73 columns = [
74 "API_KEY",
75 "MODEL_NAME",
76 "INPUT",
77 "OUTPUT",
78 "START_TIME",
79 "END_TIME",
80 "PROMPT_TOKENS",
81 "COMPLETION_TOKENS",
82 "TOTAL_TOKENS",
83 "SUCCESS",
84 ]
86 types_map = {"SUCCESS": "boolean", "START_TIME": "datetime64[ns]", "END_TIME": "datetime64[ns]"}
88 @staticmethod
89 def _get_base_subquery() -> Select:
90 query = Select(
91 targets=[
92 Identifier("llm_log.api_key", alias=Identifier("api_key")),
93 Identifier("predictor.name", alias=Identifier("model_name")),
94 Identifier("llm_log.input", alias=Identifier("input")),
95 Identifier("llm_log.output", alias=Identifier("output")),
96 Identifier("llm_log.start_time", alias=Identifier("start_time")),
97 Identifier("llm_log.end_time", alias=Identifier("end_time")),
98 Identifier("llm_log.prompt_tokens", alias=Identifier("prompt_tokens")),
99 Identifier("llm_log.completion_tokens", alias=Identifier("completion_tokens")),
100 Identifier("llm_log.total_tokens", alias=Identifier("total_tokens")),
101 Identifier("llm_log.success", alias=Identifier("success")),
102 ],
103 from_table=Join(
104 left=Identifier("llm_log"),
105 right=Identifier("predictor"),
106 join_type=JoinType.LEFT_JOIN,
107 condition=BinaryOperation(
108 op="and",
109 args=(
110 LLMLogTable.company_id_comparison("llm_log", "predictor"),
111 BinaryOperation(op="=", args=(Identifier("llm_log.model_id"), Identifier("predictor.id"))),
112 ),
113 ),
114 ),
115 where=BinaryOperation(
116 op="is" if ctx.company_id is None else "=",
117 args=(Identifier("llm_log.company_id"), Constant(ctx.company_id)),
118 ),
119 alias=Identifier("llm_log"),
120 )
121 return query
124class JobsHistoryTable(LogTable):
125 name = "jobs_history"
127 columns = ["NAME", "PROJECT", "RUN_START", "RUN_END", "ERROR", "QUERY"]
128 types_map = {"RUN_START": "datetime64[ns]", "RUN_END": "datetime64[ns]"}
130 @staticmethod
131 def _get_base_subquery() -> Select:
132 query = Select(
133 targets=[
134 Identifier("jobs.name", alias=Identifier("name")),
135 Identifier("project.name", alias=Identifier("project")),
136 Identifier("jobs_history.start_at", alias=Identifier("run_start")),
137 Identifier("jobs_history.end_at", alias=Identifier("run_end")),
138 Identifier("jobs_history.error", alias=Identifier("error")),
139 Identifier("jobs_history.query_str", alias=Identifier("query")),
140 ],
141 from_table=Join(
142 left=Join(
143 left=Identifier("jobs_history"),
144 right=Identifier("jobs"),
145 join_type=JoinType.LEFT_JOIN,
146 condition=BinaryOperation(
147 op="and",
148 args=(
149 LLMLogTable.company_id_comparison("jobs_history", "jobs"),
150 BinaryOperation(op="=", args=(Identifier("jobs_history.job_id"), Identifier("jobs.id"))),
151 ),
152 ),
153 ),
154 right=Identifier("project"),
155 join_type=JoinType.LEFT_JOIN,
156 condition=BinaryOperation(
157 op="and",
158 args=(
159 LLMLogTable.company_id_comparison("project", "jobs"),
160 BinaryOperation(op="=", args=(Identifier("project.id"), Identifier("jobs.project_id"))),
161 ),
162 ),
163 ),
164 where=BinaryOperation(
165 op="is" if ctx.company_id is None else "=",
166 args=(Identifier("jobs_history.company_id"), Constant(ctx.company_id)),
167 ),
168 alias=Identifier("jobs_history"),
169 )
170 return query
173class LogDBController:
174 def __init__(self):
175 self._tables = OrderedDict()
176 self._tables["llm_log"] = LLMLogTable
177 self._tables["jobs_history"] = JobsHistoryTable
179 def get_list(self) -> List[LogTable]:
180 return list(self._tables.values())
182 def get(self, name: str = None) -> LogTable:
183 try:
184 return self._tables[name]
185 except KeyError:
186 raise EntityNotExistsError(f"Table log.{name} does not exists")
188 def get_tables(self) -> OrderedDict:
189 return self._tables
191 def get_tree_tables(self) -> OrderedDict:
192 return self._tables
194 def get_tables_rows(self) -> List[TablesRow]:
195 return [
196 TablesRow(TABLE_TYPE=TABLES_ROW_TYPE.SYSTEM_VIEW, TABLE_NAME=table_name)
197 for table_name in self._tables.keys()
198 ]
200 def query(self, query: Select = None, native_query: str = None, session=None) -> DataHubResponse:
201 if native_query is not None: 201 ↛ 202line 201 didn't jump to line 202 because the condition on line 201 was never true
202 if query is not None:
203 raise Exception("'query' and 'native_query' arguments can not be used together")
204 query = parse_sql(native_query)
205 else:
206 query = deepcopy(query)
208 if type(query) is not Select: 208 ↛ 209line 208 didn't jump to line 209 because the condition on line 208 was never true
209 raise Exception("Only 'SELECT' is allowed for tables in log database")
210 tables = get_query_tables(query)
211 if len(tables) != 1: 211 ↛ 212line 211 didn't jump to line 212 because the condition on line 211 was never true
212 raise Exception("Only one table may be in query to log database")
213 table = tables[0]
214 if table[0] is not None and table[0].lower() != "log": 214 ↛ 215line 214 didn't jump to line 215 because the condition on line 214 was never true
215 raise Exception("This is not a query to the log database")
216 if table[1].lower() not in self._tables.keys(): 216 ↛ 217line 216 didn't jump to line 217 because the condition on line 216 was never true
217 raise Exception(f"There is no table '{table[1]}' in the log database")
219 log_table = self._tables[table[1].lower()]
221 # region check that only allowed identifiers are used in the query
222 available_columns_names = [column.lower() for column in log_table.columns]
224 def check_columns(node, is_table, **kwargs):
225 # region replace * to available columns
226 if type(node) is Select:
227 new_targets = []
228 for target in node.targets:
229 if type(target) is Star: 229 ↛ 232line 229 didn't jump to line 232 because the condition on line 229 was always true
230 new_targets += [Identifier(name) for name in available_columns_names]
231 else:
232 new_targets.append(target)
233 node.targets = new_targets
234 # endregion
236 if type(node) is Identifier and is_table is False:
237 parts = resolve_table_identifier(node)
238 if parts[0] is not None and parts[0].lower() not in self._tables: 238 ↛ 239line 238 didn't jump to line 239 because the condition on line 238 was never true
239 raise Exception(f"Table '{parts[0]}' can not be used in query")
240 if parts[1].lower() not in available_columns_names: 240 ↛ 241line 240 didn't jump to line 241 because the condition on line 240 was never true
241 raise Exception(f"Column '{parts[1]}' can not be used in query")
243 query_traversal(query, check_columns)
244 # endregion
246 query.from_table = log_table._get_base_subquery()
248 render_engine = db.engine.name
249 if render_engine == "postgresql": 249 ↛ 250line 249 didn't jump to line 250 because the condition on line 249 was never true
250 "postgres"
251 render = SqlalchemyRender(render_engine)
252 query_str = render.get_string(query, with_failback=False)
253 df = pd.read_sql_query(query_str, db.engine)
255 # region cast columns values to proper types
256 for column_name, column_type in log_table.types_map.items():
257 for df_column_name in df.columns:
258 if df_column_name.lower() == column_name.lower() and df[df_column_name].dtype != column_type:
259 df[df_column_name] = df[df_column_name].astype(column_type)
260 # endregion
262 columns_info = [{"name": k, "type": v} for k, v in df.dtypes.items()]
264 return DataHubResponse(data_frame=df, columns=columns_info)