Merge branch 'develop' into perf-bom-update-tool
diff --git a/erpnext/hooks.py b/erpnext/hooks.py
index 1c4bbbc..7d7f65d 100644
--- a/erpnext/hooks.py
+++ b/erpnext/hooks.py
@@ -392,9 +392,12 @@
 
 scheduler_events = {
 	"cron": {
+		"0/5 * * * *": [
+			"erpnext.manufacturing.doctype.bom_update_log.bom_update_log.resume_bom_cost_update_jobs",
+		],
 		"0/30 * * * *": [
 			"erpnext.utilities.doctype.video.video.update_youtube_data",
-		]
+		],
 	},
 	"all": [
 		"erpnext.projects.doctype.project.project.project_status_update_reminder",
diff --git a/erpnext/manufacturing/doctype/bom/bom.py b/erpnext/manufacturing/doctype/bom/bom.py
index 6376359..3e2a2d1 100644
--- a/erpnext/manufacturing/doctype/bom/bom.py
+++ b/erpnext/manufacturing/doctype/bom/bom.py
@@ -1,11 +1,11 @@
-# Copyright (c) 2015, Frappe Technologies Pvt. Ltd. and Contributors
+# Copyright (c) 2022, Frappe Technologies Pvt. Ltd. and Contributors
 # License: GNU General Public License v3. See license.txt
 
 import functools
 import re
 from collections import deque
 from operator import itemgetter
-from typing import List
+from typing import Dict, List
 
 import frappe
 from frappe import _
@@ -189,6 +189,7 @@
 		self.validate_transfer_against()
 		self.set_routing_operations()
 		self.validate_operations()
+		self.update_exploded_items(save=False)
 		self.calculate_cost()
 		self.update_stock_qty()
 		self.update_cost(update_parent=False, from_child_bom=True, update_hour_rate=False, save=False)
@@ -386,40 +387,14 @@
 
 		existing_bom_cost = self.total_cost
 
-		for d in self.get("items"):
-			if not d.item_code:
-				continue
-
-			rate = self.get_rm_rate(
-				{
-					"company": self.company,
-					"item_code": d.item_code,
-					"bom_no": d.bom_no,
-					"qty": d.qty,
-					"uom": d.uom,
-					"stock_uom": d.stock_uom,
-					"conversion_factor": d.conversion_factor,
-					"sourced_by_supplier": d.sourced_by_supplier,
-				}
-			)
-
-			if rate:
-				d.rate = rate
-			d.amount = flt(d.rate) * flt(d.qty)
-			d.base_rate = flt(d.rate) * flt(self.conversion_rate)
-			d.base_amount = flt(d.amount) * flt(self.conversion_rate)
-
-			if save:
-				d.db_update()
-
 		if self.docstatus == 1:
 			self.flags.ignore_validate_update_after_submit = True
-			self.calculate_cost(update_hour_rate)
+
+		self.calculate_cost(save_updates=save, update_hour_rate=update_hour_rate)
+
 		if save:
 			self.db_update()
 
-		self.update_exploded_items(save=save)
-
 		# update parent BOMs
 		if self.total_cost != existing_bom_cost and update_parent:
 			parent_boms = frappe.db.sql_list(
@@ -608,11 +583,15 @@
 		bom_list.reverse()
 		return bom_list
 
-	def calculate_cost(self, update_hour_rate=False):
+	def calculate_cost(self, save_updates=False, update_hour_rate=False):
 		"""Calculate bom totals"""
 		self.calculate_op_cost(update_hour_rate)
-		self.calculate_rm_cost()
-		self.calculate_sm_cost()
+		self.calculate_rm_cost(save=save_updates)
+		self.calculate_sm_cost(save=save_updates)
+		if save_updates:
+			# not via doc event, table is not regenerated and needs updation
+			self.calculate_exploded_cost()
+
 		self.total_cost = self.operating_cost + self.raw_material_cost - self.scrap_material_cost
 		self.base_total_cost = (
 			self.base_operating_cost + self.base_raw_material_cost - self.base_scrap_material_cost
@@ -654,12 +633,26 @@
 		if update_hour_rate:
 			row.db_update()
 
-	def calculate_rm_cost(self):
+	def calculate_rm_cost(self, save=False):
 		"""Fetch RM rate as per today's valuation rate and calculate totals"""
 		total_rm_cost = 0
 		base_total_rm_cost = 0
 
 		for d in self.get("items"):
+			old_rate = d.rate
+			d.rate = self.get_rm_rate(
+				{
+					"company": self.company,
+					"item_code": d.item_code,
+					"bom_no": d.bom_no,
+					"qty": d.qty,
+					"uom": d.uom,
+					"stock_uom": d.stock_uom,
+					"conversion_factor": d.conversion_factor,
+					"sourced_by_supplier": d.sourced_by_supplier,
+				}
+			)
+
 			d.base_rate = flt(d.rate) * flt(self.conversion_rate)
 			d.amount = flt(d.rate, d.precision("rate")) * flt(d.qty, d.precision("qty"))
 			d.base_amount = d.amount * flt(self.conversion_rate)
@@ -669,11 +662,13 @@
 
 			total_rm_cost += d.amount
 			base_total_rm_cost += d.base_amount
+			if save and (old_rate != d.rate):
+				d.db_update()
 
 		self.raw_material_cost = total_rm_cost
 		self.base_raw_material_cost = base_total_rm_cost
 
-	def calculate_sm_cost(self):
+	def calculate_sm_cost(self, save=False):
 		"""Fetch RM rate as per today's valuation rate and calculate totals"""
 		total_sm_cost = 0
 		base_total_sm_cost = 0
@@ -688,10 +683,45 @@
 			)
 			total_sm_cost += d.amount
 			base_total_sm_cost += d.base_amount
+			if save:
+				d.db_update()
 
 		self.scrap_material_cost = total_sm_cost
 		self.base_scrap_material_cost = base_total_sm_cost
 
+	def calculate_exploded_cost(self):
+		"Set exploded row cost from it's parent BOM."
+		rm_rate_map = self.get_rm_rate_map()
+
+		for row in self.get("exploded_items"):
+			old_rate = flt(row.rate)
+			row.rate = rm_rate_map.get(row.item_code)
+			row.amount = flt(row.stock_qty) * flt(row.rate)
+
+			if old_rate != row.rate:
+				# Only db_update if changed
+				row.db_update()
+
+	def get_rm_rate_map(self) -> Dict[str, float]:
+		"Create Raw Material-Rate map for Exploded Items. Fetch rate from Items table or Subassembly BOM."
+		rm_rate_map = {}
+
+		for item in self.get("items"):
+			if item.bom_no:
+				# Get Item-Rate from Subassembly BOM
+				explosion_items = frappe.get_all(
+					"BOM Explosion Item",
+					filters={"parent": item.bom_no},
+					fields=["item_code", "rate"],
+					order_by=None,  # to avoid sort index creation at db level (granular change)
+				)
+				explosion_item_rate = {item.item_code: flt(item.rate) for item in explosion_items}
+				rm_rate_map.update(explosion_item_rate)
+			else:
+				rm_rate_map[item.item_code] = flt(item.base_rate) / flt(item.conversion_factor or 1.0)
+
+		return rm_rate_map
+
 	def update_exploded_items(self, save=True):
 		"""Update Flat BOM, following will be correct data"""
 		self.get_exploded_items()
@@ -903,13 +933,17 @@
 
 
 def get_valuation_rate(args):
-	"""Get weighted average of valuation rate from all warehouses"""
+	"""
+	1) Get average valuation rate from all warehouses
+	2) If no value, get last valuation rate from SLE
+	3) If no value, get valuation rate from Item
+	"""
 
-	total_qty, total_value, valuation_rate = 0.0, 0.0, 0.0
-	item_bins = frappe.db.sql(
+	valuation_rate = 0.0
+	item_valuation = frappe.db.sql(
 		"""
 		select
-			bin.actual_qty, bin.stock_value
+			(sum(bin.stock_value) / sum(bin.actual_qty)) as valuation_rate
 		from
 			`tabBin` bin, `tabWarehouse` warehouse
 		where
@@ -918,16 +952,12 @@
 			and warehouse.company=%(company)s""",
 		{"item": args["item_code"], "company": args["company"]},
 		as_dict=1,
-	)
+	)[0]
 
-	for d in item_bins:
-		total_qty += flt(d.actual_qty)
-		total_value += flt(d.stock_value)
+	valuation_rate = item_valuation.get("valuation_rate")
 
-	if total_qty:
-		valuation_rate = total_value / total_qty
-
-	if valuation_rate <= 0:
+	if (valuation_rate is not None) and valuation_rate <= 0:
+		# Explicit null value check. If None, Bins don't exist, neither does SLE
 		last_valuation_rate = frappe.db.sql(
 			"""select valuation_rate
 			from `tabStock Ledger Entry`
@@ -1125,39 +1155,6 @@
 		return bom_items
 
 
-def get_boms_in_bottom_up_order(bom_no=None):
-	def _get_parent(bom_no):
-		return frappe.db.sql_list(
-			"""
-			select distinct bom_item.parent from `tabBOM Item` bom_item
-			where bom_item.bom_no = %s and bom_item.docstatus=1 and bom_item.parenttype='BOM'
-				and exists(select bom.name from `tabBOM` bom where bom.name=bom_item.parent and bom.is_active=1)
-		""",
-			bom_no,
-		)
-
-	count = 0
-	bom_list = []
-	if bom_no:
-		bom_list.append(bom_no)
-	else:
-		# get all leaf BOMs
-		bom_list = frappe.db.sql_list(
-			"""select name from `tabBOM` bom
-			where docstatus=1 and is_active=1
-				and not exists(select bom_no from `tabBOM Item`
-					where parent=bom.name and ifnull(bom_no, '')!='')"""
-		)
-
-	while count < len(bom_list):
-		for child_bom in _get_parent(bom_list[count]):
-			if child_bom not in bom_list:
-				bom_list.append(child_bom)
-		count += 1
-
-	return bom_list
-
-
 def add_additional_cost(stock_entry, work_order):
 	# Add non stock items cost in the additional cost
 	stock_entry.additional_costs = []
diff --git a/erpnext/manufacturing/doctype/bom/test_bom.py b/erpnext/manufacturing/doctype/bom/test_bom.py
index f235e44..f2731ec 100644
--- a/erpnext/manufacturing/doctype/bom/test_bom.py
+++ b/erpnext/manufacturing/doctype/bom/test_bom.py
@@ -11,7 +11,9 @@
 
 from erpnext.buying.doctype.purchase_order.test_purchase_order import create_purchase_order
 from erpnext.manufacturing.doctype.bom.bom import BOMRecursionError, item_query, make_variant_bom
-from erpnext.manufacturing.doctype.bom_update_tool.bom_update_tool import update_cost
+from erpnext.manufacturing.doctype.bom_update_log.test_bom_update_log import (
+	update_cost_in_all_boms_in_test,
+)
 from erpnext.stock.doctype.item.test_item import make_item
 from erpnext.stock.doctype.stock_reconciliation.test_stock_reconciliation import (
 	create_stock_reconciliation,
@@ -80,7 +82,7 @@
 		reset_item_valuation_rate(item_code="_Test Item 2", qty=200, rate=rm_rate + 10)
 
 		# update cost of all BOMs based on latest valuation rate
-		update_cost()
+		update_cost_in_all_boms_in_test()
 
 		# check if new valuation rate updated in all BOMs
 		for d in frappe.db.sql(
diff --git a/erpnext/manufacturing/doctype/bom/test_records.json b/erpnext/manufacturing/doctype/bom/test_records.json
index 25730f9..507d319 100644
--- a/erpnext/manufacturing/doctype/bom/test_records.json
+++ b/erpnext/manufacturing/doctype/bom/test_records.json
@@ -32,6 +32,7 @@
   "is_active": 1,
   "is_default": 1,
   "item": "_Test Item Home Desktop Manufactured",
+  "company": "_Test Company",
   "quantity": 1.0
  },
  {
diff --git a/erpnext/manufacturing/doctype/bom_explosion_item/bom_explosion_item.json b/erpnext/manufacturing/doctype/bom_explosion_item/bom_explosion_item.json
index f01d856..9b1db63 100644
--- a/erpnext/manufacturing/doctype/bom_explosion_item/bom_explosion_item.json
+++ b/erpnext/manufacturing/doctype/bom_explosion_item/bom_explosion_item.json
@@ -169,13 +169,15 @@
  "index_web_pages_for_search": 1,
  "istable": 1,
  "links": [],
- "modified": "2020-10-08 16:21:29.386212",
+ "modified": "2022-05-27 13:42:23.305455",
  "modified_by": "Administrator",
  "module": "Manufacturing",
  "name": "BOM Explosion Item",
+ "naming_rule": "Random",
  "owner": "Administrator",
  "permissions": [],
  "sort_field": "modified",
  "sort_order": "DESC",
+ "states": [],
  "track_changes": 1
 }
\ No newline at end of file
diff --git a/erpnext/manufacturing/doctype/bom_update_batch/__init__.py b/erpnext/manufacturing/doctype/bom_update_batch/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/erpnext/manufacturing/doctype/bom_update_batch/__init__.py
diff --git a/erpnext/manufacturing/doctype/bom_update_batch/bom_update_batch.json b/erpnext/manufacturing/doctype/bom_update_batch/bom_update_batch.json
new file mode 100644
index 0000000..83b54d3
--- /dev/null
+++ b/erpnext/manufacturing/doctype/bom_update_batch/bom_update_batch.json
@@ -0,0 +1,55 @@
+{
+ "actions": [],
+ "autoname": "autoincrement",
+ "creation": "2022-05-31 17:34:39.825537",
+ "doctype": "DocType",
+ "engine": "InnoDB",
+ "field_order": [
+  "level",
+  "batch_no",
+  "boms_updated",
+  "status"
+ ],
+ "fields": [
+  {
+   "fieldname": "level",
+   "fieldtype": "Int",
+   "in_list_view": 1,
+   "label": "Level"
+  },
+  {
+   "fieldname": "batch_no",
+   "fieldtype": "Int",
+   "in_list_view": 1,
+   "label": "Batch No."
+  },
+  {
+   "fieldname": "boms_updated",
+   "fieldtype": "Long Text",
+   "hidden": 1,
+   "in_list_view": 1,
+   "label": "BOMs Updated"
+  },
+  {
+   "fieldname": "status",
+   "fieldtype": "Select",
+   "in_list_view": 1,
+   "label": "Status",
+   "options": "Pending\nCompleted",
+   "read_only": 1
+  }
+ ],
+ "index_web_pages_for_search": 1,
+ "istable": 1,
+ "links": [],
+ "modified": "2022-06-06 14:50:35.161062",
+ "modified_by": "Administrator",
+ "module": "Manufacturing",
+ "name": "BOM Update Batch",
+ "naming_rule": "Autoincrement",
+ "owner": "Administrator",
+ "permissions": [],
+ "sort_field": "modified",
+ "sort_order": "DESC",
+ "states": []
+}
\ No newline at end of file
diff --git a/erpnext/manufacturing/doctype/bom_update_batch/bom_update_batch.py b/erpnext/manufacturing/doctype/bom_update_batch/bom_update_batch.py
new file mode 100644
index 0000000..f952e43
--- /dev/null
+++ b/erpnext/manufacturing/doctype/bom_update_batch/bom_update_batch.py
@@ -0,0 +1,9 @@
+# Copyright (c) 2022, Frappe Technologies Pvt. Ltd. and contributors
+# For license information, please see license.txt
+
+# import frappe
+from frappe.model.document import Document
+
+
+class BOMUpdateBatch(Document):
+	pass
diff --git a/erpnext/manufacturing/doctype/bom_update_log/bom_update_log.json b/erpnext/manufacturing/doctype/bom_update_log/bom_update_log.json
index 98c1acb..c32e383 100644
--- a/erpnext/manufacturing/doctype/bom_update_log/bom_update_log.json
+++ b/erpnext/manufacturing/doctype/bom_update_log/bom_update_log.json
@@ -13,6 +13,10 @@
   "update_type",
   "status",
   "error_log",
+  "progress_section",
+  "current_level",
+  "processed_boms",
+  "bom_batches",
   "amended_from"
  ],
  "fields": [
@@ -63,13 +67,36 @@
    "fieldtype": "Link",
    "label": "Error Log",
    "options": "Error Log"
+  },
+  {
+   "collapsible": 1,
+   "depends_on": "eval: doc.update_type == \"Update Cost\"",
+   "fieldname": "progress_section",
+   "fieldtype": "Section Break",
+   "label": "Progress"
+  },
+  {
+   "fieldname": "processed_boms",
+   "fieldtype": "Long Text",
+   "hidden": 1,
+   "label": "Processed BOMs"
+  },
+  {
+   "fieldname": "bom_batches",
+   "fieldtype": "Table",
+   "options": "BOM Update Batch"
+  },
+  {
+   "fieldname": "current_level",
+   "fieldtype": "Int",
+   "label": "Current Level"
   }
  ],
  "in_create": 1,
  "index_web_pages_for_search": 1,
  "is_submittable": 1,
  "links": [],
- "modified": "2022-03-31 12:51:44.885102",
+ "modified": "2022-06-06 15:15:23.883251",
  "modified_by": "Administrator",
  "module": "Manufacturing",
  "name": "BOM Update Log",
diff --git a/erpnext/manufacturing/doctype/bom_update_log/bom_update_log.py b/erpnext/manufacturing/doctype/bom_update_log/bom_update_log.py
index c0770fa..71430bd 100644
--- a/erpnext/manufacturing/doctype/bom_update_log/bom_update_log.py
+++ b/erpnext/manufacturing/doctype/bom_update_log/bom_update_log.py
@@ -1,13 +1,20 @@
 # Copyright (c) 2022, Frappe Technologies Pvt. Ltd. and contributors
 # For license information, please see license.txt
-from typing import Dict, List, Literal, Optional
+import json
+from typing import Any, Dict, List, Optional, Tuple, Union
 
 import frappe
 from frappe import _
 from frappe.model.document import Document
-from frappe.utils import cstr, flt
+from frappe.utils import cint, cstr
 
-from erpnext.manufacturing.doctype.bom_update_tool.bom_update_tool import update_cost
+from erpnext.manufacturing.doctype.bom_update_log.bom_updation_utils import (
+	get_leaf_boms,
+	get_next_higher_level_boms,
+	handle_exception,
+	replace_bom,
+	set_values_in_log,
+)
 
 
 class BOMMissingError(frappe.ValidationError):
@@ -20,6 +27,8 @@
 			self.validate_boms_are_specified()
 			self.validate_same_bom()
 			self.validate_bom_items()
+		else:
+			self.validate_bom_cost_update_in_progress()
 
 		self.status = "Queued"
 
@@ -42,6 +51,21 @@
 		if current_bom_item != new_bom_item:
 			frappe.throw(_("The selected BOMs are not for the same item"))
 
+	def validate_bom_cost_update_in_progress(self):
+		"If another Cost Updation Log is still in progress, dont make new ones."
+
+		wip_log = frappe.get_all(
+			"BOM Update Log",
+			{"update_type": "Update Cost", "status": ["in", ["Queued", "In Progress"]]},
+			limit_page_length=1,
+		)
+		if wip_log:
+			log_link = frappe.utils.get_link_to_form("BOM Update Log", wip_log[0].name)
+			frappe.throw(
+				_("BOM Updation already in progress. Please wait until {0} is complete.").format(log_link),
+				title=_("Note"),
+			)
+
 	def on_submit(self):
 		if frappe.flags.in_test:
 			return
@@ -49,116 +73,170 @@
 		if self.update_type == "Replace BOM":
 			boms = {"current_bom": self.current_bom, "new_bom": self.new_bom}
 			frappe.enqueue(
-				method="erpnext.manufacturing.doctype.bom_update_log.bom_update_log.run_bom_job",
+				method="erpnext.manufacturing.doctype.bom_update_log.bom_update_log.run_replace_bom_job",
 				doc=self,
 				boms=boms,
 				timeout=40000,
 			)
 		else:
-			frappe.enqueue(
-				method="erpnext.manufacturing.doctype.bom_update_log.bom_update_log.run_bom_job",
-				doc=self,
-				update_type="Update Cost",
-				timeout=40000,
-			)
+			process_boms_cost_level_wise(self)
 
 
-def replace_bom(boms: Dict) -> None:
-	"""Replace current BOM with new BOM in parent BOMs."""
-	current_bom = boms.get("current_bom")
-	new_bom = boms.get("new_bom")
-
-	unit_cost = get_new_bom_unit_cost(new_bom)
-	update_new_bom_in_bom_items(unit_cost, current_bom, new_bom)
-
-	frappe.cache().delete_key("bom_children")
-	parent_boms = get_parent_boms(new_bom)
-
-	for bom in parent_boms:
-		bom_obj = frappe.get_doc("BOM", bom)
-		# this is only used for versioning and we do not want
-		# to make separate db calls by using load_doc_before_save
-		# which proves to be expensive while doing bulk replace
-		bom_obj._doc_before_save = bom_obj
-		bom_obj.update_exploded_items()
-		bom_obj.calculate_cost()
-		bom_obj.update_parent_cost()
-		bom_obj.db_update()
-		if bom_obj.meta.get("track_changes") and not bom_obj.flags.ignore_version:
-			bom_obj.save_version()
-
-
-def update_new_bom_in_bom_items(unit_cost: float, current_bom: str, new_bom: str) -> None:
-	bom_item = frappe.qb.DocType("BOM Item")
-	(
-		frappe.qb.update(bom_item)
-		.set(bom_item.bom_no, new_bom)
-		.set(bom_item.rate, unit_cost)
-		.set(bom_item.amount, (bom_item.stock_qty * unit_cost))
-		.where(
-			(bom_item.bom_no == current_bom) & (bom_item.docstatus < 2) & (bom_item.parenttype == "BOM")
-		)
-	).run()
-
-
-def get_parent_boms(new_bom: str, bom_list: Optional[List] = None) -> List:
-	bom_list = bom_list or []
-	bom_item = frappe.qb.DocType("BOM Item")
-
-	parents = (
-		frappe.qb.from_(bom_item)
-		.select(bom_item.parent)
-		.where((bom_item.bom_no == new_bom) & (bom_item.docstatus < 2) & (bom_item.parenttype == "BOM"))
-		.run(as_dict=True)
-	)
-
-	for d in parents:
-		if new_bom == d.parent:
-			frappe.throw(_("BOM recursion: {0} cannot be child of {1}").format(new_bom, d.parent))
-
-		bom_list.append(d.parent)
-		get_parent_boms(d.parent, bom_list)
-
-	return list(set(bom_list))
-
-
-def get_new_bom_unit_cost(new_bom: str) -> float:
-	bom = frappe.qb.DocType("BOM")
-	new_bom_unitcost = (
-		frappe.qb.from_(bom).select(bom.total_cost / bom.quantity).where(bom.name == new_bom).run()
-	)
-
-	return flt(new_bom_unitcost[0][0])
-
-
-def run_bom_job(
+def run_replace_bom_job(
 	doc: "BOMUpdateLog",
 	boms: Optional[Dict[str, str]] = None,
-	update_type: Literal["Replace BOM", "Update Cost"] = "Replace BOM",
 ) -> None:
 	try:
 		doc.db_set("status", "In Progress")
+
 		if not frappe.flags.in_test:
 			frappe.db.commit()
 
 		frappe.db.auto_commit_on_many_writes = 1
-
 		boms = frappe._dict(boms or {})
-
-		if update_type == "Replace BOM":
-			replace_bom(boms)
-		else:
-			update_cost()
+		replace_bom(boms)
 
 		doc.db_set("status", "Completed")
-
 	except Exception:
-		frappe.db.rollback()
-		error_log = doc.log_error("BOM Update Tool Error")
-
-		doc.db_set("status", "Failed")
-		doc.db_set("error_log", error_log.name)
-
+		handle_exception(doc)
 	finally:
 		frappe.db.auto_commit_on_many_writes = 0
-		frappe.db.commit()  # nosemgrep
+
+		if not frappe.flags.in_test:
+			frappe.db.commit()  # nosemgrep
+
+
+def process_boms_cost_level_wise(
+	update_doc: "BOMUpdateLog", parent_boms: List[str] = None
+) -> Union[None, Tuple]:
+	"Queue jobs at the start of new BOM Level in 'Update Cost' Jobs."
+
+	current_boms = {}
+	values = {}
+
+	if update_doc.status == "Queued":
+		# First level yet to process. On Submit.
+		current_level = 0
+		current_boms = get_leaf_boms()
+		values = {
+			"processed_boms": json.dumps({}),
+			"status": "In Progress",
+			"current_level": current_level,
+		}
+	else:
+		# Resume next level. via Cron Job.
+		if not parent_boms:
+			return
+
+		current_level = cint(update_doc.current_level) + 1
+
+		# Process the next level BOMs. Stage parents as current BOMs.
+		current_boms = parent_boms.copy()
+		values = {"current_level": current_level}
+
+	set_values_in_log(update_doc.name, values, commit=True)
+
+	if frappe.flags.in_test:
+		return current_boms, current_level
+
+	queue_bom_cost_jobs(current_boms, update_doc, current_level)
+
+
+def queue_bom_cost_jobs(
+	current_boms_list: List[str], update_doc: "BOMUpdateLog", current_level: int
+) -> None:
+	"Queue batches of 20k BOMs of the same level to process parallelly"
+	batch_no = 0
+
+	while current_boms_list:
+		batch_no += 1
+		batch_size = 20_000
+		boms_to_process = current_boms_list[:batch_size]  # slice out batch of 20k BOMs
+
+		# update list to exclude 20K (queued) BOMs
+		current_boms_list = current_boms_list[batch_size:] if len(current_boms_list) > batch_size else []
+
+		batch_row = update_doc.append(
+			"bom_batches", {"level": current_level, "batch_no": batch_no, "status": "Pending"}
+		)
+		batch_row.db_insert()
+
+		if frappe.flags.in_test:
+			# skip background jobs in test
+			return boms_to_process, batch_row.name
+
+		frappe.enqueue(
+			method="erpnext.manufacturing.doctype.bom_update_log.bom_updation_utils.update_cost_in_level",
+			doc=update_doc,
+			bom_list=boms_to_process,
+			batch_name=batch_row.name,
+			queue="long",
+		)
+
+
+def resume_bom_cost_update_jobs():
+	"""
+	1. Checks for In Progress BOM Update Log.
+	2. Checks if this job has completed the _current level_.
+	3. If current level is complete, get parent BOMs and start next level.
+	4. If no parents, mark as Complete.
+	5. If current level is WIP, skip the Log.
+
+	Called every 5 minutes via Cron job.
+	"""
+
+	in_progress_logs = frappe.db.get_all(
+		"BOM Update Log",
+		{"update_type": "Update Cost", "status": "In Progress"},
+		["name", "processed_boms", "current_level"],
+	)
+	if not in_progress_logs:
+		return
+
+	for log in in_progress_logs:
+		# check if all log batches of current level are processed
+		bom_batches = frappe.db.get_all(
+			"BOM Update Batch",
+			{"parent": log.name, "level": log.current_level},
+			["name", "boms_updated", "status"],
+		)
+		incomplete_level = any(row.get("status") == "Pending" for row in bom_batches)
+		if not bom_batches or incomplete_level:
+			continue
+
+		# Prep parent BOMs & updated processed BOMs for next level
+		current_boms, processed_boms = get_processed_current_boms(log, bom_batches)
+		parent_boms = get_next_higher_level_boms(child_boms=current_boms, processed_boms=processed_boms)
+
+		set_values_in_log(
+			log.name,
+			values={
+				"processed_boms": json.dumps(processed_boms),
+				"status": "Completed" if not parent_boms else "In Progress",
+			},
+			commit=True,
+		)
+
+		if parent_boms:  # there is a next level to process
+			process_boms_cost_level_wise(
+				update_doc=frappe.get_doc("BOM Update Log", log.name), parent_boms=parent_boms
+			)
+
+
+def get_processed_current_boms(
+	log: Dict[str, Any], bom_batches: Dict[str, Any]
+) -> Tuple[List[str], Dict[str, Any]]:
+	"""
+	Aggregate all BOMs from BOM Update Batch rows into 'processed_boms' field
+	and into current boms list.
+	"""
+	processed_boms = json.loads(log.processed_boms) if log.processed_boms else {}
+	current_boms = []
+
+	for row in bom_batches:
+		boms_updated = json.loads(row.boms_updated)
+		current_boms.extend(boms_updated)
+		boms_updated_dict = {bom: True for bom in boms_updated}
+		processed_boms.update(boms_updated_dict)
+
+	return current_boms, processed_boms
diff --git a/erpnext/manufacturing/doctype/bom_update_log/bom_updation_utils.py b/erpnext/manufacturing/doctype/bom_update_log/bom_updation_utils.py
new file mode 100644
index 0000000..dde1e4e
--- /dev/null
+++ b/erpnext/manufacturing/doctype/bom_update_log/bom_updation_utils.py
@@ -0,0 +1,224 @@
+# Copyright (c) 2022, Frappe Technologies Pvt. Ltd. and contributors
+# For license information, please see license.txt
+
+import json
+from collections import defaultdict
+from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
+
+if TYPE_CHECKING:
+	from erpnext.manufacturing.doctype.bom_update_log.bom_update_log import BOMUpdateLog
+
+import frappe
+from frappe import _
+
+
+def replace_bom(boms: Dict) -> None:
+	"Replace current BOM with new BOM in parent BOMs."
+
+	current_bom = boms.get("current_bom")
+	new_bom = boms.get("new_bom")
+
+	unit_cost = get_bom_unit_cost(new_bom)
+	update_new_bom_in_bom_items(unit_cost, current_bom, new_bom)
+
+	frappe.cache().delete_key("bom_children")
+	parent_boms = get_ancestor_boms(new_bom)
+
+	for bom in parent_boms:
+		bom_obj = frappe.get_doc("BOM", bom)
+		# this is only used for versioning and we do not want
+		# to make separate db calls by using load_doc_before_save
+		# which proves to be expensive while doing bulk replace
+		bom_obj._doc_before_save = bom_obj
+		bom_obj.update_exploded_items()
+		bom_obj.calculate_cost()
+		bom_obj.update_parent_cost()
+		bom_obj.db_update()
+		if bom_obj.meta.get("track_changes") and not bom_obj.flags.ignore_version:
+			bom_obj.save_version()
+
+
+def update_cost_in_level(
+	doc: "BOMUpdateLog", bom_list: List[str], batch_name: Union[int, str]
+) -> None:
+	"Updates Cost for BOMs within a given level. Runs via background jobs."
+
+	try:
+		status = frappe.db.get_value("BOM Update Log", doc.name, "status")
+		if status == "Failed":
+			return
+
+		frappe.db.auto_commit_on_many_writes = 1
+
+		update_cost_in_boms(bom_list=bom_list)  # main updation logic
+
+		bom_batch = frappe.qb.DocType("BOM Update Batch")
+		(
+			frappe.qb.update(bom_batch)
+			.set(bom_batch.boms_updated, json.dumps(bom_list))
+			.set(bom_batch.status, "Completed")
+			.where(bom_batch.name == batch_name)
+		).run()
+	except Exception:
+		handle_exception(doc)
+	finally:
+		frappe.db.auto_commit_on_many_writes = 0
+
+		if not frappe.flags.in_test:
+			frappe.db.commit()  # nosemgrep
+
+
+def get_ancestor_boms(new_bom: str, bom_list: Optional[List] = None) -> List:
+	"Recursively get all ancestors of BOM."
+
+	bom_list = bom_list or []
+	bom_item = frappe.qb.DocType("BOM Item")
+
+	parents = (
+		frappe.qb.from_(bom_item)
+		.select(bom_item.parent)
+		.where((bom_item.bom_no == new_bom) & (bom_item.docstatus < 2) & (bom_item.parenttype == "BOM"))
+		.run(as_dict=True)
+	)
+
+	for d in parents:
+		if new_bom == d.parent:
+			frappe.throw(_("BOM recursion: {0} cannot be child of {1}").format(new_bom, d.parent))
+
+		bom_list.append(d.parent)
+		get_ancestor_boms(d.parent, bom_list)
+
+	return list(set(bom_list))
+
+
+def update_new_bom_in_bom_items(unit_cost: float, current_bom: str, new_bom: str) -> None:
+	bom_item = frappe.qb.DocType("BOM Item")
+	(
+		frappe.qb.update(bom_item)
+		.set(bom_item.bom_no, new_bom)
+		.set(bom_item.rate, unit_cost)
+		.set(bom_item.amount, (bom_item.stock_qty * unit_cost))
+		.where(
+			(bom_item.bom_no == current_bom) & (bom_item.docstatus < 2) & (bom_item.parenttype == "BOM")
+		)
+	).run()
+
+
+def get_bom_unit_cost(bom_name: str) -> float:
+	bom = frappe.qb.DocType("BOM")
+	new_bom_unitcost = (
+		frappe.qb.from_(bom).select(bom.total_cost / bom.quantity).where(bom.name == bom_name).run()
+	)
+
+	return frappe.utils.flt(new_bom_unitcost[0][0])
+
+
+def update_cost_in_boms(bom_list: List[str]) -> None:
+	"Updates cost in given BOMs. Returns current and total updated BOMs."
+
+	for index, bom in enumerate(bom_list):
+		bom_doc = frappe.get_doc("BOM", bom, for_update=True)
+		bom_doc.calculate_cost(save_updates=True, update_hour_rate=True)
+		bom_doc.db_update()
+
+		if (index % 100 == 0) and not frappe.flags.in_test:
+			frappe.db.commit()  # nosemgrep
+
+
+def get_next_higher_level_boms(
+	child_boms: List[str], processed_boms: Dict[str, bool]
+) -> List[str]:
+	"Generate immediate higher level dependants with no unresolved dependencies (children)."
+
+	def _all_children_are_processed(parent_bom):
+		child_boms = dependency_map.get(parent_bom)
+		return all(processed_boms.get(bom) for bom in child_boms)
+
+	dependants_map, dependency_map = _generate_dependence_map()
+
+	dependants = []
+	for bom in child_boms:
+		# generate list of immediate dependants
+		parents = dependants_map.get(bom) or []
+		dependants.extend(parents)
+
+	dependants = set(dependants)  # remove duplicates
+	resolved_dependants = set()
+
+	# consider only if children are all resolved
+	for parent_bom in dependants:
+		if _all_children_are_processed(parent_bom):
+			resolved_dependants.add(parent_bom)
+
+	return list(resolved_dependants)
+
+
+def get_leaf_boms() -> List[str]:
+	"Get BOMs that have no dependencies."
+
+	return frappe.db.sql_list(
+		"""select name from `tabBOM` bom
+		where docstatus=1 and is_active=1
+			and not exists(select bom_no from `tabBOM Item`
+				where parent=bom.name and ifnull(bom_no, '')!='')"""
+	)
+
+
+def _generate_dependence_map() -> defaultdict:
+	"""
+	Generate maps such as: { BOM-1: [Dependant-BOM-1, Dependant-BOM-2, ..] }.
+	Here BOM-1 is the leaf/lower level node/dependency.
+	The list contains one level higher nodes/dependants that depend on BOM-1.
+
+	Generate and return the reverse as well.
+	"""
+
+	bom = frappe.qb.DocType("BOM")
+	bom_item = frappe.qb.DocType("BOM Item")
+
+	bom_items = (
+		frappe.qb.from_(bom_item)
+		.join(bom)
+		.on(bom_item.parent == bom.name)
+		.select(bom_item.bom_no, bom_item.parent)
+		.where(
+			(bom_item.bom_no.isnotnull())
+			& (bom_item.bom_no != "")
+			& (bom.docstatus == 1)
+			& (bom.is_active == 1)
+			& (bom_item.parenttype == "BOM")
+		)
+	).run(as_dict=True)
+
+	child_parent_map = defaultdict(list)
+	parent_child_map = defaultdict(list)
+	for row in bom_items:
+		child_parent_map[row.bom_no].append(row.parent)
+		parent_child_map[row.parent].append(row.bom_no)
+
+	return child_parent_map, parent_child_map
+
+
+def set_values_in_log(log_name: str, values: Dict[str, Any], commit: bool = False) -> None:
+	"Update BOM Update Log record."
+
+	if not values:
+		return
+
+	bom_update_log = frappe.qb.DocType("BOM Update Log")
+	query = frappe.qb.update(bom_update_log).where(bom_update_log.name == log_name)
+
+	for key, value in values.items():
+		query = query.set(key, value)
+	query.run()
+
+	if commit and not frappe.flags.in_test:
+		frappe.db.commit()  # nosemgrep
+
+
+def handle_exception(doc: "BOMUpdateLog") -> None:
+	"Rolls back and fails BOM Update Log."
+
+	frappe.db.rollback()
+	error_log = doc.log_error("BOM Update Tool Error")
+	set_values_in_log(doc.name, {"status": "Failed", "error_log": error_log.name})
diff --git a/erpnext/manufacturing/doctype/bom_update_log/test_bom_update_log.py b/erpnext/manufacturing/doctype/bom_update_log/test_bom_update_log.py
index 47efea9..d770f6c 100644
--- a/erpnext/manufacturing/doctype/bom_update_log/test_bom_update_log.py
+++ b/erpnext/manufacturing/doctype/bom_update_log/test_bom_update_log.py
@@ -1,14 +1,27 @@
 # Copyright (c) 2022, Frappe Technologies Pvt. Ltd. and Contributors
 # See license.txt
 
+import json
+
 import frappe
 from frappe.tests.utils import FrappeTestCase
 
 from erpnext.manufacturing.doctype.bom_update_log.bom_update_log import (
 	BOMMissingError,
-	run_bom_job,
+	get_processed_current_boms,
+	process_boms_cost_level_wise,
+	queue_bom_cost_jobs,
+	run_replace_bom_job,
 )
-from erpnext.manufacturing.doctype.bom_update_tool.bom_update_tool import enqueue_replace_bom
+from erpnext.manufacturing.doctype.bom_update_log.bom_updation_utils import (
+	get_next_higher_level_boms,
+	set_values_in_log,
+	update_cost_in_level,
+)
+from erpnext.manufacturing.doctype.bom_update_tool.bom_update_tool import (
+	enqueue_replace_bom,
+	enqueue_update_cost,
+)
 
 test_records = frappe.get_test_records("BOM")
 
@@ -31,17 +44,12 @@
 	def tearDown(self):
 		frappe.db.rollback()
 
-		if self._testMethodName == "test_bom_update_log_completion":
-			# clear logs and delete BOM created via setUp
-			frappe.db.delete("BOM Update Log")
-			self.new_bom_doc.cancel()
-			self.new_bom_doc.delete()
-
-			# explicitly commit and restore to original state
-			frappe.db.commit()  # nosemgrep
-
 	def test_bom_update_log_validate(self):
-		"Test if BOM presence is validated."
+		"""
+		1) Test if BOM presence is validated.
+		2) Test if same BOMs are validated.
+		3) Test of non-existent BOM is validated.
+		"""
 
 		with self.assertRaises(BOMMissingError):
 			enqueue_replace_bom(boms={})
@@ -55,9 +63,7 @@
 	def test_bom_update_log_queueing(self):
 		"Test if BOM Update Log is created and queued."
 
-		log = enqueue_replace_bom(
-			boms=self.boms,
-		)
+		log = enqueue_replace_bom(boms=self.boms)
 
 		self.assertEqual(log.docstatus, 1)
 		self.assertEqual(log.status, "Queued")
@@ -65,32 +71,51 @@
 	def test_bom_update_log_completion(self):
 		"Test if BOM Update Log handles job completion correctly."
 
-		log = enqueue_replace_bom(
-			boms=self.boms,
-		)
+		log = enqueue_replace_bom(boms=self.boms)
 
-		# Explicitly commits log, new bom (setUp) and replacement impact.
-		# Is run via background jobs IRL
-		run_bom_job(
-			doc=log,
-			boms=self.boms,
-			update_type="Replace BOM",
-		)
+		# Is run via background job IRL
+		run_replace_bom_job(doc=log, boms=self.boms)
 		log.reload()
 
 		self.assertEqual(log.status, "Completed")
 
-		# teardown (undo replace impact) due to commit
-		boms = frappe._dict(
-			current_bom=self.boms.new_bom,
-			new_bom=self.boms.current_bom,
+
+def update_cost_in_all_boms_in_test():
+	"""
+	Utility to run 'Update Cost' job in tests immediately without Cron job.
+	Run job for all levels (manually) until fully complete.
+	"""
+	parent_boms = []
+	log = enqueue_update_cost()  # create BOM Update Log
+
+	while log.status != "Completed":
+		level_boms, current_level = process_boms_cost_level_wise(log, parent_boms)
+		log.reload()
+
+		boms, batch = queue_bom_cost_jobs(
+			level_boms, log, current_level
+		)  # adds rows in log for tracking
+		log.reload()
+
+		update_cost_in_level(log, boms, batch)  # business logic
+		log.reload()
+
+		# current level done, get next level boms
+		bom_batches = frappe.db.get_all(
+			"BOM Update Batch",
+			{"parent": log.name, "level": log.current_level},
+			["name", "boms_updated", "status"],
 		)
-		log2 = enqueue_replace_bom(
-			boms=self.boms,
+		current_boms, processed_boms = get_processed_current_boms(log, bom_batches)
+		parent_boms = get_next_higher_level_boms(child_boms=current_boms, processed_boms=processed_boms)
+
+		set_values_in_log(
+			log.name,
+			values={
+				"processed_boms": json.dumps(processed_boms),
+				"status": "Completed" if not parent_boms else "In Progress",
+			},
 		)
-		run_bom_job(  # Explicitly commits
-			doc=log2,
-			boms=boms,
-			update_type="Replace BOM",
-		)
-		self.assertEqual(log2.status, "Completed")
+		log.reload()
+
+	return log
diff --git a/erpnext/manufacturing/doctype/bom_update_tool/bom_update_tool.py b/erpnext/manufacturing/doctype/bom_update_tool/bom_update_tool.py
index b0e7da1..d16fcd0 100644
--- a/erpnext/manufacturing/doctype/bom_update_tool/bom_update_tool.py
+++ b/erpnext/manufacturing/doctype/bom_update_tool/bom_update_tool.py
@@ -10,8 +10,6 @@
 import frappe
 from frappe.model.document import Document
 
-from erpnext.manufacturing.doctype.bom.bom import get_boms_in_bottom_up_order
-
 
 class BOMUpdateTool(Document):
 	pass
@@ -40,14 +38,13 @@
 def auto_update_latest_price_in_all_boms() -> None:
 	"""Called via hooks.py."""
 	if frappe.db.get_single_value("Manufacturing Settings", "update_bom_costs_automatically"):
-		update_cost()
-
-
-def update_cost() -> None:
-	"""Updates Cost for all BOMs from bottom to top."""
-	bom_list = get_boms_in_bottom_up_order()
-	for bom in bom_list:
-		frappe.get_doc("BOM", bom).update_cost(update_parent=False, from_child_bom=True)
+		wip_log = frappe.get_all(
+			"BOM Update Log",
+			{"update_type": "Update Cost", "status": ["in", ["Queued", "In Progress"]]},
+			limit_page_length=1,
+		)
+		if not wip_log:
+			create_bom_update_log(update_type="Update Cost")
 
 
 def create_bom_update_log(
diff --git a/erpnext/manufacturing/doctype/bom_update_tool/test_bom_update_tool.py b/erpnext/manufacturing/doctype/bom_update_tool/test_bom_update_tool.py
index fae72a0..d1882e5 100644
--- a/erpnext/manufacturing/doctype/bom_update_tool/test_bom_update_tool.py
+++ b/erpnext/manufacturing/doctype/bom_update_tool/test_bom_update_tool.py
@@ -1,11 +1,13 @@
-# Copyright (c) 2015, Frappe Technologies Pvt. Ltd. and Contributors
+# Copyright (c) 2022, Frappe Technologies Pvt. Ltd. and Contributors
 # License: GNU General Public License v3. See license.txt
 
 import frappe
 from frappe.tests.utils import FrappeTestCase
 
 from erpnext.manufacturing.doctype.bom_update_log.bom_update_log import replace_bom
-from erpnext.manufacturing.doctype.bom_update_tool.bom_update_tool import update_cost
+from erpnext.manufacturing.doctype.bom_update_log.test_bom_update_log import (
+	update_cost_in_all_boms_in_test,
+)
 from erpnext.manufacturing.doctype.production_plan.test_production_plan import make_bom
 from erpnext.stock.doctype.item.test_item import create_item
 
@@ -25,8 +27,8 @@
 		boms = frappe._dict(current_bom=current_bom, new_bom=bom_doc.name)
 		replace_bom(boms)
 
-		self.assertFalse(frappe.db.sql("select name from `tabBOM Item` where bom_no=%s", current_bom))
-		self.assertTrue(frappe.db.sql("select name from `tabBOM Item` where bom_no=%s", bom_doc.name))
+		self.assertFalse(frappe.db.exists("BOM Item", {"bom_no": current_bom, "docstatus": 1}))
+		self.assertTrue(frappe.db.exists("BOM Item", {"bom_no": bom_doc.name, "docstatus": 1}))
 
 		# reverse, as it affects other testcases
 		boms.current_bom = bom_doc.name
@@ -52,13 +54,13 @@
 		self.assertEqual(doc.total_cost, 200)
 
 		frappe.db.set_value("Item", "BOM Cost Test Item 2", "valuation_rate", 200)
-		update_cost()
+		update_cost_in_all_boms_in_test()
 
 		doc.load_from_db()
 		self.assertEqual(doc.total_cost, 300)
 
 		frappe.db.set_value("Item", "BOM Cost Test Item 2", "valuation_rate", 100)
-		update_cost()
+		update_cost_in_all_boms_in_test()
 
 		doc.load_from_db()
 		self.assertEqual(doc.total_cost, 200)
diff --git a/erpnext/manufacturing/doctype/production_plan/test_production_plan.py b/erpnext/manufacturing/doctype/production_plan/test_production_plan.py
index 891a497..e88049d 100644
--- a/erpnext/manufacturing/doctype/production_plan/test_production_plan.py
+++ b/erpnext/manufacturing/doctype/production_plan/test_production_plan.py
@@ -798,7 +798,6 @@
 
 	for item in args.raw_materials:
 		item_doc = frappe.get_doc("Item", item)
-
 		bom.append(
 			"items",
 			{
diff --git a/erpnext/manufacturing/doctype/work_order/test_work_order.py b/erpnext/manufacturing/doctype/work_order/test_work_order.py
index 2aba482..27e7e24 100644
--- a/erpnext/manufacturing/doctype/work_order/test_work_order.py
+++ b/erpnext/manufacturing/doctype/work_order/test_work_order.py
@@ -417,7 +417,7 @@
 					"doctype": "Item Price",
 					"item_code": "_Test FG Non Stock Item",
 					"price_list_rate": 1000,
-					"price_list": "Standard Buying",
+					"price_list": "_Test Price List India",
 				}
 			).insert(ignore_permissions=True)
 
@@ -426,8 +426,17 @@
 			item_code="_Test FG Item", target="_Test Warehouse - _TC", qty=1, basic_rate=100
 		)
 
-		if not frappe.db.get_value("BOM", {"item": fg_item}):
-			make_bom(item=fg_item, rate=1000, raw_materials=["_Test FG Item", "_Test FG Non Stock Item"])
+		if not frappe.db.get_value("BOM", {"item": fg_item, "docstatus": 1}):
+			bom = make_bom(
+				item=fg_item,
+				rate=1000,
+				raw_materials=["_Test FG Item", "_Test FG Non Stock Item"],
+				do_not_save=True,
+			)
+			bom.rm_cost_as_per = "Price List"  # non stock item won't have valuation rate
+			bom.buying_price_list = "_Test Price List India"
+			bom.currency = "INR"
+			bom.save()
 
 		wo = make_wo_order_test_record(production_item=fg_item)