Coverage for mindsdb / integrations / handlers / hubspot_handler / hubspot_handler.py: 0%
321 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1from typing import Optional, List, Dict, Any
2import pandas as pd
3from pandas.api import types as pd_types
4from hubspot import HubSpot
6from mindsdb.integrations.handlers.hubspot_handler.hubspot_tables import (
7 ContactsTable,
8 CompaniesTable,
9 DealsTable,
10 TicketsTable,
11 TasksTable,
12 CallsTable,
13 EmailsTable,
14 MeetingsTable,
15 NotesTable,
16 to_hubspot_property,
17 to_internal_property,
18 HUBSPOT_TABLE_COLUMN_DEFINITIONS,
19)
20from mindsdb.integrations.libs.api_handler import MetaAPIHandler
22from mindsdb.integrations.libs.response import (
23 HandlerStatusResponse as StatusResponse,
24 HandlerResponse as Response,
25 RESPONSE_TYPE,
26)
27from mindsdb.api.mysql.mysql_proxy.libs.constants.mysql import MYSQL_DATA_TYPE
28from mindsdb.utilities import log
29from mindsdb_sql_parser import parse_sql
31logger = log.getLogger(__name__)
34def _extract_hubspot_error_message(error: Exception) -> str:
35 """Extract a user-friendly error message from HubSpot API exceptions."""
36 error_str = str(error)
38 if "403" in error_str and "MISSING_SCOPES" in error_str:
39 if "requiredGranularScopes" in error_str:
40 import json
42 try:
43 start = error_str.find('{"status":')
44 if start != -1:
45 json_str = error_str[start : error_str.find("}", start) + 1]
46 error_data = json.loads(json_str)
47 if "errors" in error_data and len(error_data["errors"]) > 0:
48 context = error_data["errors"][0].get("context", {})
49 scopes = context.get("requiredGranularScopes", [])
50 if scopes:
51 scopes_list = ", ".join(scopes)
52 return (
53 f"Missing required HubSpot scopes. Your access token needs one or more of these permissions: {scopes_list}. "
54 f"Please update your HubSpot app scopes at https://developers.hubspot.com/ and regenerate your access token."
55 )
56 except (json.JSONDecodeError, KeyError, IndexError):
57 pass
58 return (
59 "Missing required HubSpot API permissions (scopes). "
60 "Please verify your access token has the necessary scopes. "
61 "Update scopes at https://developers.hubspot.com/"
62 )
64 if "401" in error_str or "Unauthorized" in error_str:
65 return "Invalid or expired HubSpot access token. Please regenerate your access token at https://developers.hubspot.com/"
67 if "429" in error_str or "rate limit" in error_str.lower():
68 return "HubSpot API rate limit exceeded. Please wait a moment and try again."
70 if "ApiException" in error_str or "hubspot" in error_str.lower():
71 return f"HubSpot API error: {error_str[:200]}"
73 return str(error)
76def _map_type(data_type: str) -> MYSQL_DATA_TYPE:
77 """Map HubSpot data types to MySQL types."""
78 if data_type is None:
79 return MYSQL_DATA_TYPE.VARCHAR
81 data_type_upper = data_type.upper()
83 type_map = {
84 "VARCHAR": MYSQL_DATA_TYPE.VARCHAR,
85 "TEXT": MYSQL_DATA_TYPE.TEXT,
86 "INTEGER": MYSQL_DATA_TYPE.INT,
87 "INT": MYSQL_DATA_TYPE.INT,
88 "BIGINT": MYSQL_DATA_TYPE.BIGINT,
89 "DECIMAL": MYSQL_DATA_TYPE.DECIMAL,
90 "FLOAT": MYSQL_DATA_TYPE.FLOAT,
91 "DOUBLE": MYSQL_DATA_TYPE.DOUBLE,
92 "BOOLEAN": MYSQL_DATA_TYPE.BOOL,
93 "BOOL": MYSQL_DATA_TYPE.BOOL,
94 "DATE": MYSQL_DATA_TYPE.DATE,
95 "DATETIME": MYSQL_DATA_TYPE.DATETIME,
96 "TIMESTAMP": MYSQL_DATA_TYPE.DATETIME,
97 "TIME": MYSQL_DATA_TYPE.TIME,
98 }
100 return type_map.get(data_type_upper, MYSQL_DATA_TYPE.VARCHAR)
103class HubspotHandler(MetaAPIHandler):
104 """Hubspot API handler implementation"""
106 name = "hubspot"
108 def __init__(self, name: str, **kwargs: Any) -> None:
109 """Initialize the handler."""
110 super().__init__(name)
112 connection_data = kwargs.get("connection_data", {})
113 self.connection_data = connection_data
114 self.kwargs = kwargs
116 self.connection: Optional[HubSpot] = None
117 self.is_connected: bool = False
119 # Register core CRM tables
120 self._register_table("companies", CompaniesTable(self))
121 self._register_table("contacts", ContactsTable(self))
122 self._register_table("deals", DealsTable(self))
123 self._register_table("tickets", TicketsTable(self))
125 # Register engagement/activity tables
126 self._register_table("tasks", TasksTable(self))
127 self._register_table("calls", CallsTable(self))
128 self._register_table("emails", EmailsTable(self))
129 self._register_table("meetings", MeetingsTable(self))
130 self._register_table("notes", NotesTable(self))
132 def connect(self) -> HubSpot:
133 """Creates a new Hubspot API client if needed."""
134 if self.is_connected and self.connection is not None:
135 return self.connection
137 try:
138 if "access_token" in self.connection_data:
139 access_token = self.connection_data["access_token"]
140 if not access_token or not isinstance(access_token, str):
141 raise ValueError("Invalid access_token provided")
143 logger.info("Connecting to HubSpot using access token")
144 self.connection = HubSpot(access_token=access_token)
146 elif "client_id" in self.connection_data and "client_secret" in self.connection_data:
147 client_id = self.connection_data["client_id"]
148 client_secret = self.connection_data["client_secret"]
150 if not client_id or not client_secret:
151 raise ValueError("Invalid OAuth credentials provided")
153 logger.info("Connecting to HubSpot using OAuth credentials")
154 self.connection = HubSpot(client_id=client_id, client_secret=client_secret)
155 else:
156 raise ValueError(
157 "Authentication credentials missing. Provide either 'access_token' "
158 "or both 'client_id' and 'client_secret' for OAuth authentication."
159 )
161 self.is_connected = True
162 logger.info("Successfully connected to HubSpot API")
163 return self.connection
165 except ValueError:
166 logger.error("Failed to connect to HubSpot API")
167 raise
168 except Exception as e:
169 logger.error("Failed to connect to HubSpot API")
170 raise ValueError(f"Connection to HubSpot failed: {str(e)}")
172 def disconnect(self) -> None:
173 """Close connection and cleanup resources."""
174 self.connection = None
175 self.is_connected = False
176 logger.info("Disconnected from HubSpot API")
178 def check_connection(self) -> StatusResponse:
179 """Checks whether the API client is connected to Hubspot."""
180 response = StatusResponse(False)
182 try:
183 self.connect()
185 if self.connection:
186 # Try to access contacts first (most common scope)
187 try:
188 list(self.connection.crm.contacts.get_all(limit=1))
189 response.success = True
190 logger.info("HubSpot connection check successful (contacts accessible)")
191 except Exception as contacts_error:
192 try:
193 list(self.connection.crm.companies.get_all(limit=1))
194 response.success = True
195 logger.info("HubSpot connection check successful (companies accessible)")
196 except Exception as companies_error:
197 contacts_msg = _extract_hubspot_error_message(contacts_error)
198 companies_msg = _extract_hubspot_error_message(companies_error)
199 error_msg = f"Cannot access HubSpot data. Contacts error: {contacts_msg}. Companies error: {companies_msg}"
200 logger.error(f"HubSpot connection check failed: {error_msg}")
201 response.error_message = error_msg
202 response.success = False
204 except Exception as e:
205 error_msg = _extract_hubspot_error_message(e)
206 logger.error(f"HubSpot connection check failed: {error_msg}")
207 response.error_message = error_msg
208 response.success = False
210 self.is_connected = response.success
211 return response
213 def native_query(self, query: Optional[str] = None) -> Response:
214 """Receive and process a raw query."""
215 if not query:
216 return Response(RESPONSE_TYPE.ERROR, error_message="Query cannot be None or empty")
218 try:
219 ast = parse_sql(query)
220 return self.query(ast)
221 except Exception as e:
222 logger.error(f"Failed to execute native query: {str(e)}")
223 return Response(RESPONSE_TYPE.ERROR, error_message=f"Query execution failed: {str(e)}")
225 def get_tables(self) -> Response:
226 """Return list of tables available in the HubSpot integration."""
227 try:
228 self.connect()
230 tables_data = []
231 all_tables = ["companies", "contacts", "deals", "tickets", "tasks", "calls", "emails", "meetings", "notes"]
232 for table_name in all_tables:
233 try:
234 # Try to access each table with a minimal request
235 default_properties = self._tables[table_name].get_columns()
236 hubspot_properties = [
237 to_hubspot_property(col)
238 for col in default_properties
239 if to_hubspot_property(col) != "hs_object_id"
240 ]
242 # Different API paths for different object types
243 if table_name in ["companies", "contacts", "deals", "tickets"]:
244 getattr(self.connection.crm, table_name).get_all(limit=1, properties=hubspot_properties)
245 else:
246 # Engagement objects use crm.objects; fetch a single page to validate access.
247 self.connection.crm.objects.basic_api.get_page(
248 table_name, limit=1, properties=hubspot_properties
249 )
251 table_info = {
252 "TABLE_SCHEMA": "hubspot",
253 "TABLE_NAME": table_name,
254 "TABLE_TYPE": "BASE TABLE",
255 }
256 tables_data.append(table_info)
257 logger.info(f"Table '{table_name}' is accessible")
258 except Exception as access_error:
259 if "403" in str(access_error) or "MISSING_SCOPES" in str(access_error):
260 error_msg = _extract_hubspot_error_message(access_error)
261 logger.warning(f"Table '{table_name}' is not accessible: {error_msg}")
262 else:
263 logger.warning(f"Could not access table {table_name}: {str(access_error)}")
265 if not tables_data:
266 error_msg = (
267 "No HubSpot tables are accessible with your current access token. "
268 "Please ensure your token has the necessary scopes. "
269 "Update scopes at https://developers.hubspot.com/"
270 )
271 logger.error(error_msg)
272 return Response(RESPONSE_TYPE.ERROR, error_message=error_msg)
274 df = pd.DataFrame(tables_data)
275 logger.info(f"Retrieved metadata for {len(tables_data)} accessible table(s)")
276 return Response(RESPONSE_TYPE.TABLE, data_frame=df)
278 except Exception as e:
279 error_msg = _extract_hubspot_error_message(e)
280 logger.error(f"Failed to get tables: {error_msg}")
281 return Response(RESPONSE_TYPE.ERROR, error_message=f"Failed to retrieve table list: {error_msg}")
283 def get_columns(self, table_name: str) -> Response:
284 """Return column information for a specific table."""
285 valid_tables = [
286 "companies",
287 "contacts",
288 "deals",
289 "tickets",
290 "tasks",
291 "calls",
292 "emails",
293 "meetings",
294 "notes",
295 ]
297 if table_name not in valid_tables:
298 return Response(
299 RESPONSE_TYPE.ERROR,
300 error_message=f"Table '{table_name}' not found. Available tables: {', '.join(valid_tables)}",
301 )
303 try:
304 self.connect()
306 discovered_columns = self._get_default_discovered_columns(table_name)
308 columns_data = []
309 for col in discovered_columns:
310 columns_data.append(
311 {
312 "COLUMN_NAME": col["column_name"],
313 "DATA_TYPE": col["data_type"],
314 "ORDINAL_POSITION": col["ordinal_position"],
315 "COLUMN_DEFAULT": None,
316 "IS_NULLABLE": "YES"
317 if col["is_nullable"] is True
318 else ("NO" if col["is_nullable"] is False else None),
319 "CHARACTER_MAXIMUM_LENGTH": None,
320 "CHARACTER_OCTET_LENGTH": None,
321 "NUMERIC_PRECISION": None,
322 "NUMERIC_SCALE": None,
323 "DATETIME_PRECISION": None,
324 "CHARACTER_SET_NAME": None,
325 "COLLATION_NAME": None,
326 }
327 )
329 df = pd.DataFrame(columns_data)
330 logger.info(f"Retrieved {len(columns_data)} columns for table {table_name}")
332 result = Response(RESPONSE_TYPE.TABLE, data_frame=df)
333 result.to_columns_table_response(map_type_fn=_map_type)
334 return result
336 except Exception as e:
337 error_msg = _extract_hubspot_error_message(e)
338 logger.error(f"Failed to get columns for table {table_name}: {error_msg}")
339 return Response(
340 RESPONSE_TYPE.ERROR, error_message=f"Failed to retrieve columns for table '{table_name}': {error_msg}"
341 )
343 def meta_get_column_statistics(self, table_names: Optional[List[str]] = None) -> Response:
344 """Return column statistics for data catalog."""
345 try:
346 self.connect()
348 all_tables = ["companies", "contacts", "deals", "tickets", "tasks", "calls", "emails", "meetings", "notes"]
349 if table_names:
350 tables_to_process = [t for t in table_names if t in all_tables]
351 else:
352 tables_to_process = all_tables
354 all_statistics = []
356 for table_name in tables_to_process:
357 try:
358 table_statistics = []
359 default_properties = self._tables[table_name].get_columns()
360 hubspot_properties = [
361 to_hubspot_property(col)
362 for col in default_properties
363 if to_hubspot_property(col) != "hs_object_id"
364 ]
366 # Get sample data based on object type
367 if table_name in ["companies", "contacts", "deals", "tickets"]:
368 sample_data = list(
369 getattr(self.connection.crm, table_name).get_all(limit=1000, properties=hubspot_properties)
370 )
371 else:
372 sample_data = list(self._get_objects_all(table_name, limit=1000, properties=hubspot_properties))
374 if len(sample_data) > 0:
375 sample_size = len(sample_data)
376 logger.info(f"Calculating statistics from {sample_size} records for {table_name}")
378 all_properties = set()
379 for item in sample_data:
380 if hasattr(item, "properties") and item.properties:
381 all_properties.update(item.properties.keys())
383 # Statistics for 'id' column
384 id_values = [item.id for item in sample_data]
385 id_stats = self._calculate_column_statistics("id", id_values)
386 table_statistics.append(
387 {
388 "TABLE_NAME": table_name,
389 "COLUMN_NAME": "id",
390 "NULL_PERCENTAGE": (id_stats["null_count"] / sample_size) * 100
391 if sample_size > 0
392 else 0,
393 "DISTINCT_VALUES_COUNT": id_stats["distinct_count"],
394 "MINIMUM_VALUE": None,
395 "MAXIMUM_VALUE": None,
396 "MOST_COMMON_VALUES": None,
397 "MOST_COMMON_FREQUENCIES": None,
398 }
399 )
401 for prop_name in sorted(all_properties):
402 column_name = to_internal_property(prop_name)
404 column_values = []
405 for item in sample_data:
406 if hasattr(item, "properties") and item.properties:
407 column_values.append(item.properties.get(prop_name))
408 else:
409 column_values.append(None)
411 stats = self._calculate_column_statistics(column_name, column_values)
413 most_common_values = None
414 most_common_frequencies = None
415 non_null_values = [v for v in column_values if v is not None]
416 if non_null_values:
417 from collections import Counter
419 value_counts = Counter(non_null_values)
420 top_5 = value_counts.most_common(5)
421 if top_5:
422 most_common_values = [str(v) for v, _ in top_5]
423 most_common_frequencies = [str(c) for _, c in top_5]
425 table_statistics.append(
426 {
427 "TABLE_NAME": table_name,
428 "COLUMN_NAME": column_name,
429 "NULL_PERCENTAGE": (stats["null_count"] / sample_size) * 100
430 if sample_size > 0
431 else 0,
432 "DISTINCT_VALUES_COUNT": stats["distinct_count"],
433 "MINIMUM_VALUE": None,
434 "MAXIMUM_VALUE": None,
435 "MOST_COMMON_VALUES": most_common_values,
436 "MOST_COMMON_FREQUENCIES": most_common_frequencies,
437 }
438 )
440 # Filter to only include default properties
441 table_statistics = [
442 col
443 for col in table_statistics
444 if col["COLUMN_NAME"] in default_properties or col["COLUMN_NAME"] == "id"
445 ]
446 all_statistics.extend(table_statistics)
448 except Exception as e:
449 logger.warning(f"Could not get statistics for table {table_name}: {str(e)}")
451 df = pd.DataFrame(all_statistics)
452 logger.info(
453 f"Retrieved statistics for {len(all_statistics)} columns across {len(tables_to_process)} tables"
454 )
455 return Response(RESPONSE_TYPE.TABLE, data_frame=df)
457 except Exception as e:
458 logger.error(f"Failed to get column statistics: {str(e)}")
459 return Response(RESPONSE_TYPE.ERROR, error_message=f"Failed to retrieve column statistics: {str(e)}")
461 def _get_default_discovered_columns(self, table_name: str) -> List[Dict[str, Any]]:
462 """Get default discovered columns when API data is unavailable."""
463 ordinal_position = 1
464 base_columns = [
465 {
466 "column_name": "id",
467 "data_type": "VARCHAR",
468 "is_nullable": False,
469 "ordinal_position": ordinal_position,
470 "description": "Unique identifier (Primary Key)",
471 "original_name": "id",
472 }
473 ]
474 ordinal_position += 1
476 if table_name in HUBSPOT_TABLE_COLUMN_DEFINITIONS:
477 for col_name, data_type, description in HUBSPOT_TABLE_COLUMN_DEFINITIONS[table_name]:
478 base_columns.append(
479 {
480 "column_name": col_name,
481 "data_type": data_type,
482 "is_nullable": True,
483 "ordinal_position": ordinal_position,
484 "description": description,
485 "original_name": col_name,
486 }
487 )
488 ordinal_position += 1
490 return base_columns
492 def _get_default_meta_columns(self, table_name: str) -> List[Dict[str, Any]]:
493 """Get default column metadata for data catalog when data is unavailable."""
494 base_columns = [
495 {
496 "TABLE_NAME": table_name,
497 "COLUMN_NAME": "id",
498 "DATA_TYPE": "VARCHAR",
499 "COLUMN_DESCRIPTION": "Unique identifier (Primary Key)",
500 "IS_NULLABLE": False,
501 "COLUMN_DEFAULT": None,
502 }
503 ]
505 if table_name in HUBSPOT_TABLE_COLUMN_DEFINITIONS:
506 for col_name, data_type, description in HUBSPOT_TABLE_COLUMN_DEFINITIONS[table_name]:
507 base_columns.append(
508 {
509 "TABLE_NAME": table_name,
510 "COLUMN_NAME": col_name,
511 "DATA_TYPE": data_type,
512 "COLUMN_DESCRIPTION": description,
513 "IS_NULLABLE": True,
514 "COLUMN_DEFAULT": None,
515 }
516 )
518 return base_columns
520 def _get_table_description(self, table_name: str) -> str:
521 """Get description for a table."""
522 descriptions = {
523 "companies": "HubSpot companies data including name, industry, location and other company properties",
524 "contacts": "HubSpot contacts data including email, name, phone and other contact properties",
525 "deals": "HubSpot deals data including deal name, amount, stage and other deal properties",
526 "tickets": "HubSpot tickets data including subject, status, priority and pipeline information",
527 "tasks": "HubSpot tasks data including subject, status, priority and due dates",
528 "calls": "HubSpot call logs including direction, duration, outcome and notes",
529 "emails": "HubSpot email logs including subject, direction, status and content",
530 "meetings": "HubSpot meeting logs including title, location, outcome and timing",
531 "notes": "HubSpot notes for timeline entries on records",
532 }
533 return descriptions.get(table_name, f"HubSpot {table_name} data")
535 def _estimate_table_rows(self, table_name: str) -> Optional[int]:
536 """Get actual count of rows in a table using HubSpot Search API."""
537 try:
538 if table_name in ["companies", "contacts", "deals", "tickets"]:
539 result = getattr(self.connection.crm, table_name).search_api.do_search(
540 public_object_search_request={"limit": 1}
541 )
542 else:
543 result = self.connection.crm.objects.search_api.do_search(
544 table_name, public_object_search_request={"limit": 1}
545 )
546 return result.total if hasattr(result, "total") else None
547 except Exception as e:
548 logger.warning(f"Could not get row count for {table_name} using search API: {str(e)}")
549 return None
551 def _get_objects_all(
552 self,
553 object_type: str,
554 limit: Optional[int] = None,
555 properties: Optional[List[str]] = None,
556 **kwargs: Any,
557 ) -> List[Any]:
558 """Fetch objects with paging to honor custom limits for crm.objects."""
559 results: List[Any] = []
560 after = None
561 page_max_size = 100
563 if limit is None and "limit" in kwargs:
564 limit = kwargs.pop("limit")
565 if properties is None and "properties" in kwargs:
566 properties = kwargs.pop("properties")
568 while True:
569 if limit is not None:
570 remaining = limit - len(results)
571 if remaining <= 0:
572 break
573 page_size = min(page_max_size, remaining)
574 else:
575 page_size = page_max_size
577 page = self.connection.crm.objects.basic_api.get_page(
578 object_type, after=after, limit=page_size, properties=properties, **kwargs
579 )
580 results.extend(page.results)
582 if page.paging is None:
583 break
584 after = page.paging.next.after
586 return results
588 def _calculate_column_statistics(self, column_name: str, values: List[Any]) -> Dict[str, Any]:
589 """Calculate comprehensive statistics for a column."""
590 total_count = len(values)
591 non_null_values = [v for v in values if v is not None]
592 null_count = total_count - len(non_null_values)
594 stats = {
595 "null_count": null_count,
596 "distinct_count": len(set(str(v) for v in non_null_values)) if non_null_values else 0,
597 "min_value": None,
598 "max_value": None,
599 "average_value": None,
600 }
602 if non_null_values:
603 try:
604 s = pd.Series(non_null_values)
605 if pd_types.is_numeric_dtype(s):
606 avg = s.mean()
607 stats["average_value"] = round(avg, 2)
608 except (ValueError, TypeError):
609 pass
611 return stats
613 def _infer_data_type_from_samples(self, values: List[Any]) -> str:
614 """Infer data type from multiple sample values for better accuracy."""
615 non_null_values = [v for v in values if v is not None]
617 if not non_null_values:
618 return "VARCHAR"
620 type_counts = {}
621 for value in non_null_values[:100]:
622 inferred_type = self._infer_data_type(value)
623 type_counts[inferred_type] = type_counts.get(inferred_type, 0) + 1
625 if type_counts:
626 return max(type_counts.items(), key=lambda x: x[1])[0]
628 return "VARCHAR"
630 def _infer_data_type(self, value: Any) -> str:
631 """Infer SQL data type from Python value."""
632 if value is None:
633 return "VARCHAR"
634 elif isinstance(value, bool):
635 return "BOOLEAN"
636 elif isinstance(value, int):
637 return "INTEGER"
638 elif isinstance(value, float):
639 return "DECIMAL"
640 elif isinstance(value, str):
641 if "T" in value and ("Z" in value or "+" in value):
642 return "TIMESTAMP"
643 return "VARCHAR"
644 else:
645 return "VARCHAR"