feat: Level-wise BOM cost updation

- Process BOMs level wise and Pause after level is complete
- Cron job will resume Paused jobs, which will again process the new level and pause at the end
- This will go on until all BOMs are updated
- Added Progress section with fields to track updated BOMs in Log
- Cleanup: Add BOM Updation utils file to contain helper functions/sub-functions
- Cleanup: BOM Update Log file will only contain functions that are in direct context of the Log

Co-authored-by: Gavin D'souza <gavin18d@gmail.com>
diff --git a/erpnext/hooks.py b/erpnext/hooks.py
index 813ac17..05f06b3 100644
--- a/erpnext/hooks.py
+++ b/erpnext/hooks.py
@@ -392,9 +392,12 @@
 scheduler_events = {
 	"cron": {
+		"0/5 * * * *": [
+			"erpnext.manufacturing.doctype.bom_update_log.bom_update_log.resume_bom_cost_update_jobs",
+		],
 		"0/30 * * * *": [
-		]
+		],
 	"all": [
diff --git a/erpnext/manufacturing/doctype/bom_update_log/bom_update_log.json b/erpnext/manufacturing/doctype/bom_update_log/bom_update_log.json
index 98c1acb..3455b86 100644
--- a/erpnext/manufacturing/doctype/bom_update_log/bom_update_log.json
+++ b/erpnext/manufacturing/doctype/bom_update_log/bom_update_log.json
@@ -13,6 +13,10 @@
+  "progress_section",
+  "current_boms",
+  "parent_boms",
+  "processed_boms",
  "fields": [
@@ -47,7 +51,7 @@
    "fieldname": "status",
    "fieldtype": "Select",
    "label": "Status",
-   "options": "Queued\nIn Progress\nCompleted\nFailed"
+   "options": "Queued\nIn Progress\nPaused\nCompleted\nFailed"
    "fieldname": "amended_from",
@@ -63,13 +67,34 @@
    "fieldtype": "Link",
    "label": "Error Log",
    "options": "Error Log"
+  },
+  {
+   "fieldname": "progress_section",
+   "fieldtype": "Section Break",
+   "label": "Progress"
+  },
+  {
+   "fieldname": "current_boms",
+   "fieldtype": "Text",
+   "label": "Current BOMs"
+  },
+  {
+   "description": "Immediate parent BOMs",
+   "fieldname": "parent_boms",
+   "fieldtype": "Text",
+   "label": "Parent BOMs"
+  },
+  {
+   "fieldname": "processed_boms",
+   "fieldtype": "Text",
+   "label": "Processed BOMs"
  "in_create": 1,
  "index_web_pages_for_search": 1,
  "is_submittable": 1,
  "links": [],
- "modified": "2022-03-31 12:51:44.885102",
+ "modified": "2022-05-23 14:42:14.725914",
  "modified_by": "Administrator",
  "module": "Manufacturing",
  "name": "BOM Update Log",
diff --git a/erpnext/manufacturing/doctype/bom_update_log/bom_update_log.py b/erpnext/manufacturing/doctype/bom_update_log/bom_update_log.py
index c0770fa..639628a 100644
--- a/erpnext/manufacturing/doctype/bom_update_log/bom_update_log.py
+++ b/erpnext/manufacturing/doctype/bom_update_log/bom_update_log.py
@@ -1,13 +1,19 @@
 # Copyright (c) 2022, Frappe Technologies Pvt. Ltd. and contributors
 # For license information, please see license.txt
-from typing import Dict, List, Literal, Optional
+import json
+from typing import Dict, Optional
 import frappe
 from frappe import _
 from frappe.model.document import Document
-from frappe.utils import cstr, flt
+from frappe.utils import cstr
-from erpnext.manufacturing.doctype.bom_update_tool.bom_update_tool import update_cost
+from erpnext.manufacturing.doctype.bom_update_log.bom_updation_utils import (
+	get_leaf_boms,
+	handle_exception,
+	replace_bom,
+	set_values_in_log,
 class BOMMissingError(frappe.ValidationError):
@@ -49,116 +55,93 @@
 		if self.update_type == "Replace BOM":
 			boms = {"current_bom": self.current_bom, "new_bom": self.new_bom}
-				method="erpnext.manufacturing.doctype.bom_update_log.bom_update_log.run_bom_job",
+				method="erpnext.manufacturing.doctype.bom_update_log.bom_update_log.run_replace_bom_job",
-			frappe.enqueue(
-				method="erpnext.manufacturing.doctype.bom_update_log.bom_update_log.run_bom_job",
-				doc=self,
-				update_type="Update Cost",
-				timeout=40000,
-			)
+			process_boms_cost_level_wise(self)
-def replace_bom(boms: Dict) -> None:
-	"""Replace current BOM with new BOM in parent BOMs."""
-	current_bom = boms.get("current_bom")
-	new_bom = boms.get("new_bom")
-	unit_cost = get_new_bom_unit_cost(new_bom)
-	update_new_bom_in_bom_items(unit_cost, current_bom, new_bom)
-	frappe.cache().delete_key("bom_children")
-	parent_boms = get_parent_boms(new_bom)
-	for bom in parent_boms:
-		bom_obj = frappe.get_doc("BOM", bom)
-		# this is only used for versioning and we do not want
-		# to make separate db calls by using load_doc_before_save
-		# which proves to be expensive while doing bulk replace
-		bom_obj._doc_before_save = bom_obj
-		bom_obj.update_exploded_items()
-		bom_obj.calculate_cost()
-		bom_obj.update_parent_cost()
-		bom_obj.db_update()
-		if bom_obj.meta.get("track_changes") and not bom_obj.flags.ignore_version:
-			bom_obj.save_version()
-def update_new_bom_in_bom_items(unit_cost: float, current_bom: str, new_bom: str) -> None:
-	bom_item = frappe.qb.DocType("BOM Item")
-	(
-		frappe.qb.update(bom_item)
-		.set(bom_item.bom_no, new_bom)
-		.set(bom_item.rate, unit_cost)
-		.set(bom_item.amount, (bom_item.stock_qty * unit_cost))
-		.where(
-			(bom_item.bom_no == current_bom) & (bom_item.docstatus < 2) & (bom_item.parenttype == "BOM")
-		)
-	).run()
-def get_parent_boms(new_bom: str, bom_list: Optional[List] = None) -> List:
-	bom_list = bom_list or []
-	bom_item = frappe.qb.DocType("BOM Item")
-	parents = (
-		frappe.qb.from_(bom_item)
-		.select(bom_item.parent)
-		.where((bom_item.bom_no == new_bom) & (bom_item.docstatus < 2) & (bom_item.parenttype == "BOM"))
-		.run(as_dict=True)
-	)
-	for d in parents:
-		if new_bom == d.parent:
-			frappe.throw(_("BOM recursion: {0} cannot be child of {1}").format(new_bom, d.parent))
-		bom_list.append(d.parent)
-		get_parent_boms(d.parent, bom_list)
-	return list(set(bom_list))
-def get_new_bom_unit_cost(new_bom: str) -> float:
-	bom = frappe.qb.DocType("BOM")
-	new_bom_unitcost = (
-		frappe.qb.from_(bom).select(bom.total_cost / bom.quantity).where(bom.name == new_bom).run()
-	)
-	return flt(new_bom_unitcost[0][0])
-def run_bom_job(
+def run_replace_bom_job(
 	doc: "BOMUpdateLog",
 	boms: Optional[Dict[str, str]] = None,
-	update_type: Literal["Replace BOM", "Update Cost"] = "Replace BOM",
 ) -> None:
 		doc.db_set("status", "In Progress")
 		if not frappe.flags.in_test:
 		frappe.db.auto_commit_on_many_writes = 1
 		boms = frappe._dict(boms or {})
-		if update_type == "Replace BOM":
-			replace_bom(boms)
-		else:
-			update_cost()
+		replace_bom(boms)
 		doc.db_set("status", "Completed")
 	except Exception:
-		frappe.db.rollback()
-		error_log = doc.log_error("BOM Update Tool Error")
-		doc.db_set("status", "Failed")
-		doc.db_set("error_log", error_log.name)
+		handle_exception(doc)
 		frappe.db.auto_commit_on_many_writes = 0
 		frappe.db.commit()  # nosemgrep
+def process_boms_cost_level_wise(update_doc: "BOMUpdateLog") -> None:
+	"Queue jobs at the start of new BOM Level in 'Update Cost' Jobs."
+	current_boms, parent_boms = {}, []
+	values = {}
+	if update_doc.status == "Queued":
+		# First level yet to process. On Submit.
+		current_boms = {bom: False for bom in get_leaf_boms()}
+		values = {
+			"current_boms": json.dumps(current_boms),
+			"parent_boms": "[]",
+			"processed_boms": json.dumps({}),
+			"status": "In Progress",
+		}
+	else:
+		# status is Paused, resume. via Cron Job.
+		current_boms, parent_boms = json.loads(update_doc.current_boms), json.loads(
+			update_doc.parent_boms
+		)
+		if not current_boms:
+			# Process the next level BOMs. Stage parents as current BOMs.
+			current_boms = {bom: False for bom in parent_boms}
+			values = {
+				"current_boms": json.dumps(current_boms),
+				"parent_boms": "[]",
+				"status": "In Progress",
+			}
+	set_values_in_log(update_doc.name, values, commit=True)
+	queue_bom_cost_jobs(current_boms, update_doc)
+def queue_bom_cost_jobs(current_boms: Dict, update_doc: "BOMUpdateLog") -> None:
+	"Queue batches of 20k BOMs of the same level to process parallelly"
+	current_boms_list = [bom for bom in current_boms]
+	while current_boms_list:
+		boms_to_process = current_boms_list[:20000]  # slice out batch of 20k BOMs
+		# update list to exclude 20K (queued) BOMs
+		current_boms_list = current_boms_list[20000:] if len(current_boms_list) > 20000 else []
+		frappe.enqueue(
+			method="erpnext.manufacturing.doctype.bom_update_log.bom_updation_utils.update_cost_in_level",
+			doc=update_doc,
+			bom_list=boms_to_process,
+			timeout=40000,
+		)
+def resume_bom_cost_update_jobs():
+	"Called every 10 minutes via Cron job."
+	paused_jobs = frappe.db.get_all("BOM Update Log", {"status": "Paused"})
+	if not paused_jobs:
+		return
+	for job in paused_jobs:
+		# resume from next level
+		process_boms_cost_level_wise(update_doc=frappe.get_doc("BOM Update Log", job.name))
diff --git a/erpnext/manufacturing/doctype/bom_update_log/bom_updation_utils.py b/erpnext/manufacturing/doctype/bom_update_log/bom_updation_utils.py
new file mode 100644
index 0000000..b5964ce
--- /dev/null
+++ b/erpnext/manufacturing/doctype/bom_update_log/bom_updation_utils.py
@@ -0,0 +1,223 @@
+# Copyright (c) 2022, Frappe Technologies Pvt. Ltd. and contributors
+# For license information, please see license.txt
+import json
+from collections import defaultdict
+from typing import TYPE_CHECKING, Dict, List, Optional
+	from erpnext.manufacturing.doctype.bom_update_log.bom_update_log import BOMUpdateLog
+import frappe
+from frappe import _
+def replace_bom(boms: Dict) -> None:
+	"""Replace current BOM with new BOM in parent BOMs."""
+	current_bom = boms.get("current_bom")
+	new_bom = boms.get("new_bom")
+	unit_cost = get_bom_unit_cost(new_bom)
+	update_new_bom_in_bom_items(unit_cost, current_bom, new_bom)
+	frappe.cache().delete_key("bom_children")
+	parent_boms = get_ancestor_boms(new_bom)
+	for bom in parent_boms:
+		bom_obj = frappe.get_doc("BOM", bom)
+		# this is only used for versioning and we do not want
+		# to make separate db calls by using load_doc_before_save
+		# which proves to be expensive while doing bulk replace
+		bom_obj._doc_before_save = bom_obj
+		bom_obj.update_exploded_items()
+		bom_obj.calculate_cost()
+		bom_obj.update_parent_cost()
+		bom_obj.db_update()
+		if bom_obj.meta.get("track_changes") and not bom_obj.flags.ignore_version:
+			bom_obj.save_version()
+def update_cost_in_level(doc: "BOMUpdateLog", bom_list: List[str]) -> None:
+	"Updates Cost for BOMs within a given level. Runs via background jobs."
+	try:
+		status = frappe.db.get_value("BOM Update Log", doc.name, "status")
+		if status == "Failed":
+			return
+		frappe.db.auto_commit_on_many_writes = 1
+		# main updation logic
+		job_data = update_cost_in_boms(bom_list=bom_list, docname=doc.name)
+		set_values_in_log(
+			doc.name,
+			values={
+				"current_boms": json.dumps(job_data.get("current_boms")),
+				"processed_boms": json.dumps(job_data.get("processed_boms")),
+			},
+			commit=True,
+		)
+		process_if_level_is_complete(doc.name, job_data["current_boms"], job_data["processed_boms"])
+	except Exception:
+		handle_exception(doc)
+	finally:
+		frappe.db.auto_commit_on_many_writes = 0
+		frappe.db.commit()  # nosemgrep
+def get_ancestor_boms(new_bom: str, bom_list: Optional[List] = None) -> List:
+	bom_list = bom_list or []
+	bom_item = frappe.qb.DocType("BOM Item")
+	parents = (
+		frappe.qb.from_(bom_item)
+		.select(bom_item.parent)
+		.where((bom_item.bom_no == new_bom) & (bom_item.docstatus < 2) & (bom_item.parenttype == "BOM"))
+		.run(as_dict=True)
+	)
+	for d in parents:
+		if new_bom == d.parent:
+			frappe.throw(_("BOM recursion: {0} cannot be child of {1}").format(new_bom, d.parent))
+		bom_list.append(d.parent)
+		get_ancestor_boms(d.parent, bom_list)
+	return list(set(bom_list))
+def update_new_bom_in_bom_items(unit_cost: float, current_bom: str, new_bom: str) -> None:
+	bom_item = frappe.qb.DocType("BOM Item")
+	(
+		frappe.qb.update(bom_item)
+		.set(bom_item.bom_no, new_bom)
+		.set(bom_item.rate, unit_cost)
+		.set(bom_item.amount, (bom_item.stock_qty * unit_cost))
+		.where(
+			(bom_item.bom_no == current_bom) & (bom_item.docstatus < 2) & (bom_item.parenttype == "BOM")
+		)
+	).run()
+def get_bom_unit_cost(new_bom: str) -> float:
+	bom = frappe.qb.DocType("BOM")
+	new_bom_unitcost = (
+		frappe.qb.from_(bom).select(bom.total_cost / bom.quantity).where(bom.name == new_bom).run()
+	)
+	return frappe.utils.flt(new_bom_unitcost[0][0])
+def update_cost_in_boms(bom_list: List[str], docname: str) -> Dict:
+	"Updates cost in given BOMs. Returns current and total updated BOMs."
+	updated_boms = {}  # current boms that have been updated
+	for bom in bom_list:
+		bom_doc = frappe.get_cached_doc("BOM", bom)
+		bom_doc.calculate_cost(save_updates=True, update_hour_rate=True)
+		# bom_doc.update_exploded_items(save=True) #TODO: edit exploded items rate
+		bom_doc.db_update()
+		updated_boms[bom] = True
+	# Update processed BOMs in Log
+	log_data = frappe.db.get_values(
+		"BOM Update Log", docname, ["current_boms", "processed_boms"], as_dict=True
+	)[0]
+	for field in ("current_boms", "processed_boms"):
+		log_data[field] = json.loads(log_data.get(field))
+		log_data[field].update(updated_boms)
+	return log_data
+def process_if_level_is_complete(docname: str, current_boms: Dict, processed_boms: Dict) -> None:
+	"Prepare and set higher level BOMs in Log if current level is complete."
+	processing_complete = all(current_boms.get(bom) for bom in current_boms)
+	if not processing_complete:
+		return
+	parent_boms = get_next_higher_level_boms(child_boms=current_boms, processed_boms=processed_boms)
+	set_values_in_log(
+		docname,
+		values={
+			"current_boms": json.dumps({}),
+			"parent_boms": json.dumps(parent_boms),
+			"status": "Completed" if not parent_boms else "Paused",
+		},
+		commit=True,
+	)
+def get_next_higher_level_boms(child_boms: Dict, processed_boms: Dict):
+	"Generate immediate higher level dependants with no unresolved dependencies."
+	def _all_children_are_processed(parent):
+		bom_doc = frappe.get_cached_doc("BOM", parent)
+		return all(processed_boms.get(row.bom_no) for row in bom_doc.items if row.bom_no)
+	dependants_map = _generate_dependants_map()
+	dependants = set()
+	for bom in child_boms:
+		parents = dependants_map.get(bom) or []
+		for parent in parents:
+			if _all_children_are_processed(parent):
+				dependants.add(parent)
+	return list(dependants)
+def get_leaf_boms():
+	return frappe.db.sql_list(
+		"""select name from `tabBOM` bom
+		where docstatus=1 and is_active=1
+			and not exists(select bom_no from `tabBOM Item`
+				where parent=bom.name and ifnull(bom_no, '')!='')"""
+	)
+def _generate_dependants_map():
+	bom = frappe.qb.DocType("BOM")
+	bom_item = frappe.qb.DocType("BOM Item")
+	bom_parents = (
+		frappe.qb.from_(bom_item)
+		.join(bom)
+		.on(bom_item.parent == bom.name)
+		.select(bom_item.bom_no, bom_item.parent)
+		.where(
+			(bom_item.bom_no.isnotnull())
+			& (bom_item.bom_no != "")
+			& (bom.docstatus == 1)
+			& (bom.is_active == 1)
+			& (bom_item.parenttype == "BOM")
+		)
+	).run(as_dict=True)
+	child_parent_map = defaultdict(list)
+	for bom in bom_parents:
+		child_parent_map[bom.bom_no].append(bom.parent)
+	return child_parent_map
+def set_values_in_log(log_name: str, values: Dict, commit: bool = False) -> None:
+	"Update BOM Update Log record."
+	if not values:
+		return
+	bom_update_log = frappe.qb.DocType("BOM Update Log")
+	query = frappe.qb.update(bom_update_log).where(bom_update_log.name == log_name)
+	for key, value in values.items():
+		query = query.set(key, value)
+	query.run()
+	if commit:
+		frappe.db.commit()
+def handle_exception(doc: "BOMUpdateLog"):
+	frappe.db.rollback()
+	error_log = doc.log_error("BOM Update Tool Error")
+	set_values_in_log(doc.name, {"status": "Failed", "error_log": error_log.name})
diff --git a/erpnext/manufacturing/doctype/bom_update_log/test_bom_update_log.py b/erpnext/manufacturing/doctype/bom_update_log/test_bom_update_log.py
index 47efea9..4f15133 100644
--- a/erpnext/manufacturing/doctype/bom_update_log/test_bom_update_log.py
+++ b/erpnext/manufacturing/doctype/bom_update_log/test_bom_update_log.py
@@ -6,7 +6,7 @@
 from erpnext.manufacturing.doctype.bom_update_log.bom_update_log import (
-	run_bom_job,
+	run_replace_bom_job,
 from erpnext.manufacturing.doctype.bom_update_tool.bom_update_tool import enqueue_replace_bom
@@ -71,7 +71,7 @@
 		# Explicitly commits log, new bom (setUp) and replacement impact.
 		# Is run via background jobs IRL
-		run_bom_job(
+		run_replace_bom_job(
 			update_type="Replace BOM",
@@ -88,7 +88,7 @@
 		log2 = enqueue_replace_bom(
-		run_bom_job(  # Explicitly commits
+		run_replace_bom_job(  # Explicitly commits
 			update_type="Replace BOM",
diff --git a/erpnext/manufacturing/doctype/bom_update_tool/bom_update_tool.py b/erpnext/manufacturing/doctype/bom_update_tool/bom_update_tool.py
index e765725..4a2e03f 100644
--- a/erpnext/manufacturing/doctype/bom_update_tool/bom_update_tool.py
+++ b/erpnext/manufacturing/doctype/bom_update_tool/bom_update_tool.py
@@ -2,8 +2,7 @@
 # For license information, please see license.txt
 import json
-from collections import defaultdict
-from typing import TYPE_CHECKING, Dict, List, Literal, Optional, Union
+from typing import TYPE_CHECKING, Dict, Literal, Optional, Union
 	from erpnext.manufacturing.doctype.bom_update_log.bom_update_log import BOMUpdateLog
@@ -39,17 +38,7 @@
 def auto_update_latest_price_in_all_boms() -> None:
 	"""Called via hooks.py."""
 	if frappe.db.get_single_value("Manufacturing Settings", "update_bom_costs_automatically"):
-		update_cost()
-def update_cost() -> None:
-	"""Updates Cost for all BOMs from bottom to top."""
-	bom_list = get_boms_in_bottom_up_order()
-	for bom in bom_list:
-		bom_doc = frappe.get_cached_doc("BOM", bom)
-		bom_doc.calculate_cost(save_updates=True, update_hour_rate=True)
-		# bom_doc.update_exploded_items(save=True) #TODO: edit exploded items rate
-		bom_doc.db_update()
+		create_bom_update_log(update_type="Update Cost")
 def create_bom_update_log(
@@ -69,90 +58,3 @@
 			"update_type": update_type,
-def get_boms_in_bottom_up_order(bom_no: Optional[str] = None) -> List:
-	"""
-	Eg: Main BOM
-	        |- Sub BOM 1
-	                |- Leaf BOM 1
-	        |- Sub BOM 2
-	                |- Leaf BOM 2
-	Result: [Leaf BOM 1, Leaf BOM 2, Sub BOM 1, Sub BOM 2, Main BOM]
-	"""
-	leaf_boms = []
-	if bom_no:
-		leaf_boms.append(bom_no)
-	else:
-		leaf_boms = _get_leaf_boms()
-	child_parent_map = _generate_child_parent_map()
-	bom_list = leaf_boms.copy()
-	for leaf_bom in leaf_boms:
-		parent_list = _get_flat_parent_map(leaf_bom, child_parent_map)
-		if not parent_list:
-			continue
-		bom_list.extend(parent_list)
-		bom_list = list(dict.fromkeys(bom_list).keys())  # remove duplicates
-	return bom_list
-def _generate_child_parent_map():
-	bom = frappe.qb.DocType("BOM")
-	bom_item = frappe.qb.DocType("BOM Item")
-	bom_parents = (
-		frappe.qb.from_(bom_item)
-		.join(bom)
-		.on(bom_item.parent == bom.name)
-		.select(bom_item.bom_no, bom_item.parent)
-		.where(
-			(bom_item.bom_no.isnotnull())
-			& (bom_item.bom_no != "")
-			& (bom.docstatus == 1)
-			& (bom.is_active == 1)
-			& (bom_item.parenttype == "BOM")
-		)
-	).run(as_dict=True)
-	child_parent_map = defaultdict(list)
-	for bom in bom_parents:
-		child_parent_map[bom.bom_no].append(bom.parent)
-	return child_parent_map
-def _get_flat_parent_map(leaf, child_parent_map):
-	"Get ancestors at all levels of a leaf BOM."
-	parents_list = []
-	def _get_parents(node, parents_list):
-		"Returns recursively updated ancestors list."
-		first_parents = child_parent_map.get(node)  # immediate parents of node
-		if not first_parents:  # top most node
-			return parents_list
-		parents_list.extend(first_parents)
-		parents_list = list(dict.fromkeys(parents_list).keys())  # remove duplicates
-		for nth_node in first_parents:
-			# recursively find parents
-			parents_list = _get_parents(nth_node, parents_list)
-		return parents_list
-	parents_list = _get_parents(leaf, parents_list)
-	return parents_list
-def _get_leaf_boms():
-	return frappe.db.sql_list(
-		"""select name from `tabBOM` bom
-		where docstatus=1 and is_active=1
-			and not exists(select bom_no from `tabBOM Item`
-				where parent=bom.name and ifnull(bom_no, '')!='')"""
-	)