fix: order and time of operations for multilevel bom
- Order of operations was being sorted by idx of individual operations in
BOM table, which made the ordering useless.
- This adds ordering that's sorted from lowest level item to top level
item.
- chore: remove dead functionality. There's no `items` table. Required
item level operations get overwritten on fetching of items /
operations e.g. when clicking on multi-level BOM checkbox.
- test: add test for tree representation
- feat: BOMTree class to get complete representation of a tree
diff --git a/erpnext/manufacturing/doctype/bom/bom.py b/erpnext/manufacturing/doctype/bom/bom.py
index 3e85560..c58f017 100644
--- a/erpnext/manufacturing/doctype/bom/bom.py
+++ b/erpnext/manufacturing/doctype/bom/bom.py
@@ -1,7 +1,8 @@
# Copyright (c) 2015, Frappe Technologies Pvt. Ltd. and Contributors
# License: GNU General Public License v3. See license.txt
-from __future__ import unicode_literals
+from typing import List
+from collections import deque
import frappe, erpnext
from frappe.utils import cint, cstr, flt, today
from frappe import _
@@ -16,14 +17,85 @@
import functools
-from six import string_types
-
from operator import itemgetter
form_grid_templates = {
"items": "templates/form_grid/item_grid.html"
}
+
+class BOMTree:
+ """Full tree representation of a BOM"""
+
+ # specifying the attributes to save resources
+ # ref: https://docs.python.org/3/reference/datamodel.html#slots
+ __slots__ = ["name", "child_items", "is_bom", "item_code", "exploded_qty", "qty"]
+
+ def __init__(self, name: str, is_bom: bool = True, exploded_qty: float = 1.0, qty: float = 1) -> None:
+ self.name = name # name of node, BOM number if is_bom else item_code
+ self.child_items: List["BOMTree"] = [] # list of child items
+ self.is_bom = is_bom # true if the node is a BOM and not a leaf item
+ self.item_code: str = None # item_code associated with node
+ self.qty = qty # required unit quantity to make one unit of parent item.
+ self.exploded_qty = exploded_qty # total exploded qty required for making root of tree.
+ if not self.is_bom:
+ self.item_code = self.name
+ else:
+ self.__create_tree()
+
+ def __create_tree(self):
+ bom = frappe.get_cached_doc("BOM", self.name)
+ self.item_code = bom.item
+
+ for item in bom.get("items", []):
+ qty = item.qty / bom.quantity # quantity per unit
+ exploded_qty = self.exploded_qty * qty
+ if item.bom_no:
+ child = BOMTree(item.bom_no, exploded_qty=exploded_qty, qty=qty)
+ self.child_items.append(child)
+ else:
+ self.child_items.append(
+ BOMTree(item.item_code, is_bom=False, exploded_qty=exploded_qty, qty=qty)
+ )
+
+ def level_order_traversal(self) -> List["BOMTree"]:
+ """Get level order traversal of tree.
+ E.g. for following tree the traversal will return list of nodes in order from top to bottom.
+ BOM:
+ - SubAssy1
+ - item1
+ - item2
+ - SubAssy2
+ - item3
+ - item4
+
+ returns = [SubAssy1, item1, item2, SubAssy2, item3, item4]
+ """
+ traversal = []
+ q = deque()
+ q.append(self)
+
+ while q:
+ node = q.popleft()
+
+ for child in node.child_items:
+ traversal.append(child)
+ q.append(child)
+
+ return traversal
+
+ def __str__(self) -> str:
+ return (
+ f"{self.item_code}{' - ' + self.name if self.is_bom else ''} qty(per unit): {self.qty}"
+ f" exploded_qty: {self.exploded_qty}"
+ )
+
+ def __repr__(self, level: int = 0) -> str:
+ rep = "┃ " * (level - 1) + "┣━ " * (level > 0) + str(self) + "\n"
+ for child in self.child_items:
+ rep += child.__repr__(level=level + 1)
+ return rep
+
class BOM(WebsiteGenerator):
website = frappe._dict(
# page_title_field = "item_name",
@@ -152,7 +224,7 @@
if not args:
args = frappe.form_dict.get('args')
- if isinstance(args, string_types):
+ if isinstance(args, str):
import json
args = json.loads(args)
@@ -600,6 +672,11 @@
if not d.batch_size or d.batch_size <= 0:
d.batch_size = 1
+ def get_tree_representation(self) -> BOMTree:
+ """Get a complete tree representation preserving order of child items."""
+ return BOMTree(self.name)
+
+
def get_bom_item_rate(args, bom_doc):
if bom_doc.rm_cost_as_per == 'Valuation Rate':
rate = get_valuation_rate(args) * (args.get("conversion_factor") or 1)
diff --git a/erpnext/manufacturing/doctype/bom/test_bom.py b/erpnext/manufacturing/doctype/bom/test_bom.py
index 1f443fb..57a5458 100644
--- a/erpnext/manufacturing/doctype/bom/test_bom.py
+++ b/erpnext/manufacturing/doctype/bom/test_bom.py
@@ -2,14 +2,13 @@
# License: GNU General Public License v3. See license.txt
-from __future__ import unicode_literals
+from collections import deque
import unittest
import frappe
from frappe.utils import cstr, flt
from frappe.test_runner import make_test_records
from erpnext.stock.doctype.stock_reconciliation.test_stock_reconciliation import create_stock_reconciliation
from erpnext.manufacturing.doctype.bom_update_tool.bom_update_tool import update_cost
-from six import string_types
from erpnext.stock.doctype.item.test_item import make_item
from erpnext.buying.doctype.purchase_order.test_purchase_order import create_purchase_order
from erpnext.tests.test_subcontracting import set_backflush_based_on
@@ -227,11 +226,88 @@
supplied_items = sorted([d.rm_item_code for d in po.supplied_items])
self.assertEqual(bom_items, supplied_items)
+ def test_bom_tree_representation(self):
+ bom_tree = {
+ "Assembly": {
+ "SubAssembly1": {"ChildPart1": {}, "ChildPart2": {},},
+ "SubAssembly2": {"ChildPart3": {}},
+ "SubAssembly3": {"SubSubAssy1": {"ChildPart4": {}}},
+ "ChildPart5": {},
+ "ChildPart6": {},
+ "SubAssembly4": {"SubSubAssy2": {"ChildPart7": {}}},
+ }
+ }
+ parent_bom = create_nested_bom(bom_tree, prefix="")
+ created_tree = parent_bom.get_tree_representation()
+
+ reqd_order = level_order_traversal(bom_tree)[1:] # skip first item
+ created_order = created_tree.level_order_traversal()
+
+ self.assertEqual(len(reqd_order), len(created_order))
+
+ for reqd_item, created_item in zip(reqd_order, created_order):
+ self.assertEqual(reqd_item, created_item.item_code)
+
+
def get_default_bom(item_code="_Test FG Item 2"):
return frappe.db.get_value("BOM", {"item": item_code, "is_active": 1, "is_default": 1})
+
+
+
+def level_order_traversal(node):
+ traversal = []
+ q = deque()
+ q.append(node)
+
+ while q:
+ node = q.popleft()
+
+ for node_name, subtree in node.items():
+ traversal.append(node_name)
+ q.append(subtree)
+
+ return traversal
+
+def create_nested_bom(tree, prefix="_Test bom "):
+ """ Helper function to create a simple nested bom from tree describing item names. (along with required items)
+ """
+
+ def create_items(bom_tree):
+ for item_code, subtree in bom_tree.items():
+ bom_item_code = prefix + item_code
+ if not frappe.db.exists("Item", bom_item_code):
+ frappe.get_doc(doctype="Item", item_code=bom_item_code, item_group="_Test Item Group").insert()
+ create_items(subtree)
+ create_items(tree)
+
+ def dfs(tree, node):
+ """naive implementation for searching right subtree"""
+ for node_name, subtree in tree.items():
+ if node_name == node:
+ return subtree
+ else:
+ result = dfs(subtree, node)
+ if result is not None:
+ return result
+
+ order_of_creating_bom = reversed(level_order_traversal(tree))
+
+ for item in order_of_creating_bom:
+ child_items = dfs(tree, item)
+ if child_items:
+ bom_item_code = prefix + item
+ bom = frappe.get_doc(doctype="BOM", item=bom_item_code)
+ for child_item in child_items.keys():
+ bom.append("items", {"item_code": prefix + child_item})
+ bom.insert()
+ bom.submit()
+
+ return bom # parent bom is last bom
+
+
def reset_item_valuation_rate(item_code, warehouse_list=None, qty=None, rate=None):
- if warehouse_list and isinstance(warehouse_list, string_types):
+ if warehouse_list and isinstance(warehouse_list, str):
warehouse_list = [warehouse_list]
if not warehouse_list:
diff --git a/erpnext/manufacturing/doctype/work_order/work_order.py b/erpnext/manufacturing/doctype/work_order/work_order.py
index 3027532..180815d 100644
--- a/erpnext/manufacturing/doctype/work_order/work_order.py
+++ b/erpnext/manufacturing/doctype/work_order/work_order.py
@@ -468,46 +468,47 @@
def set_work_order_operations(self):
"""Fetch operations from BOM and set in 'Work Order'"""
- self.set('operations', [])
+ def _get_operations(bom_no, qty=1):
+ return frappe.db.sql(
+ f"""select
+ operation, description, workstation, idx,
+ base_hour_rate as hour_rate, time_in_mins * {qty} as time_in_mins,
+ "Pending" as status, parent as bom, batch_size, sequence_id
+ from
+ `tabBOM Operation`
+ where
+ parent = %s order by idx
+ """, bom_no, as_dict=1)
+
+
+ self.set('operations', [])
if not self.bom_no:
return
- if self.use_multi_level_bom:
- bom_list = frappe.get_doc("BOM", self.bom_no).traverse_tree()
+ operations = []
+ if not self.use_multi_level_bom:
+ bom_qty = frappe.db.get_value("BOM", self.bom_no, "quantity")
+ operations.extend(_get_operations(self.bom_no, qty=1.0/bom_qty))
else:
- bom_list = [self.bom_no]
+ bom_tree = frappe.get_doc("BOM", self.bom_no).get_tree_representation()
+ bom_traversal = list(reversed(bom_tree.level_order_traversal()))
+ bom_traversal.append(bom_tree) # add operation on top level item last
- operations = frappe.db.sql("""
- select
- operation, description, workstation, idx,
- base_hour_rate as hour_rate, time_in_mins,
- "Pending" as status, parent as bom, batch_size, sequence_id
- from
- `tabBOM Operation`
- where
- parent in (%s) order by idx
- """ % ", ".join(["%s"]*len(bom_list)), tuple(bom_list), as_dict=1)
+ for d in bom_traversal:
+ if d.is_bom:
+ operations.extend(_get_operations(d.name, qty=d.exploded_qty))
+
+ for correct_index, operation in enumerate(operations, start=1):
+ operation.idx = correct_index
+
self.set('operations', operations)
-
- if self.use_multi_level_bom and self.get('operations') and self.get('items'):
- raw_material_operations = [d.operation for d in self.get('items')]
- operations = [d.operation for d in self.get('operations')]
-
- for operation in raw_material_operations:
- if operation not in operations:
- self.append('operations', {
- 'operation': operation
- })
-
self.calculate_time()
def calculate_time(self):
- bom_qty = frappe.db.get_value("BOM", self.bom_no, "quantity")
-
for d in self.get("operations"):
- d.time_in_mins = flt(d.time_in_mins) / flt(bom_qty) * (flt(self.qty) / flt(d.batch_size))
+ d.time_in_mins = flt(d.time_in_mins) * (flt(self.qty) / flt(d.batch_size))
self.calculate_operating_cost()