Coverage for mindsdb / integrations / handlers / google_fit_handler / google_fit_handler.py: 0%

102 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import os.path 

2import json 

3import pandas as pd 

4import pytz 

5from tzlocal import get_localzone 

6from datetime import datetime 

7 

8from google.auth.transport.requests import Request 

9from google.oauth2.credentials import Credentials 

10from google_auth_oauthlib.flow import InstalledAppFlow 

11from googleapiclient.discovery import Resource 

12from googleapiclient.discovery import build 

13from googleapiclient.errors import HttpError 

14from mindsdb_sql_parser import parse_sql 

15 

16from mindsdb.utilities import log 

17from mindsdb.integrations.handlers.google_fit_handler.google_fit_tables import GoogleFitTable 

18from mindsdb.integrations.libs.api_handler import APIHandler 

19from mindsdb.integrations.libs.response import ( 

20 HandlerStatusResponse as StatusResponse, 

21 HandlerResponse as Response, 

22) 

23 

24SCOPES = ["https://www.googleapis.com/auth/fitness.activity.read"] 

25DATE_FORMAT = "%Y-%m-%d" 

26 

27logger = log.getLogger(__name__) 

28 

29 

30class GoogleFitHandler(APIHandler): 

31 def __init__(self, name: str = None, **kwargs): 

32 super().__init__(name) 

33 args = kwargs.get("connection_data", {}) 

34 self.connection_args = {} 

35 self.credentials_path = None 

36 if "service_account_file" in args: 

37 if os.path.isfile(args["service_account_file"]) is False: 

38 raise Exception("service_account_file must be a path to the credentials.json file") 

39 self.credentials_path = args["service_account_file"] 

40 elif "service_account_json" in args: 

41 self.connection_args = args["service_account_json"] 

42 if ( 

43 not isinstance(self.connection_args, dict) 

44 or (("redirect_uris" not in self.connection_args.keys()) and len(self.connection_args) != 6) 

45 or ("redirect_uris" in self.connection_args.keys()) 

46 and len(self.connection_args) != 7 

47 ): 

48 raise Exception("service_account_json has to be a dictionary with all 6 required fields") 

49 self.connection_args["redirect_uris"] = ["http://localhost"] 

50 self.credentials_path = "mindsdb/integrations/handlers/google_fit_handler/credentials.json" 

51 else: 

52 raise Exception("Connection args have to content ether service_account_file or service_account_json") 

53 

54 self.api = None 

55 self.is_connected = False 

56 

57 aggregated_data = GoogleFitTable(self) 

58 self._register_table("aggregated_data", aggregated_data) 

59 

60 def connect(self) -> Resource: 

61 if self.is_connected is True and self.api: 

62 return self.api 

63 if self.connection_args: 

64 credentialDict = {"installed": self.connection_args} 

65 f = open(self.credentials_path, "w") 

66 f.write(json.dumps(credentialDict).replace(" ", "")) 

67 f.close() 

68 

69 creds = None 

70 

71 if os.path.isfile("mindsdb/integrations/handlers/google_fit_handler/token.json"): 

72 creds = Credentials.from_authorized_user_file( 

73 "mindsdb/integrations/handlers/google_fit_handler/token.json", SCOPES 

74 ) 

75 if not creds or not creds.valid: 

76 if creds and creds.expired and creds.refresh_token: 

77 creds.refresh(Request()) 

78 else: 

79 flow = InstalledAppFlow.from_client_secrets_file(self.credentials_path, SCOPES) 

80 creds = flow.run_local_server(port=0) 

81 with open("mindsdb/integrations/handlers/google_fit_handler/token.json", "w") as token: 

82 token.write(creds.to_json()) 

83 self.api = build("fitness", "v1", credentials=creds) 

84 

85 self.is_connected = True 

86 return self.api 

87 

88 def check_connection(self) -> StatusResponse: 

89 response = StatusResponse(False) 

90 

91 try: 

92 self.connect() 

93 response.success = True 

94 

95 except Exception as e: 

96 logger.error(f"Error connecting to Google Fit API: {e}!") 

97 response.error_message = e 

98 

99 self.is_connected = response.success 

100 return response 

101 

102 def retrieve_data(self, service, startTimeMillis, endTimeMillis, dataSourceId) -> dict: 

103 try: 

104 return ( 

105 service.users() 

106 .dataset() 

107 .aggregate( 

108 userId="me", 

109 body={ 

110 "aggregateBy": [{"dataTypeName": "com.google.step_count.delta", "dataSourceId": dataSourceId}], 

111 "bucketByTime": {"durationMillis": 86400000}, 

112 "startTimeMillis": startTimeMillis, 

113 "endTimeMillis": endTimeMillis, 

114 }, 

115 ) 

116 .execute() 

117 ) 

118 except HttpError: 

119 raise HttpError 

120 

121 def native_query(self, query: str = None) -> Response: 

122 """Receive raw query and act upon it somehow. 

123 Args: 

124 query (Any): query in native format (str for sql databases, 

125 api's json etc) 

126 Returns: 

127 HandlerResponse 

128 """ 

129 ast = parse_sql(query) 

130 return self.query(ast) 

131 

132 def get_steps(self, start_time_millis, end_time_millis) -> pd.DataFrame: 

133 steps = {} 

134 steps_data = self.retrieve_data( 

135 self.api, 

136 start_time_millis, 

137 end_time_millis, 

138 "derived:com.google.step_count.delta:com.google.android.gms:estimated_steps", 

139 ) 

140 for daily_step_data in steps_data["bucket"]: 

141 local_date = datetime.fromtimestamp( 

142 int(daily_step_data["startTimeMillis"]) / 1000, tz=pytz.timezone(str(get_localzone())) 

143 ) 

144 local_date_str = local_date.strftime(DATE_FORMAT) 

145 

146 data_point = daily_step_data["dataset"][0]["point"] 

147 if data_point: 

148 count = data_point[0]["value"][0]["intVal"] 

149 data_source_id = data_point[0]["originDataSourceId"] 

150 steps[local_date_str] = {"steps": count, "originDataSourceId": data_source_id} 

151 ret = pd.DataFrame.from_dict(steps) 

152 ret = ret.T 

153 ret = ret.drop("originDataSourceId", axis=1) 

154 ret = ret.reset_index(drop=False) 

155 return ret 

156 

157 def call_google_fit_api(self, method_name: str = None, params: dict = None) -> pd.DataFrame: 

158 """Receive query as AST (abstract syntax tree) and act upon it somehow. 

159 Args: 

160 query (ASTNode): sql query represented as AST. May be any kind 

161 of query: SELECT, INSERT, DELETE, etc 

162 Returns: 

163 DataFrame 

164 """ 

165 self.connect() 

166 if method_name == "get_steps": 

167 val = self.get_steps(params["start_time"], params["end_time"]) 

168 return val 

169 raise NotImplementedError("Method name {} not supported by Google Fit Handler".format(method_name))