Coverage for mindsdb / interfaces / database / log.py: 77%

109 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1from typing import List 

2from copy import deepcopy 

3from abc import ABC, abstractmethod 

4from collections import OrderedDict 

5 

6import pandas as pd 

7from mindsdb_sql_parser import parse_sql 

8from mindsdb_sql_parser.ast import Select, Identifier, Star, BinaryOperation, Constant, Join, Function 

9from mindsdb_sql_parser.utils import JoinType 

10 

11from mindsdb.utilities.render.sqlalchemy_render import SqlalchemyRender 

12from mindsdb.integrations.utilities.query_traversal import query_traversal 

13from mindsdb.utilities.functions import resolve_table_identifier 

14from mindsdb.api.executor.utilities.sql import get_query_tables 

15from mindsdb.utilities.exception import EntityNotExistsError 

16import mindsdb.interfaces.storage.db as db 

17from mindsdb.utilities.context import context as ctx 

18from mindsdb.api.executor.datahub.classes.response import DataHubResponse 

19from mindsdb.api.executor.datahub.classes.tables_row import ( 

20 TABLES_ROW_TYPE, 

21 TablesRow, 

22) 

23 

24 

25class LogTable(ABC): 

26 """Base class for 'table' entitie in internal 'log' database 

27 

28 Attributes: 

29 name (str): name of the table 

30 deletable (bool): is it possible to delete a table 

31 visible (bool): should be table visible in GUI sidebar 

32 kind (str): type of the table/view 

33 """ 

34 

35 name: str 

36 deletable: bool = False 

37 visible: bool = True 

38 kind: str = "table" 

39 

40 @staticmethod 

41 @abstractmethod 

42 def _get_base_subquery() -> Select: 

43 """Get a query that returns the table from internal db 

44 

45 Returns: 

46 Select: 'select' query that returns table 

47 """ 

48 pass 

49 

50 @staticmethod 

51 def company_id_comparison(table_a: str, table_b: str) -> BinaryOperation: 

52 """Make statement for 'safe' comparison of company_id of two tables 

53 

54 Args: 

55 table_a (str): name of first table 

56 table_b (str): name of second table 

57 

58 Returns: 

59 BinaryOperation: statement that can be used for 'safe' comparison 

60 """ 

61 return BinaryOperation( 

62 op="=", 

63 args=( 

64 Function(op="coalesce", args=(Identifier(f"{table_a}.company_id"), "0")), 

65 Function(op="coalesce", args=(Identifier(f"{table_b}.company_id"), "0")), 

66 ), 

67 ) 

68 

69 

70class LLMLogTable(LogTable): 

71 name = "llm_log" 

72 

73 columns = [ 

74 "API_KEY", 

75 "MODEL_NAME", 

76 "INPUT", 

77 "OUTPUT", 

78 "START_TIME", 

79 "END_TIME", 

80 "PROMPT_TOKENS", 

81 "COMPLETION_TOKENS", 

82 "TOTAL_TOKENS", 

83 "SUCCESS", 

84 ] 

85 

86 types_map = {"SUCCESS": "boolean", "START_TIME": "datetime64[ns]", "END_TIME": "datetime64[ns]"} 

87 

88 @staticmethod 

89 def _get_base_subquery() -> Select: 

90 query = Select( 

91 targets=[ 

92 Identifier("llm_log.api_key", alias=Identifier("api_key")), 

93 Identifier("predictor.name", alias=Identifier("model_name")), 

94 Identifier("llm_log.input", alias=Identifier("input")), 

95 Identifier("llm_log.output", alias=Identifier("output")), 

96 Identifier("llm_log.start_time", alias=Identifier("start_time")), 

97 Identifier("llm_log.end_time", alias=Identifier("end_time")), 

98 Identifier("llm_log.prompt_tokens", alias=Identifier("prompt_tokens")), 

99 Identifier("llm_log.completion_tokens", alias=Identifier("completion_tokens")), 

100 Identifier("llm_log.total_tokens", alias=Identifier("total_tokens")), 

101 Identifier("llm_log.success", alias=Identifier("success")), 

102 ], 

103 from_table=Join( 

104 left=Identifier("llm_log"), 

105 right=Identifier("predictor"), 

106 join_type=JoinType.LEFT_JOIN, 

107 condition=BinaryOperation( 

108 op="and", 

109 args=( 

110 LLMLogTable.company_id_comparison("llm_log", "predictor"), 

111 BinaryOperation(op="=", args=(Identifier("llm_log.model_id"), Identifier("predictor.id"))), 

112 ), 

113 ), 

114 ), 

115 where=BinaryOperation( 

116 op="is" if ctx.company_id is None else "=", 

117 args=(Identifier("llm_log.company_id"), Constant(ctx.company_id)), 

118 ), 

119 alias=Identifier("llm_log"), 

120 ) 

121 return query 

122 

123 

124class JobsHistoryTable(LogTable): 

125 name = "jobs_history" 

126 

127 columns = ["NAME", "PROJECT", "RUN_START", "RUN_END", "ERROR", "QUERY"] 

128 types_map = {"RUN_START": "datetime64[ns]", "RUN_END": "datetime64[ns]"} 

129 

130 @staticmethod 

131 def _get_base_subquery() -> Select: 

132 query = Select( 

133 targets=[ 

134 Identifier("jobs.name", alias=Identifier("name")), 

135 Identifier("project.name", alias=Identifier("project")), 

136 Identifier("jobs_history.start_at", alias=Identifier("run_start")), 

137 Identifier("jobs_history.end_at", alias=Identifier("run_end")), 

138 Identifier("jobs_history.error", alias=Identifier("error")), 

139 Identifier("jobs_history.query_str", alias=Identifier("query")), 

140 ], 

141 from_table=Join( 

142 left=Join( 

143 left=Identifier("jobs_history"), 

144 right=Identifier("jobs"), 

145 join_type=JoinType.LEFT_JOIN, 

146 condition=BinaryOperation( 

147 op="and", 

148 args=( 

149 LLMLogTable.company_id_comparison("jobs_history", "jobs"), 

150 BinaryOperation(op="=", args=(Identifier("jobs_history.job_id"), Identifier("jobs.id"))), 

151 ), 

152 ), 

153 ), 

154 right=Identifier("project"), 

155 join_type=JoinType.LEFT_JOIN, 

156 condition=BinaryOperation( 

157 op="and", 

158 args=( 

159 LLMLogTable.company_id_comparison("project", "jobs"), 

160 BinaryOperation(op="=", args=(Identifier("project.id"), Identifier("jobs.project_id"))), 

161 ), 

162 ), 

163 ), 

164 where=BinaryOperation( 

165 op="is" if ctx.company_id is None else "=", 

166 args=(Identifier("jobs_history.company_id"), Constant(ctx.company_id)), 

167 ), 

168 alias=Identifier("jobs_history"), 

169 ) 

170 return query 

171 

172 

173class LogDBController: 

174 def __init__(self): 

175 self._tables = OrderedDict() 

176 self._tables["llm_log"] = LLMLogTable 

177 self._tables["jobs_history"] = JobsHistoryTable 

178 

179 def get_list(self) -> List[LogTable]: 

180 return list(self._tables.values()) 

181 

182 def get(self, name: str = None) -> LogTable: 

183 try: 

184 return self._tables[name] 

185 except KeyError: 

186 raise EntityNotExistsError(f"Table log.{name} does not exists") 

187 

188 def get_tables(self) -> OrderedDict: 

189 return self._tables 

190 

191 def get_tree_tables(self) -> OrderedDict: 

192 return self._tables 

193 

194 def get_tables_rows(self) -> List[TablesRow]: 

195 return [ 

196 TablesRow(TABLE_TYPE=TABLES_ROW_TYPE.SYSTEM_VIEW, TABLE_NAME=table_name) 

197 for table_name in self._tables.keys() 

198 ] 

199 

200 def query(self, query: Select = None, native_query: str = None, session=None) -> DataHubResponse: 

201 if native_query is not None: 201 ↛ 202line 201 didn't jump to line 202 because the condition on line 201 was never true

202 if query is not None: 

203 raise Exception("'query' and 'native_query' arguments can not be used together") 

204 query = parse_sql(native_query) 

205 else: 

206 query = deepcopy(query) 

207 

208 if type(query) is not Select: 208 ↛ 209line 208 didn't jump to line 209 because the condition on line 208 was never true

209 raise Exception("Only 'SELECT' is allowed for tables in log database") 

210 tables = get_query_tables(query) 

211 if len(tables) != 1: 211 ↛ 212line 211 didn't jump to line 212 because the condition on line 211 was never true

212 raise Exception("Only one table may be in query to log database") 

213 table = tables[0] 

214 if table[0] is not None and table[0].lower() != "log": 214 ↛ 215line 214 didn't jump to line 215 because the condition on line 214 was never true

215 raise Exception("This is not a query to the log database") 

216 if table[1].lower() not in self._tables.keys(): 216 ↛ 217line 216 didn't jump to line 217 because the condition on line 216 was never true

217 raise Exception(f"There is no table '{table[1]}' in the log database") 

218 

219 log_table = self._tables[table[1].lower()] 

220 

221 # region check that only allowed identifiers are used in the query 

222 available_columns_names = [column.lower() for column in log_table.columns] 

223 

224 def check_columns(node, is_table, **kwargs): 

225 # region replace * to available columns 

226 if type(node) is Select: 

227 new_targets = [] 

228 for target in node.targets: 

229 if type(target) is Star: 229 ↛ 232line 229 didn't jump to line 232 because the condition on line 229 was always true

230 new_targets += [Identifier(name) for name in available_columns_names] 

231 else: 

232 new_targets.append(target) 

233 node.targets = new_targets 

234 # endregion 

235 

236 if type(node) is Identifier and is_table is False: 

237 parts = resolve_table_identifier(node) 

238 if parts[0] is not None and parts[0].lower() not in self._tables: 238 ↛ 239line 238 didn't jump to line 239 because the condition on line 238 was never true

239 raise Exception(f"Table '{parts[0]}' can not be used in query") 

240 if parts[1].lower() not in available_columns_names: 240 ↛ 241line 240 didn't jump to line 241 because the condition on line 240 was never true

241 raise Exception(f"Column '{parts[1]}' can not be used in query") 

242 

243 query_traversal(query, check_columns) 

244 # endregion 

245 

246 query.from_table = log_table._get_base_subquery() 

247 

248 render_engine = db.engine.name 

249 if render_engine == "postgresql": 249 ↛ 250line 249 didn't jump to line 250 because the condition on line 249 was never true

250 "postgres" 

251 render = SqlalchemyRender(render_engine) 

252 query_str = render.get_string(query, with_failback=False) 

253 df = pd.read_sql_query(query_str, db.engine) 

254 

255 # region cast columns values to proper types 

256 for column_name, column_type in log_table.types_map.items(): 

257 for df_column_name in df.columns: 

258 if df_column_name.lower() == column_name.lower() and df[df_column_name].dtype != column_type: 

259 df[df_column_name] = df[df_column_name].astype(column_type) 

260 # endregion 

261 

262 columns_info = [{"name": k, "type": v} for k, v in df.dtypes.items()] 

263 

264 return DataHubResponse(data_frame=df, columns=columns_info)