fix: consumed qty logic for subcontracted raw materials (#23314)

* fix: consumed qty logic for subcontracted raw materials

* added test case

* fix: sales order workflow test case
diff --git a/erpnext/buying/doctype/purchase_order/test_purchase_order.py b/erpnext/buying/doctype/purchase_order/test_purchase_order.py
index 0cf3d24..c85d015 100644
--- a/erpnext/buying/doctype/purchase_order/test_purchase_order.py
+++ b/erpnext/buying/doctype/purchase_order/test_purchase_order.py
@@ -19,6 +19,8 @@
 from erpnext.controllers.status_updater import OverAllowanceError
 from erpnext.manufacturing.doctype.blanket_order.test_blanket_order import make_blanket_order
 
+from erpnext.stock.doctype.batch.test_batch import make_new_batch
+from erpnext.controllers.buying_controller import get_backflushed_subcontracted_raw_materials
 
 class TestPurchaseOrder(unittest.TestCase):
 	def test_make_purchase_receipt(self):
@@ -686,7 +688,7 @@
 
 	def test_exploded_items_in_subcontracted(self):
 		item_code = "_Test Subcontracted FG Item 1"
-		make_subcontracted_item(item_code)
+		make_subcontracted_item(item_code=item_code)
 
 		po = create_purchase_order(item_code=item_code, qty=1,
 			is_subcontracted="Yes", supplier_warehouse="_Test Warehouse 1 - _TC", include_exploded_items=1)
@@ -708,7 +710,7 @@
 
 	def test_backflush_based_on_stock_entry(self):
 		item_code = "_Test Subcontracted FG Item 1"
-		make_subcontracted_item(item_code)
+		make_subcontracted_item(item_code=item_code)
 		make_item('Sub Contracted Raw Material 1', {
 			'is_stock_item': 1,
 			'is_sub_contracted_item': 1
@@ -767,6 +769,76 @@
 
 		update_backflush_based_on("BOM")
 
+	def test_backflushed_based_on_for_multiple_batches(self):
+		item_code = "_Test Subcontracted FG Item 2"
+		make_item('Sub Contracted Raw Material 2', {
+			'is_stock_item': 1,
+			'is_sub_contracted_item': 1
+		})
+
+		make_subcontracted_item(item_code=item_code, has_batch_no=1, create_new_batch=1,
+			raw_materials=["Sub Contracted Raw Material 2"])
+
+		update_backflush_based_on("Material Transferred for Subcontract")
+
+		order_qty = 500
+		po = create_purchase_order(item_code=item_code, qty=order_qty,
+			is_subcontracted="Yes", supplier_warehouse="_Test Warehouse 1 - _TC")
+
+		make_stock_entry(target="_Test Warehouse - _TC",
+			item_code = "Sub Contracted Raw Material 2", qty=552, basic_rate=100)
+
+		rm_items = [
+			{"item_code":item_code,"rm_item_code":"Sub Contracted Raw Material 2","item_name":"_Test Item",
+				"qty":552,"warehouse":"_Test Warehouse - _TC", "stock_uom":"Nos"}]
+
+		rm_item_string = json.dumps(rm_items)
+		se = frappe.get_doc(make_subcontract_transfer_entry(po.name, rm_item_string))
+		se.submit()
+
+		for batch in ["ABCD1", "ABCD2", "ABCD3", "ABCD4"]:
+			make_new_batch(batch_id=batch, item_code=item_code)
+
+		pr = make_purchase_receipt(po.name)
+
+		# partial receipt
+		pr.get('items')[0].qty = 30
+		pr.get('items')[0].batch_no = "ABCD1"
+
+		purchase_order = po.name
+		purchase_order_item = po.items[0].name
+
+		for batch_no, qty in {"ABCD2": 60, "ABCD3": 70, "ABCD4":40}.items():
+			pr.append("items", {
+				"item_code": pr.get('items')[0].item_code,
+				"item_name": pr.get('items')[0].item_name,
+				"uom": pr.get('items')[0].uom,
+				"stock_uom": pr.get('items')[0].stock_uom,
+				"warehouse": pr.get('items')[0].warehouse,
+				"conversion_factor": pr.get('items')[0].conversion_factor,
+				"cost_center": pr.get('items')[0].cost_center,
+				"rate": pr.get('items')[0].rate,
+				"qty": qty,
+				"batch_no": batch_no,
+				"purchase_order": purchase_order,
+				"purchase_order_item": purchase_order_item
+			})
+
+		pr.submit()
+
+		pr1 = make_purchase_receipt(po.name)
+		pr1.get('items')[0].qty = 300
+		pr1.get('items')[0].batch_no = "ABCD1"
+		pr1.save()
+
+		pr_key = ("Sub Contracted Raw Material 2", po.name)
+		consumed_qty = get_backflushed_subcontracted_raw_materials([po.name]).get(pr_key)
+
+		self.assertTrue(pr1.supplied_items[0].consumed_qty > 0)
+		self.assertTrue(pr1.supplied_items[0].consumed_qty,  flt(552.0) - flt(consumed_qty))
+
+		update_backflush_based_on("BOM")
+
 	def test_advance_payment_entry_unlink_against_purchase_order(self):
 		from erpnext.accounts.doctype.payment_entry.test_payment_entry import get_payment_entry
 		frappe.db.set_value("Accounts Settings", "Accounts Settings",
@@ -839,27 +911,33 @@
 	pr.submit()
 	return pr
 
-def make_subcontracted_item(item_code):
+def make_subcontracted_item(**args):
 	from erpnext.manufacturing.doctype.production_plan.test_production_plan import make_bom
 
-	if not frappe.db.exists('Item', item_code):
-		make_item(item_code, {
+	args = frappe._dict(args)
+
+	if not frappe.db.exists('Item', args.item_code):
+		make_item(args.item_code, {
 			'is_stock_item': 1,
-			'is_sub_contracted_item': 1
+			'is_sub_contracted_item': 1,
+			'has_batch_no': args.get("has_batch_no") or 0
 		})
 
-	if not frappe.db.exists('Item', "Test Extra Item 1"):
-		make_item("Test Extra Item 1", {
-			'is_stock_item': 1,
-		})
+	if not args.raw_materials:
+		if not frappe.db.exists('Item', "Test Extra Item 1"):
+			make_item("Test Extra Item 1", {
+				'is_stock_item': 1,
+			})
 
-	if not frappe.db.exists('Item', "Test Extra Item 2"):
-		make_item("Test Extra Item 2", {
-			'is_stock_item': 1,
-		})
+		if not frappe.db.exists('Item', "Test Extra Item 2"):
+			make_item("Test Extra Item 2", {
+				'is_stock_item': 1,
+			})
 
-	if not frappe.db.get_value('BOM', {'item': item_code}, 'name'):
-		make_bom(item = item_code, raw_materials = ['_Test FG Item', 'Test Extra Item 1'])
+		args.raw_materials = ['_Test FG Item', 'Test Extra Item 1']
+
+	if not frappe.db.get_value('BOM', {'item': args.item_code}, 'name'):
+		make_bom(item = args.item_code, raw_materials = args.get("raw_materials"))
 
 def update_backflush_based_on(based_on):
 	doc = frappe.get_doc('Buying Settings')
diff --git a/erpnext/controllers/buying_controller.py b/erpnext/controllers/buying_controller.py
index 7fab538..c3a2560 100644
--- a/erpnext/controllers/buying_controller.py
+++ b/erpnext/controllers/buying_controller.py
@@ -5,7 +5,7 @@
 import frappe
 from frappe import _, msgprint
 from frappe.utils import flt,cint, cstr, getdate
-
+from six import iteritems
 from erpnext.accounts.party import get_party_details
 from erpnext.stock.get_item_details import get_conversion_factor
 from erpnext.buying.utils import validate_for_items, update_last_purchase_rate
@@ -301,7 +301,7 @@
 			backflushed_batch_qty_map = get_backflushed_batch_qty_map(item.purchase_order, item.item_code)
 
 			for raw_material in transferred_raw_materials + non_stock_items:
-				rm_item_key = '{}{}'.format(raw_material.rm_item_code, item.purchase_order)
+				rm_item_key = (raw_material.rm_item_code, item.purchase_order)
 				raw_material_data = backflushed_raw_materials_map.get(rm_item_key, {})
 
 				consumed_qty = raw_material_data.get('qty', 0)
@@ -346,6 +346,7 @@
 		if not rm.main_item_code:
 			rm.main_item_code = fg_item_doc.item_code
 
+		rm.reference_name = fg_item_doc.name
 		rm.required_qty = qty
 		rm.consumed_qty = qty
 
@@ -893,39 +894,42 @@
 	return raw_materials
 
 def get_backflushed_subcontracted_raw_materials(purchase_orders):
-	common_query = """
-		SELECT
-			CONCAT(prsi.rm_item_code, pri.purchase_order) AS item_key,
-			SUM(prsi.consumed_qty) AS qty,
-			{serial_no_concat_syntax} AS serial_nos,
-			{batch_no_concat_syntax} AS batch_nos
-		FROM `tabPurchase Receipt` pr, `tabPurchase Receipt Item` pri, `tabPurchase Receipt Item Supplied` prsi
-		WHERE
-			pr.name = pri.parent
-			AND pr.name = prsi.parent
-			AND pri.purchase_order IN %s
-			AND pri.item_code = prsi.main_item_code
-			AND pr.docstatus = 1
-		GROUP BY prsi.rm_item_code, pri.purchase_order
-	"""
+	purchase_receipts = frappe.get_all("Purchase Receipt Item",
+		fields = ["purchase_order", "item_code", "name", "parent"],
+		filters={"docstatus": 1, "purchase_order": ("in", list(purchase_orders))})
 
-	backflushed_raw_materials = frappe.db.multisql({
-		'mariadb': common_query.format(
-			serial_no_concat_syntax="GROUP_CONCAT(prsi.serial_no)",
-			batch_no_concat_syntax="GROUP_CONCAT(prsi.batch_no)"
-		),
-		'postgres': common_query.format(
-			serial_no_concat_syntax="STRING_AGG(prsi.serial_no, ',')",
-			batch_no_concat_syntax="STRING_AGG(prsi.batch_no, ',')"
-		)
-	}, (purchase_orders, ), as_dict=1)
+	distinct_purchase_receipts = {}
+	for pr in purchase_receipts:
+		key = (pr.purchase_order, pr.item_code, pr.parent)
+		distinct_purchase_receipts.setdefault(key, []).append(pr.name)
 
 	backflushed_raw_materials_map = frappe._dict()
-	for item in backflushed_raw_materials:
-		backflushed_raw_materials_map.setdefault(item.item_key, item)
+	for args, references in iteritems(distinct_purchase_receipts):
+		purchase_receipt_supplied_items = get_supplied_items(args[1], args[2], references)
+
+		for data in purchase_receipt_supplied_items:
+			pr_key = (data.rm_item_code, args[0])
+			if pr_key not in backflushed_raw_materials_map:
+				backflushed_raw_materials_map.setdefault(pr_key, frappe._dict({
+					"qty": 0.0,
+					"serial_no": [],
+					"batch_no": []
+				}))
+
+			row = backflushed_raw_materials_map.get(pr_key)
+			row.qty += data.consumed_qty
+
+			for field in ["serial_no", "batch_no"]:
+				if data.get(field):
+					row[field].append(data.get(field))
 
 	return backflushed_raw_materials_map
 
+def get_supplied_items(item_code, purchase_receipt, references):
+	return frappe.get_all("Purchase Receipt Item Supplied",
+		fields=["rm_item_code", "consumed_qty", "serial_no", "batch_no"],
+		filters={"main_item_code": item_code, "parent": purchase_receipt, "reference_name": ("in", references)})
+
 def get_asset_item_details(asset_items):
 	asset_items_data = {}
 	for d in frappe.get_all('Item', fields = ["name", "auto_create_assets", "asset_naming_series"],
diff --git a/erpnext/selling/doctype/sales_order/test_sales_order.py b/erpnext/selling/doctype/sales_order/test_sales_order.py
index 9e25ed0..a33d401 100644
--- a/erpnext/selling/doctype/sales_order/test_sales_order.py
+++ b/erpnext/selling/doctype/sales_order/test_sales_order.py
@@ -441,6 +441,7 @@
 	def test_update_child_qty_rate_with_workflow(self):
 		from frappe.model.workflow import apply_workflow
 
+		frappe.set_user("Administrator")
 		workflow = make_sales_order_workflow()
 		so = make_sales_order(item_code= "_Test Item", qty=1, rate=150, do_not_submit=1)
 		apply_workflow(so, 'Approve')
diff --git a/erpnext/stock/doctype/batch/test_batch.py b/erpnext/stock/doctype/batch/test_batch.py
index 1fce504..c2a3d3c 100644
--- a/erpnext/stock/doctype/batch/test_batch.py
+++ b/erpnext/stock/doctype/batch/test_batch.py
@@ -256,3 +256,18 @@
 			batch.insert()
 
 		return batch
+
+def make_new_batch(**args):
+	args = frappe._dict(args)
+
+	try:
+		batch = frappe.get_doc({
+			"doctype": "Batch",
+			"batch_id": args.batch_id,
+			"item": args.item_code,
+		}).insert()
+
+	except frappe.DuplicateEntryError:
+		batch = frappe.get_doc("Batch", args.batch_id)
+
+	return batch
\ No newline at end of file
diff --git a/erpnext/stock/doctype/purchase_receipt/test_purchase_receipt.py b/erpnext/stock/doctype/purchase_receipt/test_purchase_receipt.py
index c5f3034..7161f4b 100644
--- a/erpnext/stock/doctype/purchase_receipt/test_purchase_receipt.py
+++ b/erpnext/stock/doctype/purchase_receipt/test_purchase_receipt.py
@@ -174,7 +174,7 @@
 
 		update_backflush_based_on("Material Transferred for Subcontract")
 		item_code = "_Test Subcontracted FG Item 1"
-		make_subcontracted_item(item_code)
+		make_subcontracted_item(item_code=item_code)
 
 		po = create_purchase_order(item_code=item_code, qty=1,
 			is_subcontracted="Yes", supplier_warehouse="_Test Warehouse 1 - _TC")
diff --git a/erpnext/stock/doctype/serial_no/serial_no.py b/erpnext/stock/doctype/serial_no/serial_no.py
index dec4fe2..6b33502 100644
--- a/erpnext/stock/doctype/serial_no/serial_no.py
+++ b/erpnext/stock/doctype/serial_no/serial_no.py
@@ -449,6 +449,9 @@
 		from tabItem where name=%s""", item_code, as_dict=True)[0]
 
 def get_serial_nos(serial_no):
+	if isinstance(serial_no, list):
+		return serial_no
+
 	return [s.strip() for s in cstr(serial_no).strip().upper().replace(',', '\n').split('\n')
 		if s.strip()]