Merge branch 'develop' into perf-bom-update-tool
diff --git a/erpnext/hooks.py b/erpnext/hooks.py
index 1c4bbbc..7d7f65d 100644
--- a/erpnext/hooks.py
+++ b/erpnext/hooks.py
@@ -392,9 +392,12 @@
scheduler_events = {
"cron": {
+ "0/5 * * * *": [
+ "erpnext.manufacturing.doctype.bom_update_log.bom_update_log.resume_bom_cost_update_jobs",
+ ],
"0/30 * * * *": [
"erpnext.utilities.doctype.video.video.update_youtube_data",
- ]
+ ],
},
"all": [
"erpnext.projects.doctype.project.project.project_status_update_reminder",
diff --git a/erpnext/manufacturing/doctype/bom/bom.py b/erpnext/manufacturing/doctype/bom/bom.py
index 6376359..3e2a2d1 100644
--- a/erpnext/manufacturing/doctype/bom/bom.py
+++ b/erpnext/manufacturing/doctype/bom/bom.py
@@ -1,11 +1,11 @@
-# Copyright (c) 2015, Frappe Technologies Pvt. Ltd. and Contributors
+# Copyright (c) 2022, Frappe Technologies Pvt. Ltd. and Contributors
# License: GNU General Public License v3. See license.txt
import functools
import re
from collections import deque
from operator import itemgetter
-from typing import List
+from typing import Dict, List
import frappe
from frappe import _
@@ -189,6 +189,7 @@
self.validate_transfer_against()
self.set_routing_operations()
self.validate_operations()
+ self.update_exploded_items(save=False)
self.calculate_cost()
self.update_stock_qty()
self.update_cost(update_parent=False, from_child_bom=True, update_hour_rate=False, save=False)
@@ -386,40 +387,14 @@
existing_bom_cost = self.total_cost
- for d in self.get("items"):
- if not d.item_code:
- continue
-
- rate = self.get_rm_rate(
- {
- "company": self.company,
- "item_code": d.item_code,
- "bom_no": d.bom_no,
- "qty": d.qty,
- "uom": d.uom,
- "stock_uom": d.stock_uom,
- "conversion_factor": d.conversion_factor,
- "sourced_by_supplier": d.sourced_by_supplier,
- }
- )
-
- if rate:
- d.rate = rate
- d.amount = flt(d.rate) * flt(d.qty)
- d.base_rate = flt(d.rate) * flt(self.conversion_rate)
- d.base_amount = flt(d.amount) * flt(self.conversion_rate)
-
- if save:
- d.db_update()
-
if self.docstatus == 1:
self.flags.ignore_validate_update_after_submit = True
- self.calculate_cost(update_hour_rate)
+
+ self.calculate_cost(save_updates=save, update_hour_rate=update_hour_rate)
+
if save:
self.db_update()
- self.update_exploded_items(save=save)
-
# update parent BOMs
if self.total_cost != existing_bom_cost and update_parent:
parent_boms = frappe.db.sql_list(
@@ -608,11 +583,15 @@
bom_list.reverse()
return bom_list
- def calculate_cost(self, update_hour_rate=False):
+ def calculate_cost(self, save_updates=False, update_hour_rate=False):
"""Calculate bom totals"""
self.calculate_op_cost(update_hour_rate)
- self.calculate_rm_cost()
- self.calculate_sm_cost()
+ self.calculate_rm_cost(save=save_updates)
+ self.calculate_sm_cost(save=save_updates)
+ if save_updates:
+ # not via doc event, table is not regenerated and needs updation
+ self.calculate_exploded_cost()
+
self.total_cost = self.operating_cost + self.raw_material_cost - self.scrap_material_cost
self.base_total_cost = (
self.base_operating_cost + self.base_raw_material_cost - self.base_scrap_material_cost
@@ -654,12 +633,26 @@
if update_hour_rate:
row.db_update()
- def calculate_rm_cost(self):
+ def calculate_rm_cost(self, save=False):
"""Fetch RM rate as per today's valuation rate and calculate totals"""
total_rm_cost = 0
base_total_rm_cost = 0
for d in self.get("items"):
+ old_rate = d.rate
+ d.rate = self.get_rm_rate(
+ {
+ "company": self.company,
+ "item_code": d.item_code,
+ "bom_no": d.bom_no,
+ "qty": d.qty,
+ "uom": d.uom,
+ "stock_uom": d.stock_uom,
+ "conversion_factor": d.conversion_factor,
+ "sourced_by_supplier": d.sourced_by_supplier,
+ }
+ )
+
d.base_rate = flt(d.rate) * flt(self.conversion_rate)
d.amount = flt(d.rate, d.precision("rate")) * flt(d.qty, d.precision("qty"))
d.base_amount = d.amount * flt(self.conversion_rate)
@@ -669,11 +662,13 @@
total_rm_cost += d.amount
base_total_rm_cost += d.base_amount
+ if save and (old_rate != d.rate):
+ d.db_update()
self.raw_material_cost = total_rm_cost
self.base_raw_material_cost = base_total_rm_cost
- def calculate_sm_cost(self):
+ def calculate_sm_cost(self, save=False):
"""Fetch RM rate as per today's valuation rate and calculate totals"""
total_sm_cost = 0
base_total_sm_cost = 0
@@ -688,10 +683,45 @@
)
total_sm_cost += d.amount
base_total_sm_cost += d.base_amount
+ if save:
+ d.db_update()
self.scrap_material_cost = total_sm_cost
self.base_scrap_material_cost = base_total_sm_cost
+ def calculate_exploded_cost(self):
+ "Set exploded row cost from it's parent BOM."
+ rm_rate_map = self.get_rm_rate_map()
+
+ for row in self.get("exploded_items"):
+ old_rate = flt(row.rate)
+ row.rate = rm_rate_map.get(row.item_code)
+ row.amount = flt(row.stock_qty) * flt(row.rate)
+
+ if old_rate != row.rate:
+ # Only db_update if changed
+ row.db_update()
+
+ def get_rm_rate_map(self) -> Dict[str, float]:
+ "Create Raw Material-Rate map for Exploded Items. Fetch rate from Items table or Subassembly BOM."
+ rm_rate_map = {}
+
+ for item in self.get("items"):
+ if item.bom_no:
+ # Get Item-Rate from Subassembly BOM
+ explosion_items = frappe.get_all(
+ "BOM Explosion Item",
+ filters={"parent": item.bom_no},
+ fields=["item_code", "rate"],
+ order_by=None, # to avoid sort index creation at db level (granular change)
+ )
+ explosion_item_rate = {item.item_code: flt(item.rate) for item in explosion_items}
+ rm_rate_map.update(explosion_item_rate)
+ else:
+ rm_rate_map[item.item_code] = flt(item.base_rate) / flt(item.conversion_factor or 1.0)
+
+ return rm_rate_map
+
def update_exploded_items(self, save=True):
"""Update Flat BOM, following will be correct data"""
self.get_exploded_items()
@@ -903,13 +933,17 @@
def get_valuation_rate(args):
- """Get weighted average of valuation rate from all warehouses"""
+ """
+ 1) Get average valuation rate from all warehouses
+ 2) If no value, get last valuation rate from SLE
+ 3) If no value, get valuation rate from Item
+ """
- total_qty, total_value, valuation_rate = 0.0, 0.0, 0.0
- item_bins = frappe.db.sql(
+ valuation_rate = 0.0
+ item_valuation = frappe.db.sql(
"""
select
- bin.actual_qty, bin.stock_value
+ (sum(bin.stock_value) / sum(bin.actual_qty)) as valuation_rate
from
`tabBin` bin, `tabWarehouse` warehouse
where
@@ -918,16 +952,12 @@
and warehouse.company=%(company)s""",
{"item": args["item_code"], "company": args["company"]},
as_dict=1,
- )
+ )[0]
- for d in item_bins:
- total_qty += flt(d.actual_qty)
- total_value += flt(d.stock_value)
+ valuation_rate = item_valuation.get("valuation_rate")
- if total_qty:
- valuation_rate = total_value / total_qty
-
- if valuation_rate <= 0:
+ if (valuation_rate is not None) and valuation_rate <= 0:
+ # Explicit null value check. If None, Bins don't exist, neither does SLE
last_valuation_rate = frappe.db.sql(
"""select valuation_rate
from `tabStock Ledger Entry`
@@ -1125,39 +1155,6 @@
return bom_items
-def get_boms_in_bottom_up_order(bom_no=None):
- def _get_parent(bom_no):
- return frappe.db.sql_list(
- """
- select distinct bom_item.parent from `tabBOM Item` bom_item
- where bom_item.bom_no = %s and bom_item.docstatus=1 and bom_item.parenttype='BOM'
- and exists(select bom.name from `tabBOM` bom where bom.name=bom_item.parent and bom.is_active=1)
- """,
- bom_no,
- )
-
- count = 0
- bom_list = []
- if bom_no:
- bom_list.append(bom_no)
- else:
- # get all leaf BOMs
- bom_list = frappe.db.sql_list(
- """select name from `tabBOM` bom
- where docstatus=1 and is_active=1
- and not exists(select bom_no from `tabBOM Item`
- where parent=bom.name and ifnull(bom_no, '')!='')"""
- )
-
- while count < len(bom_list):
- for child_bom in _get_parent(bom_list[count]):
- if child_bom not in bom_list:
- bom_list.append(child_bom)
- count += 1
-
- return bom_list
-
-
def add_additional_cost(stock_entry, work_order):
# Add non stock items cost in the additional cost
stock_entry.additional_costs = []
diff --git a/erpnext/manufacturing/doctype/bom/test_bom.py b/erpnext/manufacturing/doctype/bom/test_bom.py
index f235e44..f2731ec 100644
--- a/erpnext/manufacturing/doctype/bom/test_bom.py
+++ b/erpnext/manufacturing/doctype/bom/test_bom.py
@@ -11,7 +11,9 @@
from erpnext.buying.doctype.purchase_order.test_purchase_order import create_purchase_order
from erpnext.manufacturing.doctype.bom.bom import BOMRecursionError, item_query, make_variant_bom
-from erpnext.manufacturing.doctype.bom_update_tool.bom_update_tool import update_cost
+from erpnext.manufacturing.doctype.bom_update_log.test_bom_update_log import (
+ update_cost_in_all_boms_in_test,
+)
from erpnext.stock.doctype.item.test_item import make_item
from erpnext.stock.doctype.stock_reconciliation.test_stock_reconciliation import (
create_stock_reconciliation,
@@ -80,7 +82,7 @@
reset_item_valuation_rate(item_code="_Test Item 2", qty=200, rate=rm_rate + 10)
# update cost of all BOMs based on latest valuation rate
- update_cost()
+ update_cost_in_all_boms_in_test()
# check if new valuation rate updated in all BOMs
for d in frappe.db.sql(
diff --git a/erpnext/manufacturing/doctype/bom/test_records.json b/erpnext/manufacturing/doctype/bom/test_records.json
index 25730f9..507d319 100644
--- a/erpnext/manufacturing/doctype/bom/test_records.json
+++ b/erpnext/manufacturing/doctype/bom/test_records.json
@@ -32,6 +32,7 @@
"is_active": 1,
"is_default": 1,
"item": "_Test Item Home Desktop Manufactured",
+ "company": "_Test Company",
"quantity": 1.0
},
{
diff --git a/erpnext/manufacturing/doctype/bom_explosion_item/bom_explosion_item.json b/erpnext/manufacturing/doctype/bom_explosion_item/bom_explosion_item.json
index f01d856..9b1db63 100644
--- a/erpnext/manufacturing/doctype/bom_explosion_item/bom_explosion_item.json
+++ b/erpnext/manufacturing/doctype/bom_explosion_item/bom_explosion_item.json
@@ -169,13 +169,15 @@
"index_web_pages_for_search": 1,
"istable": 1,
"links": [],
- "modified": "2020-10-08 16:21:29.386212",
+ "modified": "2022-05-27 13:42:23.305455",
"modified_by": "Administrator",
"module": "Manufacturing",
"name": "BOM Explosion Item",
+ "naming_rule": "Random",
"owner": "Administrator",
"permissions": [],
"sort_field": "modified",
"sort_order": "DESC",
+ "states": [],
"track_changes": 1
}
\ No newline at end of file
diff --git a/erpnext/manufacturing/doctype/bom_update_batch/__init__.py b/erpnext/manufacturing/doctype/bom_update_batch/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/erpnext/manufacturing/doctype/bom_update_batch/__init__.py
diff --git a/erpnext/manufacturing/doctype/bom_update_batch/bom_update_batch.json b/erpnext/manufacturing/doctype/bom_update_batch/bom_update_batch.json
new file mode 100644
index 0000000..83b54d3
--- /dev/null
+++ b/erpnext/manufacturing/doctype/bom_update_batch/bom_update_batch.json
@@ -0,0 +1,55 @@
+{
+ "actions": [],
+ "autoname": "autoincrement",
+ "creation": "2022-05-31 17:34:39.825537",
+ "doctype": "DocType",
+ "engine": "InnoDB",
+ "field_order": [
+ "level",
+ "batch_no",
+ "boms_updated",
+ "status"
+ ],
+ "fields": [
+ {
+ "fieldname": "level",
+ "fieldtype": "Int",
+ "in_list_view": 1,
+ "label": "Level"
+ },
+ {
+ "fieldname": "batch_no",
+ "fieldtype": "Int",
+ "in_list_view": 1,
+ "label": "Batch No."
+ },
+ {
+ "fieldname": "boms_updated",
+ "fieldtype": "Long Text",
+ "hidden": 1,
+ "in_list_view": 1,
+ "label": "BOMs Updated"
+ },
+ {
+ "fieldname": "status",
+ "fieldtype": "Select",
+ "in_list_view": 1,
+ "label": "Status",
+ "options": "Pending\nCompleted",
+ "read_only": 1
+ }
+ ],
+ "index_web_pages_for_search": 1,
+ "istable": 1,
+ "links": [],
+ "modified": "2022-06-06 14:50:35.161062",
+ "modified_by": "Administrator",
+ "module": "Manufacturing",
+ "name": "BOM Update Batch",
+ "naming_rule": "Autoincrement",
+ "owner": "Administrator",
+ "permissions": [],
+ "sort_field": "modified",
+ "sort_order": "DESC",
+ "states": []
+}
\ No newline at end of file
diff --git a/erpnext/manufacturing/doctype/bom_update_batch/bom_update_batch.py b/erpnext/manufacturing/doctype/bom_update_batch/bom_update_batch.py
new file mode 100644
index 0000000..f952e43
--- /dev/null
+++ b/erpnext/manufacturing/doctype/bom_update_batch/bom_update_batch.py
@@ -0,0 +1,9 @@
+# Copyright (c) 2022, Frappe Technologies Pvt. Ltd. and contributors
+# For license information, please see license.txt
+
+# import frappe
+from frappe.model.document import Document
+
+
+class BOMUpdateBatch(Document):
+ pass
diff --git a/erpnext/manufacturing/doctype/bom_update_log/bom_update_log.json b/erpnext/manufacturing/doctype/bom_update_log/bom_update_log.json
index 98c1acb..c32e383 100644
--- a/erpnext/manufacturing/doctype/bom_update_log/bom_update_log.json
+++ b/erpnext/manufacturing/doctype/bom_update_log/bom_update_log.json
@@ -13,6 +13,10 @@
"update_type",
"status",
"error_log",
+ "progress_section",
+ "current_level",
+ "processed_boms",
+ "bom_batches",
"amended_from"
],
"fields": [
@@ -63,13 +67,36 @@
"fieldtype": "Link",
"label": "Error Log",
"options": "Error Log"
+ },
+ {
+ "collapsible": 1,
+ "depends_on": "eval: doc.update_type == \"Update Cost\"",
+ "fieldname": "progress_section",
+ "fieldtype": "Section Break",
+ "label": "Progress"
+ },
+ {
+ "fieldname": "processed_boms",
+ "fieldtype": "Long Text",
+ "hidden": 1,
+ "label": "Processed BOMs"
+ },
+ {
+ "fieldname": "bom_batches",
+ "fieldtype": "Table",
+ "options": "BOM Update Batch"
+ },
+ {
+ "fieldname": "current_level",
+ "fieldtype": "Int",
+ "label": "Current Level"
}
],
"in_create": 1,
"index_web_pages_for_search": 1,
"is_submittable": 1,
"links": [],
- "modified": "2022-03-31 12:51:44.885102",
+ "modified": "2022-06-06 15:15:23.883251",
"modified_by": "Administrator",
"module": "Manufacturing",
"name": "BOM Update Log",
diff --git a/erpnext/manufacturing/doctype/bom_update_log/bom_update_log.py b/erpnext/manufacturing/doctype/bom_update_log/bom_update_log.py
index c0770fa..71430bd 100644
--- a/erpnext/manufacturing/doctype/bom_update_log/bom_update_log.py
+++ b/erpnext/manufacturing/doctype/bom_update_log/bom_update_log.py
@@ -1,13 +1,20 @@
# Copyright (c) 2022, Frappe Technologies Pvt. Ltd. and contributors
# For license information, please see license.txt
-from typing import Dict, List, Literal, Optional
+import json
+from typing import Any, Dict, List, Optional, Tuple, Union
import frappe
from frappe import _
from frappe.model.document import Document
-from frappe.utils import cstr, flt
+from frappe.utils import cint, cstr
-from erpnext.manufacturing.doctype.bom_update_tool.bom_update_tool import update_cost
+from erpnext.manufacturing.doctype.bom_update_log.bom_updation_utils import (
+ get_leaf_boms,
+ get_next_higher_level_boms,
+ handle_exception,
+ replace_bom,
+ set_values_in_log,
+)
class BOMMissingError(frappe.ValidationError):
@@ -20,6 +27,8 @@
self.validate_boms_are_specified()
self.validate_same_bom()
self.validate_bom_items()
+ else:
+ self.validate_bom_cost_update_in_progress()
self.status = "Queued"
@@ -42,6 +51,21 @@
if current_bom_item != new_bom_item:
frappe.throw(_("The selected BOMs are not for the same item"))
+ def validate_bom_cost_update_in_progress(self):
+ "If another Cost Updation Log is still in progress, dont make new ones."
+
+ wip_log = frappe.get_all(
+ "BOM Update Log",
+ {"update_type": "Update Cost", "status": ["in", ["Queued", "In Progress"]]},
+ limit_page_length=1,
+ )
+ if wip_log:
+ log_link = frappe.utils.get_link_to_form("BOM Update Log", wip_log[0].name)
+ frappe.throw(
+ _("BOM Updation already in progress. Please wait until {0} is complete.").format(log_link),
+ title=_("Note"),
+ )
+
def on_submit(self):
if frappe.flags.in_test:
return
@@ -49,116 +73,170 @@
if self.update_type == "Replace BOM":
boms = {"current_bom": self.current_bom, "new_bom": self.new_bom}
frappe.enqueue(
- method="erpnext.manufacturing.doctype.bom_update_log.bom_update_log.run_bom_job",
+ method="erpnext.manufacturing.doctype.bom_update_log.bom_update_log.run_replace_bom_job",
doc=self,
boms=boms,
timeout=40000,
)
else:
- frappe.enqueue(
- method="erpnext.manufacturing.doctype.bom_update_log.bom_update_log.run_bom_job",
- doc=self,
- update_type="Update Cost",
- timeout=40000,
- )
+ process_boms_cost_level_wise(self)
-def replace_bom(boms: Dict) -> None:
- """Replace current BOM with new BOM in parent BOMs."""
- current_bom = boms.get("current_bom")
- new_bom = boms.get("new_bom")
-
- unit_cost = get_new_bom_unit_cost(new_bom)
- update_new_bom_in_bom_items(unit_cost, current_bom, new_bom)
-
- frappe.cache().delete_key("bom_children")
- parent_boms = get_parent_boms(new_bom)
-
- for bom in parent_boms:
- bom_obj = frappe.get_doc("BOM", bom)
- # this is only used for versioning and we do not want
- # to make separate db calls by using load_doc_before_save
- # which proves to be expensive while doing bulk replace
- bom_obj._doc_before_save = bom_obj
- bom_obj.update_exploded_items()
- bom_obj.calculate_cost()
- bom_obj.update_parent_cost()
- bom_obj.db_update()
- if bom_obj.meta.get("track_changes") and not bom_obj.flags.ignore_version:
- bom_obj.save_version()
-
-
-def update_new_bom_in_bom_items(unit_cost: float, current_bom: str, new_bom: str) -> None:
- bom_item = frappe.qb.DocType("BOM Item")
- (
- frappe.qb.update(bom_item)
- .set(bom_item.bom_no, new_bom)
- .set(bom_item.rate, unit_cost)
- .set(bom_item.amount, (bom_item.stock_qty * unit_cost))
- .where(
- (bom_item.bom_no == current_bom) & (bom_item.docstatus < 2) & (bom_item.parenttype == "BOM")
- )
- ).run()
-
-
-def get_parent_boms(new_bom: str, bom_list: Optional[List] = None) -> List:
- bom_list = bom_list or []
- bom_item = frappe.qb.DocType("BOM Item")
-
- parents = (
- frappe.qb.from_(bom_item)
- .select(bom_item.parent)
- .where((bom_item.bom_no == new_bom) & (bom_item.docstatus < 2) & (bom_item.parenttype == "BOM"))
- .run(as_dict=True)
- )
-
- for d in parents:
- if new_bom == d.parent:
- frappe.throw(_("BOM recursion: {0} cannot be child of {1}").format(new_bom, d.parent))
-
- bom_list.append(d.parent)
- get_parent_boms(d.parent, bom_list)
-
- return list(set(bom_list))
-
-
-def get_new_bom_unit_cost(new_bom: str) -> float:
- bom = frappe.qb.DocType("BOM")
- new_bom_unitcost = (
- frappe.qb.from_(bom).select(bom.total_cost / bom.quantity).where(bom.name == new_bom).run()
- )
-
- return flt(new_bom_unitcost[0][0])
-
-
-def run_bom_job(
+def run_replace_bom_job(
doc: "BOMUpdateLog",
boms: Optional[Dict[str, str]] = None,
- update_type: Literal["Replace BOM", "Update Cost"] = "Replace BOM",
) -> None:
try:
doc.db_set("status", "In Progress")
+
if not frappe.flags.in_test:
frappe.db.commit()
frappe.db.auto_commit_on_many_writes = 1
-
boms = frappe._dict(boms or {})
-
- if update_type == "Replace BOM":
- replace_bom(boms)
- else:
- update_cost()
+ replace_bom(boms)
doc.db_set("status", "Completed")
-
except Exception:
- frappe.db.rollback()
- error_log = doc.log_error("BOM Update Tool Error")
-
- doc.db_set("status", "Failed")
- doc.db_set("error_log", error_log.name)
-
+ handle_exception(doc)
finally:
frappe.db.auto_commit_on_many_writes = 0
- frappe.db.commit() # nosemgrep
+
+ if not frappe.flags.in_test:
+ frappe.db.commit() # nosemgrep
+
+
+def process_boms_cost_level_wise(
+ update_doc: "BOMUpdateLog", parent_boms: List[str] = None
+) -> Union[None, Tuple]:
+ "Queue jobs at the start of new BOM Level in 'Update Cost' Jobs."
+
+ current_boms = {}
+ values = {}
+
+ if update_doc.status == "Queued":
+ # First level yet to process. On Submit.
+ current_level = 0
+ current_boms = get_leaf_boms()
+ values = {
+ "processed_boms": json.dumps({}),
+ "status": "In Progress",
+ "current_level": current_level,
+ }
+ else:
+ # Resume next level. via Cron Job.
+ if not parent_boms:
+ return
+
+ current_level = cint(update_doc.current_level) + 1
+
+ # Process the next level BOMs. Stage parents as current BOMs.
+ current_boms = parent_boms.copy()
+ values = {"current_level": current_level}
+
+ set_values_in_log(update_doc.name, values, commit=True)
+
+ if frappe.flags.in_test:
+ return current_boms, current_level
+
+ queue_bom_cost_jobs(current_boms, update_doc, current_level)
+
+
+def queue_bom_cost_jobs(
+ current_boms_list: List[str], update_doc: "BOMUpdateLog", current_level: int
+) -> None:
+ "Queue batches of 20k BOMs of the same level to process parallelly"
+ batch_no = 0
+
+ while current_boms_list:
+ batch_no += 1
+ batch_size = 20_000
+ boms_to_process = current_boms_list[:batch_size] # slice out batch of 20k BOMs
+
+ # update list to exclude 20K (queued) BOMs
+ current_boms_list = current_boms_list[batch_size:] if len(current_boms_list) > batch_size else []
+
+ batch_row = update_doc.append(
+ "bom_batches", {"level": current_level, "batch_no": batch_no, "status": "Pending"}
+ )
+ batch_row.db_insert()
+
+ if frappe.flags.in_test:
+ # skip background jobs in test
+ return boms_to_process, batch_row.name
+
+ frappe.enqueue(
+ method="erpnext.manufacturing.doctype.bom_update_log.bom_updation_utils.update_cost_in_level",
+ doc=update_doc,
+ bom_list=boms_to_process,
+ batch_name=batch_row.name,
+ queue="long",
+ )
+
+
+def resume_bom_cost_update_jobs():
+ """
+ 1. Checks for In Progress BOM Update Log.
+ 2. Checks if this job has completed the _current level_.
+ 3. If current level is complete, get parent BOMs and start next level.
+ 4. If no parents, mark as Complete.
+ 5. If current level is WIP, skip the Log.
+
+ Called every 5 minutes via Cron job.
+ """
+
+ in_progress_logs = frappe.db.get_all(
+ "BOM Update Log",
+ {"update_type": "Update Cost", "status": "In Progress"},
+ ["name", "processed_boms", "current_level"],
+ )
+ if not in_progress_logs:
+ return
+
+ for log in in_progress_logs:
+ # check if all log batches of current level are processed
+ bom_batches = frappe.db.get_all(
+ "BOM Update Batch",
+ {"parent": log.name, "level": log.current_level},
+ ["name", "boms_updated", "status"],
+ )
+ incomplete_level = any(row.get("status") == "Pending" for row in bom_batches)
+ if not bom_batches or incomplete_level:
+ continue
+
+ # Prep parent BOMs & updated processed BOMs for next level
+ current_boms, processed_boms = get_processed_current_boms(log, bom_batches)
+ parent_boms = get_next_higher_level_boms(child_boms=current_boms, processed_boms=processed_boms)
+
+ set_values_in_log(
+ log.name,
+ values={
+ "processed_boms": json.dumps(processed_boms),
+ "status": "Completed" if not parent_boms else "In Progress",
+ },
+ commit=True,
+ )
+
+ if parent_boms: # there is a next level to process
+ process_boms_cost_level_wise(
+ update_doc=frappe.get_doc("BOM Update Log", log.name), parent_boms=parent_boms
+ )
+
+
+def get_processed_current_boms(
+ log: Dict[str, Any], bom_batches: Dict[str, Any]
+) -> Tuple[List[str], Dict[str, Any]]:
+ """
+ Aggregate all BOMs from BOM Update Batch rows into 'processed_boms' field
+ and into current boms list.
+ """
+ processed_boms = json.loads(log.processed_boms) if log.processed_boms else {}
+ current_boms = []
+
+ for row in bom_batches:
+ boms_updated = json.loads(row.boms_updated)
+ current_boms.extend(boms_updated)
+ boms_updated_dict = {bom: True for bom in boms_updated}
+ processed_boms.update(boms_updated_dict)
+
+ return current_boms, processed_boms
diff --git a/erpnext/manufacturing/doctype/bom_update_log/bom_updation_utils.py b/erpnext/manufacturing/doctype/bom_update_log/bom_updation_utils.py
new file mode 100644
index 0000000..dde1e4e
--- /dev/null
+++ b/erpnext/manufacturing/doctype/bom_update_log/bom_updation_utils.py
@@ -0,0 +1,224 @@
+# Copyright (c) 2022, Frappe Technologies Pvt. Ltd. and contributors
+# For license information, please see license.txt
+
+import json
+from collections import defaultdict
+from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
+
+if TYPE_CHECKING:
+ from erpnext.manufacturing.doctype.bom_update_log.bom_update_log import BOMUpdateLog
+
+import frappe
+from frappe import _
+
+
+def replace_bom(boms: Dict) -> None:
+ "Replace current BOM with new BOM in parent BOMs."
+
+ current_bom = boms.get("current_bom")
+ new_bom = boms.get("new_bom")
+
+ unit_cost = get_bom_unit_cost(new_bom)
+ update_new_bom_in_bom_items(unit_cost, current_bom, new_bom)
+
+ frappe.cache().delete_key("bom_children")
+ parent_boms = get_ancestor_boms(new_bom)
+
+ for bom in parent_boms:
+ bom_obj = frappe.get_doc("BOM", bom)
+ # this is only used for versioning and we do not want
+ # to make separate db calls by using load_doc_before_save
+ # which proves to be expensive while doing bulk replace
+ bom_obj._doc_before_save = bom_obj
+ bom_obj.update_exploded_items()
+ bom_obj.calculate_cost()
+ bom_obj.update_parent_cost()
+ bom_obj.db_update()
+ if bom_obj.meta.get("track_changes") and not bom_obj.flags.ignore_version:
+ bom_obj.save_version()
+
+
+def update_cost_in_level(
+ doc: "BOMUpdateLog", bom_list: List[str], batch_name: Union[int, str]
+) -> None:
+ "Updates Cost for BOMs within a given level. Runs via background jobs."
+
+ try:
+ status = frappe.db.get_value("BOM Update Log", doc.name, "status")
+ if status == "Failed":
+ return
+
+ frappe.db.auto_commit_on_many_writes = 1
+
+ update_cost_in_boms(bom_list=bom_list) # main updation logic
+
+ bom_batch = frappe.qb.DocType("BOM Update Batch")
+ (
+ frappe.qb.update(bom_batch)
+ .set(bom_batch.boms_updated, json.dumps(bom_list))
+ .set(bom_batch.status, "Completed")
+ .where(bom_batch.name == batch_name)
+ ).run()
+ except Exception:
+ handle_exception(doc)
+ finally:
+ frappe.db.auto_commit_on_many_writes = 0
+
+ if not frappe.flags.in_test:
+ frappe.db.commit() # nosemgrep
+
+
+def get_ancestor_boms(new_bom: str, bom_list: Optional[List] = None) -> List:
+ "Recursively get all ancestors of BOM."
+
+ bom_list = bom_list or []
+ bom_item = frappe.qb.DocType("BOM Item")
+
+ parents = (
+ frappe.qb.from_(bom_item)
+ .select(bom_item.parent)
+ .where((bom_item.bom_no == new_bom) & (bom_item.docstatus < 2) & (bom_item.parenttype == "BOM"))
+ .run(as_dict=True)
+ )
+
+ for d in parents:
+ if new_bom == d.parent:
+ frappe.throw(_("BOM recursion: {0} cannot be child of {1}").format(new_bom, d.parent))
+
+ bom_list.append(d.parent)
+ get_ancestor_boms(d.parent, bom_list)
+
+ return list(set(bom_list))
+
+
+def update_new_bom_in_bom_items(unit_cost: float, current_bom: str, new_bom: str) -> None:
+ bom_item = frappe.qb.DocType("BOM Item")
+ (
+ frappe.qb.update(bom_item)
+ .set(bom_item.bom_no, new_bom)
+ .set(bom_item.rate, unit_cost)
+ .set(bom_item.amount, (bom_item.stock_qty * unit_cost))
+ .where(
+ (bom_item.bom_no == current_bom) & (bom_item.docstatus < 2) & (bom_item.parenttype == "BOM")
+ )
+ ).run()
+
+
+def get_bom_unit_cost(bom_name: str) -> float:
+ bom = frappe.qb.DocType("BOM")
+ new_bom_unitcost = (
+ frappe.qb.from_(bom).select(bom.total_cost / bom.quantity).where(bom.name == bom_name).run()
+ )
+
+ return frappe.utils.flt(new_bom_unitcost[0][0])
+
+
+def update_cost_in_boms(bom_list: List[str]) -> None:
+ "Updates cost in given BOMs. Returns current and total updated BOMs."
+
+ for index, bom in enumerate(bom_list):
+ bom_doc = frappe.get_doc("BOM", bom, for_update=True)
+ bom_doc.calculate_cost(save_updates=True, update_hour_rate=True)
+ bom_doc.db_update()
+
+ if (index % 100 == 0) and not frappe.flags.in_test:
+ frappe.db.commit() # nosemgrep
+
+
+def get_next_higher_level_boms(
+ child_boms: List[str], processed_boms: Dict[str, bool]
+) -> List[str]:
+ "Generate immediate higher level dependants with no unresolved dependencies (children)."
+
+ def _all_children_are_processed(parent_bom):
+ child_boms = dependency_map.get(parent_bom)
+ return all(processed_boms.get(bom) for bom in child_boms)
+
+ dependants_map, dependency_map = _generate_dependence_map()
+
+ dependants = []
+ for bom in child_boms:
+ # generate list of immediate dependants
+ parents = dependants_map.get(bom) or []
+ dependants.extend(parents)
+
+ dependants = set(dependants) # remove duplicates
+ resolved_dependants = set()
+
+ # consider only if children are all resolved
+ for parent_bom in dependants:
+ if _all_children_are_processed(parent_bom):
+ resolved_dependants.add(parent_bom)
+
+ return list(resolved_dependants)
+
+
+def get_leaf_boms() -> List[str]:
+ "Get BOMs that have no dependencies."
+
+ return frappe.db.sql_list(
+ """select name from `tabBOM` bom
+ where docstatus=1 and is_active=1
+ and not exists(select bom_no from `tabBOM Item`
+ where parent=bom.name and ifnull(bom_no, '')!='')"""
+ )
+
+
+def _generate_dependence_map() -> defaultdict:
+ """
+ Generate maps such as: { BOM-1: [Dependant-BOM-1, Dependant-BOM-2, ..] }.
+ Here BOM-1 is the leaf/lower level node/dependency.
+ The list contains one level higher nodes/dependants that depend on BOM-1.
+
+ Generate and return the reverse as well.
+ """
+
+ bom = frappe.qb.DocType("BOM")
+ bom_item = frappe.qb.DocType("BOM Item")
+
+ bom_items = (
+ frappe.qb.from_(bom_item)
+ .join(bom)
+ .on(bom_item.parent == bom.name)
+ .select(bom_item.bom_no, bom_item.parent)
+ .where(
+ (bom_item.bom_no.isnotnull())
+ & (bom_item.bom_no != "")
+ & (bom.docstatus == 1)
+ & (bom.is_active == 1)
+ & (bom_item.parenttype == "BOM")
+ )
+ ).run(as_dict=True)
+
+ child_parent_map = defaultdict(list)
+ parent_child_map = defaultdict(list)
+ for row in bom_items:
+ child_parent_map[row.bom_no].append(row.parent)
+ parent_child_map[row.parent].append(row.bom_no)
+
+ return child_parent_map, parent_child_map
+
+
+def set_values_in_log(log_name: str, values: Dict[str, Any], commit: bool = False) -> None:
+ "Update BOM Update Log record."
+
+ if not values:
+ return
+
+ bom_update_log = frappe.qb.DocType("BOM Update Log")
+ query = frappe.qb.update(bom_update_log).where(bom_update_log.name == log_name)
+
+ for key, value in values.items():
+ query = query.set(key, value)
+ query.run()
+
+ if commit and not frappe.flags.in_test:
+ frappe.db.commit() # nosemgrep
+
+
+def handle_exception(doc: "BOMUpdateLog") -> None:
+ "Rolls back and fails BOM Update Log."
+
+ frappe.db.rollback()
+ error_log = doc.log_error("BOM Update Tool Error")
+ set_values_in_log(doc.name, {"status": "Failed", "error_log": error_log.name})
diff --git a/erpnext/manufacturing/doctype/bom_update_log/test_bom_update_log.py b/erpnext/manufacturing/doctype/bom_update_log/test_bom_update_log.py
index 47efea9..d770f6c 100644
--- a/erpnext/manufacturing/doctype/bom_update_log/test_bom_update_log.py
+++ b/erpnext/manufacturing/doctype/bom_update_log/test_bom_update_log.py
@@ -1,14 +1,27 @@
# Copyright (c) 2022, Frappe Technologies Pvt. Ltd. and Contributors
# See license.txt
+import json
+
import frappe
from frappe.tests.utils import FrappeTestCase
from erpnext.manufacturing.doctype.bom_update_log.bom_update_log import (
BOMMissingError,
- run_bom_job,
+ get_processed_current_boms,
+ process_boms_cost_level_wise,
+ queue_bom_cost_jobs,
+ run_replace_bom_job,
)
-from erpnext.manufacturing.doctype.bom_update_tool.bom_update_tool import enqueue_replace_bom
+from erpnext.manufacturing.doctype.bom_update_log.bom_updation_utils import (
+ get_next_higher_level_boms,
+ set_values_in_log,
+ update_cost_in_level,
+)
+from erpnext.manufacturing.doctype.bom_update_tool.bom_update_tool import (
+ enqueue_replace_bom,
+ enqueue_update_cost,
+)
test_records = frappe.get_test_records("BOM")
@@ -31,17 +44,12 @@
def tearDown(self):
frappe.db.rollback()
- if self._testMethodName == "test_bom_update_log_completion":
- # clear logs and delete BOM created via setUp
- frappe.db.delete("BOM Update Log")
- self.new_bom_doc.cancel()
- self.new_bom_doc.delete()
-
- # explicitly commit and restore to original state
- frappe.db.commit() # nosemgrep
-
def test_bom_update_log_validate(self):
- "Test if BOM presence is validated."
+ """
+ 1) Test if BOM presence is validated.
+ 2) Test if same BOMs are validated.
+ 3) Test of non-existent BOM is validated.
+ """
with self.assertRaises(BOMMissingError):
enqueue_replace_bom(boms={})
@@ -55,9 +63,7 @@
def test_bom_update_log_queueing(self):
"Test if BOM Update Log is created and queued."
- log = enqueue_replace_bom(
- boms=self.boms,
- )
+ log = enqueue_replace_bom(boms=self.boms)
self.assertEqual(log.docstatus, 1)
self.assertEqual(log.status, "Queued")
@@ -65,32 +71,51 @@
def test_bom_update_log_completion(self):
"Test if BOM Update Log handles job completion correctly."
- log = enqueue_replace_bom(
- boms=self.boms,
- )
+ log = enqueue_replace_bom(boms=self.boms)
- # Explicitly commits log, new bom (setUp) and replacement impact.
- # Is run via background jobs IRL
- run_bom_job(
- doc=log,
- boms=self.boms,
- update_type="Replace BOM",
- )
+ # Is run via background job IRL
+ run_replace_bom_job(doc=log, boms=self.boms)
log.reload()
self.assertEqual(log.status, "Completed")
- # teardown (undo replace impact) due to commit
- boms = frappe._dict(
- current_bom=self.boms.new_bom,
- new_bom=self.boms.current_bom,
+
+def update_cost_in_all_boms_in_test():
+ """
+ Utility to run 'Update Cost' job in tests immediately without Cron job.
+ Run job for all levels (manually) until fully complete.
+ """
+ parent_boms = []
+ log = enqueue_update_cost() # create BOM Update Log
+
+ while log.status != "Completed":
+ level_boms, current_level = process_boms_cost_level_wise(log, parent_boms)
+ log.reload()
+
+ boms, batch = queue_bom_cost_jobs(
+ level_boms, log, current_level
+ ) # adds rows in log for tracking
+ log.reload()
+
+ update_cost_in_level(log, boms, batch) # business logic
+ log.reload()
+
+ # current level done, get next level boms
+ bom_batches = frappe.db.get_all(
+ "BOM Update Batch",
+ {"parent": log.name, "level": log.current_level},
+ ["name", "boms_updated", "status"],
)
- log2 = enqueue_replace_bom(
- boms=self.boms,
+ current_boms, processed_boms = get_processed_current_boms(log, bom_batches)
+ parent_boms = get_next_higher_level_boms(child_boms=current_boms, processed_boms=processed_boms)
+
+ set_values_in_log(
+ log.name,
+ values={
+ "processed_boms": json.dumps(processed_boms),
+ "status": "Completed" if not parent_boms else "In Progress",
+ },
)
- run_bom_job( # Explicitly commits
- doc=log2,
- boms=boms,
- update_type="Replace BOM",
- )
- self.assertEqual(log2.status, "Completed")
+ log.reload()
+
+ return log
diff --git a/erpnext/manufacturing/doctype/bom_update_tool/bom_update_tool.py b/erpnext/manufacturing/doctype/bom_update_tool/bom_update_tool.py
index b0e7da1..d16fcd0 100644
--- a/erpnext/manufacturing/doctype/bom_update_tool/bom_update_tool.py
+++ b/erpnext/manufacturing/doctype/bom_update_tool/bom_update_tool.py
@@ -10,8 +10,6 @@
import frappe
from frappe.model.document import Document
-from erpnext.manufacturing.doctype.bom.bom import get_boms_in_bottom_up_order
-
class BOMUpdateTool(Document):
pass
@@ -40,14 +38,13 @@
def auto_update_latest_price_in_all_boms() -> None:
"""Called via hooks.py."""
if frappe.db.get_single_value("Manufacturing Settings", "update_bom_costs_automatically"):
- update_cost()
-
-
-def update_cost() -> None:
- """Updates Cost for all BOMs from bottom to top."""
- bom_list = get_boms_in_bottom_up_order()
- for bom in bom_list:
- frappe.get_doc("BOM", bom).update_cost(update_parent=False, from_child_bom=True)
+ wip_log = frappe.get_all(
+ "BOM Update Log",
+ {"update_type": "Update Cost", "status": ["in", ["Queued", "In Progress"]]},
+ limit_page_length=1,
+ )
+ if not wip_log:
+ create_bom_update_log(update_type="Update Cost")
def create_bom_update_log(
diff --git a/erpnext/manufacturing/doctype/bom_update_tool/test_bom_update_tool.py b/erpnext/manufacturing/doctype/bom_update_tool/test_bom_update_tool.py
index fae72a0..d1882e5 100644
--- a/erpnext/manufacturing/doctype/bom_update_tool/test_bom_update_tool.py
+++ b/erpnext/manufacturing/doctype/bom_update_tool/test_bom_update_tool.py
@@ -1,11 +1,13 @@
-# Copyright (c) 2015, Frappe Technologies Pvt. Ltd. and Contributors
+# Copyright (c) 2022, Frappe Technologies Pvt. Ltd. and Contributors
# License: GNU General Public License v3. See license.txt
import frappe
from frappe.tests.utils import FrappeTestCase
from erpnext.manufacturing.doctype.bom_update_log.bom_update_log import replace_bom
-from erpnext.manufacturing.doctype.bom_update_tool.bom_update_tool import update_cost
+from erpnext.manufacturing.doctype.bom_update_log.test_bom_update_log import (
+ update_cost_in_all_boms_in_test,
+)
from erpnext.manufacturing.doctype.production_plan.test_production_plan import make_bom
from erpnext.stock.doctype.item.test_item import create_item
@@ -25,8 +27,8 @@
boms = frappe._dict(current_bom=current_bom, new_bom=bom_doc.name)
replace_bom(boms)
- self.assertFalse(frappe.db.sql("select name from `tabBOM Item` where bom_no=%s", current_bom))
- self.assertTrue(frappe.db.sql("select name from `tabBOM Item` where bom_no=%s", bom_doc.name))
+ self.assertFalse(frappe.db.exists("BOM Item", {"bom_no": current_bom, "docstatus": 1}))
+ self.assertTrue(frappe.db.exists("BOM Item", {"bom_no": bom_doc.name, "docstatus": 1}))
# reverse, as it affects other testcases
boms.current_bom = bom_doc.name
@@ -52,13 +54,13 @@
self.assertEqual(doc.total_cost, 200)
frappe.db.set_value("Item", "BOM Cost Test Item 2", "valuation_rate", 200)
- update_cost()
+ update_cost_in_all_boms_in_test()
doc.load_from_db()
self.assertEqual(doc.total_cost, 300)
frappe.db.set_value("Item", "BOM Cost Test Item 2", "valuation_rate", 100)
- update_cost()
+ update_cost_in_all_boms_in_test()
doc.load_from_db()
self.assertEqual(doc.total_cost, 200)
diff --git a/erpnext/manufacturing/doctype/production_plan/test_production_plan.py b/erpnext/manufacturing/doctype/production_plan/test_production_plan.py
index 891a497..e88049d 100644
--- a/erpnext/manufacturing/doctype/production_plan/test_production_plan.py
+++ b/erpnext/manufacturing/doctype/production_plan/test_production_plan.py
@@ -798,7 +798,6 @@
for item in args.raw_materials:
item_doc = frappe.get_doc("Item", item)
-
bom.append(
"items",
{
diff --git a/erpnext/manufacturing/doctype/work_order/test_work_order.py b/erpnext/manufacturing/doctype/work_order/test_work_order.py
index 2aba482..27e7e24 100644
--- a/erpnext/manufacturing/doctype/work_order/test_work_order.py
+++ b/erpnext/manufacturing/doctype/work_order/test_work_order.py
@@ -417,7 +417,7 @@
"doctype": "Item Price",
"item_code": "_Test FG Non Stock Item",
"price_list_rate": 1000,
- "price_list": "Standard Buying",
+ "price_list": "_Test Price List India",
}
).insert(ignore_permissions=True)
@@ -426,8 +426,17 @@
item_code="_Test FG Item", target="_Test Warehouse - _TC", qty=1, basic_rate=100
)
- if not frappe.db.get_value("BOM", {"item": fg_item}):
- make_bom(item=fg_item, rate=1000, raw_materials=["_Test FG Item", "_Test FG Non Stock Item"])
+ if not frappe.db.get_value("BOM", {"item": fg_item, "docstatus": 1}):
+ bom = make_bom(
+ item=fg_item,
+ rate=1000,
+ raw_materials=["_Test FG Item", "_Test FG Non Stock Item"],
+ do_not_save=True,
+ )
+ bom.rm_cost_as_per = "Price List" # non stock item won't have valuation rate
+ bom.buying_price_list = "_Test Price List India"
+ bom.currency = "INR"
+ bom.save()
wo = make_wo_order_test_record(production_item=fg_item)