refactor: rewrite `Product Bundle Balance Report` queries in `QB`
diff --git a/erpnext/stock/report/product_bundle_balance/product_bundle_balance.py b/erpnext/stock/report/product_bundle_balance/product_bundle_balance.py
index 854875a..9e75201 100644
--- a/erpnext/stock/report/product_bundle_balance/product_bundle_balance.py
+++ b/erpnext/stock/report/product_bundle_balance/product_bundle_balance.py
@@ -4,7 +4,9 @@
import frappe
from frappe import _
+from frappe.query_builder.functions import IfNull
from frappe.utils import flt
+from pypika.terms import ExistsCriterion
from erpnext.stock.report.stock_ledger.stock_ledger import get_item_group_condition
@@ -123,43 +125,65 @@
pb_details = frappe._dict()
item_details = frappe._dict()
- conditions = get_parent_item_conditions(filters)
- parent_item_details = frappe.db.sql(
- """
- select item.name as item_code, item.item_name, pb.description, item.item_group, item.brand, item.stock_uom
- from `tabItem` item
- inner join `tabProduct Bundle` pb on pb.new_item_code = item.name
- where ifnull(item.disabled, 0) = 0 {0}
- """.format(
- conditions
- ),
- filters,
- as_dict=1,
- ) # nosec
+ item = frappe.qb.DocType("Item")
+ pb = frappe.qb.DocType("Product Bundle")
+
+ query = (
+ frappe.qb.from_(item)
+ .inner_join(pb)
+ .on(pb.new_item_code == item.name)
+ .select(
+ item.name.as_("item_code"),
+ item.item_name,
+ pb.description,
+ item.item_group,
+ item.brand,
+ item.stock_uom,
+ )
+ .where(IfNull(item.disabled, 0) == 0)
+ )
+
+ if item_code := filters.get("item_code"):
+ query = query.where(item.item_code == item_code)
+ else:
+ if brand := filters.get("brand"):
+ query = query.where(item.brand == brand)
+ if item_group := filters.get("item_group"):
+ if conditions := get_item_group_condition(item_group, item):
+ query = query.where(conditions)
+
+ parent_item_details = query.run(as_dict=True)
parent_items = []
for d in parent_item_details:
parent_items.append(d.item_code)
item_details[d.item_code] = d
+ child_item_details = []
if parent_items:
- child_item_details = frappe.db.sql(
- """
- select
- pb.new_item_code as parent_item, pbi.item_code, item.item_name, pbi.description, item.item_group, item.brand,
- item.stock_uom, pbi.uom, pbi.qty
- from `tabProduct Bundle Item` pbi
- inner join `tabProduct Bundle` pb on pb.name = pbi.parent
- inner join `tabItem` item on item.name = pbi.item_code
- where pb.new_item_code in ({0})
- """.format(
- ", ".join(["%s"] * len(parent_items))
- ),
- parent_items,
- as_dict=1,
- ) # nosec
- else:
- child_item_details = []
+ item = frappe.qb.DocType("Item")
+ pb = frappe.qb.DocType("Product Bundle")
+ pbi = frappe.qb.DocType("Product Bundle Item")
+
+ child_item_details = (
+ frappe.qb.from_(pbi)
+ .inner_join(pb)
+ .on(pb.name == pbi.parent)
+ .inner_join(item)
+ .on(item.name == pbi.item_code)
+ .select(
+ pb.new_item_code.as_("parent_item"),
+ pbi.item_code,
+ item.item_name,
+ pbi.description,
+ item.item_group,
+ item.brand,
+ item.stock_uom,
+ pbi.uom,
+ pbi.qty,
+ )
+ .where(pb.new_item_code.isin(parent_items))
+ ).run(as_dict=1)
child_items = set()
for d in child_item_details:
@@ -184,58 +208,42 @@
if not items:
return []
- item_conditions_sql = " and sle.item_code in ({})".format(
- ", ".join(frappe.db.escape(i) for i in items)
+ sle = frappe.qb.DocType("Stock Ledger Entry")
+ sle2 = frappe.qb.DocType("Stock Ledger Entry")
+
+ query = (
+ frappe.qb.from_(sle)
+ .force_index("posting_sort_index")
+ .left_join(sle2)
+ .on(
+ (sle.item_code == sle2.item_code)
+ & (sle.warehouse == sle2.warehouse)
+ & (sle.posting_date < sle2.posting_date)
+ & (sle.posting_time < sle2.posting_time)
+ & (sle.name < sle2.name)
+ )
+ .select(sle.item_code, sle.warehouse, sle.qty_after_transaction, sle.company)
+ .where((sle2.name.isnull()) & (sle.docstatus < 2) & (sle.item_code.isin(items)))
)
- conditions = get_sle_conditions(filters)
-
- return frappe.db.sql(
- """
- select
- sle.item_code, sle.warehouse, sle.qty_after_transaction, sle.company
- from
- `tabStock Ledger Entry` sle force index (posting_sort_index)
- left join `tabStock Ledger Entry` sle2 on
- sle.item_code = sle2.item_code and sle.warehouse = sle2.warehouse
- and (sle.posting_date, sle.posting_time, sle.name) < (sle2.posting_date, sle2.posting_time, sle2.name)
- where sle2.name is null and sle.docstatus < 2 %s %s"""
- % (item_conditions_sql, conditions),
- as_dict=1,
- ) # nosec
-
-
-def get_parent_item_conditions(filters):
- conditions = []
-
- if filters.get("item_code"):
- conditions.append("item.item_code = %(item_code)s")
+ if date := filters.get("date"):
+ query = query.where(sle.posting_date <= date)
else:
- if filters.get("brand"):
- conditions.append("item.brand=%(brand)s")
- if filters.get("item_group"):
- conditions.append(get_item_group_condition(filters.get("item_group")))
-
- conditions = " and ".join(conditions)
- return "and {0}".format(conditions) if conditions else ""
-
-
-def get_sle_conditions(filters):
- conditions = ""
- if not filters.get("date"):
frappe.throw(_("'Date' is required"))
- conditions += " and sle.posting_date <= %s" % frappe.db.escape(filters.get("date"))
-
if filters.get("warehouse"):
warehouse_details = frappe.db.get_value(
"Warehouse", filters.get("warehouse"), ["lft", "rgt"], as_dict=1
)
- if warehouse_details:
- conditions += (
- " and exists (select name from `tabWarehouse` wh \
- where wh.lft >= %s and wh.rgt <= %s and sle.warehouse = wh.name)"
- % (warehouse_details.lft, warehouse_details.rgt)
- ) # nosec
- return conditions
+ if warehouse_details:
+ wh = frappe.qb.DocType("Warehouse")
+ query = query.where(
+ ExistsCriterion(
+ frappe.qb.from_(wh)
+ .select(wh.name)
+ .where((wh.lft >= warehouse_details.lft) & (wh.rgt <= warehouse_details.rgt))
+ )
+ )
+
+ return query.run(as_dict=True)