feat: Track progress in Log Batch/Job wise

- This was done due to stale reads while the background jobs tried updating status of the log
- Added a table where all bom jobs within log will be tracked with what level they are processing
- Cron job will check if table jobs are all processed every 5 mins
- If yes, it will prepare parents and call `process_boms_cost_level_wise` to start next level
- If pending jobs, do nothing
- Current BOM Level is being tracked that helps adding rows to the table
- Individual bom cost jobs (that are queued) will process and update boms > will update BOM Update Batch table row with list of updated BOMs
diff --git a/erpnext/manufacturing/doctype/bom_update_batch/__init__.py b/erpnext/manufacturing/doctype/bom_update_batch/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/erpnext/manufacturing/doctype/bom_update_batch/__init__.py
diff --git a/erpnext/manufacturing/doctype/bom_update_batch/bom_update_batch.json b/erpnext/manufacturing/doctype/bom_update_batch/bom_update_batch.json
new file mode 100644
index 0000000..9938454
--- /dev/null
+++ b/erpnext/manufacturing/doctype/bom_update_batch/bom_update_batch.json
@@ -0,0 +1,45 @@
+{
+ "actions": [],
+ "autoname": "autoincrement",
+ "creation": "2022-05-31 17:34:39.825537",
+ "doctype": "DocType",
+ "engine": "InnoDB",
+ "field_order": [
+  "level",
+  "batch_no",
+  "boms_updated"
+ ],
+ "fields": [
+  {
+   "fieldname": "level",
+   "fieldtype": "Int",
+   "in_list_view": 1,
+   "label": "Level"
+  },
+  {
+   "fieldname": "batch_no",
+   "fieldtype": "Int",
+   "in_list_view": 1,
+   "label": "Batch No."
+  },
+  {
+   "fieldname": "boms_updated",
+   "fieldtype": "Long Text",
+   "in_list_view": 1,
+   "label": "BOMs Updated"
+  }
+ ],
+ "index_web_pages_for_search": 1,
+ "istable": 1,
+ "links": [],
+ "modified": "2022-05-31 23:36:13.628391",
+ "modified_by": "Administrator",
+ "module": "Manufacturing",
+ "name": "BOM Update Batch",
+ "naming_rule": "Autoincrement",
+ "owner": "Administrator",
+ "permissions": [],
+ "sort_field": "modified",
+ "sort_order": "DESC",
+ "states": []
+}
\ No newline at end of file
diff --git a/erpnext/manufacturing/doctype/bom_update_batch/bom_update_batch.py b/erpnext/manufacturing/doctype/bom_update_batch/bom_update_batch.py
new file mode 100644
index 0000000..f952e43
--- /dev/null
+++ b/erpnext/manufacturing/doctype/bom_update_batch/bom_update_batch.py
@@ -0,0 +1,9 @@
+# Copyright (c) 2022, Frappe Technologies Pvt. Ltd. and contributors
+# For license information, please see license.txt
+
+# import frappe
+from frappe.model.document import Document
+
+
+class BOMUpdateBatch(Document):
+	pass
diff --git a/erpnext/manufacturing/doctype/bom_update_log/bom_update_log.json b/erpnext/manufacturing/doctype/bom_update_log/bom_update_log.json
index bea3cf0..b1c24ab 100644
--- a/erpnext/manufacturing/doctype/bom_update_log/bom_update_log.json
+++ b/erpnext/manufacturing/doctype/bom_update_log/bom_update_log.json
@@ -14,9 +14,10 @@
   "status",
   "error_log",
   "progress_section",
-  "current_boms",
+  "current_level",
   "parent_boms",
   "processed_boms",
+  "bom_batches",
   "amended_from"
  ],
  "fields": [
@@ -70,16 +71,12 @@
   },
   {
    "collapsible": 1,
+   "depends_on": "eval: doc.update_type == \"Update Cost\"",
    "fieldname": "progress_section",
    "fieldtype": "Section Break",
    "label": "Progress"
   },
   {
-   "fieldname": "current_boms",
-   "fieldtype": "Long Text",
-   "label": "Current BOMs"
-  },
-  {
    "description": "Immediate parent BOMs",
    "fieldname": "parent_boms",
    "fieldtype": "Long Text",
@@ -89,13 +86,23 @@
    "fieldname": "processed_boms",
    "fieldtype": "Long Text",
    "label": "Processed BOMs"
+  },
+  {
+   "fieldname": "bom_batches",
+   "fieldtype": "Table",
+   "options": "BOM Update Batch"
+  },
+  {
+   "fieldname": "current_level",
+   "fieldtype": "Int",
+   "label": "Current Level"
   }
  ],
  "in_create": 1,
  "index_web_pages_for_search": 1,
  "is_submittable": 1,
  "links": [],
- "modified": "2022-05-27 17:03:34.712010",
+ "modified": "2022-05-31 20:20:06.370786",
  "modified_by": "Administrator",
  "module": "Manufacturing",
  "name": "BOM Update Log",
diff --git a/erpnext/manufacturing/doctype/bom_update_log/bom_update_log.py b/erpnext/manufacturing/doctype/bom_update_log/bom_update_log.py
index f61f863..bfae76c 100644
--- a/erpnext/manufacturing/doctype/bom_update_log/bom_update_log.py
+++ b/erpnext/manufacturing/doctype/bom_update_log/bom_update_log.py
@@ -1,15 +1,16 @@
 # Copyright (c) 2022, Frappe Technologies Pvt. Ltd. and contributors
 # For license information, please see license.txt
 import json
-from typing import Dict, Optional
+from typing import Any, Dict, List, Optional, Tuple
 
 import frappe
 from frappe import _
 from frappe.model.document import Document
-from frappe.utils import cstr
+from frappe.utils import cint, cstr
 
 from erpnext.manufacturing.doctype.bom_update_log.bom_updation_utils import (
 	get_leaf_boms,
+	get_next_higher_level_boms,
 	handle_exception,
 	replace_bom,
 	set_values_in_log,
@@ -111,55 +112,110 @@
 
 	if update_doc.status == "Queued":
 		# First level yet to process. On Submit.
-		current_boms = {bom: False for bom in get_leaf_boms()}
+		current_level = 0
+		current_boms = get_leaf_boms()
 		values = {
-			"current_boms": json.dumps(current_boms),
 			"parent_boms": "[]",
 			"processed_boms": json.dumps({}),
 			"status": "In Progress",
+			"current_level": current_level,
 		}
 	else:
-		# status is Paused, resume. via Cron Job.
-		current_boms, parent_boms = json.loads(update_doc.current_boms), json.loads(
-			update_doc.parent_boms
-		)
-		if not current_boms:
-			# Process the next level BOMs. Stage parents as current BOMs.
-			current_boms = {bom: False for bom in parent_boms}
-			values = {
-				"current_boms": json.dumps(current_boms),
-				"parent_boms": "[]",
-				"status": "In Progress",
-			}
+		# Resume next level. via Cron Job.
+		current_level = cint(update_doc.current_level) + 1
+		parent_boms = json.loads(update_doc.parent_boms)
+
+		# Process the next level BOMs. Stage parents as current BOMs.
+		current_boms = parent_boms.copy()
+		values = {"parent_boms": "[]", "current_level": current_level}
 
 	set_values_in_log(update_doc.name, values, commit=True)
-	queue_bom_cost_jobs(current_boms, update_doc)
+	queue_bom_cost_jobs(current_boms, update_doc, current_level)
 
 
-def queue_bom_cost_jobs(current_boms: Dict, update_doc: "BOMUpdateLog") -> None:
+def queue_bom_cost_jobs(
+	current_boms_list: List, update_doc: "BOMUpdateLog", current_level: int
+) -> None:
 	"Queue batches of 20k BOMs of the same level to process parallelly"
-	current_boms_list = [bom for bom in current_boms]
+	batch_no = 0
 
 	while current_boms_list:
+		batch_no += 1
 		batch_size = 20_000
 		boms_to_process = current_boms_list[:batch_size]  # slice out batch of 20k BOMs
 
 		# update list to exclude 20K (queued) BOMs
 		current_boms_list = current_boms_list[batch_size:] if len(current_boms_list) > batch_size else []
+
+		batch_row = update_doc.append("bom_batches", {"level": current_level, "batch_no": batch_no})
+		batch_row.db_insert()
+
 		frappe.enqueue(
 			method="erpnext.manufacturing.doctype.bom_update_log.bom_updation_utils.update_cost_in_level",
 			doc=update_doc,
 			bom_list=boms_to_process,
+			batch_name=batch_row.name,
 			timeout=40000,
 		)
 
 
 def resume_bom_cost_update_jobs():
-	"Called every 10 minutes via Cron job."
-	paused_jobs = frappe.db.get_all("BOM Update Log", {"status": "Paused"})
-	if not paused_jobs:
+	"""
+	1. Checks for In Progress BOM Update Log.
+	2. Checks if this job has completed the _current level_.
+	3. If current level is complete, get parent BOMs and start next level.
+	4. If no parents, mark as Complete.
+	5. If current level is WIP, skip the Log.
+
+	Called every 5 minutes via Cron job.
+	"""
+
+	in_progress_logs = frappe.db.get_all(
+		"BOM Update Log",
+		{"update_type": "Update Cost", "status": "In Progress"},
+		["name", "processed_boms", "current_level"],
+	)
+	if not in_progress_logs:
 		return
 
-	for job in paused_jobs:
-		# resume from next level
-		process_boms_cost_level_wise(update_doc=frappe.get_doc("BOM Update Log", job.name))
+	for log in in_progress_logs:
+		# check if all log batches of current level are processed
+		bom_batches = frappe.db.get_all(
+			"BOM Update Batch", {"parent": log.name, "level": log.current_level}, ["name", "boms_updated"]
+		)
+		incomplete_level = any(not row.get("boms_updated") for row in bom_batches)
+		if not bom_batches or incomplete_level:
+			continue
+
+		# Prep parent BOMs & updated processed BOMs for next level
+		current_boms, processed_boms = get_processed_current_boms(log, bom_batches)
+		parent_boms = get_next_higher_level_boms(child_boms=current_boms, processed_boms=processed_boms)
+
+		set_values_in_log(
+			log.name,
+			values={
+				"processed_boms": json.dumps(processed_boms),
+				"parent_boms": json.dumps(parent_boms),
+				"status": "Completed" if not parent_boms else "In Progress",
+			},
+			commit=True,
+		)
+
+		if parent_boms:  # there is a next level to process
+			process_boms_cost_level_wise(update_doc=frappe.get_doc("BOM Update Log", log.name))
+
+
+def get_processed_current_boms(
+	log: Dict[str, Any], bom_batches: Dict[str, Any]
+) -> Tuple[List[str], Dict[str, Any]]:
+	"Aggregate all BOMs from BOM Update Batch rows into 'processed_boms' field and into current boms list."
+	processed_boms = json.loads(log.processed_boms) if log.processed_boms else {}
+	current_boms = []
+
+	for row in bom_batches:
+		boms_updated = json.loads(row.boms_updated)
+		current_boms.extend(boms_updated)
+		boms_updated_dict = {bom: True for bom in boms_updated}
+		processed_boms.update(boms_updated_dict)
+
+	return current_boms, processed_boms
diff --git a/erpnext/manufacturing/doctype/bom_update_log/bom_updation_utils.py b/erpnext/manufacturing/doctype/bom_update_log/bom_updation_utils.py
index 790a79b..2d6429b 100644
--- a/erpnext/manufacturing/doctype/bom_update_log/bom_updation_utils.py
+++ b/erpnext/manufacturing/doctype/bom_update_log/bom_updation_utils.py
@@ -38,7 +38,7 @@
 			bom_obj.save_version()
 
 
-def update_cost_in_level(doc: "BOMUpdateLog", bom_list: List[str]) -> None:
+def update_cost_in_level(doc: "BOMUpdateLog", bom_list: List[str], batch_name: int) -> None:
 	"Updates Cost for BOMs within a given level. Runs via background jobs."
 
 	try:
@@ -47,19 +47,9 @@
 			return
 
 		frappe.db.auto_commit_on_many_writes = 1
-		# main updation logic
-		job_data = update_cost_in_boms(bom_list=bom_list, docname=doc.name)
 
-		set_values_in_log(
-			doc.name,
-			values={
-				"current_boms": json.dumps(job_data.get("current_boms")),
-				"processed_boms": json.dumps(job_data.get("processed_boms")),
-			},
-			commit=True,
-		)
-
-		process_if_level_is_complete(doc.name, job_data["current_boms"], job_data["processed_boms"])
+		update_cost_in_boms(bom_list=bom_list)  # main updation logic
+		frappe.db.set_value("BOM Update Batch", batch_name, "boms_updated", json.dumps(bom_list))
 	except Exception:
 		handle_exception(doc)
 	finally:
@@ -112,48 +102,13 @@
 	return frappe.utils.flt(new_bom_unitcost[0][0])
 
 
-def update_cost_in_boms(bom_list: List[str], docname: str) -> Dict[str, Dict]:
+def update_cost_in_boms(bom_list: List[str]) -> None:
 	"Updates cost in given BOMs. Returns current and total updated BOMs."
 
-	updated_boms = {}  # current boms that have been updated
-
 	for bom in bom_list:
 		bom_doc = frappe.get_cached_doc("BOM", bom)
 		bom_doc.calculate_cost(save_updates=True, update_hour_rate=True)
 		bom_doc.db_update()
-		updated_boms[bom] = True
-
-	# Update processed BOMs in Log
-	log_data = frappe.db.get_values(
-		"BOM Update Log", docname, ["current_boms", "processed_boms"], as_dict=True
-	)[0]
-
-	for field in ("current_boms", "processed_boms"):
-		log_data[field] = json.loads(log_data.get(field))
-		log_data[field].update(updated_boms)
-
-	return log_data
-
-
-def process_if_level_is_complete(
-	docname: str, current_boms: Dict[str, bool], processed_boms: Dict[str, bool]
-) -> None:
-	"Prepare and set higher level BOMs/dependants in Log if current level is complete."
-
-	processing_complete = all(current_boms.get(bom) for bom in current_boms)
-	if not processing_complete:
-		return
-
-	parent_boms = get_next_higher_level_boms(child_boms=current_boms, processed_boms=processed_boms)
-	set_values_in_log(
-		docname,
-		values={
-			"current_boms": json.dumps({}),
-			"parent_boms": json.dumps(parent_boms),
-			"status": "Completed" if not parent_boms else "Paused",
-		},
-		commit=True,
-	)
 
 
 def get_next_higher_level_boms(
@@ -244,7 +199,7 @@
 	query.run()
 
 	if commit:
-		frappe.db.commit()
+		frappe.db.commit()  # nosemgrep
 
 
 def handle_exception(doc: "BOMUpdateLog") -> None: