blob: e234eec1a66aa867157606d8b035493b9c8d1334 [file] [log] [blame]
# Copyright (c) 2015, Frappe Technologies Pvt. Ltd. and Contributors
# License: GNU General Public License v3. See license.txt
import json
from collections import OrderedDict, defaultdict
import frappe
from frappe import qb, scrub
from frappe.desk.reportview import get_filters_cond, get_match_cond
from frappe.query_builder import Criterion, CustomFunction
from frappe.query_builder.functions import Concat, Locate, Sum
from frappe.utils import nowdate, today, unique
from pypika import Order
import erpnext
from erpnext.stock.get_item_details import _get_item_tax_template
# searches for active employees
@frappe.whitelist()
@frappe.validate_and_sanitize_search_inputs
def employee_query(doctype, txt, searchfield, start, page_len, filters):
doctype = "Employee"
conditions = []
fields = get_fields(doctype, ["name", "employee_name"])
return frappe.db.sql(
"""select {fields} from `tabEmployee`
where status in ('Active', 'Suspended')
and docstatus < 2
and ({key} like %(txt)s
or employee_name like %(txt)s)
{fcond} {mcond}
order by
(case when locate(%(_txt)s, name) > 0 then locate(%(_txt)s, name) else 99999 end),
(case when locate(%(_txt)s, employee_name) > 0 then locate(%(_txt)s, employee_name) else 99999 end),
idx desc,
name, employee_name
limit %(page_len)s offset %(start)s""".format(
**{
"fields": ", ".join(fields),
"key": searchfield,
"fcond": get_filters_cond(doctype, filters, conditions),
"mcond": get_match_cond(doctype),
}
),
{"txt": "%%%s%%" % txt, "_txt": txt.replace("%", ""), "start": start, "page_len": page_len},
)
# searches for leads which are not converted
@frappe.whitelist()
@frappe.validate_and_sanitize_search_inputs
def lead_query(doctype, txt, searchfield, start, page_len, filters):
doctype = "Lead"
fields = get_fields(doctype, ["name", "lead_name", "company_name"])
searchfields = frappe.get_meta(doctype).get_search_fields()
searchfields = " or ".join(field + " like %(txt)s" for field in searchfields)
return frappe.db.sql(
"""select {fields} from `tabLead`
where docstatus < 2
and ifnull(status, '') != 'Converted'
and ({key} like %(txt)s
or lead_name like %(txt)s
or company_name like %(txt)s
or {scond})
{mcond}
order by
(case when locate(%(_txt)s, name) > 0 then locate(%(_txt)s, name) else 99999 end),
(case when locate(%(_txt)s, lead_name) > 0 then locate(%(_txt)s, lead_name) else 99999 end),
(case when locate(%(_txt)s, company_name) > 0 then locate(%(_txt)s, company_name) else 99999 end),
idx desc,
name, lead_name
limit %(page_len)s offset %(start)s""".format(
**{
"fields": ", ".join(fields),
"key": searchfield,
"scond": searchfields,
"mcond": get_match_cond(doctype),
}
),
{"txt": "%%%s%%" % txt, "_txt": txt.replace("%", ""), "start": start, "page_len": page_len},
)
# searches for customer
@frappe.whitelist()
@frappe.validate_and_sanitize_search_inputs
def customer_query(doctype, txt, searchfield, start, page_len, filters, as_dict=False):
doctype = "Customer"
conditions = []
cust_master_name = frappe.defaults.get_user_default("cust_master_name")
fields = ["name"]
if cust_master_name != "Customer Name":
fields.append("customer_name")
fields = get_fields(doctype, fields)
searchfields = frappe.get_meta(doctype).get_search_fields()
searchfields = " or ".join(field + " like %(txt)s" for field in searchfields)
return frappe.db.sql(
"""select {fields} from `tabCustomer`
where docstatus < 2
and ({scond}) and disabled=0
{fcond} {mcond}
order by
(case when locate(%(_txt)s, name) > 0 then locate(%(_txt)s, name) else 99999 end),
(case when locate(%(_txt)s, customer_name) > 0 then locate(%(_txt)s, customer_name) else 99999 end),
idx desc,
name, customer_name
limit %(page_len)s offset %(start)s""".format(
**{
"fields": ", ".join(fields),
"scond": searchfields,
"mcond": get_match_cond(doctype),
"fcond": get_filters_cond(doctype, filters, conditions).replace("%", "%%"),
}
),
{"txt": "%%%s%%" % txt, "_txt": txt.replace("%", ""), "start": start, "page_len": page_len},
as_dict=as_dict,
)
# searches for supplier
@frappe.whitelist()
@frappe.validate_and_sanitize_search_inputs
def supplier_query(doctype, txt, searchfield, start, page_len, filters, as_dict=False):
doctype = "Supplier"
supp_master_name = frappe.defaults.get_user_default("supp_master_name")
fields = ["name"]
if supp_master_name != "Supplier Name":
fields.append("supplier_name")
fields = get_fields(doctype, fields)
return frappe.db.sql(
"""select {field} from `tabSupplier`
where docstatus < 2
and ({key} like %(txt)s
or supplier_name like %(txt)s) and disabled=0
and (on_hold = 0 or (on_hold = 1 and CURRENT_DATE > release_date))
{mcond}
order by
(case when locate(%(_txt)s, name) > 0 then locate(%(_txt)s, name) else 99999 end),
(case when locate(%(_txt)s, supplier_name) > 0 then locate(%(_txt)s, supplier_name) else 99999 end),
idx desc,
name, supplier_name
limit %(page_len)s offset %(start)s""".format(
**{"field": ", ".join(fields), "key": searchfield, "mcond": get_match_cond(doctype)}
),
{"txt": "%%%s%%" % txt, "_txt": txt.replace("%", ""), "start": start, "page_len": page_len},
as_dict=as_dict,
)
@frappe.whitelist()
@frappe.validate_and_sanitize_search_inputs
def tax_account_query(doctype, txt, searchfield, start, page_len, filters):
doctype = "Account"
company_currency = erpnext.get_company_currency(filters.get("company"))
def get_accounts(with_account_type_filter):
account_type_condition = ""
if with_account_type_filter:
account_type_condition = "AND account_type in %(account_types)s"
accounts = frappe.db.sql(
"""
SELECT name, parent_account
FROM `tabAccount`
WHERE `tabAccount`.docstatus!=2
{account_type_condition}
AND is_group = 0
AND company = %(company)s
AND disabled = %(disabled)s
AND (account_currency = %(currency)s or ifnull(account_currency, '') = '')
AND `{searchfield}` LIKE %(txt)s
{mcond}
ORDER BY idx DESC, name
LIMIT %(limit)s offset %(offset)s
""".format(
account_type_condition=account_type_condition,
searchfield=searchfield,
mcond=get_match_cond(doctype),
),
dict(
account_types=filters.get("account_type"),
company=filters.get("company"),
disabled=filters.get("disabled", 0),
currency=company_currency,
txt="%{}%".format(txt),
offset=start,
limit=page_len,
),
)
return accounts
tax_accounts = get_accounts(True)
if not tax_accounts:
tax_accounts = get_accounts(False)
return tax_accounts
@frappe.whitelist()
@frappe.validate_and_sanitize_search_inputs
def item_query(doctype, txt, searchfield, start, page_len, filters, as_dict=False):
doctype = "Item"
conditions = []
if isinstance(filters, str):
filters = json.loads(filters)
# Get searchfields from meta and use in Item Link field query
meta = frappe.get_meta(doctype, cached=True)
searchfields = meta.get_search_fields()
columns = ""
extra_searchfields = [field for field in searchfields if field not in ["name", "description"]]
if extra_searchfields:
columns += ", " + ", ".join(extra_searchfields)
if "description" in searchfields:
columns += """, if(length(tabItem.description) > 40, \
concat(substr(tabItem.description, 1, 40), "..."), description) as description"""
searchfields = searchfields + [
field
for field in [
searchfield or "name",
"item_code",
"item_group",
"item_name",
]
if field not in searchfields
]
searchfields = " or ".join([field + " like %(txt)s" for field in searchfields])
if filters and isinstance(filters, dict):
if filters.get("customer") or filters.get("supplier"):
party = filters.get("customer") or filters.get("supplier")
item_rules_list = frappe.get_all(
"Party Specific Item", filters={"party": party}, fields=["restrict_based_on", "based_on_value"]
)
filters_dict = {}
for rule in item_rules_list:
if rule["restrict_based_on"] == "Item":
rule["restrict_based_on"] = "name"
filters_dict[rule.restrict_based_on] = []
for rule in item_rules_list:
filters_dict[rule.restrict_based_on].append(rule.based_on_value)
for filter in filters_dict:
filters[scrub(filter)] = ["in", filters_dict[filter]]
if filters.get("customer"):
del filters["customer"]
else:
del filters["supplier"]
else:
filters.pop("customer", None)
filters.pop("supplier", None)
description_cond = ""
if frappe.db.count(doctype, cache=True) < 50000:
# scan description only if items are less than 50000
description_cond = "or tabItem.description LIKE %(txt)s"
return frappe.db.sql(
"""select
tabItem.name {columns}
from tabItem
where tabItem.docstatus < 2
and tabItem.disabled=0
and tabItem.has_variants=0
and (tabItem.end_of_life > %(today)s or ifnull(tabItem.end_of_life, '0000-00-00')='0000-00-00')
and ({scond} or tabItem.item_code IN (select parent from `tabItem Barcode` where barcode LIKE %(txt)s)
{description_cond})
{fcond} {mcond}
order by
if(locate(%(_txt)s, name), locate(%(_txt)s, name), 99999),
if(locate(%(_txt)s, item_name), locate(%(_txt)s, item_name), 99999),
idx desc,
name, item_name
limit %(start)s, %(page_len)s """.format(
columns=columns,
scond=searchfields,
fcond=get_filters_cond(doctype, filters, conditions).replace("%", "%%"),
mcond=get_match_cond(doctype).replace("%", "%%"),
description_cond=description_cond,
),
{
"today": nowdate(),
"txt": "%%%s%%" % txt,
"_txt": txt.replace("%", ""),
"start": start,
"page_len": page_len,
},
as_dict=as_dict,
)
@frappe.whitelist()
@frappe.validate_and_sanitize_search_inputs
def bom(doctype, txt, searchfield, start, page_len, filters):
doctype = "BOM"
conditions = []
fields = get_fields(doctype, ["name", "item"])
return frappe.db.sql(
"""select {fields}
from `tabBOM`
where `tabBOM`.docstatus=1
and `tabBOM`.is_active=1
and `tabBOM`.`{key}` like %(txt)s
{fcond} {mcond}
order by
(case when locate(%(_txt)s, name) > 0 then locate(%(_txt)s, name) else 99999 end),
idx desc, name
limit %(page_len)s offset %(start)s""".format(
fields=", ".join(fields),
fcond=get_filters_cond(doctype, filters, conditions).replace("%", "%%"),
mcond=get_match_cond(doctype).replace("%", "%%"),
key=searchfield,
),
{
"txt": "%" + txt + "%",
"_txt": txt.replace("%", ""),
"start": start or 0,
"page_len": page_len or 20,
},
)
@frappe.whitelist()
@frappe.validate_and_sanitize_search_inputs
def get_project_name(doctype, txt, searchfield, start, page_len, filters):
proj = qb.DocType("Project")
qb_filter_and_conditions = []
qb_filter_or_conditions = []
ifelse = CustomFunction("IF", ["condition", "then", "else"])
if filters and filters.get("customer"):
qb_filter_and_conditions.append(proj.customer == filters.get("customer"))
qb_filter_and_conditions.append(proj.status.notin(["Completed", "Cancelled"]))
q = qb.from_(proj)
fields = get_fields(doctype, ["name", "project_name"])
for x in fields:
q = q.select(proj[x])
# don't consider 'customer' and 'status' fields for pattern search, as they must be exactly matched
searchfields = [
x for x in frappe.get_meta(doctype).get_search_fields() if x not in ["customer", "status"]
]
# pattern search
if txt:
for x in searchfields:
qb_filter_or_conditions.append(proj[x].like(f"%{txt}%"))
q = q.where(Criterion.all(qb_filter_and_conditions)).where(Criterion.any(qb_filter_or_conditions))
# ordering
if txt:
# project_name containing search string 'txt' will be given higher precedence
q = q.orderby(ifelse(Locate(txt, proj.project_name) > 0, Locate(txt, proj.project_name), 99999))
q = q.orderby(proj.idx, order=Order.desc).orderby(proj.name)
if page_len:
q = q.limit(page_len)
if start:
q = q.offset(start)
return q.run()
@frappe.whitelist()
@frappe.validate_and_sanitize_search_inputs
def get_delivery_notes_to_be_billed(doctype, txt, searchfield, start, page_len, filters, as_dict):
doctype = "Delivery Note"
fields = get_fields(doctype, ["name", "customer", "posting_date"])
return frappe.db.sql(
"""
select %(fields)s
from `tabDelivery Note`
where `tabDelivery Note`.`%(key)s` like %(txt)s and
`tabDelivery Note`.docstatus = 1
and status not in ('Stopped', 'Closed') %(fcond)s
and (
(`tabDelivery Note`.is_return = 0 and `tabDelivery Note`.per_billed < 100)
or (`tabDelivery Note`.grand_total = 0 and `tabDelivery Note`.per_billed < 100)
or (
`tabDelivery Note`.is_return = 1
and return_against in (select name from `tabDelivery Note` where per_billed < 100)
)
)
%(mcond)s order by `tabDelivery Note`.`%(key)s` asc limit %(page_len)s offset %(start)s
"""
% {
"fields": ", ".join(["`tabDelivery Note`.{0}".format(f) for f in fields]),
"key": searchfield,
"fcond": get_filters_cond(doctype, filters, []),
"mcond": get_match_cond(doctype),
"start": start,
"page_len": page_len,
"txt": "%(txt)s",
},
{"txt": ("%%%s%%" % txt)},
as_dict=as_dict,
)
@frappe.whitelist()
@frappe.validate_and_sanitize_search_inputs
def get_batch_no(doctype, txt, searchfield, start, page_len, filters):
doctype = "Batch"
meta = frappe.get_meta(doctype, cached=True)
searchfields = meta.get_search_fields()
batches = get_batches_from_stock_ledger_entries(searchfields, txt, filters, start, page_len)
batches.extend(
get_batches_from_serial_and_batch_bundle(searchfields, txt, filters, start, page_len)
)
filtered_batches = get_filterd_batches(batches)
return filtered_batches
def get_filterd_batches(data):
batches = OrderedDict()
for batch_data in data:
if batch_data[0] not in batches:
batches[batch_data[0]] = list(batch_data)
else:
batches[batch_data[0]][1] += batch_data[1]
filterd_batch = []
for batch, batch_data in batches.items():
if batch_data[1] > 0:
filterd_batch.append(tuple(batch_data))
return filterd_batch
def get_batches_from_stock_ledger_entries(searchfields, txt, filters, start=0, page_len=100):
stock_ledger_entry = frappe.qb.DocType("Stock Ledger Entry")
batch_table = frappe.qb.DocType("Batch")
expiry_date = filters.get("posting_date") or today()
query = (
frappe.qb.from_(stock_ledger_entry)
.inner_join(batch_table)
.on(batch_table.name == stock_ledger_entry.batch_no)
.select(
stock_ledger_entry.batch_no,
Sum(stock_ledger_entry.actual_qty).as_("qty"),
)
.where(((batch_table.expiry_date >= expiry_date) | (batch_table.expiry_date.isnull())))
.where(stock_ledger_entry.is_cancelled == 0)
.where(
(stock_ledger_entry.item_code == filters.get("item_code"))
& (batch_table.disabled == 0)
& (stock_ledger_entry.batch_no.isnotnull())
)
.groupby(stock_ledger_entry.batch_no, stock_ledger_entry.warehouse)
.offset(start)
.limit(page_len)
)
query = query.select(
Concat("MFG-", batch_table.manufacturing_date).as_("manufacturing_date"),
Concat("EXP-", batch_table.expiry_date).as_("expiry_date"),
)
if filters.get("warehouse"):
query = query.where(stock_ledger_entry.warehouse == filters.get("warehouse"))
for field in searchfields:
query = query.select(batch_table[field])
if txt:
txt_condition = batch_table.name.like("%{0}%".format(txt))
for field in searchfields + ["name"]:
txt_condition |= batch_table[field].like("%{0}%".format(txt))
query = query.where(txt_condition)
return query.run(as_list=1) or []
def get_batches_from_serial_and_batch_bundle(searchfields, txt, filters, start=0, page_len=100):
bundle = frappe.qb.DocType("Serial and Batch Entry")
stock_ledger_entry = frappe.qb.DocType("Stock Ledger Entry")
batch_table = frappe.qb.DocType("Batch")
expiry_date = filters.get("posting_date") or today()
bundle_query = (
frappe.qb.from_(bundle)
.inner_join(stock_ledger_entry)
.on(bundle.parent == stock_ledger_entry.serial_and_batch_bundle)
.inner_join(batch_table)
.on(batch_table.name == bundle.batch_no)
.select(
bundle.batch_no,
Sum(bundle.qty).as_("qty"),
)
.where(((batch_table.expiry_date >= expiry_date) | (batch_table.expiry_date.isnull())))
.where(stock_ledger_entry.is_cancelled == 0)
.where(
(stock_ledger_entry.item_code == filters.get("item_code"))
& (batch_table.disabled == 0)
& (stock_ledger_entry.serial_and_batch_bundle.isnotnull())
)
.groupby(bundle.batch_no, bundle.warehouse)
.offset(start)
.limit(page_len)
)
bundle_query = bundle_query.select(
Concat("MFG-", batch_table.manufacturing_date),
Concat("EXP-", batch_table.expiry_date),
)
if filters.get("warehouse"):
bundle_query = bundle_query.where(stock_ledger_entry.warehouse == filters.get("warehouse"))
for field in searchfields:
bundle_query = bundle_query.select(batch_table[field])
if txt:
txt_condition = batch_table.name.like("%{0}%".format(txt))
for field in searchfields + ["name"]:
txt_condition |= batch_table[field].like("%{0}%".format(txt))
bundle_query = bundle_query.where(txt_condition)
return bundle_query.run(as_list=1)
@frappe.whitelist()
@frappe.validate_and_sanitize_search_inputs
def get_account_list(doctype, txt, searchfield, start, page_len, filters):
doctype = "Account"
filter_list = []
if isinstance(filters, dict):
for key, val in filters.items():
if isinstance(val, (list, tuple)):
filter_list.append([doctype, key, val[0], val[1]])
else:
filter_list.append([doctype, key, "=", val])
elif isinstance(filters, list):
filter_list.extend(filters)
if "is_group" not in [d[1] for d in filter_list]:
filter_list.append(["Account", "is_group", "=", "0"])
if searchfield and txt:
filter_list.append([doctype, searchfield, "like", "%%%s%%" % txt])
return frappe.desk.reportview.execute(
doctype,
filters=filter_list,
fields=["name", "parent_account"],
limit_start=start,
limit_page_length=page_len,
as_list=True,
)
@frappe.whitelist()
@frappe.validate_and_sanitize_search_inputs
def get_blanket_orders(doctype, txt, searchfield, start, page_len, filters):
return frappe.db.sql(
"""select distinct bo.name, bo.blanket_order_type, bo.to_date
from `tabBlanket Order` bo, `tabBlanket Order Item` boi
where
boi.parent = bo.name
and boi.item_code = {item_code}
and bo.blanket_order_type = '{blanket_order_type}'
and bo.company = {company}
and bo.docstatus = 1""".format(
item_code=frappe.db.escape(filters.get("item")),
blanket_order_type=filters.get("blanket_order_type"),
company=frappe.db.escape(filters.get("company")),
)
)
@frappe.whitelist()
@frappe.validate_and_sanitize_search_inputs
def get_income_account(doctype, txt, searchfield, start, page_len, filters):
from erpnext.controllers.queries import get_match_cond
# income account can be any Credit account,
# but can also be a Asset account with account_type='Income Account' in special circumstances.
# Hence the first condition is an "OR"
if not filters:
filters = {}
doctype = "Account"
condition = ""
if filters.get("company"):
condition += "and tabAccount.company = %(company)s"
condition += f"and tabAccount.disabled = {filters.get('disabled', 0)}"
return frappe.db.sql(
"""select tabAccount.name from `tabAccount`
where (tabAccount.report_type = "Profit and Loss"
or tabAccount.account_type in ("Income Account", "Temporary"))
and tabAccount.is_group=0
and tabAccount.`{key}` LIKE %(txt)s
{condition} {match_condition}
order by idx desc, name""".format(
condition=condition, match_condition=get_match_cond(doctype), key=searchfield
),
{"txt": "%" + txt + "%", "company": filters.get("company", "")},
)
@frappe.whitelist()
@frappe.validate_and_sanitize_search_inputs
def get_filtered_dimensions(
doctype, txt, searchfield, start, page_len, filters, reference_doctype=None
):
from erpnext.accounts.doctype.accounting_dimension_filter.accounting_dimension_filter import (
get_dimension_filter_map,
)
dimension_filters = get_dimension_filter_map()
dimension_filters = dimension_filters.get((filters.get("dimension"), filters.get("account")))
query_filters = []
or_filters = []
fields = ["name"]
searchfields = frappe.get_meta(doctype).get_search_fields()
meta = frappe.get_meta(doctype)
if meta.is_tree:
query_filters.append(["is_group", "=", 0])
if meta.has_field("disabled"):
query_filters.append(["disabled", "!=", 1])
if meta.has_field("company"):
query_filters.append(["company", "=", filters.get("company")])
for field in searchfields:
or_filters.append([field, "LIKE", "%%%s%%" % txt])
fields.append(field)
if dimension_filters:
if dimension_filters["allow_or_restrict"] == "Allow":
query_selector = "in"
else:
query_selector = "not in"
if len(dimension_filters["allowed_dimensions"]) == 1:
dimensions = tuple(dimension_filters["allowed_dimensions"] * 2)
else:
dimensions = tuple(dimension_filters["allowed_dimensions"])
query_filters.append(["name", query_selector, dimensions])
output = frappe.get_list(
doctype,
fields=fields,
filters=query_filters,
or_filters=or_filters,
as_list=1,
reference_doctype=reference_doctype,
)
return [tuple(d) for d in set(output)]
@frappe.whitelist()
@frappe.validate_and_sanitize_search_inputs
def get_expense_account(doctype, txt, searchfield, start, page_len, filters):
from erpnext.controllers.queries import get_match_cond
if not filters:
filters = {}
doctype = "Account"
condition = ""
if filters.get("company"):
condition += "and tabAccount.company = %(company)s"
return frappe.db.sql(
"""select tabAccount.name from `tabAccount`
where (tabAccount.report_type = "Profit and Loss"
or tabAccount.account_type in ("Expense Account", "Fixed Asset", "Temporary", "Asset Received But Not Billed", "Capital Work in Progress"))
and tabAccount.is_group=0
and tabAccount.docstatus!=2
and tabAccount.{key} LIKE %(txt)s
{condition} {match_condition}""".format(
condition=condition, key=searchfield, match_condition=get_match_cond(doctype)
),
{"company": filters.get("company", ""), "txt": "%" + txt + "%"},
)
@frappe.whitelist()
@frappe.validate_and_sanitize_search_inputs
def warehouse_query(doctype, txt, searchfield, start, page_len, filters):
# Should be used when item code is passed in filters.
doctype = "Warehouse"
conditions, bin_conditions = [], []
filter_dict = get_doctype_wise_filters(filters)
query = """select `tabWarehouse`.name,
CONCAT_WS(' : ', 'Actual Qty', ifnull(round(`tabBin`.actual_qty, 2), 0 )) actual_qty
from `tabWarehouse` left join `tabBin`
on `tabBin`.warehouse = `tabWarehouse`.name {bin_conditions}
where
`tabWarehouse`.`{key}` like {txt}
{fcond} {mcond}
order by ifnull(`tabBin`.actual_qty, 0) desc
limit
{page_len} offset {start}
""".format(
bin_conditions=get_filters_cond(
doctype, filter_dict.get("Bin"), bin_conditions, ignore_permissions=True
),
key=searchfield,
fcond=get_filters_cond(doctype, filter_dict.get("Warehouse"), conditions),
mcond=get_match_cond(doctype),
start=start,
page_len=page_len,
txt=frappe.db.escape("%{0}%".format(txt)),
)
return frappe.db.sql(query)
def get_doctype_wise_filters(filters):
# Helper function to seperate filters doctype_wise
filter_dict = defaultdict(list)
for row in filters:
filter_dict[row[0]].append(row)
return filter_dict
@frappe.whitelist()
@frappe.validate_and_sanitize_search_inputs
def get_batch_numbers(doctype, txt, searchfield, start, page_len, filters):
query = """select batch_id from `tabBatch`
where disabled = 0
and (expiry_date >= CURRENT_DATE or expiry_date IS NULL)
and name like {txt}""".format(
txt=frappe.db.escape("%{0}%".format(txt))
)
if filters and filters.get("item"):
query += " and item = {item}".format(item=frappe.db.escape(filters.get("item")))
return frappe.db.sql(query, filters)
@frappe.whitelist()
@frappe.validate_and_sanitize_search_inputs
def item_manufacturer_query(doctype, txt, searchfield, start, page_len, filters):
item_filters = [
["manufacturer", "like", "%" + txt + "%"],
["item_code", "=", filters.get("item_code")],
]
item_manufacturers = frappe.get_all(
"Item Manufacturer",
fields=["manufacturer", "manufacturer_part_no"],
filters=item_filters,
limit_start=start,
limit_page_length=page_len,
as_list=1,
)
return item_manufacturers
@frappe.whitelist()
@frappe.validate_and_sanitize_search_inputs
def get_purchase_receipts(doctype, txt, searchfield, start, page_len, filters):
query = """
select pr.name
from `tabPurchase Receipt` pr, `tabPurchase Receipt Item` pritem
where pr.docstatus = 1 and pritem.parent = pr.name
and pr.name like {txt}""".format(
txt=frappe.db.escape("%{0}%".format(txt))
)
if filters and filters.get("item_code"):
query += " and pritem.item_code = {item_code}".format(
item_code=frappe.db.escape(filters.get("item_code"))
)
return frappe.db.sql(query, filters)
@frappe.whitelist()
@frappe.validate_and_sanitize_search_inputs
def get_purchase_invoices(doctype, txt, searchfield, start, page_len, filters):
query = """
select pi.name
from `tabPurchase Invoice` pi, `tabPurchase Invoice Item` piitem
where pi.docstatus = 1 and piitem.parent = pi.name
and pi.name like {txt}""".format(
txt=frappe.db.escape("%{0}%".format(txt))
)
if filters and filters.get("item_code"):
query += " and piitem.item_code = {item_code}".format(
item_code=frappe.db.escape(filters.get("item_code"))
)
return frappe.db.sql(query, filters)
@frappe.whitelist()
@frappe.validate_and_sanitize_search_inputs
def get_doctypes_for_closing(doctype, txt, searchfield, start, page_len, filters):
doctypes = frappe.get_hooks("period_closing_doctypes")
if txt:
doctypes = [d for d in doctypes if txt.lower() in d.lower()]
return [(d,) for d in set(doctypes)]
@frappe.whitelist()
@frappe.validate_and_sanitize_search_inputs
def get_tax_template(doctype, txt, searchfield, start, page_len, filters):
item_doc = frappe.get_cached_doc("Item", filters.get("item_code"))
item_group = filters.get("item_group")
company = filters.get("company")
taxes = item_doc.taxes or []
while item_group:
item_group_doc = frappe.get_cached_doc("Item Group", item_group)
taxes += item_group_doc.taxes or []
item_group = item_group_doc.parent_item_group
if not taxes:
return frappe.get_all(
"Item Tax Template", filters={"disabled": 0, "company": company}, as_list=True
)
else:
valid_from = filters.get("valid_from")
valid_from = valid_from[1] if isinstance(valid_from, list) else valid_from
args = {
"item_code": filters.get("item_code"),
"posting_date": valid_from,
"tax_category": filters.get("tax_category"),
"company": company,
}
taxes = _get_item_tax_template(args, taxes, for_validate=True)
return [(d,) for d in set(taxes)]
def get_fields(doctype, fields=None):
if fields is None:
fields = []
meta = frappe.get_meta(doctype)
fields.extend(meta.get_search_fields())
if meta.title_field and meta.title_field.strip() not in fields:
fields.insert(1, meta.title_field.strip())
return unique(fields)
@frappe.whitelist()
@frappe.validate_and_sanitize_search_inputs
def get_payment_terms_for_references(doctype, txt, searchfield, start, page_len, filters) -> list:
terms = []
if filters:
terms = frappe.db.get_all(
"Payment Schedule",
filters={"parent": filters.get("reference")},
fields=["payment_term"],
limit=page_len,
as_list=1,
)
return terms
@frappe.whitelist()
@frappe.validate_and_sanitize_search_inputs
def get_filtered_child_rows(doctype, txt, searchfield, start, page_len, filters) -> list:
table = frappe.qb.DocType(doctype)
query = (
frappe.qb.from_(table)
.select(
table.name,
Concat("#", table.idx, ", ", table.item_code),
)
.orderby(table.idx)
.offset(start)
.limit(page_len)
)
if filters:
for field, value in filters.items():
query = query.where(table[field] == value)
if txt:
txt += "%"
query = query.where(
((table.idx.like(txt.replace("#", ""))) | (table.item_code.like(txt))) | (table.name.like(txt))
)
return query.run(as_dict=False)