| from __future__ import unicode_literals |
| |
| import frappe, json, os |
| from frappe.utils import flt, cstr |
| from erpnext.controllers.taxes_and_totals import get_itemised_tax |
| from frappe import _ |
| from frappe.core.doctype.file.file import remove_file |
| from six import string_types |
| from frappe.desk.form.load import get_attachments |
| from erpnext.regional.italy import state_codes |
| |
| |
| def update_itemised_tax_data(doc): |
| if not doc.taxes: return |
| |
| itemised_tax = get_itemised_tax(doc.taxes) |
| |
| for row in doc.items: |
| tax_rate = 0.0 |
| if itemised_tax.get(row.item_code): |
| tax_rate = sum([tax.get('tax_rate', 0) for d, tax in itemised_tax.get(row.item_code).items()]) |
| |
| row.tax_rate = flt(tax_rate, row.precision("tax_rate")) |
| row.tax_amount = flt((row.net_amount * tax_rate) / 100, row.precision("net_amount")) |
| row.total_amount = flt((row.net_amount + row.tax_amount), row.precision("total_amount")) |
| |
| @frappe.whitelist() |
| def export_invoices(filters=None): |
| saved_xmls = [] |
| |
| invoices = frappe.get_all("Sales Invoice", filters=get_conditions(filters), fields=["*"]) |
| |
| for invoice in invoices: |
| attachments = get_e_invoice_attachments(invoice) |
| saved_xmls += [attachment.file_name for attachment in attachments] |
| |
| zip_filename = "{0}-einvoices.zip".format(frappe.utils.get_datetime().strftime("%Y%m%d_%H%M%S")) |
| |
| download_zip(saved_xmls, zip_filename) |
| |
| |
| @frappe.whitelist() |
| def prepare_invoice(invoice, progressive_number): |
| #set company information |
| company = frappe.get_doc("Company", invoice.company) |
| |
| invoice.progressive_number = progressive_number |
| invoice.unamended_name = get_unamended_name(invoice) |
| invoice.company_data = company |
| company_address = frappe.get_doc("Address", invoice.company_address) |
| invoice.company_address_data = company_address |
| |
| #Set invoice type |
| if invoice.is_return and invoice.return_against: |
| invoice.type_of_document = "TD04" #Credit Note (Nota di Credito) |
| invoice.return_against_unamended = get_unamended_name(frappe.get_doc("Sales Invoice", invoice.return_against)) |
| else: |
| invoice.type_of_document = "TD01" #Sales Invoice (Fattura) |
| |
| #set customer information |
| invoice.customer_data = frappe.get_doc("Customer", invoice.customer) |
| customer_address = frappe.get_doc("Address", invoice.customer_address) |
| invoice.customer_address_data = customer_address |
| |
| if invoice.shipping_address_name: |
| invoice.shipping_address_data = frappe.get_doc("Address", invoice.shipping_address_name) |
| |
| if invoice.customer_data.is_public_administration: |
| invoice.transmission_format_code = "FPA12" |
| else: |
| invoice.transmission_format_code = "FPR12" |
| |
| invoice.e_invoice_items = [item for item in invoice.items] |
| tax_data = get_invoice_summary(invoice.e_invoice_items, invoice.taxes) |
| invoice.tax_data = tax_data |
| |
| #Check if stamp duty (Bollo) of 2 EUR exists. |
| stamp_duty_charge_row = next((tax for tax in invoice.taxes if tax.charge_type == "Actual" and tax.tax_amount == 2.0 ), None) |
| if stamp_duty_charge_row: |
| invoice.stamp_duty = stamp_duty_charge_row.tax_amount |
| |
| for item in invoice.e_invoice_items: |
| if item.tax_rate == 0.0 and item.tax_amount == 0.0: |
| item.tax_exemption_reason = tax_data["0.0"]["tax_exemption_reason"] |
| |
| customer_po_data = {} |
| for d in invoice.e_invoice_items: |
| if (d.customer_po_no and d.customer_po_date |
| and d.customer_po_no not in customer_po_data): |
| customer_po_data[d.customer_po_no] = d.customer_po_date |
| |
| invoice.customer_po_data = customer_po_data |
| |
| return invoice |
| |
| def get_conditions(filters): |
| filters = json.loads(filters) |
| |
| conditions = {"docstatus": 1} |
| |
| if filters.get("company"): conditions["company"] = filters["company"] |
| if filters.get("customer"): conditions["customer"] = filters["customer"] |
| |
| if filters.get("from_date"): conditions["posting_date"] = (">=", filters["from_date"]) |
| if filters.get("to_date"): conditions["posting_date"] = ("<=", filters["to_date"]) |
| |
| if filters.get("from_date") and filters.get("to_date"): |
| conditions["posting_date"] = ("between", [filters.get("from_date"), filters.get("to_date")]) |
| |
| return conditions |
| |
| #TODO: Use function from frappe once PR #6853 is merged. |
| def download_zip(files, output_filename): |
| from zipfile import ZipFile |
| |
| input_files = [frappe.get_site_path('private', 'files', filename) for filename in files] |
| output_path = frappe.get_site_path('private', 'files', output_filename) |
| |
| with ZipFile(output_path, 'w') as output_zip: |
| for input_file in input_files: |
| output_zip.write(input_file, arcname=os.path.basename(input_file)) |
| |
| with open(output_path, 'rb') as fileobj: |
| filedata = fileobj.read() |
| |
| frappe.local.response.filename = output_filename |
| frappe.local.response.filecontent = filedata |
| frappe.local.response.type = "download" |
| |
| def get_invoice_summary(items, taxes): |
| summary_data = frappe._dict() |
| for tax in taxes: |
| #Include only VAT charges. |
| if tax.charge_type == "Actual": |
| continue |
| |
| #Charges to appear as items in the e-invoice. |
| if tax.charge_type in ["On Previous Row Total", "On Previous Row Amount"]: |
| reference_row = next((row for row in taxes if row.idx == int(tax.row_id or 0)), None) |
| if reference_row: |
| items.append( |
| frappe._dict( |
| idx=len(items)+1, |
| item_code=reference_row.description, |
| item_name=reference_row.description, |
| description=reference_row.description, |
| rate=reference_row.tax_amount, |
| qty=1.0, |
| amount=reference_row.tax_amount, |
| stock_uom=frappe.db.get_single_value("Stock Settings", "stock_uom") or _("Nos"), |
| tax_rate=tax.rate, |
| tax_amount=(reference_row.tax_amount * tax.rate) / 100, |
| net_amount=reference_row.tax_amount, |
| taxable_amount=reference_row.tax_amount, |
| item_tax_rate={tax.account_head: tax.rate}, |
| charges=True |
| ) |
| ) |
| |
| #Check item tax rates if tax rate is zero. |
| if tax.rate == 0: |
| for item in items: |
| item_tax_rate = item.item_tax_rate |
| if isinstance(item.item_tax_rate, string_types): |
| item_tax_rate = json.loads(item.item_tax_rate) |
| |
| if item_tax_rate and tax.account_head in item_tax_rate: |
| key = cstr(item_tax_rate[tax.account_head]) |
| if key not in summary_data: |
| summary_data.setdefault(key, {"tax_amount": 0.0, "taxable_amount": 0.0, |
| "tax_exemption_reason": "", "tax_exemption_law": ""}) |
| |
| summary_data[key]["tax_amount"] += item.tax_amount |
| summary_data[key]["taxable_amount"] += item.net_amount |
| if key == "0.0": |
| summary_data[key]["tax_exemption_reason"] = tax.tax_exemption_reason |
| summary_data[key]["tax_exemption_law"] = tax.tax_exemption_law |
| |
| if summary_data == {}: #Implies that Zero VAT has not been set on any item. |
| summary_data.setdefault("0.0", {"tax_amount": 0.0, "taxable_amount": tax.total, |
| "tax_exemption_reason": tax.tax_exemption_reason, "tax_exemption_law": tax.tax_exemption_law}) |
| |
| else: |
| item_wise_tax_detail = json.loads(tax.item_wise_tax_detail) |
| for rate_item in [tax_item for tax_item in item_wise_tax_detail.items() if tax_item[1][0] == tax.rate]: |
| key = cstr(tax.rate) |
| if not summary_data.get(key): summary_data.setdefault(key, {"tax_amount": 0.0, "taxable_amount": 0.0}) |
| summary_data[key]["tax_amount"] += rate_item[1][1] |
| summary_data[key]["taxable_amount"] += sum([item.net_amount for item in items if item.item_code == rate_item[0]]) |
| |
| for item in items: |
| key = cstr(tax.rate) |
| if item.get("charges"): |
| if not summary_data.get(key): summary_data.setdefault(key, {"taxable_amount": 0.0}) |
| summary_data[key]["taxable_amount"] += item.taxable_amount |
| |
| return summary_data |
| |
| #Preflight for successful e-invoice export. |
| def sales_invoice_validate(doc): |
| #Validate company |
| if doc.doctype != 'Sales Invoice': |
| return |
| |
| if not doc.company_address: |
| frappe.throw(_("Please set an Address on the Company '%s'" % doc.company), title=_("E-Invoicing Information Missing")) |
| else: |
| validate_address(doc.company_address) |
| |
| company_fiscal_regime = frappe.get_cached_value("Company", doc.company, 'fiscal_regime') |
| if not company_fiscal_regime: |
| frappe.throw(_("Fiscal Regime is mandatory, kindly set the fiscal regime in the company {0}") |
| .format(doc.company)) |
| else: |
| doc.company_fiscal_regime = company_fiscal_regime |
| |
| doc.company_tax_id = frappe.get_cached_value("Company", doc.company, 'tax_id') |
| doc.company_fiscal_code = frappe.get_cached_value("Company", doc.company, 'fiscal_code') |
| if not doc.company_tax_id and not doc.company_fiscal_code: |
| frappe.throw(_("Please set either the Tax ID or Fiscal Code on Company '%s'" % doc.company), title=_("E-Invoicing Information Missing")) |
| |
| #Validate customer details |
| customer = frappe.get_doc("Customer", doc.customer) |
| |
| if customer.customer_type == "Individual": |
| doc.customer_fiscal_code = customer.fiscal_code |
| if not doc.customer_fiscal_code: |
| frappe.throw(_("Please set Fiscal Code for the customer '%s'" % doc.customer), title=_("E-Invoicing Information Missing")) |
| else: |
| if customer.is_public_administration: |
| doc.customer_fiscal_code = customer.fiscal_code |
| if not doc.customer_fiscal_code: |
| frappe.throw(_("Please set Fiscal Code for the public administration '%s'" % doc.customer), title=_("E-Invoicing Information Missing")) |
| else: |
| doc.tax_id = customer.tax_id |
| if not doc.tax_id: |
| frappe.throw(_("Please set Tax ID for the customer '%s'" % doc.customer), title=_("E-Invoicing Information Missing")) |
| |
| if not doc.customer_address: |
| frappe.throw(_("Please set the Customer Address"), title=_("E-Invoicing Information Missing")) |
| else: |
| validate_address(doc.customer_address) |
| |
| if not len(doc.taxes): |
| frappe.throw(_("Please set at least one row in the Taxes and Charges Table"), title=_("E-Invoicing Information Missing")) |
| else: |
| for row in doc.taxes: |
| if row.rate == 0 and row.tax_amount == 0 and not row.tax_exemption_reason: |
| frappe.throw(_("Row {0}: Please set at Tax Exemption Reason in Sales Taxes and Charges".format(row.idx)), |
| title=_("E-Invoicing Information Missing")) |
| |
| for schedule in doc.payment_schedule: |
| if schedule.mode_of_payment and not schedule.mode_of_payment_code: |
| schedule.mode_of_payment_code = frappe.get_cached_value('Mode of Payment', |
| schedule.mode_of_payment, 'mode_of_payment_code') |
| |
| #Ensure payment details are valid for e-invoice. |
| def sales_invoice_on_submit(doc, method): |
| #Validate payment details |
| if get_company_country(doc.company) not in ['Italy', |
| 'Italia', 'Italian Republic', 'Repubblica Italiana']: |
| return |
| |
| if not len(doc.payment_schedule): |
| frappe.throw(_("Please set the Payment Schedule"), title=_("E-Invoicing Information Missing")) |
| else: |
| for schedule in doc.payment_schedule: |
| if not schedule.mode_of_payment: |
| frappe.throw(_("Row {0}: Please set the Mode of Payment in Payment Schedule".format(schedule.idx)), |
| title=_("E-Invoicing Information Missing")) |
| elif not frappe.db.get_value("Mode of Payment", schedule.mode_of_payment, "mode_of_payment_code"): |
| frappe.throw(_("Row {0}: Please set the correct code on Mode of Payment {1}".format(schedule.idx, schedule.mode_of_payment)), |
| title=_("E-Invoicing Information Missing")) |
| |
| prepare_and_attach_invoice(doc) |
| |
| def prepare_and_attach_invoice(doc, replace=False): |
| progressive_name, progressive_number = get_progressive_name_and_number(doc, replace) |
| |
| invoice = prepare_invoice(doc, progressive_number) |
| invoice_xml = frappe.render_template('erpnext/regional/italy/e-invoice.xml', context={"doc": invoice}, is_path=True) |
| invoice_xml = invoice_xml.replace("&", "&") |
| |
| xml_filename = progressive_name + ".xml" |
| |
| _file = frappe.get_doc({ |
| "doctype": "File", |
| "file_name": xml_filename, |
| "attached_to_doctype": doc.doctype, |
| "attached_to_name": doc.name, |
| "is_private": True, |
| "content": invoice_xml |
| }) |
| _file.save() |
| return _file |
| |
| @frappe.whitelist() |
| def generate_single_invoice(docname): |
| doc = frappe.get_doc("Sales Invoice", docname) |
| |
| |
| e_invoice = prepare_and_attach_invoice(doc, True) |
| |
| return e_invoice.file_name |
| |
| @frappe.whitelist() |
| def download_e_invoice_file(file_name): |
| content = None |
| with open(frappe.get_site_path('private', 'files', file_name), "r") as f: |
| content = f.read() |
| |
| frappe.local.response.filename = file_name |
| frappe.local.response.filecontent = content |
| frappe.local.response.type = "download" |
| |
| #Delete e-invoice attachment on cancel. |
| def sales_invoice_on_cancel(doc, method): |
| if get_company_country(doc.company) not in ['Italy', |
| 'Italia', 'Italian Republic', 'Repubblica Italiana']: |
| return |
| |
| for attachment in get_e_invoice_attachments(doc): |
| remove_file(attachment.name, attached_to_doctype=doc.doctype, attached_to_name=doc.name) |
| |
| def get_company_country(company): |
| return frappe.get_cached_value('Company', company, 'country') |
| |
| def get_e_invoice_attachments(invoice): |
| out = [] |
| attachments = get_attachments(invoice.doctype, invoice.name) |
| company_tax_id = invoice.company_tax_id if invoice.company_tax_id.startswith("IT") else "IT" + invoice.company_tax_id |
| |
| for attachment in attachments: |
| if attachment.file_name and attachment.file_name.startswith(company_tax_id) and attachment.file_name.endswith(".xml"): |
| out.append(attachment) |
| |
| return out |
| |
| def validate_address(address_name): |
| fields = ["pincode", "city", "country_code"] |
| data = frappe.get_cached_value("Address", address_name, fields, as_dict=1) or {} |
| |
| for field in fields: |
| if not data.get(field): |
| frappe.throw(_("Please set {0} for address {1}".format(field.replace('-',''), address_name)), |
| title=_("E-Invoicing Information Missing")) |
| |
| def get_unamended_name(doc): |
| attributes = ["naming_series", "amended_from"] |
| for attribute in attributes: |
| if not hasattr(doc, attribute): |
| return doc.name |
| |
| if doc.amended_from: |
| return "-".join(doc.name.split("-")[:-1]) |
| else: |
| return doc.name |
| |
| def get_progressive_name_and_number(doc, replace=False): |
| if replace: |
| for attachment in get_e_invoice_attachments(doc): |
| remove_file(attachment.name, attached_to_doctype=doc.doctype, attached_to_name=doc.name) |
| filename = attachment.file_name.split(".xml")[0] |
| return filename, filename.split("_")[1] |
| |
| company_tax_id = doc.company_tax_id if doc.company_tax_id.startswith("IT") else "IT" + doc.company_tax_id |
| progressive_name = frappe.model.naming.make_autoname(company_tax_id + "_.#####") |
| progressive_number = progressive_name.split("_")[1] |
| |
| return progressive_name, progressive_number |
| |
| def set_state_code(doc, method): |
| if doc.get('country_code'): |
| doc.country_code = doc.country_code.upper() |
| |
| if not doc.get('state'): |
| return |
| |
| if not (hasattr(doc, "state_code") and doc.country in ["Italy", "Italia", "Italian Republic", "Repubblica Italiana"]): |
| return |
| |
| state_codes_lower = {key.lower():value for key,value in state_codes.items()} |
| |
| state = doc.get('state','').lower() |
| if state_codes_lower.get(state): |
| doc.state_code = state_codes_lower.get(state) |