Coverage for mindsdb / integrations / handlers / frappe_handler / frappe_handler.py: 0%
120 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import json
2import pandas as pd
3import datetime as dt
4from typing import Dict
6from mindsdb.integrations.handlers.frappe_handler.frappe_tables import FrappeDocumentsTable
7from mindsdb.integrations.handlers.frappe_handler.frappe_client import FrappeClient
8from mindsdb.integrations.libs.api_handler import APIHandler
9from mindsdb.integrations.libs.response import (
10 HandlerStatusResponse as StatusResponse,
11 HandlerResponse as Response,
12)
13from mindsdb.utilities import log
14from mindsdb_sql_parser import parse_sql
16logger = log.getLogger(__name__)
19class FrappeHandler(APIHandler):
20 """A class for handling connections and interactions with the Frappe API.
22 Attributes:
23 client (FrappeClient): The `FrappeClient` object for interacting with the Frappe API.
24 is_connected (bool): Whether or not the API client is connected to Frappe.
25 domain (str): Frappe domain to send API requests to.
26 access_token (str): OAuth token to use for authentication.
27 """
29 def __init__(self, name: str = None, **kwargs):
30 """Registers all API tables and prepares the handler for an API connection.
32 Args:
33 name: (str): The handler name to use
34 """
35 super().__init__(name)
36 self.client = None
37 self.is_connected = False
39 args = kwargs.get('connection_data', {})
40 if 'access_token' not in args:
41 raise ValueError('"access_token" parameter required for authentication')
42 if 'domain' not in args:
43 raise ValueError('"domain" parameter required to connect to your Frappe instance')
44 self.access_token = args['access_token']
45 self.domain = args['domain']
47 document_data = FrappeDocumentsTable(self)
48 self._register_table('documents', document_data)
49 self.connection_data = args
51 def back_office_config(self):
52 tools = {
53 'register_sales_invoice': 'have to be used by assistant to register a sales invoice. Input is JSON object serialized as a string. Due date have to be passed in format: "yyyy-mm-dd".',
54 'check_company_exists': 'useful to check the company is exist. Input is company',
55 'check_expense_type': 'useful to check the expense_type is exist. Input is expense_type',
56 'check_customer': 'useful to check the customer is exist. Input is customer',
57 'check_item_code': 'have to be used to check the item code. Input is item_code',
58 }
59 return {
60 'tools': tools,
61 }
63 def register_sales_invoice(self, data):
64 """
65 input is:
66 {
67 "due_date": "2023-05-31",
68 "customer": "ksim",
69 "items": [
70 {
71 "name": "T-shirt--",
72 "description": "T-shirt",
73 "quantity": 1
74 }
75 ]
76 }
77 """
78 invoice = json.loads(data)
79 date = dt.datetime.strptime(invoice['due_date'], '%Y-%m-%d')
80 if date <= dt.datetime.today():
81 return 'Error: due_date have to be in the future'
83 for item in invoice['items']:
84 # rename column
85 item['qty'] = item['quantity']
86 del item['quantity']
88 # add required fields
89 item['uom'] = "Nos"
90 item['conversion_factor'] = 1
92 income_account = self.connection_data.get('income_account', "Sales Income - C8")
93 item['income_account'] = income_account
95 try:
96 self.connect()
97 self.client.post_document('Sales Invoice', invoice)
98 except Exception as e:
99 return f"Error: {e}"
100 return "Success"
102 def check_item_code(self, item_code):
103 self.connect()
104 result = self.client.get_documents('Item', filters=[['item_code', '=', item_code]])
105 if len(result) == 1:
106 return True
107 return "Item doesn't exist: please use different name"
109 def check_company_exists(self, name):
110 self.connect()
111 result = self.client.get_documents('Company', filters=[['name', '=', name]])
112 if len(result) == 1:
113 return True
114 return "Company doesn't exist: please use different name"
116 def check_expense_type(self, name):
117 self.connect()
118 result = self.client.get_documents('Expense Claim Type', filters=[['name', '=', name]])
119 if len(result) == 1:
120 return True
121 return "Expense Claim Type doesn't exist: please use different name"
123 def check_customer(self, name):
124 self.connect()
125 result = self.client.get_documents('Customer', filters=[['name', '=', name]])
126 if len(result) == 1:
127 return True
128 return "Customer doesn't exist"
130 def connect(self) -> FrappeClient:
131 """Creates a new API client if needed and sets it as the client to use for requests.
133 Returns newly created Frappe API client, or current client if already set.
134 """
135 if self.is_connected is True and self.client:
136 return self.client
138 if self.domain and self.access_token:
139 self.client = FrappeClient(self.domain, self.access_token)
141 self.is_connected = True
142 return self.client
144 def check_connection(self) -> StatusResponse:
145 """Checks connection to Frappe API by sending a ping request.
147 Returns StatusResponse indicating whether or not the handler is connected.
148 """
150 response = StatusResponse(False)
152 try:
153 client = self.connect()
154 client.ping()
155 response.success = True
157 except Exception as e:
158 logger.error(f'Error connecting to Frappe API: {e}!')
159 response.error_message = e
161 self.is_connected = response.success
162 return response
164 def native_query(self, query: str = None) -> Response:
165 ast = parse_sql(query)
166 return self.query(ast)
168 def _document_to_dataframe_row(self, doctype, document: Dict) -> Dict:
169 return {
170 'doctype': doctype,
171 'data': json.dumps(document)
172 }
174 def _get_document(self, params: Dict = None) -> pd.DataFrame:
175 client = self.connect()
176 doctype = params['doctype']
177 document = client.get_document(doctype, params['name'])
178 return pd.DataFrame.from_records([self._document_to_dataframe_row(doctype, document)])
180 def _get_documents(self, params: Dict = None) -> pd.DataFrame:
181 client = self.connect()
182 doctype = params['doctype']
183 limit = params.get('limit', None)
184 filters = params.get('filters', None)
185 fields = params.get('fields', None)
186 documents = client.get_documents(doctype, limit=limit, fields=fields, filters=filters)
187 return pd.DataFrame.from_records([self._document_to_dataframe_row(doctype, d) for d in documents])
189 def _create_document(self, params: Dict = None) -> pd.DataFrame:
190 client = self.connect()
191 doctype = params['doctype']
192 new_document = client.post_document(doctype, json.loads(params['data']))
193 return pd.DataFrame.from_records([self._document_to_dataframe_row(doctype, new_document)])
195 def call_frappe_api(self, method_name: str = None, params: Dict = None) -> pd.DataFrame:
196 """Calls the Frappe API method with the given params.
198 Returns results as a pandas DataFrame.
200 Args:
201 method_name (str): Method name to call (e.g. get_document)
202 params (Dict): Params to pass to the API call
203 """
204 if method_name == 'get_documents':
205 return self._get_documents(params)
206 if method_name == 'get_document':
207 return self._get_document(params)
208 if method_name == 'create_document':
209 return self._create_document(params)
210 raise NotImplementedError('Method name {} not supported by Frappe API Handler'.format(method_name))