Coverage for mindsdb / integrations / handlers / mediawiki_handler / mediawiki_tables.py: 0%
46 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import pandas as pd
3from typing import List
5from mindsdb.integrations.libs.api_handler import APITable
7from mindsdb_sql_parser import ast
9from mindsdb.integrations.utilities.handlers.query_utilities import SELECTQueryParser, SELECTQueryExecutor
11from mindsdb.utilities import log
13logger = log.getLogger(__name__)
16class PagesTable(APITable):
17 """The MediaWiki Pages Table implementation"""
19 def select(self, query: ast.Select) -> pd.DataFrame:
20 """Pulls MediaWiki pages data.
22 Parameters
23 ----------
24 query : ast.Select
25 Given SQL SELECT query
27 Returns
28 -------
29 pd.DataFrame
30 Sendinblue Email Campaigns matching the query
32 Raises
33 ------
34 ValueError
35 If the query contains an unsupported condition
36 """
38 select_statement_parser = SELECTQueryParser(
39 query,
40 'pages',
41 self.get_columns()
42 )
43 selected_columns, where_conditions, order_by_conditions, result_limit = select_statement_parser.parse_query()
45 title, page_id = None, None
46 for condition in where_conditions:
47 if condition[1] == 'title':
48 if condition[0] != '=':
49 raise ValueError(f"Unsupported operator '{condition[0]}' for column '{condition[1]}' in WHERE clause.")
50 title = condition[2]
51 elif condition[1] == 'pageid':
52 if condition[0] != '=':
53 raise ValueError(f"Unsupported operator '{condition[0]}' for column '{condition[1]}' in WHERE clause.")
54 page_id = condition[2]
55 else:
56 raise ValueError(f"Unsupported column '{condition[1]}' in WHERE clause.")
58 pages_df = pd.json_normalize(self.get_pages(title=title, page_id=page_id, limit=result_limit))
60 select_statement_executor = SELECTQueryExecutor(
61 pages_df,
62 selected_columns,
63 [],
64 order_by_conditions
65 )
66 pages_df = select_statement_executor.execute_query()
68 return pages_df
70 def get_columns(self) -> List[str]:
71 return ['pageid', 'title', 'original_title', 'content', 'summary', 'url', 'categories']
73 def get_pages(self, title: str = None, page_id: int = None, limit: int = 20):
74 query_parts = []
76 query_parts.append(f'intitle:{title}') if title is not None else None
77 query_parts.append(f'pageid:{page_id}') if page_id is not None else None
79 search_query = ' | '.join(query_parts)
81 connection = self.handler.connect()
83 if search_query:
84 return [self.convert_page_to_dict(connection.page(result, auto_suggest=False)) for result in connection.search(search_query, results=limit)]
85 else:
86 return [self.convert_page_to_dict(connection.page(result, auto_suggest=False)) for result in connection.random(pages=limit)]
88 def convert_page_to_dict(self, page):
89 result = {}
90 attributes = self.get_columns()
92 for attribute in attributes:
93 try:
94 result[attribute] = getattr(page, attribute)
95 except KeyError:
96 logger.debug(f"Error accessing '{attribute}' attribute. Skipping...")
98 return result