Coverage for mindsdb / integrations / handlers / google_fit_handler / google_fit_handler.py: 0%
102 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import os.path
2import json
3import pandas as pd
4import pytz
5from tzlocal import get_localzone
6from datetime import datetime
8from google.auth.transport.requests import Request
9from google.oauth2.credentials import Credentials
10from google_auth_oauthlib.flow import InstalledAppFlow
11from googleapiclient.discovery import Resource
12from googleapiclient.discovery import build
13from googleapiclient.errors import HttpError
14from mindsdb_sql_parser import parse_sql
16from mindsdb.utilities import log
17from mindsdb.integrations.handlers.google_fit_handler.google_fit_tables import GoogleFitTable
18from mindsdb.integrations.libs.api_handler import APIHandler
19from mindsdb.integrations.libs.response import (
20 HandlerStatusResponse as StatusResponse,
21 HandlerResponse as Response,
22)
24SCOPES = ["https://www.googleapis.com/auth/fitness.activity.read"]
25DATE_FORMAT = "%Y-%m-%d"
27logger = log.getLogger(__name__)
30class GoogleFitHandler(APIHandler):
31 def __init__(self, name: str = None, **kwargs):
32 super().__init__(name)
33 args = kwargs.get("connection_data", {})
34 self.connection_args = {}
35 self.credentials_path = None
36 if "service_account_file" in args:
37 if os.path.isfile(args["service_account_file"]) is False:
38 raise Exception("service_account_file must be a path to the credentials.json file")
39 self.credentials_path = args["service_account_file"]
40 elif "service_account_json" in args:
41 self.connection_args = args["service_account_json"]
42 if (
43 not isinstance(self.connection_args, dict)
44 or (("redirect_uris" not in self.connection_args.keys()) and len(self.connection_args) != 6)
45 or ("redirect_uris" in self.connection_args.keys())
46 and len(self.connection_args) != 7
47 ):
48 raise Exception("service_account_json has to be a dictionary with all 6 required fields")
49 self.connection_args["redirect_uris"] = ["http://localhost"]
50 self.credentials_path = "mindsdb/integrations/handlers/google_fit_handler/credentials.json"
51 else:
52 raise Exception("Connection args have to content ether service_account_file or service_account_json")
54 self.api = None
55 self.is_connected = False
57 aggregated_data = GoogleFitTable(self)
58 self._register_table("aggregated_data", aggregated_data)
60 def connect(self) -> Resource:
61 if self.is_connected is True and self.api:
62 return self.api
63 if self.connection_args:
64 credentialDict = {"installed": self.connection_args}
65 f = open(self.credentials_path, "w")
66 f.write(json.dumps(credentialDict).replace(" ", ""))
67 f.close()
69 creds = None
71 if os.path.isfile("mindsdb/integrations/handlers/google_fit_handler/token.json"):
72 creds = Credentials.from_authorized_user_file(
73 "mindsdb/integrations/handlers/google_fit_handler/token.json", SCOPES
74 )
75 if not creds or not creds.valid:
76 if creds and creds.expired and creds.refresh_token:
77 creds.refresh(Request())
78 else:
79 flow = InstalledAppFlow.from_client_secrets_file(self.credentials_path, SCOPES)
80 creds = flow.run_local_server(port=0)
81 with open("mindsdb/integrations/handlers/google_fit_handler/token.json", "w") as token:
82 token.write(creds.to_json())
83 self.api = build("fitness", "v1", credentials=creds)
85 self.is_connected = True
86 return self.api
88 def check_connection(self) -> StatusResponse:
89 response = StatusResponse(False)
91 try:
92 self.connect()
93 response.success = True
95 except Exception as e:
96 logger.error(f"Error connecting to Google Fit API: {e}!")
97 response.error_message = e
99 self.is_connected = response.success
100 return response
102 def retrieve_data(self, service, startTimeMillis, endTimeMillis, dataSourceId) -> dict:
103 try:
104 return (
105 service.users()
106 .dataset()
107 .aggregate(
108 userId="me",
109 body={
110 "aggregateBy": [{"dataTypeName": "com.google.step_count.delta", "dataSourceId": dataSourceId}],
111 "bucketByTime": {"durationMillis": 86400000},
112 "startTimeMillis": startTimeMillis,
113 "endTimeMillis": endTimeMillis,
114 },
115 )
116 .execute()
117 )
118 except HttpError:
119 raise HttpError
121 def native_query(self, query: str = None) -> Response:
122 """Receive raw query and act upon it somehow.
123 Args:
124 query (Any): query in native format (str for sql databases,
125 api's json etc)
126 Returns:
127 HandlerResponse
128 """
129 ast = parse_sql(query)
130 return self.query(ast)
132 def get_steps(self, start_time_millis, end_time_millis) -> pd.DataFrame:
133 steps = {}
134 steps_data = self.retrieve_data(
135 self.api,
136 start_time_millis,
137 end_time_millis,
138 "derived:com.google.step_count.delta:com.google.android.gms:estimated_steps",
139 )
140 for daily_step_data in steps_data["bucket"]:
141 local_date = datetime.fromtimestamp(
142 int(daily_step_data["startTimeMillis"]) / 1000, tz=pytz.timezone(str(get_localzone()))
143 )
144 local_date_str = local_date.strftime(DATE_FORMAT)
146 data_point = daily_step_data["dataset"][0]["point"]
147 if data_point:
148 count = data_point[0]["value"][0]["intVal"]
149 data_source_id = data_point[0]["originDataSourceId"]
150 steps[local_date_str] = {"steps": count, "originDataSourceId": data_source_id}
151 ret = pd.DataFrame.from_dict(steps)
152 ret = ret.T
153 ret = ret.drop("originDataSourceId", axis=1)
154 ret = ret.reset_index(drop=False)
155 return ret
157 def call_google_fit_api(self, method_name: str = None, params: dict = None) -> pd.DataFrame:
158 """Receive query as AST (abstract syntax tree) and act upon it somehow.
159 Args:
160 query (ASTNode): sql query represented as AST. May be any kind
161 of query: SELECT, INSERT, DELETE, etc
162 Returns:
163 DataFrame
164 """
165 self.connect()
166 if method_name == "get_steps":
167 val = self.get_steps(params["start_time"], params["end_time"])
168 return val
169 raise NotImplementedError("Method name {} not supported by Google Fit Handler".format(method_name))