Coverage for mindsdb / integrations / handlers / ms_one_drive_handler / ms_graph_api_one_drive_client.py: 0%

40 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1from typing import Text, List, Dict 

2 

3from requests.exceptions import RequestException 

4 

5from mindsdb.integrations.utilities.handlers.api_utilities.microsoft.ms_graph_api_utilities import MSGraphAPIBaseClient 

6from mindsdb.utilities import log 

7 

8logger = log.getLogger(__name__) 

9 

10 

11class MSGraphAPIOneDriveClient(MSGraphAPIBaseClient): 

12 """ 

13 The Microsoft Graph API client for the Microsoft OneDrive handler. 

14 This client is used for accessing the Microsoft OneDrive specific endpoints of the Microsoft Graph API. 

15 Several common methods for submitting requests, fetching data, etc. are inherited from the base class. 

16 """ 

17 

18 def __init__(self, access_token: Text) -> None: 

19 super().__init__(access_token) 

20 

21 def check_connection(self) -> bool: 

22 """ 

23 Checks the connection to the Microsoft Graph API by fetching the user's profile. 

24 

25 Returns: 

26 bool: True if the connection is successful, False otherwise. 

27 """ 

28 try: 

29 self.fetch_paginated_data("me/drive") 

30 return True 

31 except RequestException as request_error: 

32 logger.error(f"Error checking connection: {request_error}") 

33 return False 

34 

35 def get_all_items(self) -> List[Dict]: 

36 """ 

37 Retrieves all items of the user's OneDrive. 

38 

39 Returns: 

40 List[Dict]: All items of the user's OneDrive. 

41 """ 

42 all_items = [] 

43 for root_item in self.get_root_items(): 

44 if "folder" in root_item: 

45 all_items.extend(self.get_child_items(root_item["id"], root_item["name"])) 

46 

47 else: 

48 # Add the path to the item. 

49 root_item["path"] = root_item["name"] 

50 all_items.append(root_item) 

51 

52 return all_items 

53 

54 def get_root_items(self) -> List[Dict]: 

55 """ 

56 Retrieves the root items of the user's OneDrive. 

57 

58 Returns: 

59 List[Dict]: The root items of the user's OneDrive. 

60 """ 

61 root_items = [] 

62 for items in self.fetch_paginated_data("me/drive/root/children"): 

63 root_items.extend(items) 

64 

65 return root_items 

66 

67 def get_child_items(self, item_id: Text, path: Text) -> List[Dict]: 

68 """ 

69 Recursively retrieves the child items of the specified item. 

70 

71 Args: 

72 item_id (Text): The ID of the item whose child items are to be retrieved. 

73 

74 Returns: 

75 List[Dict]: The child items of the specified item. 

76 """ 

77 child_items = [] 

78 for items in self.fetch_paginated_data(f"me/drive/items/{item_id}/children"): 

79 for item in items: 

80 child_path = f"{path}/{item['name']}" 

81 # If the item is a folder, get its child items. 

82 if "folder" in item: 

83 # Recursively get the child items of the folder. 

84 child_items.extend(self.get_child_items(item["id"], child_path)) 

85 

86 else: 

87 # Add the path to the item. 

88 item["path"] = child_path 

89 child_items.append(item) 

90 

91 return child_items 

92 

93 def get_item_content(self, path: Text) -> bytes: 

94 """ 

95 Retrieves the content of the specified item. 

96 

97 Args: 

98 path (Text): The path of the item whose content is to be retrieved. 

99 

100 Returns: 

101 bytes: The content of the specified item. 

102 """ 

103 return self.fetch_data_content(f"me/drive/root:/{path}:/content")