Coverage for mindsdb / api / http / namespaces / analysis.py: 29%
78 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import time
3import pandas as pd
4from flask import request
5from flask_restx import Resource
6from mindsdb_sql_parser import parse_sql
7from mindsdb_sql_parser.ast import Constant
8from pandas.core.frame import DataFrame
10from mindsdb.api.http.namespaces.configs.analysis import ns_conf
11from mindsdb.api.executor.utilities.sql import get_query_tables
12from mindsdb.api.http.utils import http_error
13from mindsdb.api.mysql.mysql_proxy.classes.fake_mysql_proxy import FakeMysqlProxy
14from mindsdb.api.executor.data_types.response_type import (
15 RESPONSE_TYPE as SQL_RESPONSE_TYPE,
16)
17from mindsdb.metrics.metrics import api_endpoint_metrics
18from mindsdb.utilities import log
20logger = log.getLogger(__name__)
23def analyze_df(df: DataFrame) -> dict:
24 if len(df) == 0:
25 return {}
27 cols = pd.Series(df.columns)
29 # https://stackoverflow.com/questions/24685012/pandas-dataframe-renaming-multiple-identically-named-columns
30 for dup in cols[cols.duplicated()].unique():
31 cols[cols[cols == dup].index.values.tolist()] = [
32 dup + "." + str(i) if i != 0 else dup for i in range(sum(cols == dup))
33 ]
35 # rename the columns with the cols list.
36 df.columns = cols
38 from dataprep_ml.insights import analyze_dataset
40 analysis = analyze_dataset(df)
41 return analysis.to_dict()
44@ns_conf.route("/query")
45class QueryAnalysis(Resource):
46 @ns_conf.doc("post_query_to_analyze")
47 @api_endpoint_metrics("POST", "/analysis/query")
48 def post(self):
49 data = request.json
50 query = data.get("query")
51 context = data.get("context", {})
52 limit = data.get("limit")
53 if query is None or len(query) == 0:
54 return http_error(400, "Missed query", "Need provide query to analyze")
56 try:
57 ast = parse_sql(query)
58 except Exception as e:
59 return http_error(500, "Wrong query", str(e))
61 if limit is not None:
62 ast.limit = Constant(limit)
63 query = str(ast)
65 mysql_proxy = FakeMysqlProxy()
66 mysql_proxy.set_context(context)
68 try:
69 result = mysql_proxy.process_query(query)
70 except Exception as e:
71 logger.exception("Error during query analysis:")
72 return http_error(500, "Error", f"Unexpected error duting query analysis: {e}")
74 if result.type == SQL_RESPONSE_TYPE.ERROR:
75 return http_error(500, f"Error {result.error_code}", result.error_message)
76 if result.type != SQL_RESPONSE_TYPE.TABLE:
77 return http_error(500, "Error", "Query does not return data")
79 column_names = [column.name for column in result.result_set.columns]
80 df = result.result_set.to_df()
81 try:
82 analysis = analyze_df(df)
83 except ImportError:
84 return {
85 "analysis": {},
86 "timestamp": time.time(),
87 "error": 'To use this feature, please install the "dataprep_ml" package.',
88 }
90 query_tables = [table.to_string() for table in get_query_tables(ast)]
92 return {
93 "analysis": analysis,
94 "column_names": column_names,
95 "row_count": len(result.result_set),
96 "timestamp": time.time(),
97 "tables": query_tables,
98 }
101@ns_conf.route("/data")
102class DataAnalysis(Resource):
103 @ns_conf.doc("post_data_to_analyze")
104 @api_endpoint_metrics("POST", "/analysis/data")
105 def post(self):
106 payload = request.json
107 column_names = payload.get("column_names")
108 data = payload.get("data")
110 timestamp = time.time()
111 try:
112 analysis = analyze_df(DataFrame(data, columns=column_names))
113 return {"analysis": analysis, "timestamp": time.time()}
114 except ImportError:
115 return {
116 "analysis": {},
117 "timestamp": timestamp,
118 "error": 'To use this feature, please install the "dataprep_ml" package.',
119 }
120 except Exception as e:
121 # Don't want analysis exceptions to show up on UI.
122 # TODO: Fix analysis so it doesn't throw exceptions at all.
123 return {"analysis": {}, "timestamp": timestamp, "error": str(e)}