refactor!: remove `Mpesa Settings`
diff --git a/erpnext/erpnext_integrations/doctype/mpesa_settings/__init__.py b/erpnext/erpnext_integrations/doctype/mpesa_settings/__init__.py
deleted file mode 100644
index e69de29..0000000
--- a/erpnext/erpnext_integrations/doctype/mpesa_settings/__init__.py
+++ /dev/null
diff --git a/erpnext/erpnext_integrations/doctype/mpesa_settings/account_balance.html b/erpnext/erpnext_integrations/doctype/mpesa_settings/account_balance.html
deleted file mode 100644
index b74a718..0000000
--- a/erpnext/erpnext_integrations/doctype/mpesa_settings/account_balance.html
+++ /dev/null
@@ -1,28 +0,0 @@
-
-{% if not jQuery.isEmptyObject(data) %}
-<h5 style="margin-top: 20px;"> {{ __("Balance Details") }} </h5>
-<table class="table table-bordered small">
- <thead>
- <tr>
- <th style="width: 20%">{{ __("Account Type") }}</th>
- <th style="width: 20%" class="text-right">{{ __("Current Balance") }}</th>
- <th style="width: 20%" class="text-right">{{ __("Available Balance") }}</th>
- <th style="width: 20%" class="text-right">{{ __("Reserved Balance") }}</th>
- <th style="width: 20%" class="text-right">{{ __("Uncleared Balance") }}</th>
- </tr>
- </thead>
- <tbody>
- {% for(const [key, value] of Object.entries(data)) { %}
- <tr>
- <td> {%= key %} </td>
- <td class="text-right"> {%= value["current_balance"] %} </td>
- <td class="text-right"> {%= value["available_balance"] %} </td>
- <td class="text-right"> {%= value["reserved_balance"] %} </td>
- <td class="text-right"> {%= value["uncleared_balance"] %} </td>
- </tr>
- {% } %}
- </tbody>
-</table>
-{% else %}
-<p style="margin-top: 30px;"> Account Balance Information Not Available. </p>
-{% endif %}
diff --git a/erpnext/erpnext_integrations/doctype/mpesa_settings/mpesa_connector.py b/erpnext/erpnext_integrations/doctype/mpesa_settings/mpesa_connector.py
deleted file mode 100644
index a577e7f..0000000
--- a/erpnext/erpnext_integrations/doctype/mpesa_settings/mpesa_connector.py
+++ /dev/null
@@ -1,149 +0,0 @@
-import base64
-import datetime
-
-import requests
-from requests.auth import HTTPBasicAuth
-
-
-class MpesaConnector:
- def __init__(
- self,
- env="sandbox",
- app_key=None,
- app_secret=None,
- sandbox_url="https://sandbox.safaricom.co.ke",
- live_url="https://api.safaricom.co.ke",
- ):
- """Setup configuration for Mpesa connector and generate new access token."""
- self.env = env
- self.app_key = app_key
- self.app_secret = app_secret
- if env == "sandbox":
- self.base_url = sandbox_url
- else:
- self.base_url = live_url
- self.authenticate()
-
- def authenticate(self):
- """
- This method is used to fetch the access token required by Mpesa.
-
- Returns:
- access_token (str): This token is to be used with the Bearer header for further API calls to Mpesa.
- """
- authenticate_uri = "/oauth/v1/generate?grant_type=client_credentials"
- authenticate_url = "{0}{1}".format(self.base_url, authenticate_uri)
- r = requests.get(authenticate_url, auth=HTTPBasicAuth(self.app_key, self.app_secret))
- self.authentication_token = r.json()["access_token"]
- return r.json()["access_token"]
-
- def get_balance(
- self,
- initiator=None,
- security_credential=None,
- party_a=None,
- identifier_type=None,
- remarks=None,
- queue_timeout_url=None,
- result_url=None,
- ):
- """
- This method uses Mpesa's Account Balance API to to enquire the balance on a M-Pesa BuyGoods (Till Number).
-
- Args:
- initiator (str): Username used to authenticate the transaction.
- security_credential (str): Generate from developer portal.
- command_id (str): AccountBalance.
- party_a (int): Till number being queried.
- identifier_type (int): Type of organization receiving the transaction. (MSISDN/Till Number/Organization short code)
- remarks (str): Comments that are sent along with the transaction(maximum 100 characters).
- queue_timeout_url (str): The url that handles information of timed out transactions.
- result_url (str): The url that receives results from M-Pesa api call.
-
- Returns:
- OriginatorConverstionID (str): The unique request ID for tracking a transaction.
- ConversationID (str): The unique request ID returned by mpesa for each request made
- ResponseDescription (str): Response Description message
- """
-
- payload = {
- "Initiator": initiator,
- "SecurityCredential": security_credential,
- "CommandID": "AccountBalance",
- "PartyA": party_a,
- "IdentifierType": identifier_type,
- "Remarks": remarks,
- "QueueTimeOutURL": queue_timeout_url,
- "ResultURL": result_url,
- }
- headers = {
- "Authorization": "Bearer {0}".format(self.authentication_token),
- "Content-Type": "application/json",
- }
- saf_url = "{0}{1}".format(self.base_url, "/mpesa/accountbalance/v1/query")
- r = requests.post(saf_url, headers=headers, json=payload)
- return r.json()
-
- def stk_push(
- self,
- business_shortcode=None,
- passcode=None,
- amount=None,
- callback_url=None,
- reference_code=None,
- phone_number=None,
- description=None,
- ):
- """
- This method uses Mpesa's Express API to initiate online payment on behalf of a customer.
-
- Args:
- business_shortcode (int): The short code of the organization.
- passcode (str): Get from developer portal
- amount (int): The amount being transacted
- callback_url (str): A CallBack URL is a valid secure URL that is used to receive notifications from M-Pesa API.
- reference_code(str): Account Reference: This is an Alpha-Numeric parameter that is defined by your system as an Identifier of the transaction for CustomerPayBillOnline transaction type.
- phone_number(int): The Mobile Number to receive the STK Pin Prompt.
- description(str): This is any additional information/comment that can be sent along with the request from your system. MAX 13 characters
-
- Success Response:
- CustomerMessage(str): Messages that customers can understand.
- CheckoutRequestID(str): This is a global unique identifier of the processed checkout transaction request.
- ResponseDescription(str): Describes Success or failure
- MerchantRequestID(str): This is a global unique Identifier for any submitted payment request.
- ResponseCode(int): 0 means success all others are error codes. e.g.404.001.03
-
- Error Reponse:
- requestId(str): This is a unique requestID for the payment request
- errorCode(str): This is a predefined code that indicates the reason for request failure.
- errorMessage(str): This is a predefined code that indicates the reason for request failure.
- """
-
- time = (
- str(datetime.datetime.now()).split(".")[0].replace("-", "").replace(" ", "").replace(":", "")
- )
- password = "{0}{1}{2}".format(str(business_shortcode), str(passcode), time)
- encoded = base64.b64encode(bytes(password, encoding="utf8"))
- payload = {
- "BusinessShortCode": business_shortcode,
- "Password": encoded.decode("utf-8"),
- "Timestamp": time,
- "Amount": amount,
- "PartyA": int(phone_number),
- "PartyB": reference_code,
- "PhoneNumber": int(phone_number),
- "CallBackURL": callback_url,
- "AccountReference": reference_code,
- "TransactionDesc": description,
- "TransactionType": "CustomerPayBillOnline"
- if self.env == "sandbox"
- else "CustomerBuyGoodsOnline",
- }
- headers = {
- "Authorization": "Bearer {0}".format(self.authentication_token),
- "Content-Type": "application/json",
- }
-
- saf_url = "{0}{1}".format(self.base_url, "/mpesa/stkpush/v1/processrequest")
- r = requests.post(saf_url, headers=headers, json=payload)
- return r.json()
diff --git a/erpnext/erpnext_integrations/doctype/mpesa_settings/mpesa_custom_fields.py b/erpnext/erpnext_integrations/doctype/mpesa_settings/mpesa_custom_fields.py
deleted file mode 100644
index c92edc5..0000000
--- a/erpnext/erpnext_integrations/doctype/mpesa_settings/mpesa_custom_fields.py
+++ /dev/null
@@ -1,56 +0,0 @@
-import frappe
-from frappe.custom.doctype.custom_field.custom_field import create_custom_fields
-
-
-def create_custom_pos_fields():
- """Create custom fields corresponding to POS Settings and POS Invoice."""
- pos_field = {
- "POS Invoice": [
- {
- "fieldname": "request_for_payment",
- "label": "Request for Payment",
- "fieldtype": "Button",
- "hidden": 1,
- "insert_after": "contact_email",
- },
- {
- "fieldname": "mpesa_receipt_number",
- "label": "Mpesa Receipt Number",
- "fieldtype": "Data",
- "read_only": 1,
- "insert_after": "company",
- },
- ]
- }
- if not frappe.get_meta("POS Invoice").has_field("request_for_payment"):
- create_custom_fields(pos_field)
-
- record_dict = [
- {
- "doctype": "POS Field",
- "fieldname": "contact_mobile",
- "label": "Mobile No",
- "fieldtype": "Data",
- "options": "Phone",
- "parenttype": "POS Settings",
- "parent": "POS Settings",
- "parentfield": "invoice_fields",
- },
- {
- "doctype": "POS Field",
- "fieldname": "request_for_payment",
- "label": "Request for Payment",
- "fieldtype": "Button",
- "parenttype": "POS Settings",
- "parent": "POS Settings",
- "parentfield": "invoice_fields",
- },
- ]
- create_pos_settings(record_dict)
-
-
-def create_pos_settings(record_dict):
- for record in record_dict:
- if frappe.db.exists("POS Field", {"fieldname": record.get("fieldname")}):
- continue
- frappe.get_doc(record).insert()
diff --git a/erpnext/erpnext_integrations/doctype/mpesa_settings/mpesa_settings.js b/erpnext/erpnext_integrations/doctype/mpesa_settings/mpesa_settings.js
deleted file mode 100644
index 447d720..0000000
--- a/erpnext/erpnext_integrations/doctype/mpesa_settings/mpesa_settings.js
+++ /dev/null
@@ -1,39 +0,0 @@
-// Copyright (c) 2020, Frappe Technologies Pvt. Ltd. and contributors
-// For license information, please see license.txt
-
-frappe.ui.form.on('Mpesa Settings', {
- onload_post_render: function(frm) {
- frm.events.setup_account_balance_html(frm);
- },
-
- refresh: function(frm) {
- erpnext.utils.check_payments_app();
-
- frappe.realtime.on("refresh_mpesa_dashboard", function(){
- frm.reload_doc();
- frm.events.setup_account_balance_html(frm);
- });
- },
-
- get_account_balance: function(frm) {
- if (!frm.doc.initiator_name && !frm.doc.security_credential) {
- frappe.throw(__("Please set the initiator name and the security credential"));
- }
- frappe.call({
- method: "get_account_balance_info",
- doc: frm.doc
- });
- },
-
- setup_account_balance_html: function(frm) {
- if (!frm.doc.account_balance) return;
- $("div").remove(".form-dashboard-section.custom");
- frm.dashboard.add_section(
- frappe.render_template('account_balance', {
- data: JSON.parse(frm.doc.account_balance)
- })
- );
- frm.dashboard.show();
- }
-
-});
diff --git a/erpnext/erpnext_integrations/doctype/mpesa_settings/mpesa_settings.json b/erpnext/erpnext_integrations/doctype/mpesa_settings/mpesa_settings.json
deleted file mode 100644
index 8f3b427..0000000
--- a/erpnext/erpnext_integrations/doctype/mpesa_settings/mpesa_settings.json
+++ /dev/null
@@ -1,152 +0,0 @@
-{
- "actions": [],
- "autoname": "field:payment_gateway_name",
- "creation": "2020-09-10 13:21:27.398088",
- "doctype": "DocType",
- "editable_grid": 1,
- "engine": "InnoDB",
- "field_order": [
- "payment_gateway_name",
- "consumer_key",
- "consumer_secret",
- "initiator_name",
- "till_number",
- "transaction_limit",
- "sandbox",
- "column_break_4",
- "business_shortcode",
- "online_passkey",
- "security_credential",
- "get_account_balance",
- "account_balance"
- ],
- "fields": [
- {
- "fieldname": "payment_gateway_name",
- "fieldtype": "Data",
- "label": "Payment Gateway Name",
- "reqd": 1,
- "unique": 1
- },
- {
- "fieldname": "consumer_key",
- "fieldtype": "Data",
- "in_list_view": 1,
- "label": "Consumer Key",
- "reqd": 1
- },
- {
- "fieldname": "consumer_secret",
- "fieldtype": "Password",
- "in_list_view": 1,
- "label": "Consumer Secret",
- "reqd": 1
- },
- {
- "fieldname": "till_number",
- "fieldtype": "Data",
- "in_list_view": 1,
- "label": "Till Number",
- "reqd": 1
- },
- {
- "default": "0",
- "fieldname": "sandbox",
- "fieldtype": "Check",
- "label": "Sandbox"
- },
- {
- "fieldname": "column_break_4",
- "fieldtype": "Column Break"
- },
- {
- "fieldname": "online_passkey",
- "fieldtype": "Password",
- "label": " Online PassKey",
- "reqd": 1
- },
- {
- "fieldname": "initiator_name",
- "fieldtype": "Data",
- "label": "Initiator Name"
- },
- {
- "fieldname": "security_credential",
- "fieldtype": "Small Text",
- "label": "Security Credential"
- },
- {
- "fieldname": "account_balance",
- "fieldtype": "Long Text",
- "hidden": 1,
- "label": "Account Balance",
- "read_only": 1
- },
- {
- "fieldname": "get_account_balance",
- "fieldtype": "Button",
- "label": "Get Account Balance"
- },
- {
- "depends_on": "eval:(doc.sandbox==0)",
- "fieldname": "business_shortcode",
- "fieldtype": "Data",
- "label": "Business Shortcode",
- "mandatory_depends_on": "eval:(doc.sandbox==0)"
- },
- {
- "default": "150000",
- "fieldname": "transaction_limit",
- "fieldtype": "Float",
- "label": "Transaction Limit",
- "non_negative": 1
- }
- ],
- "links": [],
- "modified": "2021-03-02 17:35:14.084342",
- "modified_by": "Administrator",
- "module": "ERPNext Integrations",
- "name": "Mpesa Settings",
- "owner": "Administrator",
- "permissions": [
- {
- "create": 1,
- "delete": 1,
- "email": 1,
- "export": 1,
- "print": 1,
- "read": 1,
- "report": 1,
- "role": "System Manager",
- "share": 1,
- "write": 1
- },
- {
- "create": 1,
- "delete": 1,
- "email": 1,
- "export": 1,
- "print": 1,
- "read": 1,
- "report": 1,
- "role": "Accounts Manager",
- "share": 1,
- "write": 1
- },
- {
- "create": 1,
- "delete": 1,
- "email": 1,
- "export": 1,
- "print": 1,
- "read": 1,
- "report": 1,
- "role": "Accounts User",
- "share": 1,
- "write": 1
- }
- ],
- "sort_field": "modified",
- "sort_order": "DESC",
- "track_changes": 1
-}
\ No newline at end of file
diff --git a/erpnext/erpnext_integrations/doctype/mpesa_settings/mpesa_settings.py b/erpnext/erpnext_integrations/doctype/mpesa_settings/mpesa_settings.py
deleted file mode 100644
index a298e11..0000000
--- a/erpnext/erpnext_integrations/doctype/mpesa_settings/mpesa_settings.py
+++ /dev/null
@@ -1,354 +0,0 @@
-# Copyright (c) 2020, Frappe Technologies and contributors
-# For license information, please see license.txt
-
-
-from json import dumps, loads
-
-import frappe
-from frappe import _
-from frappe.integrations.utils import create_request_log
-from frappe.model.document import Document
-from frappe.utils import call_hook_method, fmt_money, get_request_site_address
-
-from erpnext.erpnext_integrations.doctype.mpesa_settings.mpesa_connector import MpesaConnector
-from erpnext.erpnext_integrations.doctype.mpesa_settings.mpesa_custom_fields import (
- create_custom_pos_fields,
-)
-from erpnext.erpnext_integrations.utils import create_mode_of_payment
-from erpnext.utilities import payment_app_import_guard
-
-
-class MpesaSettings(Document):
- supported_currencies = ["KES"]
-
- def validate_transaction_currency(self, currency):
- if currency not in self.supported_currencies:
- frappe.throw(
- _(
- "Please select another payment method. Mpesa does not support transactions in currency '{0}'"
- ).format(currency)
- )
-
- def on_update(self):
- with payment_app_import_guard():
- from payments.utils import create_payment_gateway
-
- create_custom_pos_fields()
- create_payment_gateway(
- "Mpesa-" + self.payment_gateway_name,
- settings="Mpesa Settings",
- controller=self.payment_gateway_name,
- )
- call_hook_method(
- "payment_gateway_enabled", gateway="Mpesa-" + self.payment_gateway_name, payment_channel="Phone"
- )
-
- # required to fetch the bank account details from the payment gateway account
- frappe.db.commit()
- create_mode_of_payment("Mpesa-" + self.payment_gateway_name, payment_type="Phone")
-
- def request_for_payment(self, **kwargs):
- args = frappe._dict(kwargs)
- request_amounts = self.split_request_amount_according_to_transaction_limit(args)
-
- for i, amount in enumerate(request_amounts):
- args.request_amount = amount
- if frappe.flags.in_test:
- from erpnext.erpnext_integrations.doctype.mpesa_settings.test_mpesa_settings import (
- get_payment_request_response_payload,
- )
-
- response = frappe._dict(get_payment_request_response_payload(amount))
- else:
- response = frappe._dict(generate_stk_push(**args))
-
- self.handle_api_response("CheckoutRequestID", args, response)
-
- def split_request_amount_according_to_transaction_limit(self, args):
- request_amount = args.request_amount
- if request_amount > self.transaction_limit:
- # make multiple requests
- request_amounts = []
- requests_to_be_made = frappe.utils.ceil(
- request_amount / self.transaction_limit
- ) # 480/150 = ceil(3.2) = 4
- for i in range(requests_to_be_made):
- amount = self.transaction_limit
- if i == requests_to_be_made - 1:
- amount = request_amount - (
- self.transaction_limit * i
- ) # for 4th request, 480 - (150 * 3) = 30
- request_amounts.append(amount)
- else:
- request_amounts = [request_amount]
-
- return request_amounts
-
- @frappe.whitelist()
- def get_account_balance_info(self):
- payload = dict(
- reference_doctype="Mpesa Settings", reference_docname=self.name, doc_details=vars(self)
- )
-
- if frappe.flags.in_test:
- from erpnext.erpnext_integrations.doctype.mpesa_settings.test_mpesa_settings import (
- get_test_account_balance_response,
- )
-
- response = frappe._dict(get_test_account_balance_response())
- else:
- response = frappe._dict(get_account_balance(payload))
-
- self.handle_api_response("ConversationID", payload, response)
-
- def handle_api_response(self, global_id, request_dict, response):
- """Response received from API calls returns a global identifier for each transaction, this code is returned during the callback."""
- # check error response
- if getattr(response, "requestId"):
- req_name = getattr(response, "requestId")
- error = response
- else:
- # global checkout id used as request name
- req_name = getattr(response, global_id)
- error = None
-
- if not frappe.db.exists("Integration Request", req_name):
- create_request_log(request_dict, "Host", "Mpesa", req_name, error)
-
- if error:
- frappe.throw(_(getattr(response, "errorMessage")), title=_("Transaction Error"))
-
-
-def generate_stk_push(**kwargs):
- """Generate stk push by making a API call to the stk push API."""
- args = frappe._dict(kwargs)
- try:
- callback_url = (
- get_request_site_address(True)
- + "/api/method/erpnext.erpnext_integrations.doctype.mpesa_settings.mpesa_settings.verify_transaction"
- )
-
- mpesa_settings = frappe.get_doc("Mpesa Settings", args.payment_gateway[6:])
- env = "production" if not mpesa_settings.sandbox else "sandbox"
- # for sandbox, business shortcode is same as till number
- business_shortcode = (
- mpesa_settings.business_shortcode if env == "production" else mpesa_settings.till_number
- )
-
- connector = MpesaConnector(
- env=env,
- app_key=mpesa_settings.consumer_key,
- app_secret=mpesa_settings.get_password("consumer_secret"),
- )
-
- mobile_number = sanitize_mobile_number(args.sender)
-
- response = connector.stk_push(
- business_shortcode=business_shortcode,
- amount=args.request_amount,
- passcode=mpesa_settings.get_password("online_passkey"),
- callback_url=callback_url,
- reference_code=mpesa_settings.till_number,
- phone_number=mobile_number,
- description="POS Payment",
- )
-
- return response
-
- except Exception:
- frappe.log_error("Mpesa Express Transaction Error")
- frappe.throw(
- _("Issue detected with Mpesa configuration, check the error logs for more details"),
- title=_("Mpesa Express Error"),
- )
-
-
-def sanitize_mobile_number(number):
- """Add country code and strip leading zeroes from the phone number."""
- return "254" + str(number).lstrip("0")
-
-
-@frappe.whitelist(allow_guest=True)
-def verify_transaction(**kwargs):
- """Verify the transaction result received via callback from stk."""
- transaction_response = frappe._dict(kwargs["Body"]["stkCallback"])
-
- checkout_id = getattr(transaction_response, "CheckoutRequestID", "")
- if not isinstance(checkout_id, str):
- frappe.throw(_("Invalid Checkout Request ID"))
-
- integration_request = frappe.get_doc("Integration Request", checkout_id)
- transaction_data = frappe._dict(loads(integration_request.data))
- total_paid = 0 # for multiple integration request made against a pos invoice
- success = False # for reporting successfull callback to point of sale ui
-
- if transaction_response["ResultCode"] == 0:
- if integration_request.reference_doctype and integration_request.reference_docname:
- try:
- item_response = transaction_response["CallbackMetadata"]["Item"]
- amount = fetch_param_value(item_response, "Amount", "Name")
- mpesa_receipt = fetch_param_value(item_response, "MpesaReceiptNumber", "Name")
- pr = frappe.get_doc(
- integration_request.reference_doctype, integration_request.reference_docname
- )
-
- mpesa_receipts, completed_payments = get_completed_integration_requests_info(
- integration_request.reference_doctype, integration_request.reference_docname, checkout_id
- )
-
- total_paid = amount + sum(completed_payments)
- mpesa_receipts = ", ".join(mpesa_receipts + [mpesa_receipt])
-
- if total_paid >= pr.grand_total:
- pr.run_method("on_payment_authorized", "Completed")
- success = True
-
- frappe.db.set_value("POS Invoice", pr.reference_name, "mpesa_receipt_number", mpesa_receipts)
- integration_request.handle_success(transaction_response)
- except Exception:
- integration_request.handle_failure(transaction_response)
- frappe.log_error("Mpesa: Failed to verify transaction")
-
- else:
- integration_request.handle_failure(transaction_response)
-
- frappe.publish_realtime(
- event="process_phone_payment",
- doctype="POS Invoice",
- docname=transaction_data.payment_reference,
- user=integration_request.owner,
- message={
- "amount": total_paid,
- "success": success,
- "failure_message": transaction_response["ResultDesc"]
- if transaction_response["ResultCode"] != 0
- else "",
- },
- )
-
-
-def get_completed_integration_requests_info(reference_doctype, reference_docname, checkout_id):
- output_of_other_completed_requests = frappe.get_all(
- "Integration Request",
- filters={
- "name": ["!=", checkout_id],
- "reference_doctype": reference_doctype,
- "reference_docname": reference_docname,
- "status": "Completed",
- },
- pluck="output",
- )
-
- mpesa_receipts, completed_payments = [], []
-
- for out in output_of_other_completed_requests:
- out = frappe._dict(loads(out))
- item_response = out["CallbackMetadata"]["Item"]
- completed_amount = fetch_param_value(item_response, "Amount", "Name")
- completed_mpesa_receipt = fetch_param_value(item_response, "MpesaReceiptNumber", "Name")
- completed_payments.append(completed_amount)
- mpesa_receipts.append(completed_mpesa_receipt)
-
- return mpesa_receipts, completed_payments
-
-
-def get_account_balance(request_payload):
- """Call account balance API to send the request to the Mpesa Servers."""
- try:
- mpesa_settings = frappe.get_doc("Mpesa Settings", request_payload.get("reference_docname"))
- env = "production" if not mpesa_settings.sandbox else "sandbox"
- connector = MpesaConnector(
- env=env,
- app_key=mpesa_settings.consumer_key,
- app_secret=mpesa_settings.get_password("consumer_secret"),
- )
-
- callback_url = (
- get_request_site_address(True)
- + "/api/method/erpnext.erpnext_integrations.doctype.mpesa_settings.mpesa_settings.process_balance_info"
- )
-
- response = connector.get_balance(
- mpesa_settings.initiator_name,
- mpesa_settings.security_credential,
- mpesa_settings.till_number,
- 4,
- mpesa_settings.name,
- callback_url,
- callback_url,
- )
- return response
- except Exception:
- frappe.log_error("Mpesa: Failed to get account balance")
- frappe.throw(_("Please check your configuration and try again"), title=_("Error"))
-
-
-@frappe.whitelist(allow_guest=True)
-def process_balance_info(**kwargs):
- """Process and store account balance information received via callback from the account balance API call."""
- account_balance_response = frappe._dict(kwargs["Result"])
-
- conversation_id = getattr(account_balance_response, "ConversationID", "")
- if not isinstance(conversation_id, str):
- frappe.throw(_("Invalid Conversation ID"))
-
- request = frappe.get_doc("Integration Request", conversation_id)
-
- if request.status == "Completed":
- return
-
- transaction_data = frappe._dict(loads(request.data))
-
- if account_balance_response["ResultCode"] == 0:
- try:
- result_params = account_balance_response["ResultParameters"]["ResultParameter"]
-
- balance_info = fetch_param_value(result_params, "AccountBalance", "Key")
- balance_info = format_string_to_json(balance_info)
-
- ref_doc = frappe.get_doc(transaction_data.reference_doctype, transaction_data.reference_docname)
- ref_doc.db_set("account_balance", balance_info)
-
- request.handle_success(account_balance_response)
- frappe.publish_realtime(
- "refresh_mpesa_dashboard",
- doctype="Mpesa Settings",
- docname=transaction_data.reference_docname,
- user=transaction_data.owner,
- )
- except Exception:
- request.handle_failure(account_balance_response)
- frappe.log_error(
- title="Mpesa Account Balance Processing Error", message=account_balance_response
- )
- else:
- request.handle_failure(account_balance_response)
-
-
-def format_string_to_json(balance_info):
- """
- Format string to json.
-
- e.g: '''Working Account|KES|481000.00|481000.00|0.00|0.00'''
- => {'Working Account': {'current_balance': '481000.00',
- 'available_balance': '481000.00',
- 'reserved_balance': '0.00',
- 'uncleared_balance': '0.00'}}
- """
- balance_dict = frappe._dict()
- for account_info in balance_info.split("&"):
- account_info = account_info.split("|")
- balance_dict[account_info[0]] = dict(
- current_balance=fmt_money(account_info[2], currency="KES"),
- available_balance=fmt_money(account_info[3], currency="KES"),
- reserved_balance=fmt_money(account_info[4], currency="KES"),
- uncleared_balance=fmt_money(account_info[5], currency="KES"),
- )
- return dumps(balance_dict)
-
-
-def fetch_param_value(response, key, key_field):
- """Fetch the specified key from list of dictionary. Key is identified via the key field."""
- for param in response:
- if param[key_field] == key:
- return param["Value"]
diff --git a/erpnext/erpnext_integrations/doctype/mpesa_settings/test_mpesa_settings.py b/erpnext/erpnext_integrations/doctype/mpesa_settings/test_mpesa_settings.py
deleted file mode 100644
index b526624..0000000
--- a/erpnext/erpnext_integrations/doctype/mpesa_settings/test_mpesa_settings.py
+++ /dev/null
@@ -1,361 +0,0 @@
-# Copyright (c) 2020, Frappe Technologies Pvt. Ltd. and Contributors
-# See license.txt
-
-import unittest
-from json import dumps
-
-import frappe
-
-from erpnext.accounts.doctype.pos_invoice.test_pos_invoice import create_pos_invoice
-from erpnext.erpnext_integrations.doctype.mpesa_settings.mpesa_settings import (
- process_balance_info,
- verify_transaction,
-)
-from erpnext.erpnext_integrations.utils import create_mode_of_payment
-
-
-class TestMpesaSettings(unittest.TestCase):
- def setUp(self):
- # create payment gateway in setup
- create_mpesa_settings(payment_gateway_name="_Test")
- create_mpesa_settings(payment_gateway_name="_Account Balance")
- create_mpesa_settings(payment_gateway_name="Payment")
-
- def tearDown(self):
- frappe.db.sql("delete from `tabMpesa Settings`")
- frappe.db.sql("delete from `tabIntegration Request` where integration_request_service = 'Mpesa'")
-
- def test_creation_of_payment_gateway(self):
- mode_of_payment = create_mode_of_payment("Mpesa-_Test", payment_type="Phone")
- self.assertTrue(frappe.db.exists("Payment Gateway Account", {"payment_gateway": "Mpesa-_Test"}))
- self.assertTrue(mode_of_payment.name)
- self.assertEqual(mode_of_payment.type, "Phone")
-
- def test_processing_of_account_balance(self):
- mpesa_doc = create_mpesa_settings(payment_gateway_name="_Account Balance")
- mpesa_doc.get_account_balance_info()
-
- callback_response = get_account_balance_callback_payload()
- process_balance_info(**callback_response)
- integration_request = frappe.get_doc("Integration Request", "AG_20200927_00007cdb1f9fb6494315")
-
- # test integration request creation and successful update of the status on receiving callback response
- self.assertTrue(integration_request)
- self.assertEqual(integration_request.status, "Completed")
-
- # test formatting of account balance received as string to json with appropriate currency symbol
- mpesa_doc.reload()
- self.assertEqual(
- mpesa_doc.account_balance,
- dumps(
- {
- "Working Account": {
- "current_balance": "Sh 481,000.00",
- "available_balance": "Sh 481,000.00",
- "reserved_balance": "Sh 0.00",
- "uncleared_balance": "Sh 0.00",
- }
- }
- ),
- )
-
- integration_request.delete()
-
- def test_processing_of_callback_payload(self):
- mpesa_account = frappe.db.get_value(
- "Payment Gateway Account", {"payment_gateway": "Mpesa-Payment"}, "payment_account"
- )
- frappe.db.set_value("Account", mpesa_account, "account_currency", "KES")
- frappe.db.set_value("Customer", "_Test Customer", "default_currency", "KES")
-
- pos_invoice = create_pos_invoice(do_not_submit=1)
- pos_invoice.append(
- "payments", {"mode_of_payment": "Mpesa-Payment", "account": mpesa_account, "amount": 500}
- )
- pos_invoice.contact_mobile = "093456543894"
- pos_invoice.currency = "KES"
- pos_invoice.save()
-
- pr = pos_invoice.create_payment_request()
- # test payment request creation
- self.assertEqual(pr.payment_gateway, "Mpesa-Payment")
-
- # submitting payment request creates integration requests with random id
- integration_req_ids = frappe.get_all(
- "Integration Request",
- filters={
- "reference_doctype": pr.doctype,
- "reference_docname": pr.name,
- },
- pluck="name",
- )
-
- callback_response = get_payment_callback_payload(
- Amount=500, CheckoutRequestID=integration_req_ids[0]
- )
- verify_transaction(**callback_response)
- # test creation of integration request
- integration_request = frappe.get_doc("Integration Request", integration_req_ids[0])
-
- # test integration request creation and successful update of the status on receiving callback response
- self.assertTrue(integration_request)
- self.assertEqual(integration_request.status, "Completed")
-
- pos_invoice.reload()
- integration_request.reload()
- self.assertEqual(pos_invoice.mpesa_receipt_number, "LGR7OWQX0R")
- self.assertEqual(integration_request.status, "Completed")
-
- frappe.db.set_value("Customer", "_Test Customer", "default_currency", "")
- integration_request.delete()
- pr.reload()
- pr.cancel()
- pr.delete()
- pos_invoice.delete()
-
- def test_processing_of_multiple_callback_payload(self):
- mpesa_account = frappe.db.get_value(
- "Payment Gateway Account", {"payment_gateway": "Mpesa-Payment"}, "payment_account"
- )
- frappe.db.set_value("Account", mpesa_account, "account_currency", "KES")
- frappe.db.set_value("Mpesa Settings", "Payment", "transaction_limit", "500")
- frappe.db.set_value("Customer", "_Test Customer", "default_currency", "KES")
-
- pos_invoice = create_pos_invoice(do_not_submit=1)
- pos_invoice.append(
- "payments", {"mode_of_payment": "Mpesa-Payment", "account": mpesa_account, "amount": 1000}
- )
- pos_invoice.contact_mobile = "093456543894"
- pos_invoice.currency = "KES"
- pos_invoice.save()
-
- pr = pos_invoice.create_payment_request()
- # test payment request creation
- self.assertEqual(pr.payment_gateway, "Mpesa-Payment")
-
- # submitting payment request creates integration requests with random id
- integration_req_ids = frappe.get_all(
- "Integration Request",
- filters={
- "reference_doctype": pr.doctype,
- "reference_docname": pr.name,
- },
- pluck="name",
- )
-
- # create random receipt nos and send it as response to callback handler
- mpesa_receipt_numbers = [frappe.utils.random_string(5) for d in integration_req_ids]
-
- integration_requests = []
- for i in range(len(integration_req_ids)):
- callback_response = get_payment_callback_payload(
- Amount=500,
- CheckoutRequestID=integration_req_ids[i],
- MpesaReceiptNumber=mpesa_receipt_numbers[i],
- )
- # handle response manually
- verify_transaction(**callback_response)
- # test completion of integration request
- integration_request = frappe.get_doc("Integration Request", integration_req_ids[i])
- self.assertEqual(integration_request.status, "Completed")
- integration_requests.append(integration_request)
-
- # check receipt number once all the integration requests are completed
- pos_invoice.reload()
- self.assertEqual(pos_invoice.mpesa_receipt_number, ", ".join(mpesa_receipt_numbers))
-
- frappe.db.set_value("Customer", "_Test Customer", "default_currency", "")
- [d.delete() for d in integration_requests]
- pr.reload()
- pr.cancel()
- pr.delete()
- pos_invoice.delete()
-
- def test_processing_of_only_one_succes_callback_payload(self):
- mpesa_account = frappe.db.get_value(
- "Payment Gateway Account", {"payment_gateway": "Mpesa-Payment"}, "payment_account"
- )
- frappe.db.set_value("Account", mpesa_account, "account_currency", "KES")
- frappe.db.set_value("Mpesa Settings", "Payment", "transaction_limit", "500")
- frappe.db.set_value("Customer", "_Test Customer", "default_currency", "KES")
-
- pos_invoice = create_pos_invoice(do_not_submit=1)
- pos_invoice.append(
- "payments", {"mode_of_payment": "Mpesa-Payment", "account": mpesa_account, "amount": 1000}
- )
- pos_invoice.contact_mobile = "093456543894"
- pos_invoice.currency = "KES"
- pos_invoice.save()
-
- pr = pos_invoice.create_payment_request()
- # test payment request creation
- self.assertEqual(pr.payment_gateway, "Mpesa-Payment")
-
- # submitting payment request creates integration requests with random id
- integration_req_ids = frappe.get_all(
- "Integration Request",
- filters={
- "reference_doctype": pr.doctype,
- "reference_docname": pr.name,
- },
- pluck="name",
- )
-
- # create random receipt nos and send it as response to callback handler
- mpesa_receipt_numbers = [frappe.utils.random_string(5) for d in integration_req_ids]
-
- callback_response = get_payment_callback_payload(
- Amount=500,
- CheckoutRequestID=integration_req_ids[0],
- MpesaReceiptNumber=mpesa_receipt_numbers[0],
- )
- # handle response manually
- verify_transaction(**callback_response)
- # test completion of integration request
- integration_request = frappe.get_doc("Integration Request", integration_req_ids[0])
- self.assertEqual(integration_request.status, "Completed")
-
- # now one request is completed
- # second integration request fails
- # now retrying payment request should make only one integration request again
- pr = pos_invoice.create_payment_request()
- new_integration_req_ids = frappe.get_all(
- "Integration Request",
- filters={
- "reference_doctype": pr.doctype,
- "reference_docname": pr.name,
- "name": ["not in", integration_req_ids],
- },
- pluck="name",
- )
-
- self.assertEqual(len(new_integration_req_ids), 1)
-
- frappe.db.set_value("Customer", "_Test Customer", "default_currency", "")
- frappe.db.sql("delete from `tabIntegration Request` where integration_request_service = 'Mpesa'")
- pr.reload()
- pr.cancel()
- pr.delete()
- pos_invoice.delete()
-
-
-def create_mpesa_settings(payment_gateway_name="Express"):
- if frappe.db.exists("Mpesa Settings", payment_gateway_name):
- return frappe.get_doc("Mpesa Settings", payment_gateway_name)
-
- doc = frappe.get_doc(
- dict( # nosec
- doctype="Mpesa Settings",
- sandbox=1,
- payment_gateway_name=payment_gateway_name,
- consumer_key="5sMu9LVI1oS3oBGPJfh3JyvLHwZOdTKn",
- consumer_secret="VI1oS3oBGPJfh3JyvLHw",
- online_passkey="LVI1oS3oBGPJfh3JyvLHwZOd",
- till_number="174379",
- )
- )
-
- doc.insert(ignore_permissions=True)
- return doc
-
-
-def get_test_account_balance_response():
- """Response received after calling the account balance API."""
- return {
- "ResultType": 0,
- "ResultCode": 0,
- "ResultDesc": "The service request has been accepted successfully.",
- "OriginatorConversationID": "10816-694520-2",
- "ConversationID": "AG_20200927_00007cdb1f9fb6494315",
- "TransactionID": "LGR0000000",
- "ResultParameters": {
- "ResultParameter": [
- {"Key": "ReceiptNo", "Value": "LGR919G2AV"},
- {"Key": "Conversation ID", "Value": "AG_20170727_00004492b1b6d0078fbe"},
- {"Key": "FinalisedTime", "Value": 20170727101415},
- {"Key": "Amount", "Value": 10},
- {"Key": "TransactionStatus", "Value": "Completed"},
- {"Key": "ReasonType", "Value": "Salary Payment via API"},
- {"Key": "TransactionReason"},
- {"Key": "DebitPartyCharges", "Value": "Fee For B2C Payment|KES|33.00"},
- {"Key": "DebitAccountType", "Value": "Utility Account"},
- {"Key": "InitiatedTime", "Value": 20170727101415},
- {"Key": "Originator Conversation ID", "Value": "19455-773836-1"},
- {"Key": "CreditPartyName", "Value": "254708374149 - John Doe"},
- {"Key": "DebitPartyName", "Value": "600134 - Safaricom157"},
- ]
- },
- "ReferenceData": {"ReferenceItem": {"Key": "Occasion", "Value": "aaaa"}},
- }
-
-
-def get_payment_request_response_payload(Amount=500):
- """Response received after successfully calling the stk push process request API."""
-
- CheckoutRequestID = frappe.utils.random_string(10)
-
- return {
- "MerchantRequestID": "8071-27184008-1",
- "CheckoutRequestID": CheckoutRequestID,
- "ResultCode": 0,
- "ResultDesc": "The service request is processed successfully.",
- "CallbackMetadata": {
- "Item": [
- {"Name": "Amount", "Value": Amount},
- {"Name": "MpesaReceiptNumber", "Value": "LGR7OWQX0R"},
- {"Name": "TransactionDate", "Value": 20201006113336},
- {"Name": "PhoneNumber", "Value": 254723575670},
- ]
- },
- }
-
-
-def get_payment_callback_payload(
- Amount=500, CheckoutRequestID="ws_CO_061020201133231972", MpesaReceiptNumber="LGR7OWQX0R"
-):
- """Response received from the server as callback after calling the stkpush process request API."""
- return {
- "Body": {
- "stkCallback": {
- "MerchantRequestID": "19465-780693-1",
- "CheckoutRequestID": CheckoutRequestID,
- "ResultCode": 0,
- "ResultDesc": "The service request is processed successfully.",
- "CallbackMetadata": {
- "Item": [
- {"Name": "Amount", "Value": Amount},
- {"Name": "MpesaReceiptNumber", "Value": MpesaReceiptNumber},
- {"Name": "Balance"},
- {"Name": "TransactionDate", "Value": 20170727154800},
- {"Name": "PhoneNumber", "Value": 254721566839},
- ]
- },
- }
- }
- }
-
-
-def get_account_balance_callback_payload():
- """Response received from the server as callback after calling the account balance API."""
- return {
- "Result": {
- "ResultType": 0,
- "ResultCode": 0,
- "ResultDesc": "The service request is processed successfully.",
- "OriginatorConversationID": "16470-170099139-1",
- "ConversationID": "AG_20200927_00007cdb1f9fb6494315",
- "TransactionID": "OIR0000000",
- "ResultParameters": {
- "ResultParameter": [
- {"Key": "AccountBalance", "Value": "Working Account|KES|481000.00|481000.00|0.00|0.00"},
- {"Key": "BOCompletedTime", "Value": 20200927234123},
- ]
- },
- "ReferenceData": {
- "ReferenceItem": {
- "Key": "QueueTimeoutURL",
- "Value": "https://internalsandbox.safaricom.co.ke/mpesa/abresults/v1/submit",
- }
- },
- }
- }
diff --git a/erpnext/erpnext_integrations/utils.py b/erpnext/erpnext_integrations/utils.py
index 981486e..8984f1b 100644
--- a/erpnext/erpnext_integrations/utils.py
+++ b/erpnext/erpnext_integrations/utils.py
@@ -6,8 +6,6 @@
import frappe
from frappe import _
-from erpnext import get_default_company
-
def validate_webhooks_request(doctype, hmac_key, secret_key="secret"):
def innerfn(fn):
@@ -47,35 +45,6 @@
return server_url
-def create_mode_of_payment(gateway, payment_type="General"):
- payment_gateway_account = frappe.db.get_value(
- "Payment Gateway Account", {"payment_gateway": gateway}, ["payment_account"]
- )
-
- mode_of_payment = frappe.db.exists("Mode of Payment", gateway)
- if not mode_of_payment and payment_gateway_account:
- mode_of_payment = frappe.get_doc(
- {
- "doctype": "Mode of Payment",
- "mode_of_payment": gateway,
- "enabled": 1,
- "type": payment_type,
- "accounts": [
- {
- "doctype": "Mode of Payment Account",
- "company": get_default_company(),
- "default_account": payment_gateway_account,
- }
- ],
- }
- )
- mode_of_payment.insert(ignore_permissions=True)
-
- return mode_of_payment
- elif mode_of_payment:
- return frappe.get_doc("Mode of Payment", mode_of_payment)
-
-
def get_tracking_url(carrier, tracking_number):
# Return the formatted Tracking URL.
tracking_url = ""