fix: DB update child items, remove redundancy, fix perf
- Move `get_boms_in_bottom_up_order` in bom update tool’s file
- Remove repeated rm cost update from `update_cost`. `calculate_cost` handles RM cost update
- db_update children in `calculate_cost` optionally
- Don’t call `update_exploded_items` and regenerate exploded items in `update_cost`. They will stay the same (except cost)
diff --git a/erpnext/manufacturing/doctype/bom/bom.py b/erpnext/manufacturing/doctype/bom/bom.py
index a828869..047bcc5 100644
--- a/erpnext/manufacturing/doctype/bom/bom.py
+++ b/erpnext/manufacturing/doctype/bom/bom.py
@@ -3,9 +3,9 @@
import functools
import re
-from collections import defaultdict, deque
+from collections import deque
from operator import itemgetter
-from typing import List, Optional
+from typing import List
import frappe
from frappe import _
@@ -383,35 +383,9 @@
existing_bom_cost = self.total_cost
- for d in self.get("items"):
- if not d.item_code:
- continue
-
- rate = self.get_rm_rate(
- {
- "company": self.company,
- "item_code": d.item_code,
- "bom_no": d.bom_no,
- "qty": d.qty,
- "uom": d.uom,
- "stock_uom": d.stock_uom,
- "conversion_factor": d.conversion_factor,
- "sourced_by_supplier": d.sourced_by_supplier,
- }
- )
-
- if rate:
- d.rate = rate
- d.amount = flt(d.rate) * flt(d.qty)
- d.base_rate = flt(d.rate) * flt(self.conversion_rate)
- d.base_amount = flt(d.amount) * flt(self.conversion_rate)
-
- if save:
- d.db_update()
-
if self.docstatus == 1:
self.flags.ignore_validate_update_after_submit = True
- self.calculate_cost(update_hour_rate)
+ self.calculate_cost(save_updates=save, update_hour_rate=update_hour_rate)
if save:
self.db_update()
@@ -613,11 +587,11 @@
bom_list.reverse()
return bom_list
- def calculate_cost(self, update_hour_rate=False):
+ def calculate_cost(self, save_update=False, update_hour_rate=False):
"""Calculate bom totals"""
self.calculate_op_cost(update_hour_rate)
- self.calculate_rm_cost()
- self.calculate_sm_cost()
+ self.calculate_rm_cost(save=save_update)
+ self.calculate_sm_cost(save=save_update)
self.total_cost = self.operating_cost + self.raw_material_cost - self.scrap_material_cost
self.base_total_cost = (
self.base_operating_cost + self.base_raw_material_cost - self.base_scrap_material_cost
@@ -659,7 +633,7 @@
if update_hour_rate:
row.db_update()
- def calculate_rm_cost(self):
+ def calculate_rm_cost(self, save=False):
"""Fetch RM rate as per today's valuation rate and calculate totals"""
total_rm_cost = 0
base_total_rm_cost = 0
@@ -674,11 +648,13 @@
total_rm_cost += d.amount
base_total_rm_cost += d.base_amount
+ if save:
+ d.db_update()
self.raw_material_cost = total_rm_cost
self.base_raw_material_cost = base_total_rm_cost
- def calculate_sm_cost(self):
+ def calculate_sm_cost(self, save=False):
"""Fetch RM rate as per today's valuation rate and calculate totals"""
total_sm_cost = 0
base_total_sm_cost = 0
@@ -693,6 +669,8 @@
)
total_sm_cost += d.amount
base_total_sm_cost += d.base_amount
+ if save:
+ d.db_update()
self.scrap_material_cost = total_sm_cost
self.base_scrap_material_cost = base_total_sm_cost
@@ -1130,81 +1108,6 @@
return bom_items
-def get_boms_in_bottom_up_order(bom_no: Optional[str] = None) -> List:
- def _generate_child_parent_map():
- bom = frappe.qb.DocType("BOM")
- bom_item = frappe.qb.DocType("BOM Item")
-
- bom_parents = (
- frappe.qb.from_(bom_item)
- .join(bom)
- .on(bom_item.parent == bom.name)
- .select(bom_item.bom_no, bom_item.parent)
- .where(
- (bom_item.bom_no.isnotnull())
- & (bom_item.bom_no != "")
- & (bom.docstatus == 1)
- & (bom.is_active == 1)
- & (bom_item.parenttype == "BOM")
- )
- ).run(as_dict=True)
-
- child_parent_map = defaultdict(list)
- for bom in bom_parents:
- child_parent_map[bom.bom_no].append(bom.parent)
-
- return child_parent_map
-
- def _get_flat_parent_map(leaf, child_parent_map):
- parents_list = []
-
- def _get_parents(node, parents_list):
- "Returns updated ancestors list."
- first_parents = child_parent_map.get(node) # immediate parents of node
- if not first_parents: # top most node
- return parents_list
-
- parents_list.extend(first_parents)
- parents_list = list(dict.fromkeys(parents_list).keys()) # remove duplicates
-
- for nth_node in first_parents:
- # recursively find parents
- parents_list = _get_parents(nth_node, parents_list)
-
- return parents_list
-
- parents_list = _get_parents(leaf, parents_list)
- return parents_list
-
- def _get_leaf_boms():
- return frappe.db.sql_list(
- """select name from `tabBOM` bom
- where docstatus=1 and is_active=1
- and not exists(select bom_no from `tabBOM Item`
- where parent=bom.name and ifnull(bom_no, '')!='')"""
- )
-
- bom_list = []
- if bom_no:
- bom_list.append(bom_no)
- else:
- bom_list = _get_leaf_boms()
-
- child_parent_map = _generate_child_parent_map()
-
- for leaf_bom in bom_list:
- # generate list recursively bottom to top
- parent_list = _get_flat_parent_map(leaf_bom, child_parent_map)
-
- if not parent_list:
- continue
-
- bom_list.extend(parent_list)
- bom_list = list(dict.fromkeys(bom_list).keys()) # remove duplicates
-
- return bom_list
-
-
def add_additional_cost(stock_entry, work_order):
# Add non stock items cost in the additional cost
stock_entry.additional_costs = []
diff --git a/erpnext/manufacturing/doctype/bom_update_tool/bom_update_tool.py b/erpnext/manufacturing/doctype/bom_update_tool/bom_update_tool.py
index b0e7da1..5b073b7 100644
--- a/erpnext/manufacturing/doctype/bom_update_tool/bom_update_tool.py
+++ b/erpnext/manufacturing/doctype/bom_update_tool/bom_update_tool.py
@@ -2,7 +2,8 @@
# For license information, please see license.txt
import json
-from typing import TYPE_CHECKING, Dict, Literal, Optional, Union
+from collections import defaultdict
+from typing import TYPE_CHECKING, Dict, List, Literal, Optional, Union
if TYPE_CHECKING:
from erpnext.manufacturing.doctype.bom_update_log.bom_update_log import BOMUpdateLog
@@ -10,8 +11,6 @@
import frappe
from frappe.model.document import Document
-from erpnext.manufacturing.doctype.bom.bom import get_boms_in_bottom_up_order
-
class BOMUpdateTool(Document):
pass
@@ -47,7 +46,10 @@
"""Updates Cost for all BOMs from bottom to top."""
bom_list = get_boms_in_bottom_up_order()
for bom in bom_list:
- frappe.get_doc("BOM", bom).update_cost(update_parent=False, from_child_bom=True)
+ bom_doc = frappe.get_doc("BOM", bom)
+ bom_doc.calculate_cost(save_updates=True, update_hour_rate=True)
+ # bom_doc.update_exploded_items(save=True) #TODO: edit exploded items rate
+ bom_doc.db_update()
def create_bom_update_log(
@@ -67,3 +69,90 @@
"update_type": update_type,
}
).submit()
+
+
+def get_boms_in_bottom_up_order(bom_no: Optional[str] = None) -> List:
+ """
+ Eg: Main BOM
+ |- Sub BOM 1
+ |- Leaf BOM 1
+ |- Sub BOM 2
+ |- Leaf BOM 2
+ Result: [Leaf BOM 1, Leaf BOM 2, Sub BOM 1, Sub BOM 2, Main BOM]
+ """
+ leaf_boms = []
+ if bom_no:
+ leaf_boms.append(bom_no)
+ else:
+ leaf_boms = _get_leaf_boms()
+
+ child_parent_map = _generate_child_parent_map()
+ bom_list = leaf_boms.copy()
+
+ for leaf_bom in leaf_boms:
+ parent_list = _get_flat_parent_map(leaf_bom, child_parent_map)
+
+ if not parent_list:
+ continue
+
+ bom_list.extend(parent_list)
+ bom_list = list(dict.fromkeys(bom_list).keys()) # remove duplicates
+
+ return bom_list
+
+
+def _generate_child_parent_map():
+ bom = frappe.qb.DocType("BOM")
+ bom_item = frappe.qb.DocType("BOM Item")
+
+ bom_parents = (
+ frappe.qb.from_(bom_item)
+ .join(bom)
+ .on(bom_item.parent == bom.name)
+ .select(bom_item.bom_no, bom_item.parent)
+ .where(
+ (bom_item.bom_no.isnotnull())
+ & (bom_item.bom_no != "")
+ & (bom.docstatus == 1)
+ & (bom.is_active == 1)
+ & (bom_item.parenttype == "BOM")
+ )
+ ).run(as_dict=True)
+
+ child_parent_map = defaultdict(list)
+ for bom in bom_parents:
+ child_parent_map[bom.bom_no].append(bom.parent)
+
+ return child_parent_map
+
+
+def _get_flat_parent_map(leaf, child_parent_map):
+ "Get ancestors at all levels of a leaf BOM."
+ parents_list = []
+
+ def _get_parents(node, parents_list):
+ "Returns recursively updated ancestors list."
+ first_parents = child_parent_map.get(node) # immediate parents of node
+ if not first_parents: # top most node
+ return parents_list
+
+ parents_list.extend(first_parents)
+ parents_list = list(dict.fromkeys(parents_list).keys()) # remove duplicates
+
+ for nth_node in first_parents:
+ # recursively find parents
+ parents_list = _get_parents(nth_node, parents_list)
+
+ return parents_list
+
+ parents_list = _get_parents(leaf, parents_list)
+ return parents_list
+
+
+def _get_leaf_boms():
+ return frappe.db.sql_list(
+ """select name from `tabBOM` bom
+ where docstatus=1 and is_active=1
+ and not exists(select bom_no from `tabBOM Item`
+ where parent=bom.name and ifnull(bom_no, '')!='')"""
+ )