refactor: move Fifo Valuation to a new class
diff --git a/erpnext/stock/stock_ledger.py b/erpnext/stock/stock_ledger.py
index e95c0fc..056e4a7 100644
--- a/erpnext/stock/stock_ledger.py
+++ b/erpnext/stock/stock_ledger.py
@@ -16,6 +16,7 @@
get_or_make_bin,
get_valuation_method,
)
+from erpnext.stock.valuation import FifoValuation
class NegativeStockError(frappe.ValidationError): pass
@@ -456,9 +457,8 @@
self.wh_data.qty_after_transaction += flt(sle.actual_qty)
self.wh_data.stock_value = flt(self.wh_data.qty_after_transaction) * flt(self.wh_data.valuation_rate)
else:
- self.get_fifo_values(sle)
+ self.update_fifo_values(sle)
self.wh_data.qty_after_transaction += flt(sle.actual_qty)
- self.wh_data.stock_value = sum(flt(batch[0]) * flt(batch[1]) for batch in self.wh_data.stock_queue)
# rounding as per precision
self.wh_data.stock_value = flt(self.wh_data.stock_value, self.precision)
@@ -696,87 +696,39 @@
sle.voucher_type, sle.voucher_no, self.allow_zero_rate,
currency=erpnext.get_company_currency(sle.company), company=sle.company)
- def get_fifo_values(self, sle):
+ def update_fifo_values(self, sle):
incoming_rate = flt(sle.incoming_rate)
actual_qty = flt(sle.actual_qty)
outgoing_rate = flt(sle.outgoing_rate)
+ fifo_queue = FifoValuation(self.wh_data.stock_queue)
if actual_qty > 0:
- if not self.wh_data.stock_queue:
- self.wh_data.stock_queue.append([0, 0])
-
- # last row has the same rate, just updated the qty
- if self.wh_data.stock_queue[-1][1]==incoming_rate:
- self.wh_data.stock_queue[-1][0] += actual_qty
- else:
- # Item has a positive balance qty, add new entry
- if self.wh_data.stock_queue[-1][0] > 0:
- self.wh_data.stock_queue.append([actual_qty, incoming_rate])
- else: # negative balance qty
- qty = self.wh_data.stock_queue[-1][0] + actual_qty
- if qty > 0: # new balance qty is positive
- self.wh_data.stock_queue[-1] = [qty, incoming_rate]
- else: # new balance qty is still negative, maintain same rate
- self.wh_data.stock_queue[-1][0] = qty
+ fifo_queue.add_stock(qty=actual_qty, rate=incoming_rate)
else:
- qty_to_pop = abs(actual_qty)
- while qty_to_pop:
- if not self.wh_data.stock_queue:
- # Get valuation rate from last sle if exists or from valuation rate field in item master
- allow_zero_valuation_rate = self.check_if_allow_zero_valuation_rate(sle.voucher_type, sle.voucher_detail_no)
- if not allow_zero_valuation_rate:
- _rate = get_valuation_rate(sle.item_code, sle.warehouse,
- sle.voucher_type, sle.voucher_no, self.allow_zero_rate,
- currency=erpnext.get_company_currency(sle.company), company=sle.company)
- else:
- _rate = 0
-
- self.wh_data.stock_queue.append([0, _rate])
-
- index = None
- if outgoing_rate > 0:
- # Find the entry where rate matched with outgoing rate
- for i, v in enumerate(self.wh_data.stock_queue):
- if v[1] == outgoing_rate:
- index = i
- break
-
- # If no entry found with outgoing rate, collapse stack
- if index is None: # nosemgrep
- new_stock_value = sum(d[0]*d[1] for d in self.wh_data.stock_queue) - qty_to_pop*outgoing_rate
- new_stock_qty = sum(d[0] for d in self.wh_data.stock_queue) - qty_to_pop
- self.wh_data.stock_queue = [[new_stock_qty, new_stock_value/new_stock_qty if new_stock_qty > 0 else outgoing_rate]]
- break
+ def rate_generator() -> float:
+ allow_zero_valuation_rate = self.check_if_allow_zero_valuation_rate(sle.voucher_type, sle.voucher_detail_no)
+ if not allow_zero_valuation_rate:
+ return get_valuation_rate(sle.item_code, sle.warehouse,
+ sle.voucher_type, sle.voucher_no, self.allow_zero_rate,
+ currency=erpnext.get_company_currency(sle.company), company=sle.company)
else:
- index = 0
+ return 0.0
- # select first batch or the batch with same rate
- batch = self.wh_data.stock_queue[index]
- if qty_to_pop >= batch[0]:
- # consume current batch
- qty_to_pop = _round_off_if_near_zero(qty_to_pop - batch[0])
- self.wh_data.stock_queue.pop(index)
- if not self.wh_data.stock_queue and qty_to_pop:
- # stock finished, qty still remains to be withdrawn
- # negative stock, keep in as a negative batch
- self.wh_data.stock_queue.append([-qty_to_pop, outgoing_rate or batch[1]])
- break
+ fifo_queue.remove_stock(qty=abs(actual_qty), rate=outgoing_rate, rate_generator=rate_generator)
- else:
- # qty found in current batch
- # consume it and exit
- batch[0] = batch[0] - qty_to_pop
- qty_to_pop = 0
+ stock_qty, stock_value = fifo_queue.get_total_stock_and_value()
- stock_value = _round_off_if_near_zero(sum(flt(batch[0]) * flt(batch[1]) for batch in self.wh_data.stock_queue))
- stock_qty = _round_off_if_near_zero(sum(flt(batch[0]) for batch in self.wh_data.stock_queue))
-
+ self.wh_data.stock_queue = fifo_queue.get_state()
+ self.wh_data.stock_value = stock_value
if stock_qty:
- self.wh_data.valuation_rate = stock_value / flt(stock_qty)
+ self.wh_data.valuation_rate = stock_value / stock_qty
+
if not self.wh_data.stock_queue:
self.wh_data.stock_queue.append([0, sle.incoming_rate or sle.outgoing_rate or self.wh_data.valuation_rate])
+
+
def check_if_allow_zero_valuation_rate(self, voucher_type, voucher_detail_no):
ref_item_dt = ""
@@ -1158,13 +1110,3 @@
and timestamp(posting_date, posting_time) >= timestamp(%(posting_date)s, %(posting_time)s)
limit 1
""", args, as_dict=1)
-
-
-def _round_off_if_near_zero(number: float, precision: int = 6) -> float:
- """ Rounds off the number to zero only if number is close to zero for decimal
- specified in precision. Precision defaults to 6.
- """
- if flt(number) < (1.0 / (10**precision)):
- return 0
-
- return flt(number)
diff --git a/erpnext/stock/valuation.py b/erpnext/stock/valuation.py
new file mode 100644
index 0000000..33191d8
--- /dev/null
+++ b/erpnext/stock/valuation.py
@@ -0,0 +1,125 @@
+from typing import Callable, List, NewType, Optional, Tuple
+
+from frappe.utils import flt
+
+FifoBin = NewType("FifoBin", List[float])
+
+# Indexes of values inside FIFO bin 2-tuple
+QTY = 0
+RATE = 1
+
+
+class FifoValuation:
+ """Valuation method where a queue of all the incoming stock is maintained.
+
+ New stock is added at end of the queue.
+ Qty consumption happens on First In First Out basis.
+
+ Queue is implemented using "bins" of [qty, rate].
+
+ ref: https://en.wikipedia.org/wiki/FIFO_and_LIFO_accounting
+ """
+
+ def __init__(self, state: Optional[List[FifoBin]]):
+ self.queue: List[FifoBin] = state if state is not None else []
+
+ def get_state(self) -> List[FifoBin]:
+ """Get current state of queue."""
+ return self.queue
+
+ def get_total_stock_and_value(self) -> Tuple[float, float]:
+ total_qty = 0.0
+ total_value = 0.0
+
+ for qty, rate in self.queue:
+ total_qty += flt(qty)
+ total_value += flt(qty) * flt(rate)
+
+ return _round_off_if_near_zero(total_qty), _round_off_if_near_zero(total_value)
+
+ def add_stock(self, qty: float, rate: float) -> List[FifoBin]:
+ """Update fifo queue with new stock and return queue.
+
+ args:
+ qty: new quantity to add
+ rate: incoming rate of new quantity"""
+
+ if not len(self.queue):
+ self.queue.append([0, 0])
+
+ # last row has the same rate, merge new bin.
+ if self.queue[-1][RATE] == rate:
+ self.queue[-1][QTY] += qty
+ else:
+ # Item has a positive balance qty, add new entry
+ if self.queue[-1][QTY] > 0:
+ self.queue.append([qty, rate])
+ else: # negative balance qty
+ qty = self.queue[-1][QTY] + qty
+ if qty > 0: # new balance qty is positive
+ self.queue[-1] = [qty, rate]
+ else: # new balance qty is still negative, maintain same rate
+ self.queue[-1][QTY] = qty
+ return self.get_state()
+
+ def remove_stock(
+ self, qty: float, rate: float, rate_generator: Callable[[], float]
+ ) -> List[FifoBin]:
+ """Remove stock from the queue and return queue.
+
+ args:
+ qty: quantity to remove
+ rate: outgoing rate
+ rate_generator: function to be called if queue is not found and rate is required.
+ """
+
+ while qty:
+ if not len(self.queue):
+ # rely on rate generator.
+ self.queue.append([0, rate_generator()])
+
+ index = None
+ if rate > 0:
+ # Find the entry where rate matched with outgoing rate
+ for idx, fifo_bin in enumerate(self.queue):
+ if fifo_bin[RATE] == rate:
+ index = idx
+ break
+
+ # If no entry found with outgoing rate, collapse stack
+ if index is None: # nosemgrep
+ new_stock_value = sum(d[QTY] * d[RATE] for d in self.queue) - qty * rate
+ new_stock_qty = sum(d[QTY] for d in self.queue) - qty
+ self.queue = [[new_stock_qty, new_stock_value / new_stock_qty if new_stock_qty > 0 else rate]]
+ break
+ else:
+ index = 0
+
+ # select first bin or the bin with same rate
+ fifo_bin = self.queue[index]
+ if qty >= fifo_bin[QTY]:
+ # consume current bin
+ qty = _round_off_if_near_zero(qty - fifo_bin[QTY])
+ self.queue.pop(index)
+ if not self.queue and qty:
+ # stock finished, qty still remains to be withdrawn
+ # negative stock, keep in as a negative bin
+ self.queue.append([-qty, rate or fifo_bin[RATE]])
+ break
+
+ else:
+ # qty found in current bin consume it and exit
+ fifo_bin[QTY] = fifo_bin[QTY] - qty
+ qty = 0
+
+ return self.get_state()
+
+
+def _round_off_if_near_zero(number: float, precision: int = 6) -> float:
+ """Rounds off the number to zero only if number is close to zero for decimal
+ specified in precision. Precision defaults to 6.
+ """
+ if flt(number) < (1.0 / (10 ** precision)):
+ return 0
+
+ return flt(number)