blob: 617e1ca15a811e35effda2e514afb3415fc64063 [file] [log] [blame]
Ankush Menat4b29fb62021-12-18 18:40:22 +05301from typing import Callable, List, NewType, Optional, Tuple
2
3from frappe.utils import flt
4
5FifoBin = NewType("FifoBin", List[float])
6
7# Indexes of values inside FIFO bin 2-tuple
8QTY = 0
9RATE = 1
10
11
12class FifoValuation:
13 """Valuation method where a queue of all the incoming stock is maintained.
14
15 New stock is added at end of the queue.
16 Qty consumption happens on First In First Out basis.
17
18 Queue is implemented using "bins" of [qty, rate].
19
20 ref: https://en.wikipedia.org/wiki/FIFO_and_LIFO_accounting
21 """
22
23 def __init__(self, state: Optional[List[FifoBin]]):
24 self.queue: List[FifoBin] = state if state is not None else []
25
Ankush Menata71b4762021-12-18 19:33:58 +053026 def __repr__(self):
27 return str(self.queue)
28
29 def __iter__(self):
30 return iter(self.queue)
31
32 def __eq__(self, other):
33 if isinstance(other, list):
34 return self.queue == other
35 return self.queue == other.queue
36
Ankush Menat4b29fb62021-12-18 18:40:22 +053037 def get_state(self) -> List[FifoBin]:
38 """Get current state of queue."""
39 return self.queue
40
41 def get_total_stock_and_value(self) -> Tuple[float, float]:
42 total_qty = 0.0
43 total_value = 0.0
44
45 for qty, rate in self.queue:
46 total_qty += flt(qty)
47 total_value += flt(qty) * flt(rate)
48
49 return _round_off_if_near_zero(total_qty), _round_off_if_near_zero(total_value)
50
51 def add_stock(self, qty: float, rate: float) -> List[FifoBin]:
52 """Update fifo queue with new stock and return queue.
53
54 args:
55 qty: new quantity to add
56 rate: incoming rate of new quantity"""
57
58 if not len(self.queue):
59 self.queue.append([0, 0])
60
61 # last row has the same rate, merge new bin.
62 if self.queue[-1][RATE] == rate:
63 self.queue[-1][QTY] += qty
64 else:
65 # Item has a positive balance qty, add new entry
66 if self.queue[-1][QTY] > 0:
67 self.queue.append([qty, rate])
68 else: # negative balance qty
69 qty = self.queue[-1][QTY] + qty
70 if qty > 0: # new balance qty is positive
71 self.queue[-1] = [qty, rate]
72 else: # new balance qty is still negative, maintain same rate
73 self.queue[-1][QTY] = qty
74 return self.get_state()
75
76 def remove_stock(
77 self, qty: float, rate: float, rate_generator: Callable[[], float]
78 ) -> List[FifoBin]:
79 """Remove stock from the queue and return queue.
80
81 args:
82 qty: quantity to remove
83 rate: outgoing rate
84 rate_generator: function to be called if queue is not found and rate is required.
85 """
86
87 while qty:
88 if not len(self.queue):
89 # rely on rate generator.
90 self.queue.append([0, rate_generator()])
91
92 index = None
93 if rate > 0:
94 # Find the entry where rate matched with outgoing rate
95 for idx, fifo_bin in enumerate(self.queue):
96 if fifo_bin[RATE] == rate:
97 index = idx
98 break
99
100 # If no entry found with outgoing rate, collapse stack
101 if index is None: # nosemgrep
102 new_stock_value = sum(d[QTY] * d[RATE] for d in self.queue) - qty * rate
103 new_stock_qty = sum(d[QTY] for d in self.queue) - qty
104 self.queue = [[new_stock_qty, new_stock_value / new_stock_qty if new_stock_qty > 0 else rate]]
105 break
106 else:
107 index = 0
108
109 # select first bin or the bin with same rate
110 fifo_bin = self.queue[index]
111 if qty >= fifo_bin[QTY]:
112 # consume current bin
113 qty = _round_off_if_near_zero(qty - fifo_bin[QTY])
114 self.queue.pop(index)
115 if not self.queue and qty:
116 # stock finished, qty still remains to be withdrawn
117 # negative stock, keep in as a negative bin
118 self.queue.append([-qty, rate or fifo_bin[RATE]])
119 break
120
121 else:
122 # qty found in current bin consume it and exit
123 fifo_bin[QTY] = fifo_bin[QTY] - qty
124 qty = 0
125
126 return self.get_state()
127
128
129def _round_off_if_near_zero(number: float, precision: int = 6) -> float:
130 """Rounds off the number to zero only if number is close to zero for decimal
131 specified in precision. Precision defaults to 6.
132 """
133 if flt(number) < (1.0 / (10 ** precision)):
134 return 0
135
136 return flt(number)