feat: Level-wise BOM cost updation
- Process BOMs level wise and Pause after level is complete
- Cron job will resume Paused jobs, which will again process the new level and pause at the end
- This will go on until all BOMs are updated
- Added Progress section with fields to track updated BOMs in Log
- Cleanup: Add BOM Updation utils file to contain helper functions/sub-functions
- Cleanup: BOM Update Log file will only contain functions that are in direct context of the Log
Co-authored-by: Gavin D'souza <gavin18d@gmail.com>
diff --git a/erpnext/hooks.py b/erpnext/hooks.py
index 813ac17..05f06b3 100644
--- a/erpnext/hooks.py
+++ b/erpnext/hooks.py
@@ -392,9 +392,12 @@
scheduler_events = {
"cron": {
+ "0/5 * * * *": [
+ "erpnext.manufacturing.doctype.bom_update_log.bom_update_log.resume_bom_cost_update_jobs",
+ ],
"0/30 * * * *": [
"erpnext.utilities.doctype.video.video.update_youtube_data",
- ]
+ ],
},
"all": [
"erpnext.projects.doctype.project.project.project_status_update_reminder",
diff --git a/erpnext/manufacturing/doctype/bom_update_log/bom_update_log.json b/erpnext/manufacturing/doctype/bom_update_log/bom_update_log.json
index 98c1acb..3455b86 100644
--- a/erpnext/manufacturing/doctype/bom_update_log/bom_update_log.json
+++ b/erpnext/manufacturing/doctype/bom_update_log/bom_update_log.json
@@ -13,6 +13,10 @@
"update_type",
"status",
"error_log",
+ "progress_section",
+ "current_boms",
+ "parent_boms",
+ "processed_boms",
"amended_from"
],
"fields": [
@@ -47,7 +51,7 @@
"fieldname": "status",
"fieldtype": "Select",
"label": "Status",
- "options": "Queued\nIn Progress\nCompleted\nFailed"
+ "options": "Queued\nIn Progress\nPaused\nCompleted\nFailed"
},
{
"fieldname": "amended_from",
@@ -63,13 +67,34 @@
"fieldtype": "Link",
"label": "Error Log",
"options": "Error Log"
+ },
+ {
+ "fieldname": "progress_section",
+ "fieldtype": "Section Break",
+ "label": "Progress"
+ },
+ {
+ "fieldname": "current_boms",
+ "fieldtype": "Text",
+ "label": "Current BOMs"
+ },
+ {
+ "description": "Immediate parent BOMs",
+ "fieldname": "parent_boms",
+ "fieldtype": "Text",
+ "label": "Parent BOMs"
+ },
+ {
+ "fieldname": "processed_boms",
+ "fieldtype": "Text",
+ "label": "Processed BOMs"
}
],
"in_create": 1,
"index_web_pages_for_search": 1,
"is_submittable": 1,
"links": [],
- "modified": "2022-03-31 12:51:44.885102",
+ "modified": "2022-05-23 14:42:14.725914",
"modified_by": "Administrator",
"module": "Manufacturing",
"name": "BOM Update Log",
diff --git a/erpnext/manufacturing/doctype/bom_update_log/bom_update_log.py b/erpnext/manufacturing/doctype/bom_update_log/bom_update_log.py
index c0770fa..639628a 100644
--- a/erpnext/manufacturing/doctype/bom_update_log/bom_update_log.py
+++ b/erpnext/manufacturing/doctype/bom_update_log/bom_update_log.py
@@ -1,13 +1,19 @@
# Copyright (c) 2022, Frappe Technologies Pvt. Ltd. and contributors
# For license information, please see license.txt
-from typing import Dict, List, Literal, Optional
+import json
+from typing import Dict, Optional
import frappe
from frappe import _
from frappe.model.document import Document
-from frappe.utils import cstr, flt
+from frappe.utils import cstr
-from erpnext.manufacturing.doctype.bom_update_tool.bom_update_tool import update_cost
+from erpnext.manufacturing.doctype.bom_update_log.bom_updation_utils import (
+ get_leaf_boms,
+ handle_exception,
+ replace_bom,
+ set_values_in_log,
+)
class BOMMissingError(frappe.ValidationError):
@@ -49,116 +55,93 @@
if self.update_type == "Replace BOM":
boms = {"current_bom": self.current_bom, "new_bom": self.new_bom}
frappe.enqueue(
- method="erpnext.manufacturing.doctype.bom_update_log.bom_update_log.run_bom_job",
+ method="erpnext.manufacturing.doctype.bom_update_log.bom_update_log.run_replace_bom_job",
doc=self,
boms=boms,
timeout=40000,
)
else:
- frappe.enqueue(
- method="erpnext.manufacturing.doctype.bom_update_log.bom_update_log.run_bom_job",
- doc=self,
- update_type="Update Cost",
- timeout=40000,
- )
+ process_boms_cost_level_wise(self)
-def replace_bom(boms: Dict) -> None:
- """Replace current BOM with new BOM in parent BOMs."""
- current_bom = boms.get("current_bom")
- new_bom = boms.get("new_bom")
-
- unit_cost = get_new_bom_unit_cost(new_bom)
- update_new_bom_in_bom_items(unit_cost, current_bom, new_bom)
-
- frappe.cache().delete_key("bom_children")
- parent_boms = get_parent_boms(new_bom)
-
- for bom in parent_boms:
- bom_obj = frappe.get_doc("BOM", bom)
- # this is only used for versioning and we do not want
- # to make separate db calls by using load_doc_before_save
- # which proves to be expensive while doing bulk replace
- bom_obj._doc_before_save = bom_obj
- bom_obj.update_exploded_items()
- bom_obj.calculate_cost()
- bom_obj.update_parent_cost()
- bom_obj.db_update()
- if bom_obj.meta.get("track_changes") and not bom_obj.flags.ignore_version:
- bom_obj.save_version()
-
-
-def update_new_bom_in_bom_items(unit_cost: float, current_bom: str, new_bom: str) -> None:
- bom_item = frappe.qb.DocType("BOM Item")
- (
- frappe.qb.update(bom_item)
- .set(bom_item.bom_no, new_bom)
- .set(bom_item.rate, unit_cost)
- .set(bom_item.amount, (bom_item.stock_qty * unit_cost))
- .where(
- (bom_item.bom_no == current_bom) & (bom_item.docstatus < 2) & (bom_item.parenttype == "BOM")
- )
- ).run()
-
-
-def get_parent_boms(new_bom: str, bom_list: Optional[List] = None) -> List:
- bom_list = bom_list or []
- bom_item = frappe.qb.DocType("BOM Item")
-
- parents = (
- frappe.qb.from_(bom_item)
- .select(bom_item.parent)
- .where((bom_item.bom_no == new_bom) & (bom_item.docstatus < 2) & (bom_item.parenttype == "BOM"))
- .run(as_dict=True)
- )
-
- for d in parents:
- if new_bom == d.parent:
- frappe.throw(_("BOM recursion: {0} cannot be child of {1}").format(new_bom, d.parent))
-
- bom_list.append(d.parent)
- get_parent_boms(d.parent, bom_list)
-
- return list(set(bom_list))
-
-
-def get_new_bom_unit_cost(new_bom: str) -> float:
- bom = frappe.qb.DocType("BOM")
- new_bom_unitcost = (
- frappe.qb.from_(bom).select(bom.total_cost / bom.quantity).where(bom.name == new_bom).run()
- )
-
- return flt(new_bom_unitcost[0][0])
-
-
-def run_bom_job(
+def run_replace_bom_job(
doc: "BOMUpdateLog",
boms: Optional[Dict[str, str]] = None,
- update_type: Literal["Replace BOM", "Update Cost"] = "Replace BOM",
) -> None:
try:
doc.db_set("status", "In Progress")
+
if not frappe.flags.in_test:
frappe.db.commit()
frappe.db.auto_commit_on_many_writes = 1
-
boms = frappe._dict(boms or {})
-
- if update_type == "Replace BOM":
- replace_bom(boms)
- else:
- update_cost()
+ replace_bom(boms)
doc.db_set("status", "Completed")
-
except Exception:
- frappe.db.rollback()
- error_log = doc.log_error("BOM Update Tool Error")
-
- doc.db_set("status", "Failed")
- doc.db_set("error_log", error_log.name)
-
+ handle_exception(doc)
finally:
frappe.db.auto_commit_on_many_writes = 0
frappe.db.commit() # nosemgrep
+
+
+def process_boms_cost_level_wise(update_doc: "BOMUpdateLog") -> None:
+ "Queue jobs at the start of new BOM Level in 'Update Cost' Jobs."
+
+ current_boms, parent_boms = {}, []
+ values = {}
+
+ if update_doc.status == "Queued":
+ # First level yet to process. On Submit.
+ current_boms = {bom: False for bom in get_leaf_boms()}
+ values = {
+ "current_boms": json.dumps(current_boms),
+ "parent_boms": "[]",
+ "processed_boms": json.dumps({}),
+ "status": "In Progress",
+ }
+ else:
+ # status is Paused, resume. via Cron Job.
+ current_boms, parent_boms = json.loads(update_doc.current_boms), json.loads(
+ update_doc.parent_boms
+ )
+ if not current_boms:
+ # Process the next level BOMs. Stage parents as current BOMs.
+ current_boms = {bom: False for bom in parent_boms}
+ values = {
+ "current_boms": json.dumps(current_boms),
+ "parent_boms": "[]",
+ "status": "In Progress",
+ }
+
+ set_values_in_log(update_doc.name, values, commit=True)
+ queue_bom_cost_jobs(current_boms, update_doc)
+
+
+def queue_bom_cost_jobs(current_boms: Dict, update_doc: "BOMUpdateLog") -> None:
+ "Queue batches of 20k BOMs of the same level to process parallelly"
+ current_boms_list = [bom for bom in current_boms]
+
+ while current_boms_list:
+ boms_to_process = current_boms_list[:20000] # slice out batch of 20k BOMs
+
+ # update list to exclude 20K (queued) BOMs
+ current_boms_list = current_boms_list[20000:] if len(current_boms_list) > 20000 else []
+ frappe.enqueue(
+ method="erpnext.manufacturing.doctype.bom_update_log.bom_updation_utils.update_cost_in_level",
+ doc=update_doc,
+ bom_list=boms_to_process,
+ timeout=40000,
+ )
+
+
+def resume_bom_cost_update_jobs():
+ "Called every 10 minutes via Cron job."
+ paused_jobs = frappe.db.get_all("BOM Update Log", {"status": "Paused"})
+ if not paused_jobs:
+ return
+
+ for job in paused_jobs:
+ # resume from next level
+ process_boms_cost_level_wise(update_doc=frappe.get_doc("BOM Update Log", job.name))
diff --git a/erpnext/manufacturing/doctype/bom_update_log/bom_updation_utils.py b/erpnext/manufacturing/doctype/bom_update_log/bom_updation_utils.py
new file mode 100644
index 0000000..b5964ce
--- /dev/null
+++ b/erpnext/manufacturing/doctype/bom_update_log/bom_updation_utils.py
@@ -0,0 +1,223 @@
+# Copyright (c) 2022, Frappe Technologies Pvt. Ltd. and contributors
+# For license information, please see license.txt
+
+import json
+from collections import defaultdict
+from typing import TYPE_CHECKING, Dict, List, Optional
+
+if TYPE_CHECKING:
+ from erpnext.manufacturing.doctype.bom_update_log.bom_update_log import BOMUpdateLog
+
+import frappe
+from frappe import _
+
+
+def replace_bom(boms: Dict) -> None:
+ """Replace current BOM with new BOM in parent BOMs."""
+ current_bom = boms.get("current_bom")
+ new_bom = boms.get("new_bom")
+
+ unit_cost = get_bom_unit_cost(new_bom)
+ update_new_bom_in_bom_items(unit_cost, current_bom, new_bom)
+
+ frappe.cache().delete_key("bom_children")
+ parent_boms = get_ancestor_boms(new_bom)
+
+ for bom in parent_boms:
+ bom_obj = frappe.get_doc("BOM", bom)
+ # this is only used for versioning and we do not want
+ # to make separate db calls by using load_doc_before_save
+ # which proves to be expensive while doing bulk replace
+ bom_obj._doc_before_save = bom_obj
+ bom_obj.update_exploded_items()
+ bom_obj.calculate_cost()
+ bom_obj.update_parent_cost()
+ bom_obj.db_update()
+ if bom_obj.meta.get("track_changes") and not bom_obj.flags.ignore_version:
+ bom_obj.save_version()
+
+
+def update_cost_in_level(doc: "BOMUpdateLog", bom_list: List[str]) -> None:
+ "Updates Cost for BOMs within a given level. Runs via background jobs."
+ try:
+ status = frappe.db.get_value("BOM Update Log", doc.name, "status")
+ if status == "Failed":
+ return
+
+ frappe.db.auto_commit_on_many_writes = 1
+ # main updation logic
+ job_data = update_cost_in_boms(bom_list=bom_list, docname=doc.name)
+
+ set_values_in_log(
+ doc.name,
+ values={
+ "current_boms": json.dumps(job_data.get("current_boms")),
+ "processed_boms": json.dumps(job_data.get("processed_boms")),
+ },
+ commit=True,
+ )
+
+ process_if_level_is_complete(doc.name, job_data["current_boms"], job_data["processed_boms"])
+ except Exception:
+ handle_exception(doc)
+ finally:
+ frappe.db.auto_commit_on_many_writes = 0
+ frappe.db.commit() # nosemgrep
+
+
+def get_ancestor_boms(new_bom: str, bom_list: Optional[List] = None) -> List:
+ bom_list = bom_list or []
+ bom_item = frappe.qb.DocType("BOM Item")
+
+ parents = (
+ frappe.qb.from_(bom_item)
+ .select(bom_item.parent)
+ .where((bom_item.bom_no == new_bom) & (bom_item.docstatus < 2) & (bom_item.parenttype == "BOM"))
+ .run(as_dict=True)
+ )
+
+ for d in parents:
+ if new_bom == d.parent:
+ frappe.throw(_("BOM recursion: {0} cannot be child of {1}").format(new_bom, d.parent))
+
+ bom_list.append(d.parent)
+ get_ancestor_boms(d.parent, bom_list)
+
+ return list(set(bom_list))
+
+
+def update_new_bom_in_bom_items(unit_cost: float, current_bom: str, new_bom: str) -> None:
+ bom_item = frappe.qb.DocType("BOM Item")
+ (
+ frappe.qb.update(bom_item)
+ .set(bom_item.bom_no, new_bom)
+ .set(bom_item.rate, unit_cost)
+ .set(bom_item.amount, (bom_item.stock_qty * unit_cost))
+ .where(
+ (bom_item.bom_no == current_bom) & (bom_item.docstatus < 2) & (bom_item.parenttype == "BOM")
+ )
+ ).run()
+
+
+def get_bom_unit_cost(new_bom: str) -> float:
+ bom = frappe.qb.DocType("BOM")
+ new_bom_unitcost = (
+ frappe.qb.from_(bom).select(bom.total_cost / bom.quantity).where(bom.name == new_bom).run()
+ )
+
+ return frappe.utils.flt(new_bom_unitcost[0][0])
+
+
+def update_cost_in_boms(bom_list: List[str], docname: str) -> Dict:
+ "Updates cost in given BOMs. Returns current and total updated BOMs."
+ updated_boms = {} # current boms that have been updated
+
+ for bom in bom_list:
+ bom_doc = frappe.get_cached_doc("BOM", bom)
+ bom_doc.calculate_cost(save_updates=True, update_hour_rate=True)
+ # bom_doc.update_exploded_items(save=True) #TODO: edit exploded items rate
+ bom_doc.db_update()
+ updated_boms[bom] = True
+
+ # Update processed BOMs in Log
+ log_data = frappe.db.get_values(
+ "BOM Update Log", docname, ["current_boms", "processed_boms"], as_dict=True
+ )[0]
+
+ for field in ("current_boms", "processed_boms"):
+ log_data[field] = json.loads(log_data.get(field))
+ log_data[field].update(updated_boms)
+
+ return log_data
+
+
+def process_if_level_is_complete(docname: str, current_boms: Dict, processed_boms: Dict) -> None:
+ "Prepare and set higher level BOMs in Log if current level is complete."
+ processing_complete = all(current_boms.get(bom) for bom in current_boms)
+ if not processing_complete:
+ return
+
+ parent_boms = get_next_higher_level_boms(child_boms=current_boms, processed_boms=processed_boms)
+ set_values_in_log(
+ docname,
+ values={
+ "current_boms": json.dumps({}),
+ "parent_boms": json.dumps(parent_boms),
+ "status": "Completed" if not parent_boms else "Paused",
+ },
+ commit=True,
+ )
+
+
+def get_next_higher_level_boms(child_boms: Dict, processed_boms: Dict):
+ "Generate immediate higher level dependants with no unresolved dependencies."
+
+ def _all_children_are_processed(parent):
+ bom_doc = frappe.get_cached_doc("BOM", parent)
+ return all(processed_boms.get(row.bom_no) for row in bom_doc.items if row.bom_no)
+
+ dependants_map = _generate_dependants_map()
+ dependants = set()
+ for bom in child_boms:
+ parents = dependants_map.get(bom) or []
+ for parent in parents:
+ if _all_children_are_processed(parent):
+ dependants.add(parent)
+
+ return list(dependants)
+
+
+def get_leaf_boms():
+ return frappe.db.sql_list(
+ """select name from `tabBOM` bom
+ where docstatus=1 and is_active=1
+ and not exists(select bom_no from `tabBOM Item`
+ where parent=bom.name and ifnull(bom_no, '')!='')"""
+ )
+
+
+def _generate_dependants_map():
+ bom = frappe.qb.DocType("BOM")
+ bom_item = frappe.qb.DocType("BOM Item")
+
+ bom_parents = (
+ frappe.qb.from_(bom_item)
+ .join(bom)
+ .on(bom_item.parent == bom.name)
+ .select(bom_item.bom_no, bom_item.parent)
+ .where(
+ (bom_item.bom_no.isnotnull())
+ & (bom_item.bom_no != "")
+ & (bom.docstatus == 1)
+ & (bom.is_active == 1)
+ & (bom_item.parenttype == "BOM")
+ )
+ ).run(as_dict=True)
+
+ child_parent_map = defaultdict(list)
+ for bom in bom_parents:
+ child_parent_map[bom.bom_no].append(bom.parent)
+
+ return child_parent_map
+
+
+def set_values_in_log(log_name: str, values: Dict, commit: bool = False) -> None:
+ "Update BOM Update Log record."
+ if not values:
+ return
+
+ bom_update_log = frappe.qb.DocType("BOM Update Log")
+ query = frappe.qb.update(bom_update_log).where(bom_update_log.name == log_name)
+
+ for key, value in values.items():
+ query = query.set(key, value)
+ query.run()
+
+ if commit:
+ frappe.db.commit()
+
+
+def handle_exception(doc: "BOMUpdateLog"):
+ frappe.db.rollback()
+ error_log = doc.log_error("BOM Update Tool Error")
+ set_values_in_log(doc.name, {"status": "Failed", "error_log": error_log.name})
diff --git a/erpnext/manufacturing/doctype/bom_update_log/test_bom_update_log.py b/erpnext/manufacturing/doctype/bom_update_log/test_bom_update_log.py
index 47efea9..4f15133 100644
--- a/erpnext/manufacturing/doctype/bom_update_log/test_bom_update_log.py
+++ b/erpnext/manufacturing/doctype/bom_update_log/test_bom_update_log.py
@@ -6,7 +6,7 @@
from erpnext.manufacturing.doctype.bom_update_log.bom_update_log import (
BOMMissingError,
- run_bom_job,
+ run_replace_bom_job,
)
from erpnext.manufacturing.doctype.bom_update_tool.bom_update_tool import enqueue_replace_bom
@@ -71,7 +71,7 @@
# Explicitly commits log, new bom (setUp) and replacement impact.
# Is run via background jobs IRL
- run_bom_job(
+ run_replace_bom_job(
doc=log,
boms=self.boms,
update_type="Replace BOM",
@@ -88,7 +88,7 @@
log2 = enqueue_replace_bom(
boms=self.boms,
)
- run_bom_job( # Explicitly commits
+ run_replace_bom_job( # Explicitly commits
doc=log2,
boms=boms,
update_type="Replace BOM",
diff --git a/erpnext/manufacturing/doctype/bom_update_tool/bom_update_tool.py b/erpnext/manufacturing/doctype/bom_update_tool/bom_update_tool.py
index e765725..4a2e03f 100644
--- a/erpnext/manufacturing/doctype/bom_update_tool/bom_update_tool.py
+++ b/erpnext/manufacturing/doctype/bom_update_tool/bom_update_tool.py
@@ -2,8 +2,7 @@
# For license information, please see license.txt
import json
-from collections import defaultdict
-from typing import TYPE_CHECKING, Dict, List, Literal, Optional, Union
+from typing import TYPE_CHECKING, Dict, Literal, Optional, Union
if TYPE_CHECKING:
from erpnext.manufacturing.doctype.bom_update_log.bom_update_log import BOMUpdateLog
@@ -39,17 +38,7 @@
def auto_update_latest_price_in_all_boms() -> None:
"""Called via hooks.py."""
if frappe.db.get_single_value("Manufacturing Settings", "update_bom_costs_automatically"):
- update_cost()
-
-
-def update_cost() -> None:
- """Updates Cost for all BOMs from bottom to top."""
- bom_list = get_boms_in_bottom_up_order()
- for bom in bom_list:
- bom_doc = frappe.get_cached_doc("BOM", bom)
- bom_doc.calculate_cost(save_updates=True, update_hour_rate=True)
- # bom_doc.update_exploded_items(save=True) #TODO: edit exploded items rate
- bom_doc.db_update()
+ create_bom_update_log(update_type="Update Cost")
def create_bom_update_log(
@@ -69,90 +58,3 @@
"update_type": update_type,
}
).submit()
-
-
-def get_boms_in_bottom_up_order(bom_no: Optional[str] = None) -> List:
- """
- Eg: Main BOM
- |- Sub BOM 1
- |- Leaf BOM 1
- |- Sub BOM 2
- |- Leaf BOM 2
- Result: [Leaf BOM 1, Leaf BOM 2, Sub BOM 1, Sub BOM 2, Main BOM]
- """
- leaf_boms = []
- if bom_no:
- leaf_boms.append(bom_no)
- else:
- leaf_boms = _get_leaf_boms()
-
- child_parent_map = _generate_child_parent_map()
- bom_list = leaf_boms.copy()
-
- for leaf_bom in leaf_boms:
- parent_list = _get_flat_parent_map(leaf_bom, child_parent_map)
-
- if not parent_list:
- continue
-
- bom_list.extend(parent_list)
- bom_list = list(dict.fromkeys(bom_list).keys()) # remove duplicates
-
- return bom_list
-
-
-def _generate_child_parent_map():
- bom = frappe.qb.DocType("BOM")
- bom_item = frappe.qb.DocType("BOM Item")
-
- bom_parents = (
- frappe.qb.from_(bom_item)
- .join(bom)
- .on(bom_item.parent == bom.name)
- .select(bom_item.bom_no, bom_item.parent)
- .where(
- (bom_item.bom_no.isnotnull())
- & (bom_item.bom_no != "")
- & (bom.docstatus == 1)
- & (bom.is_active == 1)
- & (bom_item.parenttype == "BOM")
- )
- ).run(as_dict=True)
-
- child_parent_map = defaultdict(list)
- for bom in bom_parents:
- child_parent_map[bom.bom_no].append(bom.parent)
-
- return child_parent_map
-
-
-def _get_flat_parent_map(leaf, child_parent_map):
- "Get ancestors at all levels of a leaf BOM."
- parents_list = []
-
- def _get_parents(node, parents_list):
- "Returns recursively updated ancestors list."
- first_parents = child_parent_map.get(node) # immediate parents of node
- if not first_parents: # top most node
- return parents_list
-
- parents_list.extend(first_parents)
- parents_list = list(dict.fromkeys(parents_list).keys()) # remove duplicates
-
- for nth_node in first_parents:
- # recursively find parents
- parents_list = _get_parents(nth_node, parents_list)
-
- return parents_list
-
- parents_list = _get_parents(leaf, parents_list)
- return parents_list
-
-
-def _get_leaf_boms():
- return frappe.db.sql_list(
- """select name from `tabBOM` bom
- where docstatus=1 and is_active=1
- and not exists(select bom_no from `tabBOM Item`
- where parent=bom.name and ifnull(bom_no, '')!='')"""
- )