test: more tests for batchwise valuation

Co-Authored-By: Ankush Menat <ankush@frappe.io>
diff --git a/erpnext/stock/doctype/purchase_receipt/test_purchase_receipt.py b/erpnext/stock/doctype/purchase_receipt/test_purchase_receipt.py
index 5ab7929..d481689 100644
--- a/erpnext/stock/doctype/purchase_receipt/test_purchase_receipt.py
+++ b/erpnext/stock/doctype/purchase_receipt/test_purchase_receipt.py
@@ -1540,6 +1540,7 @@
 		"conversion_factor": args.conversion_factor or 1.0,
 		"stock_qty": flt(qty) * (flt(args.conversion_factor) or 1.0),
 		"serial_no": args.serial_no,
+		"batch_no": args.batch_no,
 		"stock_uom": args.stock_uom or "_Test UOM",
 		"uom": uom,
 		"cost_center": args.cost_center or frappe.get_cached_value('Company',  pr.company,  'cost_center'),
diff --git a/erpnext/stock/doctype/stock_ledger_entry/test_stock_ledger_entry.py b/erpnext/stock/doctype/stock_ledger_entry/test_stock_ledger_entry.py
index a1030d5..60fea96 100644
--- a/erpnext/stock/doctype/stock_ledger_entry/test_stock_ledger_entry.py
+++ b/erpnext/stock/doctype/stock_ledger_entry/test_stock_ledger_entry.py
@@ -1,6 +1,10 @@
 # Copyright (c) 2015, Frappe Technologies Pvt. Ltd. and Contributors
 # See license.txt
 
+import json
+from operator import itemgetter
+from uuid import uuid4
+
 import frappe
 from frappe.core.page.permission_manager.permission_manager import reset
 from frappe.utils import add_days, today
@@ -349,6 +353,170 @@
 			frappe.set_user("Administrator")
 			user.remove_roles("Stock Manager")
 
+	def test_batchwise_item_valuation_moving_average(self):
+		suffix = get_unique_suffix()
+		item, warehouses, batches = setup_item_valuation_test(
+			valuation_method="Moving Average", suffix=suffix
+		)
+
+		# Incoming Entries for Stock Value check
+		pr_entry_list = [
+			(item, warehouses[0], batches[0], 1, 100),
+			(item, warehouses[0], batches[1], 1,  50),
+			(item, warehouses[0], batches[0], 1, 150),
+			(item, warehouses[0], batches[1], 1, 100),
+		]
+		prs = create_purchase_receipt_entries_for_batchwise_item_valuation_test(pr_entry_list)
+		sle_details = fetch_sle_details_for_doc_list(prs, ['stock_value'])
+		sv_list = [d['stock_value'] for d in sle_details]
+		expected_sv = [100, 150, 300, 400]
+		self.assertEqual(expected_sv, sv_list, "Incorrect 'Stock Value' values")
+
+		# Outgoing Entries for Stock Value Difference check
+		dn_entry_list = [
+			(item, warehouses[0], batches[1], 1, 200),
+			(item, warehouses[0], batches[0], 1, 200),
+			(item, warehouses[0], batches[1], 1, 200),
+			(item, warehouses[0], batches[0], 1, 200)
+		]
+		dns = create_delivery_note_entries_for_batchwise_item_valuation_test(dn_entry_list)
+		sle_details = fetch_sle_details_for_doc_list(dns, ['stock_value_difference'])
+		svd_list = [-1 * d['stock_value_difference'] for d in sle_details]
+		expected_incoming_rates = expected_abs_svd = [75, 125, 75, 125]
+
+		self.assertEqual(expected_abs_svd, svd_list, "Incorrect 'Stock Value Difference' values")
+		for dn, incoming_rate in zip(dns, expected_incoming_rates):
+			self.assertEqual(
+				dn.items[0].incoming_rate, incoming_rate,
+				"Incorrect 'Incoming Rate' values fetched for DN items"
+			)
+
+
+	def assertSLEs(self, doc, expected_sles):
+		""" Compare sorted SLEs, useful for vouchers that create multiple SLEs for same line"""
+		sles = frappe.get_all("Stock Ledger Entry", fields=["*"],
+				filters={"voucher_no": doc.name, "voucher_type": doc.doctype, "is_cancelled":0},
+			order_by="timestamp(posting_date, posting_time), creation")
+
+		for exp_sle, act_sle in zip(expected_sles, sles):
+			for k, v in exp_sle.items():
+				self.assertEqual(v, act_sle[k], msg=f"{k} doesn't match \n{exp_sle}\n{act_sle}")
+
+	def test_batchwise_item_valuation_stock_reco(self):
+		suffix = get_unique_suffix()
+		item, warehouses, batches = setup_item_valuation_test(
+			valuation_method="FIFO", suffix=suffix
+		)
+		state = {
+			"stock_value" : 0.0,
+			"qty": 0.0
+		}
+		def update_invariants(exp_sles):
+			for sle in exp_sles:
+				state["stock_value"] += sle["stock_value_difference"]
+				state["qty"] += sle["actual_qty"]
+				sle["stock_value"] = state["stock_value"]
+				sle["qty_after_transaction"] = state["qty"]
+
+		osr1 = create_stock_reconciliation(warehouse=warehouses[0], item_code=item, qty=10, rate=100, batch_no=batches[1])
+		expected_sles = [
+			{"actual_qty": 10, "stock_value_difference": 1000},
+		]
+		update_invariants(expected_sles)
+		self.assertSLEs(osr1, expected_sles)
+
+		osr2 = create_stock_reconciliation(warehouse=warehouses[0], item_code=item, qty=13, rate=200, batch_no=batches[0])
+		expected_sles = [
+			{"actual_qty": 13, "stock_value_difference": 200*13},
+		]
+		update_invariants(expected_sles)
+		self.assertSLEs(osr2, expected_sles)
+
+		sr1 = create_stock_reconciliation(warehouse=warehouses[0], item_code=item, qty=5, rate=50, batch_no=batches[1])
+
+		expected_sles = [
+			{"actual_qty": -10, "stock_value_difference": -10 * 100},
+			{"actual_qty": 5, "stock_value_difference": 250}
+		]
+		update_invariants(expected_sles)
+		self.assertSLEs(sr1, expected_sles)
+
+		sr2 = create_stock_reconciliation(warehouse=warehouses[0], item_code=item, qty=20, rate=75, batch_no=batches[0])
+		expected_sles = [
+			{"actual_qty": -13, "stock_value_difference": -13 * 200},
+			{"actual_qty": 20, "stock_value_difference": 20 * 75}
+		]
+		update_invariants(expected_sles)
+		self.assertSLEs(sr2, expected_sles)
+
+	def test_legacy_item_valuation_stock_entry(self):
+		suffix = get_unique_suffix()
+		columns = [
+				'stock_value_difference',
+				'stock_value',
+				'actual_qty',
+				'qty_after_transaction',
+				'stock_queue',
+		]
+		item, warehouses, batches = setup_item_valuation_test(
+			valuation_method="FIFO", suffix=suffix, use_batchwise_valuation=0
+		)
+
+		def check_sle_details_against_expected(sle_details, expected_sle_details, detail, columns):
+			for i, (sle_vals, ex_sle_vals) in enumerate(zip(sle_details, expected_sle_details)):
+				for col, sle_val, ex_sle_val in zip(columns, sle_vals, ex_sle_vals):
+					if col == 'stock_queue':
+						sle_val = get_stock_value_from_q(sle_val)
+						ex_sle_val = get_stock_value_from_q(ex_sle_val)
+					self.assertEqual(
+						sle_val, ex_sle_val,
+						f"Incorrect {col} value on transaction #: {i} in {detail}"
+					)
+
+		# List used to defer assertions to prevent commits cause of error skipped rollback
+		details_list = []
+
+
+		# Test Material Receipt Entries
+		se_entry_list_mr = [
+			(item, None, warehouses[0], batches[0], 1,  50, "2021-01-21"),
+			(item, None, warehouses[0], batches[1], 1, 100, "2021-01-23"),
+		]
+		ses = create_stock_entry_entries_for_batchwise_item_valuation_test(
+			se_entry_list_mr, "Material Receipt"
+		)
+		sle_details = fetch_sle_details_for_doc_list(ses, columns=columns, as_dict=0)
+		expected_sle_details = [
+			(50.0, 50.0, 1.0, 1.0, '[[1.0, 50.0]]'),
+			(100.0, 150.0, 1.0, 2.0, '[[1.0, 50.0], [1.0, 100.0]]'),
+		]
+		details_list.append((
+			sle_details, expected_sle_details,
+			"Material Receipt Entries", columns
+		))
+
+
+		# Test Material Issue Entries
+		se_entry_list_mi = [
+			(item, warehouses[0], None, batches[1], 1, None, "2021-01-29"),
+		]
+		ses = create_stock_entry_entries_for_batchwise_item_valuation_test(
+			se_entry_list_mi, "Material Issue"
+		)
+		sle_details = fetch_sle_details_for_doc_list(ses, columns=columns, as_dict=0)
+		expected_sle_details = [
+			(-50.0, 100.0, -1.0, 1.0, '[[1, 100.0]]')
+		]
+		details_list.append((
+			sle_details, expected_sle_details,
+			"Material Issue Entries", columns
+		))
+
+
+		# Run assertions
+		for details in details_list:
+			check_sle_details_against_expected(*details)
+
 
 def create_repack_entry(**args):
 	args = frappe._dict(args)
@@ -412,3 +580,113 @@
 		make_item(d, properties=properties)
 
 	return items
+
+def setup_item_valuation_test(valuation_method, suffix, use_batchwise_valuation=1, batches_list=['X', 'Y']):
+	from erpnext.stock.doctype.batch.batch import make_batch
+	from erpnext.stock.doctype.item.test_item import make_item
+	from erpnext.stock.doctype.warehouse.test_warehouse import create_warehouse
+
+	item = make_item(
+		f"IV - Test Item {valuation_method} {suffix}",
+		dict(valuation_method=valuation_method, has_batch_no=1)
+	)
+	warehouses = [create_warehouse(f"IV - Test Warehouse {i}") for i in ['J', 'K']]
+	batches = [f"IV - Test Batch {i} {valuation_method} {suffix}" for i in batches_list]
+
+	for i, batch_id in enumerate(batches):
+		if not frappe.db.exists("Batch", batch_id):
+			ubw = use_batchwise_valuation
+			if isinstance(use_batchwise_valuation, (list, tuple)):
+				ubw = use_batchwise_valuation[i]
+			make_batch(
+				frappe._dict(
+					batch_id=batch_id,
+					item=item.item_code,
+					use_batchwise_valuation=ubw
+				)
+			)
+
+	return item.item_code, warehouses, batches
+
+def create_purchase_receipt_entries_for_batchwise_item_valuation_test(pr_entry_list):
+	from erpnext.stock.doctype.purchase_receipt.test_purchase_receipt import make_purchase_receipt
+	prs = []
+
+	for item, warehouse, batch_no, qty, rate in pr_entry_list:
+		pr = make_purchase_receipt(item=item, warehouse=warehouse, qty=qty, rate=rate, batch_no=batch_no)
+		prs.append(pr)
+
+	return prs
+
+def create_delivery_note_entries_for_batchwise_item_valuation_test(dn_entry_list):
+	from erpnext.selling.doctype.sales_order.sales_order import make_delivery_note
+	from erpnext.selling.doctype.sales_order.test_sales_order import make_sales_order
+	dns = []
+	for item, warehouse, batch_no, qty, rate in dn_entry_list:
+		so = make_sales_order(
+			rate=rate,
+			qty=qty,
+			item=item,
+			warehouse=warehouse,
+			against_blanket_order=0
+		)
+
+		dn = make_delivery_note(so.name)
+		dn.items[0].batch_no = batch_no
+		dn.insert()
+		dn.submit()
+		dns.append(dn)
+	return dns
+
+def fetch_sle_details_for_doc_list(doc_list, columns, as_dict=1):
+	return frappe.db.sql(f"""
+		SELECT { ', '.join(columns)}
+		FROM `tabStock Ledger Entry`
+		WHERE
+			voucher_no IN %(voucher_nos)s
+			and docstatus = 1
+		ORDER BY timestamp(posting_date, posting_time) ASC, CREATION ASC
+	""", dict(
+		voucher_nos=[doc.name for doc in doc_list]
+	), as_dict=as_dict)
+
+def get_stock_value_from_q(q):
+	return sum(r*q for r,q in json.loads(q))
+
+def create_stock_entry_entries_for_batchwise_item_valuation_test(se_entry_list, purpose):
+	ses = []
+	for item, source, target, batch, qty, rate, posting_date in se_entry_list:
+		args = dict(
+			item_code=item,
+			qty=qty,
+			company="_Test Company",
+			batch_no=batch,
+			posting_date=posting_date,
+			purpose=purpose
+		)
+
+		if purpose == "Material Receipt":
+			args.update(
+				dict(to_warehouse=target, rate=rate)
+			)
+
+		elif purpose == "Material Issue":
+			args.update(
+				dict(from_warehouse=source)
+			)
+
+		elif purpose == "Material Transfer":
+			args.update(
+				dict(from_warehouse=source, to_warehouse=target)
+			)
+
+		else:
+			raise ValueError(f"Invalid purpose: {purpose}")
+		ses.append(make_stock_entry(**args))
+
+	return ses
+
+def get_unique_suffix():
+	# Used to isolate valuation sensitive
+	# tests to prevent future tests from failing.
+	return str(uuid4())[:8].upper()