feat: LIFOValuation class for handling LIFO
diff --git a/erpnext/stock/tests/test_valuation.py b/erpnext/stock/tests/test_valuation.py
index 85788ba..623040e 100644
--- a/erpnext/stock/tests/test_valuation.py
+++ b/erpnext/stock/tests/test_valuation.py
@@ -3,14 +3,14 @@
from hypothesis import given
from hypothesis import strategies as st
-from erpnext.stock.valuation import FIFOValuation, _round_off_if_near_zero
+from erpnext.stock.valuation import FIFOValuation, LIFOValuation, _round_off_if_near_zero
qty_gen = st.floats(min_value=-1e6, max_value=1e6)
value_gen = st.floats(min_value=1, max_value=1e6)
stock_queue_generator = st.lists(st.tuples(qty_gen, value_gen), min_size=10)
-class TestFifoValuation(unittest.TestCase):
+class TestFIFOValuation(unittest.TestCase):
def setUp(self):
self.queue = FIFOValuation([])
@@ -164,3 +164,123 @@
total_value -= sum(q * r for q, r in consumed)
self.assertTotalQty(total_qty)
self.assertTotalValue(total_value)
+
+
+class TestLIFOValuation(unittest.TestCase):
+
+ def setUp(self):
+ self.stack = LIFOValuation([])
+
+ def tearDown(self):
+ qty, value = self.stack.get_total_stock_and_value()
+ self.assertTotalQty(qty)
+ self.assertTotalValue(value)
+
+ def assertTotalQty(self, qty):
+ self.assertAlmostEqual(sum(q for q, _ in self.stack), qty, msg=f"stack: {self.stack}", places=4)
+
+ def assertTotalValue(self, value):
+ self.assertAlmostEqual(sum(q * r for q, r in self.stack), value, msg=f"stack: {self.stack}", places=2)
+
+ def test_simple_addition(self):
+ self.stack.add_stock(1, 10)
+ self.assertTotalQty(1)
+
+ def test_merge_new_stock(self):
+ self.stack.add_stock(1, 10)
+ self.stack.add_stock(1, 10)
+ self.assertEqual(self.stack, [[2, 10]])
+
+ def test_simple_removal(self):
+ self.stack.add_stock(1, 10)
+ self.stack.remove_stock(1)
+ self.assertTotalQty(0)
+
+ def test_adding_negative_stock_keeps_rate(self):
+ self.stack = LIFOValuation([[-5.0, 100]])
+ self.stack.add_stock(1, 10)
+ self.assertEqual(self.stack, [[-4, 100]])
+
+ def test_adding_negative_stock_updates_rate(self):
+ self.stack = LIFOValuation([[-5.0, 100]])
+ self.stack.add_stock(6, 10)
+ self.assertEqual(self.stack, [[1, 10]])
+
+ def test_rounding_off(self):
+ self.stack.add_stock(1.0, 1.0)
+ self.stack.remove_stock(1.0 - 1e-9)
+ self.assertTotalQty(0)
+
+ def test_lifo_consumption(self):
+ self.stack.add_stock(10, 10)
+ self.stack.add_stock(10, 20)
+ consumed = self.stack.remove_stock(15)
+ self.assertEqual(consumed, [[10, 20], [5, 10]])
+ self.assertTotalQty(5)
+
+ def test_lifo_consumption_going_negative(self):
+ self.stack.add_stock(10, 10)
+ self.stack.add_stock(10, 20)
+ consumed = self.stack.remove_stock(25)
+ self.assertEqual(consumed, [[10, 20], [10, 10], [5, 10]])
+ self.assertTotalQty(-5)
+
+ def test_lifo_consumption_multiple(self):
+ self.stack.add_stock(1, 1)
+ self.stack.add_stock(2, 2)
+ consumed = self.stack.remove_stock(1)
+ self.assertEqual(consumed, [[1, 2]])
+
+ self.stack.add_stock(3, 3)
+ consumed = self.stack.remove_stock(4)
+ self.assertEqual(consumed, [[3, 3], [1, 2]])
+
+ self.stack.add_stock(4, 4)
+ consumed = self.stack.remove_stock(5)
+ self.assertEqual(consumed, [[4, 4], [1, 1]])
+
+ self.stack.add_stock(5, 5)
+ consumed = self.stack.remove_stock(5)
+ self.assertEqual(consumed, [[5, 5]])
+
+
+ @given(stock_queue_generator)
+ def test_lifo_qty_hypothesis(self, stock_stack):
+ self.stack = LIFOValuation([])
+ total_qty = 0
+
+ for qty, rate in stock_stack:
+ if qty == 0:
+ continue
+ if qty > 0:
+ self.stack.add_stock(qty, rate)
+ total_qty += qty
+ else:
+ qty = abs(qty)
+ consumed = self.stack.remove_stock(qty)
+ self.assertAlmostEqual(qty, sum(q for q, _ in consumed), msg=f"incorrect consumption {consumed}")
+ total_qty -= qty
+ self.assertTotalQty(total_qty)
+
+ @given(stock_queue_generator)
+ def test_lifo_qty_value_nonneg_hypothesis(self, stock_stack):
+ self.stack = LIFOValuation([])
+ total_qty = 0.0
+ total_value = 0.0
+
+ for qty, rate in stock_stack:
+ # don't allow negative stock
+ if qty == 0 or total_qty + qty < 0 or abs(qty) < 0.1:
+ continue
+ if qty > 0:
+ self.stack.add_stock(qty, rate)
+ total_qty += qty
+ total_value += qty * rate
+ else:
+ qty = abs(qty)
+ consumed = self.stack.remove_stock(qty)
+ self.assertAlmostEqual(qty, sum(q for q, _ in consumed), msg=f"incorrect consumption {consumed}")
+ total_qty -= qty
+ total_value -= sum(q * r for q, r in consumed)
+ self.assertTotalQty(total_qty)
+ self.assertTotalValue(total_value)
diff --git a/erpnext/stock/valuation.py b/erpnext/stock/valuation.py
index f056439..ee9477e 100644
--- a/erpnext/stock/valuation.py
+++ b/erpnext/stock/valuation.py
@@ -3,7 +3,7 @@
from frappe.utils import flt
-StockBin = NewType("FifoBin", List[float])
+StockBin = NewType("StockBin", List[float]) # [[qty, rate], ...]
# Indexes of values inside FIFO bin 2-tuple
QTY = 0
@@ -164,11 +164,12 @@
Stack is implemented using "bins" of [qty, rate].
ref: https://en.wikipedia.org/wiki/FIFO_and_LIFO_accounting
+ Implementation detail: appends and pops both at end of list.
"""
# specifying the attributes to save resources
# ref: https://docs.python.org/3/reference/datamodel.html#slots
- __slots__ = ["queue",]
+ __slots__ = ["stack",]
def __init__(self, state: Optional[List[StockBin]]):
self.stack: List[StockBin] = state if state is not None else []
@@ -183,8 +184,26 @@
args:
qty: new quantity to add
- rate: incoming rate of new quantity"""
- pass
+ rate: incoming rate of new quantity.
+
+ Behaviour of this is same as FIFO valuation.
+ """
+ if not len(self.stack):
+ self.stack.append([0, 0])
+
+ # last row has the same rate, merge new bin.
+ if self.stack[-1][RATE] == rate:
+ self.stack[-1][QTY] += qty
+ else:
+ # Item has a positive balance qty, add new entry
+ if self.stack[-1][QTY] > 0:
+ self.stack.append([qty, rate])
+ else: # negative balance qty
+ qty = self.stack[-1][QTY] + qty
+ if qty > 0: # new balance qty is positive
+ self.stack[-1] = [qty, rate]
+ else: # new balance qty is still negative, maintain same rate
+ self.stack[-1][QTY] = qty
def remove_stock(
@@ -194,10 +213,41 @@
args:
qty: quantity to remove
- rate: outgoing rate
+ rate: outgoing rate - ignored. Kept for backwards compatibility.
rate_generator: function to be called if stack is not found and rate is required.
"""
- pass
+ if not rate_generator:
+ rate_generator = lambda : 0.0 # noqa
+
+ consumed_bins = []
+ while qty:
+ if not len(self.stack):
+ # rely on rate generator.
+ self.stack.append([0, rate_generator()])
+
+ # start at the end.
+ index = -1
+
+ stock_bin = self.stack[index]
+ if qty >= stock_bin[QTY]:
+ # consume current bin
+ qty = _round_off_if_near_zero(qty - stock_bin[QTY])
+ to_consume = self.stack.pop(index)
+ consumed_bins.append(list(to_consume))
+
+ if not self.stack and qty:
+ # stock finished, qty still remains to be withdrawn
+ # negative stock, keep in as a negative bin
+ self.stack.append([-qty, outgoing_rate or stock_bin[RATE]])
+ consumed_bins.append([qty, outgoing_rate or stock_bin[RATE]])
+ break
+ else:
+ # qty found in current bin consume it and exit
+ stock_bin[QTY] = _round_off_if_near_zero(stock_bin[QTY] - qty)
+ consumed_bins.append([qty, stock_bin[RATE]])
+ qty = 0
+
+ return consumed_bins
def _round_off_if_near_zero(number: float, precision: int = 7) -> float: