vishdha | d3ec1c1 | 2020-03-24 11:31:41 +0530 | [diff] [blame] | 1 | import traceback |
| 2 | |
vishdha | d3ec1c1 | 2020-03-24 11:31:41 +0530 | [diff] [blame] | 3 | import frappe |
| 4 | from erpnext import get_default_company |
| 5 | from frappe import _ |
| 6 | from frappe.contacts.doctype.address.address import get_company_address |
| 7 | |
| 8 | TAX_ACCOUNT_HEAD = frappe.db.get_single_value("TaxJar Settings", "tax_account_head") |
| 9 | SHIP_ACCOUNT_HEAD = frappe.db.get_single_value("TaxJar Settings", "shipping_account_head") |
| 10 | TAXJAR_CREATE_TRANSACTIONS = frappe.db.get_single_value("TaxJar Settings", "taxjar_create_transactions") |
| 11 | TAXJAR_CALCULATE_TAX = frappe.db.get_single_value("TaxJar Settings", "taxjar_calculate_tax") |
| 12 | SUPPORTED_COUNTRY_CODES = ["AT", "AU", "BE", "BG", "CA", "CY", "CZ", "DE", "DK", "EE", "ES", "FI", |
| 13 | "FR", "GB", "GR", "HR", "HU", "IE", "IT", "LT", "LU", "LV", "MT", "NL", "PL", "PT", "RO", |
| 14 | "SE", "SI", "SK", "US"] |
| 15 | |
| 16 | |
| 17 | def get_client(): |
| 18 | taxjar_settings = frappe.get_single("TaxJar Settings") |
| 19 | |
| 20 | if not taxjar_settings.is_sandbox: |
| 21 | api_key = taxjar_settings.api_key and taxjar_settings.get_password("api_key") |
| 22 | api_url = taxjar.DEFAULT_API_URL |
| 23 | else: |
| 24 | api_key = taxjar_settings.sandbox_api_key and taxjar_settings.get_password("sandbox_api_key") |
| 25 | api_url = taxjar.SANDBOX_API_URL |
| 26 | |
| 27 | if api_key and api_url: |
| 28 | return taxjar.Client(api_key=api_key, api_url=api_url) |
| 29 | |
| 30 | |
| 31 | def create_transaction(doc, method): |
Deepesh Garg | dcb462f | 2020-08-01 13:47:09 +0530 | [diff] [blame] | 32 | import taxjar |
vishdha | d3ec1c1 | 2020-03-24 11:31:41 +0530 | [diff] [blame] | 33 | """Create an order transaction in TaxJar""" |
| 34 | |
| 35 | if not TAXJAR_CREATE_TRANSACTIONS: |
| 36 | return |
| 37 | |
| 38 | client = get_client() |
| 39 | |
| 40 | if not client: |
| 41 | return |
| 42 | |
| 43 | sales_tax = sum([tax.tax_amount for tax in doc.taxes if tax.account_head == TAX_ACCOUNT_HEAD]) |
| 44 | |
| 45 | if not sales_tax: |
| 46 | return |
| 47 | |
| 48 | tax_dict = get_tax_data(doc) |
| 49 | |
| 50 | if not tax_dict: |
| 51 | return |
| 52 | |
| 53 | tax_dict['transaction_id'] = doc.name |
| 54 | tax_dict['transaction_date'] = frappe.utils.today() |
| 55 | tax_dict['sales_tax'] = sales_tax |
| 56 | tax_dict['amount'] = doc.total + tax_dict['shipping'] |
| 57 | |
| 58 | try: |
| 59 | client.create_order(tax_dict) |
| 60 | except taxjar.exceptions.TaxJarResponseError as err: |
| 61 | frappe.throw(_(sanitize_error_response(err))) |
| 62 | except Exception as ex: |
| 63 | print(traceback.format_exc(ex)) |
| 64 | |
| 65 | |
| 66 | def delete_transaction(doc, method): |
| 67 | """Delete an existing TaxJar order transaction""" |
| 68 | |
| 69 | if not TAXJAR_CREATE_TRANSACTIONS: |
| 70 | return |
| 71 | |
| 72 | client = get_client() |
| 73 | |
| 74 | if not client: |
| 75 | return |
| 76 | |
| 77 | client.delete_order(doc.name) |
| 78 | |
| 79 | |
| 80 | def get_tax_data(doc): |
| 81 | from_address = get_company_address_details(doc) |
| 82 | from_shipping_state = from_address.get("state") |
| 83 | from_country_code = frappe.db.get_value("Country", from_address.country, "code") |
| 84 | from_country_code = from_country_code.upper() |
| 85 | |
| 86 | to_address = get_shipping_address_details(doc) |
| 87 | to_shipping_state = to_address.get("state") |
| 88 | to_country_code = frappe.db.get_value("Country", to_address.country, "code") |
| 89 | to_country_code = to_country_code.upper() |
| 90 | |
| 91 | if to_country_code not in SUPPORTED_COUNTRY_CODES: |
| 92 | return |
| 93 | |
| 94 | shipping = sum([tax.tax_amount for tax in doc.taxes if tax.account_head == SHIP_ACCOUNT_HEAD]) |
| 95 | |
| 96 | if to_shipping_state is not None: |
| 97 | to_shipping_state = get_iso_3166_2_state_code(to_address) |
| 98 | |
| 99 | tax_dict = { |
| 100 | 'from_country': from_country_code, |
| 101 | 'from_zip': from_address.pincode, |
| 102 | 'from_state': from_shipping_state, |
| 103 | 'from_city': from_address.city, |
| 104 | 'from_street': from_address.address_line1, |
| 105 | 'to_country': to_country_code, |
| 106 | 'to_zip': to_address.pincode, |
| 107 | 'to_city': to_address.city, |
| 108 | 'to_street': to_address.address_line1, |
| 109 | 'to_state': to_shipping_state, |
| 110 | 'shipping': shipping, |
| 111 | 'amount': doc.net_total |
| 112 | } |
| 113 | |
| 114 | return tax_dict |
| 115 | |
| 116 | |
| 117 | def set_sales_tax(doc, method): |
| 118 | if not TAXJAR_CALCULATE_TAX: |
| 119 | return |
| 120 | |
| 121 | if not doc.items: |
| 122 | return |
| 123 | |
| 124 | # if the party is exempt from sales tax, then set all tax account heads to zero |
| 125 | sales_tax_exempted = hasattr(doc, "exempt_from_sales_tax") and doc.exempt_from_sales_tax \ |
| 126 | or frappe.db.has_column("Customer", "exempt_from_sales_tax") and frappe.db.get_value("Customer", doc.customer, "exempt_from_sales_tax") |
| 127 | |
| 128 | if sales_tax_exempted: |
| 129 | for tax in doc.taxes: |
| 130 | if tax.account_head == TAX_ACCOUNT_HEAD: |
| 131 | tax.tax_amount = 0 |
| 132 | break |
| 133 | |
| 134 | doc.run_method("calculate_taxes_and_totals") |
| 135 | return |
| 136 | |
| 137 | tax_dict = get_tax_data(doc) |
| 138 | |
| 139 | if not tax_dict: |
| 140 | # Remove existing tax rows if address is changed from a taxable state/country |
| 141 | setattr(doc, "taxes", [tax for tax in doc.taxes if tax.account_head != TAX_ACCOUNT_HEAD]) |
| 142 | return |
| 143 | |
| 144 | tax_data = validate_tax_request(tax_dict) |
| 145 | |
| 146 | if tax_data is not None: |
| 147 | if not tax_data.amount_to_collect: |
| 148 | setattr(doc, "taxes", [tax for tax in doc.taxes if tax.account_head != TAX_ACCOUNT_HEAD]) |
| 149 | elif tax_data.amount_to_collect > 0: |
| 150 | # Loop through tax rows for existing Sales Tax entry |
| 151 | # If none are found, add a row with the tax amount |
| 152 | for tax in doc.taxes: |
| 153 | if tax.account_head == TAX_ACCOUNT_HEAD: |
| 154 | tax.tax_amount = tax_data.amount_to_collect |
| 155 | |
| 156 | doc.run_method("calculate_taxes_and_totals") |
| 157 | break |
| 158 | else: |
| 159 | doc.append("taxes", { |
| 160 | "charge_type": "Actual", |
| 161 | "description": "Sales Tax", |
| 162 | "account_head": TAX_ACCOUNT_HEAD, |
| 163 | "tax_amount": tax_data.amount_to_collect |
| 164 | }) |
| 165 | |
| 166 | doc.run_method("calculate_taxes_and_totals") |
| 167 | |
| 168 | |
| 169 | def validate_tax_request(tax_dict): |
| 170 | """Return the sales tax that should be collected for a given order.""" |
| 171 | |
| 172 | client = get_client() |
| 173 | |
| 174 | if not client: |
| 175 | return |
| 176 | |
| 177 | try: |
| 178 | tax_data = client.tax_for_order(tax_dict) |
| 179 | except taxjar.exceptions.TaxJarResponseError as err: |
| 180 | frappe.throw(_(sanitize_error_response(err))) |
| 181 | else: |
| 182 | return tax_data |
| 183 | |
| 184 | |
| 185 | def get_company_address_details(doc): |
| 186 | """Return default company address details""" |
| 187 | |
| 188 | company_address = get_company_address(get_default_company()).company_address |
| 189 | |
| 190 | if not company_address: |
| 191 | frappe.throw(_("Please set a default company address")) |
| 192 | |
| 193 | company_address = frappe.get_doc("Address", company_address) |
| 194 | return company_address |
| 195 | |
| 196 | |
| 197 | def get_shipping_address_details(doc): |
| 198 | """Return customer shipping address details""" |
| 199 | |
| 200 | if doc.shipping_address_name: |
| 201 | shipping_address = frappe.get_doc("Address", doc.shipping_address_name) |
| 202 | else: |
| 203 | shipping_address = get_company_address_details(doc) |
| 204 | |
| 205 | return shipping_address |
| 206 | |
| 207 | |
| 208 | def get_iso_3166_2_state_code(address): |
Deepesh Garg | dcb462f | 2020-08-01 13:47:09 +0530 | [diff] [blame] | 209 | import pycountry |
vishdha | d3ec1c1 | 2020-03-24 11:31:41 +0530 | [diff] [blame] | 210 | country_code = frappe.db.get_value("Country", address.get("country"), "code") |
| 211 | |
| 212 | error_message = _("""{0} is not a valid state! Check for typos or enter the ISO code for your state.""").format(address.get("state")) |
| 213 | state = address.get("state").upper().strip() |
| 214 | |
| 215 | # The max length for ISO state codes is 3, excluding the country code |
| 216 | if len(state) <= 3: |
| 217 | # PyCountry returns state code as {country_code}-{state-code} (e.g. US-FL) |
| 218 | address_state = (country_code + "-" + state).upper() |
| 219 | |
| 220 | states = pycountry.subdivisions.get(country_code=country_code.upper()) |
| 221 | states = [pystate.code for pystate in states] |
| 222 | |
| 223 | if address_state in states: |
| 224 | return state |
| 225 | |
| 226 | frappe.throw(_(error_message)) |
| 227 | else: |
| 228 | try: |
| 229 | lookup_state = pycountry.subdivisions.lookup(state) |
| 230 | except LookupError: |
| 231 | frappe.throw(_(error_message)) |
| 232 | else: |
| 233 | return lookup_state.code.split('-')[1] |
| 234 | |
| 235 | |
| 236 | def sanitize_error_response(response): |
| 237 | response = response.full_response.get("detail") |
| 238 | response = response.replace("_", " ") |
| 239 | |
| 240 | sanitized_responses = { |
| 241 | "to zip": "Zipcode", |
| 242 | "to city": "City", |
| 243 | "to state": "State", |
| 244 | "to country": "Country" |
| 245 | } |
| 246 | |
| 247 | for k, v in sanitized_responses.items(): |
| 248 | response = response.replace(k, v) |
| 249 | |
| 250 | return response |