Revert "refactor!: remove e_invoice utils (#27167)"
This reverts commit 8ed63383366728ae7650f6858feaba7ee1f832cd.
diff --git a/erpnext/regional/india/e_invoice/utils.py b/erpnext/regional/india/e_invoice/utils.py
new file mode 100644
index 0000000..765b51f
--- /dev/null
+++ b/erpnext/regional/india/e_invoice/utils.py
@@ -0,0 +1,1131 @@
+# -*- coding: utf-8 -*-
+# Copyright (c) 2020, Frappe Technologies Pvt. Ltd. and contributors
+# For license information, please see license.txt
+
+from __future__ import unicode_literals
+import os
+import re
+import jwt
+import sys
+import json
+import base64
+import frappe
+import six
+import traceback
+import io
+from frappe import _, bold
+from pyqrcode import create as qrcreate
+from frappe.utils.background_jobs import enqueue
+from frappe.utils.scheduler import is_scheduler_inactive
+from frappe.core.page.background_jobs.background_jobs import get_info
+from frappe.integrations.utils import make_post_request, make_get_request
+from erpnext.regional.india.utils import get_gst_accounts, get_place_of_supply
+from frappe.utils.data import cstr, cint, format_date, flt, time_diff_in_seconds, now_datetime, add_to_date, get_link_to_form, getdate, time_diff_in_hours
+
+@frappe.whitelist()
+def validate_eligibility(doc):
+ if isinstance(doc, six.string_types):
+ doc = json.loads(doc)
+
+ invalid_doctype = doc.get('doctype') != 'Sales Invoice'
+ if invalid_doctype:
+ return False
+
+ einvoicing_enabled = cint(frappe.db.get_single_value('E Invoice Settings', 'enable'))
+ if not einvoicing_enabled:
+ return False
+
+ einvoicing_eligible_from = frappe.db.get_single_value('E Invoice Settings', 'applicable_from') or '2021-04-01'
+ if getdate(doc.get('posting_date')) < getdate(einvoicing_eligible_from):
+ return False
+
+ invalid_company = not frappe.db.get_value('E Invoice User', { 'company': doc.get('company') })
+ invalid_supply_type = doc.get('gst_category') not in ['Registered Regular', 'SEZ', 'Overseas', 'Deemed Export']
+ company_transaction = doc.get('billing_address_gstin') == doc.get('company_gstin')
+
+ # if export invoice, then taxes can be empty
+ # invoice can only be ineligible if no taxes applied and is not an export invoice
+ no_taxes_applied = not doc.get('taxes') and not doc.get('gst_category') == 'Overseas'
+ has_non_gst_item = any(d for d in doc.get('items', []) if d.get('is_non_gst'))
+
+ if invalid_company or invalid_supply_type or company_transaction or no_taxes_applied or has_non_gst_item:
+ return False
+
+ return True
+
+def validate_einvoice_fields(doc):
+ invoice_eligible = validate_eligibility(doc)
+
+ if not invoice_eligible:
+ return
+
+ if doc.docstatus == 0 and doc._action == 'save':
+ if doc.irn:
+ frappe.throw(_('You cannot edit the invoice after generating IRN'), title=_('Edit Not Allowed'))
+ if len(doc.name) > 16:
+ raise_document_name_too_long_error()
+
+ doc.einvoice_status = 'Pending'
+
+ elif doc.docstatus == 1 and doc._action == 'submit' and not doc.irn:
+ frappe.throw(_('You must generate IRN before submitting the document.'), title=_('Missing IRN'))
+
+ elif doc.irn and doc.docstatus == 2 and doc._action == 'cancel' and not doc.irn_cancelled:
+ frappe.throw(_('You must cancel IRN before cancelling the document.'), title=_('Cancel Not Allowed'))
+
+def raise_document_name_too_long_error():
+ title = _('Document ID Too Long')
+ msg = _('As you have E-Invoicing enabled, to be able to generate IRN for this invoice')
+ msg += ', '
+ msg += _('document id {} exceed 16 letters.').format(bold(_('should not')))
+ msg += '<br><br>'
+ msg += _('You must {} your {} in order to have document id of {} length 16.').format(
+ bold(_('modify')), bold(_('naming series')), bold(_('maximum'))
+ )
+ msg += _('Please account for ammended documents too.')
+ frappe.throw(msg, title=title)
+
+def read_json(name):
+ file_path = os.path.join(os.path.dirname(__file__), '{name}.json'.format(name=name))
+ with open(file_path, 'r') as f:
+ return cstr(f.read())
+
+def get_transaction_details(invoice):
+ supply_type = ''
+ if invoice.gst_category == 'Registered Regular': supply_type = 'B2B'
+ elif invoice.gst_category == 'SEZ': supply_type = 'SEZWOP'
+ elif invoice.gst_category == 'Overseas': supply_type = 'EXPWOP'
+ elif invoice.gst_category == 'Deemed Export': supply_type = 'DEXP'
+
+ if not supply_type:
+ rr, sez, overseas, export = bold('Registered Regular'), bold('SEZ'), bold('Overseas'), bold('Deemed Export')
+ frappe.throw(_('GST category should be one of {}, {}, {}, {}').format(rr, sez, overseas, export),
+ title=_('Invalid Supply Type'))
+
+ return frappe._dict(dict(
+ tax_scheme='GST',
+ supply_type=supply_type,
+ reverse_charge=invoice.reverse_charge
+ ))
+
+def get_doc_details(invoice):
+ if getdate(invoice.posting_date) < getdate('2021-01-01'):
+ frappe.throw(_('IRN generation is not allowed for invoices dated before 1st Jan 2021'), title=_('Not Allowed'))
+
+ invoice_type = 'CRN' if invoice.is_return else 'INV'
+
+ invoice_name = invoice.name
+ invoice_date = format_date(invoice.posting_date, 'dd/mm/yyyy')
+
+ return frappe._dict(dict(
+ invoice_type=invoice_type,
+ invoice_name=invoice_name,
+ invoice_date=invoice_date
+ ))
+
+def validate_address_fields(address, is_shipping_address):
+ if ((not address.gstin and not is_shipping_address)
+ or not address.city
+ or not address.pincode
+ or not address.address_title
+ or not address.address_line1
+ or not address.gst_state_number):
+
+ frappe.throw(
+ msg=_('Address Lines, City, Pincode, GSTIN are mandatory for address {}. Please set them and try again.').format(address.name),
+ title=_('Missing Address Fields')
+ )
+
+def get_party_details(address_name, is_shipping_address=False):
+ addr = frappe.get_doc('Address', address_name)
+
+ validate_address_fields(addr, is_shipping_address)
+
+ if addr.gst_state_number == 97:
+ # according to einvoice standard
+ addr.pincode = 999999
+
+ party_address_details = frappe._dict(dict(
+ legal_name=sanitize_for_json(addr.address_title),
+ location=sanitize_for_json(addr.city),
+ pincode=addr.pincode, gstin=addr.gstin,
+ state_code=addr.gst_state_number,
+ address_line1=sanitize_for_json(addr.address_line1),
+ address_line2=sanitize_for_json(addr.address_line2)
+ ))
+
+ return party_address_details
+
+def get_overseas_address_details(address_name):
+ address_title, address_line1, address_line2, city = frappe.db.get_value(
+ 'Address', address_name, ['address_title', 'address_line1', 'address_line2', 'city']
+ )
+
+ if not address_title or not address_line1 or not city:
+ frappe.throw(
+ msg=_('Address lines and city is mandatory for address {}. Please set them and try again.').format(
+ get_link_to_form('Address', address_name)
+ ),
+ title=_('Missing Address Fields')
+ )
+
+ return frappe._dict(dict(
+ gstin='URP',
+ legal_name=sanitize_for_json(address_title),
+ location=city,
+ address_line1=sanitize_for_json(address_line1),
+ address_line2=sanitize_for_json(address_line2),
+ pincode=999999, state_code=96, place_of_supply=96
+ ))
+
+def get_item_list(invoice):
+ item_list = []
+
+ for d in invoice.items:
+ einvoice_item_schema = read_json('einv_item_template')
+ item = frappe._dict({})
+ item.update(d.as_dict())
+
+ item.sr_no = d.idx
+ item.description = sanitize_for_json(d.item_name)
+
+ item.qty = abs(item.qty)
+ if flt(item.qty) != 0.0:
+ item.unit_rate = abs(item.taxable_value / item.qty)
+ else:
+ item.unit_rate = abs(item.taxable_value)
+ item.gross_amount = abs(item.taxable_value)
+ item.taxable_value = abs(item.taxable_value)
+ item.discount_amount = 0
+
+ item.batch_expiry_date = frappe.db.get_value('Batch', d.batch_no, 'expiry_date') if d.batch_no else None
+ item.batch_expiry_date = format_date(item.batch_expiry_date, 'dd/mm/yyyy') if item.batch_expiry_date else None
+ item.is_service_item = 'Y' if item.gst_hsn_code and item.gst_hsn_code[:2] == "99" else 'N'
+ item.serial_no = ""
+
+ item = update_item_taxes(invoice, item)
+
+ item.total_value = abs(
+ item.taxable_value + item.igst_amount + item.sgst_amount +
+ item.cgst_amount + item.cess_amount + item.cess_nadv_amount + item.other_charges
+ )
+ einv_item = einvoice_item_schema.format(item=item)
+ item_list.append(einv_item)
+
+ return ', '.join(item_list)
+
+def update_item_taxes(invoice, item):
+ gst_accounts = get_gst_accounts(invoice.company)
+ gst_accounts_list = [d for accounts in gst_accounts.values() for d in accounts if d]
+
+ for attr in [
+ 'tax_rate', 'cess_rate', 'cess_nadv_amount',
+ 'cgst_amount', 'sgst_amount', 'igst_amount',
+ 'cess_amount', 'cess_nadv_amount', 'other_charges'
+ ]:
+ item[attr] = 0
+
+ for t in invoice.taxes:
+ is_applicable = t.tax_amount and t.account_head in gst_accounts_list
+ if is_applicable:
+ # this contains item wise tax rate & tax amount (incl. discount)
+ item_tax_detail = json.loads(t.item_wise_tax_detail).get(item.item_code or item.item_name)
+
+ item_tax_rate = item_tax_detail[0]
+ # item tax amount excluding discount amount
+ item_tax_amount = (item_tax_rate / 100) * item.taxable_value
+
+ if t.account_head in gst_accounts.cess_account:
+ item_tax_amount_after_discount = item_tax_detail[1]
+ if t.charge_type == 'On Item Quantity':
+ item.cess_nadv_amount += abs(item_tax_amount_after_discount)
+ else:
+ item.cess_rate += item_tax_rate
+ item.cess_amount += abs(item_tax_amount_after_discount)
+
+ for tax_type in ['igst', 'cgst', 'sgst']:
+ if t.account_head in gst_accounts[f'{tax_type}_account']:
+ item.tax_rate += item_tax_rate
+ item[f'{tax_type}_amount'] += abs(item_tax_amount)
+ else:
+ # TODO: other charges per item
+ pass
+
+ return item
+
+def get_invoice_value_details(invoice):
+ invoice_value_details = frappe._dict(dict())
+ invoice_value_details.base_total = abs(sum([i.taxable_value for i in invoice.get('items')]))
+ invoice_value_details.invoice_discount_amt = 0
+
+ invoice_value_details.round_off = invoice.base_rounding_adjustment
+ invoice_value_details.base_grand_total = abs(invoice.base_rounded_total) or abs(invoice.base_grand_total)
+ invoice_value_details.grand_total = abs(invoice.rounded_total) or abs(invoice.grand_total)
+
+ invoice_value_details = update_invoice_taxes(invoice, invoice_value_details)
+
+ return invoice_value_details
+
+def update_invoice_taxes(invoice, invoice_value_details):
+ gst_accounts = get_gst_accounts(invoice.company)
+ gst_accounts_list = [d for accounts in gst_accounts.values() for d in accounts if d]
+
+ invoice_value_details.total_cgst_amt = 0
+ invoice_value_details.total_sgst_amt = 0
+ invoice_value_details.total_igst_amt = 0
+ invoice_value_details.total_cess_amt = 0
+ invoice_value_details.total_other_charges = 0
+ considered_rows = []
+
+ for t in invoice.taxes:
+ tax_amount = t.base_tax_amount_after_discount_amount
+ if t.account_head in gst_accounts_list:
+ if t.account_head in gst_accounts.cess_account:
+ # using after discount amt since item also uses after discount amt for cess calc
+ invoice_value_details.total_cess_amt += abs(t.base_tax_amount_after_discount_amount)
+
+ for tax_type in ['igst', 'cgst', 'sgst']:
+ if t.account_head in gst_accounts[f'{tax_type}_account']:
+
+ invoice_value_details[f'total_{tax_type}_amt'] += abs(tax_amount)
+ update_other_charges(t, invoice_value_details, gst_accounts_list, invoice, considered_rows)
+ else:
+ invoice_value_details.total_other_charges += abs(tax_amount)
+
+ return invoice_value_details
+
+def update_other_charges(tax_row, invoice_value_details, gst_accounts_list, invoice, considered_rows):
+ prev_row_id = cint(tax_row.row_id) - 1
+ if tax_row.account_head in gst_accounts_list and prev_row_id not in considered_rows:
+ if tax_row.charge_type == 'On Previous Row Amount':
+ amount = invoice.get('taxes')[prev_row_id].tax_amount_after_discount_amount
+ invoice_value_details.total_other_charges -= abs(amount)
+ considered_rows.append(prev_row_id)
+ if tax_row.charge_type == 'On Previous Row Total':
+ amount = invoice.get('taxes')[prev_row_id].base_total - invoice.base_net_total
+ invoice_value_details.total_other_charges -= abs(amount)
+ considered_rows.append(prev_row_id)
+
+def get_payment_details(invoice):
+ payee_name = invoice.company
+ mode_of_payment = ', '.join([d.mode_of_payment for d in invoice.payments])
+ paid_amount = invoice.base_paid_amount
+ outstanding_amount = invoice.outstanding_amount
+
+ return frappe._dict(dict(
+ payee_name=payee_name, mode_of_payment=mode_of_payment,
+ paid_amount=paid_amount, outstanding_amount=outstanding_amount
+ ))
+
+def get_return_doc_reference(invoice):
+ invoice_date = frappe.db.get_value('Sales Invoice', invoice.return_against, 'posting_date')
+ return frappe._dict(dict(
+ invoice_name=invoice.return_against, invoice_date=format_date(invoice_date, 'dd/mm/yyyy')
+ ))
+
+def get_eway_bill_details(invoice):
+ if invoice.is_return:
+ frappe.throw(_('E-Way Bill cannot be generated for Credit Notes & Debit Notes. Please clear fields in the Transporter Section of the invoice.'),
+ title=_('Invalid Fields'))
+
+
+ mode_of_transport = { '': '', 'Road': '1', 'Air': '2', 'Rail': '3', 'Ship': '4' }
+ vehicle_type = { 'Regular': 'R', 'Over Dimensional Cargo (ODC)': 'O' }
+
+ return frappe._dict(dict(
+ gstin=invoice.gst_transporter_id,
+ name=invoice.transporter_name,
+ mode_of_transport=mode_of_transport[invoice.mode_of_transport],
+ distance=invoice.distance or 0,
+ document_name=invoice.lr_no,
+ document_date=format_date(invoice.lr_date, 'dd/mm/yyyy'),
+ vehicle_no=invoice.vehicle_no,
+ vehicle_type=vehicle_type[invoice.gst_vehicle_type]
+ ))
+
+def validate_mandatory_fields(invoice):
+ if not invoice.company_address:
+ frappe.throw(
+ _('Company Address is mandatory to fetch company GSTIN details. Please set Company Address and try again.'),
+ title=_('Missing Fields')
+ )
+ if not invoice.customer_address:
+ frappe.throw(
+ _('Customer Address is mandatory to fetch customer GSTIN details. Please set Company Address and try again.'),
+ title=_('Missing Fields')
+ )
+ if not frappe.db.get_value('Address', invoice.company_address, 'gstin'):
+ frappe.throw(
+ _('GSTIN is mandatory to fetch company GSTIN details. Please enter GSTIN in selected company address.'),
+ title=_('Missing Fields')
+ )
+ if invoice.gst_category != 'Overseas' and not frappe.db.get_value('Address', invoice.customer_address, 'gstin'):
+ frappe.throw(
+ _('GSTIN is mandatory to fetch customer GSTIN details. Please enter GSTIN in selected customer address.'),
+ title=_('Missing Fields')
+ )
+
+def validate_totals(einvoice):
+ item_list = einvoice['ItemList']
+ value_details = einvoice['ValDtls']
+
+ total_item_ass_value = 0
+ total_item_cgst_value = 0
+ total_item_sgst_value = 0
+ total_item_igst_value = 0
+ total_item_value = 0
+ for item in item_list:
+ total_item_ass_value += flt(item['AssAmt'])
+ total_item_cgst_value += flt(item['CgstAmt'])
+ total_item_sgst_value += flt(item['SgstAmt'])
+ total_item_igst_value += flt(item['IgstAmt'])
+ total_item_value += flt(item['TotItemVal'])
+
+ if abs(flt(item['AssAmt']) * flt(item['GstRt']) / 100) - (flt(item['CgstAmt']) + flt(item['SgstAmt']) + flt(item['IgstAmt'])) > 1:
+ frappe.throw(_('Row #{}: GST rate is invalid. Please remove tax rows with zero tax amount from taxes table.').format(item.idx))
+
+ if abs(flt(value_details['AssVal']) - total_item_ass_value) > 1:
+ frappe.throw(_('Total Taxable Value of the items is not equal to the Invoice Net Total. Please check item taxes / discounts for any correction.'))
+
+ if abs(
+ flt(value_details['TotInvVal']) + flt(value_details['Discount']) -
+ flt(value_details['OthChrg']) - flt(value_details['RndOffAmt']) -
+ total_item_value) > 1:
+ frappe.throw(_('Total Value of the items is not equal to the Invoice Grand Total. Please check item taxes / discounts for any correction.'))
+
+ calculated_invoice_value = \
+ flt(value_details['AssVal']) + flt(value_details['CgstVal']) \
+ + flt(value_details['SgstVal']) + flt(value_details['IgstVal']) \
+ + flt(value_details['OthChrg']) + flt(value_details['RndOffAmt']) - flt(value_details['Discount'])
+
+ if abs(flt(value_details['TotInvVal']) - calculated_invoice_value) > 1:
+ frappe.throw(_('Total Item Value + Taxes - Discount is not equal to the Invoice Grand Total. Please check taxes / discounts for any correction.'))
+
+def make_einvoice(invoice):
+ validate_mandatory_fields(invoice)
+
+ schema = read_json('einv_template')
+
+ transaction_details = get_transaction_details(invoice)
+ item_list = get_item_list(invoice)
+ doc_details = get_doc_details(invoice)
+ invoice_value_details = get_invoice_value_details(invoice)
+ seller_details = get_party_details(invoice.company_address)
+
+ if invoice.gst_category == 'Overseas':
+ buyer_details = get_overseas_address_details(invoice.customer_address)
+ else:
+ buyer_details = get_party_details(invoice.customer_address)
+ place_of_supply = get_place_of_supply(invoice, invoice.doctype)
+ if place_of_supply:
+ place_of_supply = place_of_supply.split('-')[0]
+ else:
+ place_of_supply = sanitize_for_json(invoice.billing_address_gstin)[:2]
+ buyer_details.update(dict(place_of_supply=place_of_supply))
+
+ seller_details.update(dict(legal_name=invoice.company))
+ buyer_details.update(dict(legal_name=invoice.customer_name or invoice.customer))
+
+ shipping_details = payment_details = prev_doc_details = eway_bill_details = frappe._dict({})
+ if invoice.shipping_address_name and invoice.customer_address != invoice.shipping_address_name:
+ if invoice.gst_category == 'Overseas':
+ shipping_details = get_overseas_address_details(invoice.shipping_address_name)
+ else:
+ shipping_details = get_party_details(invoice.shipping_address_name, is_shipping_address=True)
+
+ if invoice.is_pos and invoice.base_paid_amount:
+ payment_details = get_payment_details(invoice)
+
+ if invoice.is_return and invoice.return_against:
+ prev_doc_details = get_return_doc_reference(invoice)
+
+ if invoice.transporter and not invoice.is_return:
+ eway_bill_details = get_eway_bill_details(invoice)
+
+ # not yet implemented
+ dispatch_details = period_details = export_details = frappe._dict({})
+
+ einvoice = schema.format(
+ transaction_details=transaction_details, doc_details=doc_details, dispatch_details=dispatch_details,
+ seller_details=seller_details, buyer_details=buyer_details, shipping_details=shipping_details,
+ item_list=item_list, invoice_value_details=invoice_value_details, payment_details=payment_details,
+ period_details=period_details, prev_doc_details=prev_doc_details,
+ export_details=export_details, eway_bill_details=eway_bill_details
+ )
+
+ try:
+ einvoice = safe_json_load(einvoice)
+ einvoice = santize_einvoice_fields(einvoice)
+ except Exception:
+ show_link_to_error_log(invoice, einvoice)
+
+ validate_totals(einvoice)
+
+ return einvoice
+
+def show_link_to_error_log(invoice, einvoice):
+ err_log = log_error(einvoice)
+ link_to_error_log = get_link_to_form('Error Log', err_log.name, 'Error Log')
+ frappe.throw(
+ _('An error occurred while creating e-invoice for {}. Please check {} for more information.').format(
+ invoice.name, link_to_error_log),
+ title=_('E Invoice Creation Failed')
+ )
+
+def log_error(data=None):
+ if isinstance(data, six.string_types):
+ data = json.loads(data)
+
+ seperator = "--" * 50
+ err_tb = traceback.format_exc()
+ err_msg = str(sys.exc_info()[1])
+ data = json.dumps(data, indent=4)
+
+ message = "\n".join([
+ "Error", err_msg, seperator,
+ "Data:", data, seperator,
+ "Exception:", err_tb
+ ])
+ frappe.log_error(title=_('E Invoice Request Failed'), message=message)
+
+def santize_einvoice_fields(einvoice):
+ int_fields = ["Pin","Distance","CrDay"]
+ float_fields = ["Qty","FreeQty","UnitPrice","TotAmt","Discount","PreTaxVal","AssAmt","GstRt","IgstAmt","CgstAmt","SgstAmt","CesRt","CesAmt","CesNonAdvlAmt","StateCesRt","StateCesAmt","StateCesNonAdvlAmt","OthChrg","TotItemVal","AssVal","CgstVal","SgstVal","IgstVal","CesVal","StCesVal","Discount","OthChrg","RndOffAmt","TotInvVal","TotInvValFc","PaidAmt","PaymtDue","ExpDuty",]
+ copy = einvoice.copy()
+ for key, value in copy.items():
+ if isinstance(value, list):
+ for idx, d in enumerate(value):
+ santized_dict = santize_einvoice_fields(d)
+ if santized_dict:
+ einvoice[key][idx] = santized_dict
+ else:
+ einvoice[key].pop(idx)
+
+ if not einvoice[key]:
+ einvoice.pop(key, None)
+
+ elif isinstance(value, dict):
+ santized_dict = santize_einvoice_fields(value)
+ if santized_dict:
+ einvoice[key] = santized_dict
+ else:
+ einvoice.pop(key, None)
+
+ elif not value or value == "None":
+ einvoice.pop(key, None)
+
+ elif key in float_fields:
+ einvoice[key] = flt(value, 2)
+
+ elif key in int_fields:
+ einvoice[key] = cint(value)
+
+ return einvoice
+
+def safe_json_load(json_string):
+ try:
+ return json.loads(json_string)
+ except json.JSONDecodeError as e:
+ # print a snippet of 40 characters around the location where error occured
+ pos = e.pos
+ start, end = max(0, pos-20), min(len(json_string)-1, pos+20)
+ snippet = json_string[start:end]
+ frappe.throw(_("Error in input data. Please check for any special characters near following input: <br> {}").format(snippet))
+
+class RequestFailed(Exception):
+ pass
+class CancellationNotAllowed(Exception):
+ pass
+
+class GSPConnector():
+ def __init__(self, doctype=None, docname=None):
+ self.doctype = doctype
+ self.docname = docname
+
+ self.set_invoice()
+ self.set_credentials()
+
+ # authenticate url is same for sandbox & live
+ self.authenticate_url = 'https://gsp.adaequare.com/gsp/authenticate?grant_type=token'
+ self.base_url = 'https://gsp.adaequare.com' if not self.e_invoice_settings.sandbox_mode else 'https://gsp.adaequare.com/test'
+
+ self.cancel_irn_url = self.base_url + '/enriched/ei/api/invoice/cancel'
+ self.irn_details_url = self.base_url + '/enriched/ei/api/invoice/irn'
+ self.generate_irn_url = self.base_url + '/enriched/ei/api/invoice'
+ self.gstin_details_url = self.base_url + '/enriched/ei/api/master/gstin'
+ self.cancel_ewaybill_url = self.base_url + '/enriched/ewb/ewayapi?action=CANEWB'
+ self.generate_ewaybill_url = self.base_url + '/enriched/ei/api/ewaybill'
+
+ def set_invoice(self):
+ self.invoice = None
+ if self.doctype and self.docname:
+ self.invoice = frappe.get_cached_doc(self.doctype, self.docname)
+
+ def set_credentials(self):
+ self.e_invoice_settings = frappe.get_cached_doc('E Invoice Settings')
+
+ if not self.e_invoice_settings.enable:
+ frappe.throw(_("E-Invoicing is disabled. Please enable it from {} to generate e-invoices.").format(get_link_to_form("E Invoice Settings", "E Invoice Settings")))
+
+ if self.invoice:
+ gstin = self.get_seller_gstin()
+ credentials_for_gstin = [d for d in self.e_invoice_settings.credentials if d.gstin == gstin]
+ if credentials_for_gstin:
+ self.credentials = credentials_for_gstin[0]
+ else:
+ frappe.throw(_('Cannot find e-invoicing credentials for selected Company GSTIN. Please check E-Invoice Settings'))
+ else:
+ self.credentials = self.e_invoice_settings.credentials[0] if self.e_invoice_settings.credentials else None
+
+ def get_seller_gstin(self):
+ gstin = frappe.db.get_value('Address', self.invoice.company_address, 'gstin')
+ if not gstin:
+ frappe.throw(_('Cannot retrieve Company GSTIN. Please select company address with valid GSTIN.'))
+ return gstin
+
+ def get_auth_token(self):
+ if time_diff_in_seconds(self.e_invoice_settings.token_expiry, now_datetime()) < 150.0:
+ self.fetch_auth_token()
+
+ return self.e_invoice_settings.auth_token
+
+ def make_request(self, request_type, url, headers=None, data=None):
+ if request_type == 'post':
+ res = make_post_request(url, headers=headers, data=data)
+ else:
+ res = make_get_request(url, headers=headers, data=data)
+
+ self.log_request(url, headers, data, res)
+ return res
+
+ def log_request(self, url, headers, data, res):
+ headers.update({ 'password': self.credentials.password })
+ request_log = frappe.get_doc({
+ "doctype": "E Invoice Request Log",
+ "user": frappe.session.user,
+ "reference_invoice": self.invoice.name if self.invoice else None,
+ "url": url,
+ "headers": json.dumps(headers, indent=4) if headers else None,
+ "data": json.dumps(data, indent=4) if isinstance(data, dict) else data,
+ "response": json.dumps(res, indent=4) if res else None
+ })
+ request_log.save(ignore_permissions=True)
+ frappe.db.commit()
+
+ def fetch_auth_token(self):
+ headers = {
+ 'gspappid': frappe.conf.einvoice_client_id,
+ 'gspappsecret': frappe.conf.einvoice_client_secret
+ }
+ res = {}
+ try:
+ res = self.make_request('post', self.authenticate_url, headers)
+ self.e_invoice_settings.auth_token = "{} {}".format(res.get('token_type'), res.get('access_token'))
+ self.e_invoice_settings.token_expiry = add_to_date(None, seconds=res.get('expires_in'))
+ self.e_invoice_settings.save(ignore_permissions=True)
+ self.e_invoice_settings.reload()
+
+ except Exception:
+ log_error(res)
+ self.raise_error(True)
+
+ def get_headers(self):
+ return {
+ 'content-type': 'application/json',
+ 'user_name': self.credentials.username,
+ 'password': self.credentials.get_password(),
+ 'gstin': self.credentials.gstin,
+ 'authorization': self.get_auth_token(),
+ 'requestid': str(base64.b64encode(os.urandom(18))),
+ }
+
+ def fetch_gstin_details(self, gstin):
+ headers = self.get_headers()
+
+ try:
+ params = '?gstin={gstin}'.format(gstin=gstin)
+ res = self.make_request('get', self.gstin_details_url + params, headers)
+ if res.get('success'):
+ return res.get('result')
+ else:
+ log_error(res)
+ raise RequestFailed
+
+ except RequestFailed:
+ self.raise_error()
+
+ except Exception:
+ log_error()
+ self.raise_error(True)
+ @staticmethod
+ def get_gstin_details(gstin):
+ '''fetch and cache GSTIN details'''
+ if not hasattr(frappe.local, 'gstin_cache'):
+ frappe.local.gstin_cache = {}
+
+ key = gstin
+ gsp_connector = GSPConnector()
+ details = gsp_connector.fetch_gstin_details(gstin)
+
+ frappe.local.gstin_cache[key] = details
+ frappe.cache().hset('gstin_cache', key, details)
+ return details
+
+ def generate_irn(self):
+ data = {}
+ try:
+ headers = self.get_headers()
+ einvoice = make_einvoice(self.invoice)
+ data = json.dumps(einvoice, indent=4)
+ res = self.make_request('post', self.generate_irn_url, headers, data)
+
+ if res.get('success'):
+ self.set_einvoice_data(res.get('result'))
+
+ elif '2150' in res.get('message'):
+ # IRN already generated but not updated in invoice
+ # Extract the IRN from the response description and fetch irn details
+ irn = res.get('result')[0].get('Desc').get('Irn')
+ irn_details = self.get_irn_details(irn)
+ if irn_details:
+ self.set_einvoice_data(irn_details)
+ else:
+ raise RequestFailed('IRN has already been generated for the invoice but cannot fetch details for the it. \
+ Contact ERPNext support to resolve the issue.')
+
+ else:
+ raise RequestFailed
+
+ except RequestFailed:
+ errors = self.sanitize_error_message(res.get('message'))
+ self.set_failed_status(errors=errors)
+ self.raise_error(errors=errors)
+
+ except Exception as e:
+ self.set_failed_status(errors=str(e))
+ log_error(data)
+ self.raise_error(True)
+
+ @staticmethod
+ def bulk_generate_irn(invoices):
+ gsp_connector = GSPConnector()
+ gsp_connector.doctype = 'Sales Invoice'
+
+ failed = []
+
+ for invoice in invoices:
+ try:
+ gsp_connector.docname = invoice
+ gsp_connector.set_invoice()
+ gsp_connector.set_credentials()
+ gsp_connector.generate_irn()
+
+ except Exception as e:
+ failed.append({
+ 'docname': invoice,
+ 'message': str(e)
+ })
+
+ return failed
+
+ def get_irn_details(self, irn):
+ headers = self.get_headers()
+
+ try:
+ params = '?irn={irn}'.format(irn=irn)
+ res = self.make_request('get', self.irn_details_url + params, headers)
+ if res.get('success'):
+ return res.get('result')
+ else:
+ raise RequestFailed
+
+ except RequestFailed:
+ errors = self.sanitize_error_message(res.get('message'))
+ self.raise_error(errors=errors)
+
+ except Exception:
+ log_error()
+ self.raise_error(True)
+
+ def cancel_irn(self, irn, reason, remark):
+ data, res = {}, {}
+ try:
+ # validate cancellation
+ if time_diff_in_hours(now_datetime(), self.invoice.ack_date) > 24:
+ frappe.throw(_('E-Invoice cannot be cancelled after 24 hours of IRN generation.'), title=_('Not Allowed'), exc=CancellationNotAllowed)
+ if not irn:
+ frappe.throw(_('IRN not found. You must generate IRN before cancelling.'), title=_('Not Allowed'), exc=CancellationNotAllowed)
+
+ headers = self.get_headers()
+ data = json.dumps({
+ 'Irn': irn,
+ 'Cnlrsn': reason,
+ 'Cnlrem': remark
+ }, indent=4)
+
+ res = self.make_request('post', self.cancel_irn_url, headers, data)
+ if res.get('success') or '9999' in res.get('message'):
+ self.invoice.irn_cancelled = 1
+ self.invoice.irn_cancel_date = res.get('result')['CancelDate'] if res.get('result') else ""
+ self.invoice.einvoice_status = 'Cancelled'
+ self.invoice.flags.updater_reference = {
+ 'doctype': self.invoice.doctype,
+ 'docname': self.invoice.name,
+ 'label': _('IRN Cancelled - {}').format(remark)
+ }
+ self.update_invoice()
+
+ else:
+ raise RequestFailed
+
+ except RequestFailed:
+ errors = self.sanitize_error_message(res.get('message'))
+ self.set_failed_status(errors=errors)
+ self.raise_error(errors=errors)
+
+ except CancellationNotAllowed as e:
+ self.set_failed_status(errors=str(e))
+ self.raise_error(errors=str(e))
+
+ except Exception as e:
+ self.set_failed_status(errors=str(e))
+ log_error(data)
+ self.raise_error(True)
+
+ @staticmethod
+ def bulk_cancel_irn(invoices, reason, remark):
+ gsp_connector = GSPConnector()
+ gsp_connector.doctype = 'Sales Invoice'
+
+ failed = []
+
+ for invoice in invoices:
+ try:
+ gsp_connector.docname = invoice
+ gsp_connector.set_invoice()
+ gsp_connector.set_credentials()
+ irn = gsp_connector.invoice.irn
+ gsp_connector.cancel_irn(irn, reason, remark)
+
+ except Exception as e:
+ failed.append({
+ 'docname': invoice,
+ 'message': str(e)
+ })
+
+ return failed
+
+ def generate_eway_bill(self, **kwargs):
+ args = frappe._dict(kwargs)
+
+ headers = self.get_headers()
+ eway_bill_details = get_eway_bill_details(args)
+ data = json.dumps({
+ 'Irn': args.irn,
+ 'Distance': cint(eway_bill_details.distance),
+ 'TransMode': eway_bill_details.mode_of_transport,
+ 'TransId': eway_bill_details.gstin,
+ 'TransName': eway_bill_details.transporter,
+ 'TrnDocDt': eway_bill_details.document_date,
+ 'TrnDocNo': eway_bill_details.document_name,
+ 'VehNo': eway_bill_details.vehicle_no,
+ 'VehType': eway_bill_details.vehicle_type
+ }, indent=4)
+
+ try:
+ res = self.make_request('post', self.generate_ewaybill_url, headers, data)
+ if res.get('success'):
+ self.invoice.ewaybill = res.get('result').get('EwbNo')
+ self.invoice.eway_bill_validity = res.get('result').get('EwbValidTill')
+ self.invoice.eway_bill_cancelled = 0
+ self.invoice.update(args)
+ self.invoice.flags.updater_reference = {
+ 'doctype': self.invoice.doctype,
+ 'docname': self.invoice.name,
+ 'label': _('E-Way Bill Generated')
+ }
+ self.update_invoice()
+
+ else:
+ raise RequestFailed
+
+ except RequestFailed:
+ errors = self.sanitize_error_message(res.get('message'))
+ self.raise_error(errors=errors)
+
+ except Exception:
+ log_error(data)
+ self.raise_error(True)
+
+ def cancel_eway_bill(self, eway_bill, reason, remark):
+ headers = self.get_headers()
+ data = json.dumps({
+ 'ewbNo': eway_bill,
+ 'cancelRsnCode': reason,
+ 'cancelRmrk': remark
+ }, indent=4)
+ headers["username"] = headers["user_name"]
+ del headers["user_name"]
+ try:
+ res = self.make_request('post', self.cancel_ewaybill_url, headers, data)
+ if res.get('success'):
+ self.invoice.ewaybill = ''
+ self.invoice.eway_bill_cancelled = 1
+ self.invoice.flags.updater_reference = {
+ 'doctype': self.invoice.doctype,
+ 'docname': self.invoice.name,
+ 'label': _('E-Way Bill Cancelled - {}').format(remark)
+ }
+ self.update_invoice()
+
+ else:
+ raise RequestFailed
+
+ except RequestFailed:
+ errors = self.sanitize_error_message(res.get('message'))
+ self.raise_error(errors=errors)
+
+ except Exception:
+ log_error(data)
+ self.raise_error(True)
+
+ def sanitize_error_message(self, message):
+ '''
+ On validation errors, response message looks something like this:
+ message = '2174 : For inter-state transaction, CGST and SGST amounts are not applicable; only IGST amount is applicable,
+ 3095 : Supplier GSTIN is inactive'
+ we search for string between ':' to extract the error messages
+ errors = [
+ ': For inter-state transaction, CGST and SGST amounts are not applicable; only IGST amount is applicable, 3095 ',
+ ': Test'
+ ]
+ then we trim down the message by looping over errors
+ '''
+ if not message:
+ return []
+
+ errors = re.findall(': [^:]+', message)
+ for idx, e in enumerate(errors):
+ # remove colons
+ errors[idx] = errors[idx].replace(':', '').strip()
+ # if not last
+ if idx != len(errors) - 1:
+ # remove last 7 chars eg: ', 3095 '
+ errors[idx] = errors[idx][:-6]
+
+ return errors
+
+ def raise_error(self, raise_exception=False, errors=[]):
+ title = _('E Invoice Request Failed')
+ if errors:
+ frappe.throw(errors, title=title, as_list=1)
+ else:
+ link_to_error_list = '<a href="desk#List/Error Log/List?method=E Invoice Request Failed">Error Log</a>'
+ frappe.msgprint(
+ _('An error occurred while making e-invoicing request. Please check {} for more information.').format(link_to_error_list),
+ title=title,
+ raise_exception=raise_exception,
+ indicator='red'
+ )
+
+ def set_einvoice_data(self, res):
+ enc_signed_invoice = res.get('SignedInvoice')
+ dec_signed_invoice = jwt.decode(enc_signed_invoice, options={"verify_signature": False})['data']
+
+ self.invoice.irn = res.get('Irn')
+ self.invoice.ewaybill = res.get('EwbNo')
+ self.invoice.eway_bill_validity = res.get('EwbValidTill')
+ self.invoice.ack_no = res.get('AckNo')
+ self.invoice.ack_date = res.get('AckDt')
+ self.invoice.signed_einvoice = dec_signed_invoice
+ self.invoice.ack_no = res.get('AckNo')
+ self.invoice.ack_date = res.get('AckDt')
+ self.invoice.signed_qr_code = res.get('SignedQRCode')
+ self.invoice.einvoice_status = 'Generated'
+
+ self.attach_qrcode_image()
+
+ self.invoice.flags.updater_reference = {
+ 'doctype': self.invoice.doctype,
+ 'docname': self.invoice.name,
+ 'label': _('IRN Generated')
+ }
+ self.update_invoice()
+
+ def attach_qrcode_image(self):
+ qrcode = self.invoice.signed_qr_code
+ doctype = self.invoice.doctype
+ docname = self.invoice.name
+ filename = 'QRCode_{}.png'.format(docname).replace(os.path.sep, "__")
+
+ qr_image = io.BytesIO()
+ url = qrcreate(qrcode, error='L')
+ url.png(qr_image, scale=2, quiet_zone=1)
+ _file = frappe.get_doc({
+ "doctype": "File",
+ "file_name": filename,
+ "attached_to_doctype": doctype,
+ "attached_to_name": docname,
+ "attached_to_field": "qrcode_image",
+ "is_private": 0,
+ "content": qr_image.getvalue()})
+ _file.save()
+ frappe.db.commit()
+ self.invoice.qrcode_image = _file.file_url
+
+ def update_invoice(self):
+ self.invoice.flags.ignore_validate_update_after_submit = True
+ self.invoice.flags.ignore_validate = True
+ self.invoice.save()
+
+ def set_failed_status(self, errors=None):
+ frappe.db.rollback()
+ self.invoice.einvoice_status = 'Failed'
+ self.invoice.failure_description = self.get_failure_message(errors) if errors else ""
+ self.update_invoice()
+ frappe.db.commit()
+
+ def get_failure_message(self, errors):
+ if isinstance(errors, list):
+ errors = ', '.join(errors)
+ return errors
+
+def sanitize_for_json(string):
+ """Escape JSON specific characters from a string."""
+
+ # json.dumps adds double-quotes to the string. Indexing to remove them.
+ return json.dumps(string)[1:-1]
+
+@frappe.whitelist()
+def get_einvoice(doctype, docname):
+ invoice = frappe.get_doc(doctype, docname)
+ return make_einvoice(invoice)
+
+@frappe.whitelist()
+def generate_irn(doctype, docname):
+ gsp_connector = GSPConnector(doctype, docname)
+ gsp_connector.generate_irn()
+
+@frappe.whitelist()
+def cancel_irn(doctype, docname, irn, reason, remark):
+ gsp_connector = GSPConnector(doctype, docname)
+ gsp_connector.cancel_irn(irn, reason, remark)
+
+@frappe.whitelist()
+def generate_eway_bill(doctype, docname, **kwargs):
+ gsp_connector = GSPConnector(doctype, docname)
+ gsp_connector.generate_eway_bill(**kwargs)
+
+@frappe.whitelist()
+def cancel_eway_bill(doctype, docname):
+ # TODO: uncomment when eway_bill api from Adequare is enabled
+ # gsp_connector = GSPConnector(doctype, docname)
+ # gsp_connector.cancel_eway_bill(eway_bill, reason, remark)
+
+ frappe.db.set_value(doctype, docname, 'ewaybill', '')
+ frappe.db.set_value(doctype, docname, 'eway_bill_cancelled', 1)
+
+@frappe.whitelist()
+def generate_einvoices(docnames):
+ docnames = json.loads(docnames) or []
+
+ if len(docnames) < 10:
+ failures = GSPConnector.bulk_generate_irn(docnames)
+ frappe.local.message_log = []
+
+ if failures:
+ show_bulk_action_failure_message(failures)
+
+ success = len(docnames) - len(failures)
+ frappe.msgprint(
+ _('{} e-invoices generated successfully').format(success),
+ title=_('Bulk E-Invoice Generation Complete')
+ )
+
+ else:
+ enqueue_bulk_action(schedule_bulk_generate_irn, docnames=docnames)
+
+def schedule_bulk_generate_irn(docnames):
+ failures = GSPConnector.bulk_generate_irn(docnames)
+ frappe.local.message_log = []
+
+ frappe.publish_realtime("bulk_einvoice_generation_complete", {
+ "user": frappe.session.user,
+ "failures": failures,
+ "invoices": docnames
+ })
+
+def show_bulk_action_failure_message(failures):
+ for doc in failures:
+ docname = '<a href="sales-invoice/{0}">{0}</a>'.format(doc.get('docname'))
+ message = doc.get('message').replace("'", '"')
+ if message[0] == '[':
+ errors = json.loads(message)
+ error_list = ''.join(['<li>{}</li>'.format(err) for err in errors])
+ message = '''{} has following errors:<br>
+ <ul style="padding-left: 20px; padding-top: 5px">{}</ul>'''.format(docname, error_list)
+ else:
+ message = '{} - {}'.format(docname, message)
+
+ frappe.msgprint(
+ message,
+ title=_('Bulk E-Invoice Generation Complete'),
+ indicator='red'
+ )
+
+@frappe.whitelist()
+def cancel_irns(docnames, reason, remark):
+ docnames = json.loads(docnames) or []
+
+ if len(docnames) < 10:
+ failures = GSPConnector.bulk_cancel_irn(docnames, reason, remark)
+ frappe.local.message_log = []
+
+ if failures:
+ show_bulk_action_failure_message(failures)
+
+ success = len(docnames) - len(failures)
+ frappe.msgprint(
+ _('{} e-invoices cancelled successfully').format(success),
+ title=_('Bulk E-Invoice Cancellation Complete')
+ )
+ else:
+ enqueue_bulk_action(schedule_bulk_cancel_irn, docnames=docnames, reason=reason, remark=remark)
+
+def schedule_bulk_cancel_irn(docnames, reason, remark):
+ failures = GSPConnector.bulk_cancel_irn(docnames, reason, remark)
+ frappe.local.message_log = []
+
+ frappe.publish_realtime("bulk_einvoice_cancellation_complete", {
+ "user": frappe.session.user,
+ "failures": failures,
+ "invoices": docnames
+ })
+
+def enqueue_bulk_action(job, **kwargs):
+ check_scheduler_status()
+
+ enqueue(
+ job,
+ **kwargs,
+ queue="long",
+ timeout=10000,
+ event="processing_bulk_einvoice_action",
+ now=frappe.conf.developer_mode or frappe.flags.in_test,
+ )
+
+ if job == schedule_bulk_generate_irn:
+ msg = _('E-Invoices will be generated in a background process.')
+ else:
+ msg = _('E-Invoices will be cancelled in a background process.')
+
+ frappe.msgprint(msg, alert=1)
+
+def check_scheduler_status():
+ if is_scheduler_inactive() and not frappe.flags.in_test:
+ frappe.throw(_("Scheduler is inactive. Cannot enqueue job."), title=_("Scheduler Inactive"))
+
+def job_already_enqueued(job_name):
+ enqueued_jobs = [d.get("job_name") for d in get_info()]
+ if job_name in enqueued_jobs:
+ return True