Coverage for mindsdb / integrations / handlers / mediawiki_handler / mediawiki_tables.py: 0%

46 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import pandas as pd 

2 

3from typing import List 

4 

5from mindsdb.integrations.libs.api_handler import APITable 

6 

7from mindsdb_sql_parser import ast 

8 

9from mindsdb.integrations.utilities.handlers.query_utilities import SELECTQueryParser, SELECTQueryExecutor 

10 

11from mindsdb.utilities import log 

12 

13logger = log.getLogger(__name__) 

14 

15 

16class PagesTable(APITable): 

17 """The MediaWiki Pages Table implementation""" 

18 

19 def select(self, query: ast.Select) -> pd.DataFrame: 

20 """Pulls MediaWiki pages data. 

21 

22 Parameters 

23 ---------- 

24 query : ast.Select 

25 Given SQL SELECT query 

26 

27 Returns 

28 ------- 

29 pd.DataFrame 

30 Sendinblue Email Campaigns matching the query 

31 

32 Raises 

33 ------ 

34 ValueError 

35 If the query contains an unsupported condition 

36 """ 

37 

38 select_statement_parser = SELECTQueryParser( 

39 query, 

40 'pages', 

41 self.get_columns() 

42 ) 

43 selected_columns, where_conditions, order_by_conditions, result_limit = select_statement_parser.parse_query() 

44 

45 title, page_id = None, None 

46 for condition in where_conditions: 

47 if condition[1] == 'title': 

48 if condition[0] != '=': 

49 raise ValueError(f"Unsupported operator '{condition[0]}' for column '{condition[1]}' in WHERE clause.") 

50 title = condition[2] 

51 elif condition[1] == 'pageid': 

52 if condition[0] != '=': 

53 raise ValueError(f"Unsupported operator '{condition[0]}' for column '{condition[1]}' in WHERE clause.") 

54 page_id = condition[2] 

55 else: 

56 raise ValueError(f"Unsupported column '{condition[1]}' in WHERE clause.") 

57 

58 pages_df = pd.json_normalize(self.get_pages(title=title, page_id=page_id, limit=result_limit)) 

59 

60 select_statement_executor = SELECTQueryExecutor( 

61 pages_df, 

62 selected_columns, 

63 [], 

64 order_by_conditions 

65 ) 

66 pages_df = select_statement_executor.execute_query() 

67 

68 return pages_df 

69 

70 def get_columns(self) -> List[str]: 

71 return ['pageid', 'title', 'original_title', 'content', 'summary', 'url', 'categories'] 

72 

73 def get_pages(self, title: str = None, page_id: int = None, limit: int = 20): 

74 query_parts = [] 

75 

76 query_parts.append(f'intitle:{title}') if title is not None else None 

77 query_parts.append(f'pageid:{page_id}') if page_id is not None else None 

78 

79 search_query = ' | '.join(query_parts) 

80 

81 connection = self.handler.connect() 

82 

83 if search_query: 

84 return [self.convert_page_to_dict(connection.page(result, auto_suggest=False)) for result in connection.search(search_query, results=limit)] 

85 else: 

86 return [self.convert_page_to_dict(connection.page(result, auto_suggest=False)) for result in connection.random(pages=limit)] 

87 

88 def convert_page_to_dict(self, page): 

89 result = {} 

90 attributes = self.get_columns() 

91 

92 for attribute in attributes: 

93 try: 

94 result[attribute] = getattr(page, attribute) 

95 except KeyError: 

96 logger.debug(f"Error accessing '{attribute}' attribute. Skipping...") 

97 

98 return result