Coverage for mindsdb / integrations / handlers / redshift_handler / redshift_handler.py: 100%

31 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import os 

2import numpy as np 

3import pandas as pd 

4 

5from mindsdb.utilities import log 

6from mindsdb.integrations.libs.response import ( 

7 HandlerResponse as Response, 

8 RESPONSE_TYPE 

9) 

10from mindsdb.integrations.handlers.postgres_handler.postgres_handler import PostgresHandler 

11 

12logger = log.getLogger(__name__) 

13os.environ["PGCLIENTENCODING"] = "utf-8" 

14 

15 

16class RedshiftHandler(PostgresHandler): 

17 """ 

18 This handler handles connection and execution of the Redshift statements. 

19 """ 

20 

21 name = 'redshift' 

22 

23 def __init__(self, name: str, **kwargs): 

24 """ 

25 Initialize the handler. 

26 Args: 

27 name (str): name of particular handler instance. 

28 **kwargs: arbitrary keyword arguments. 

29 """ 

30 super().__init__(name, **kwargs) 

31 

32 def insert(self, table_name: str, df: pd.DataFrame): 

33 """ 

34 Handles the execution of INSERT statements. 

35 

36 Args: 

37 table_name (str): name of the table to insert the data into. 

38 df (pd.DataFrame): data to be inserted into the table. 

39 """ 

40 need_to_close = not self.is_connected 

41 

42 connection = self.connect() 

43 

44 # Replace NaN values with None 

45 df = df.replace({np.nan: None}) 

46 

47 # Build the query to insert the data 

48 columns = ', '.join([f'"{col}"' if ' ' in col else col for col in df.columns]) 

49 values = ', '.join(['%s' for _ in range(len(df.columns))]) 

50 query = f'INSERT INTO {table_name} ({columns}) VALUES ({values})' 

51 

52 with connection.cursor() as cur: 

53 try: 

54 cur.executemany(query, df.values.tolist()) 

55 response = Response(RESPONSE_TYPE.OK, affected_rows=cur.rowcount) 

56 

57 connection.commit() 

58 except Exception as e: 

59 logger.error(f"Error inserting data into {table_name}, {e}!") 

60 connection.rollback() 

61 response = Response( 

62 RESPONSE_TYPE.ERROR, 

63 error_code=0, 

64 error_message=str(e) 

65 ) 

66 

67 if need_to_close: 

68 self.disconnect() 

69 

70 return response