Ankush Menat | 4b29fb6 | 2021-12-18 18:40:22 +0530 | [diff] [blame] | 1 | from typing import Callable, List, NewType, Optional, Tuple |
| 2 | |
| 3 | from frappe.utils import flt |
| 4 | |
| 5 | FifoBin = NewType("FifoBin", List[float]) |
| 6 | |
| 7 | # Indexes of values inside FIFO bin 2-tuple |
| 8 | QTY = 0 |
| 9 | RATE = 1 |
| 10 | |
| 11 | |
| 12 | class FifoValuation: |
| 13 | """Valuation method where a queue of all the incoming stock is maintained. |
| 14 | |
| 15 | New stock is added at end of the queue. |
| 16 | Qty consumption happens on First In First Out basis. |
| 17 | |
| 18 | Queue is implemented using "bins" of [qty, rate]. |
| 19 | |
| 20 | ref: https://en.wikipedia.org/wiki/FIFO_and_LIFO_accounting |
| 21 | """ |
| 22 | |
| 23 | def __init__(self, state: Optional[List[FifoBin]]): |
| 24 | self.queue: List[FifoBin] = state if state is not None else [] |
| 25 | |
Ankush Menat | a71b476 | 2021-12-18 19:33:58 +0530 | [diff] [blame] | 26 | def __repr__(self): |
| 27 | return str(self.queue) |
| 28 | |
| 29 | def __iter__(self): |
| 30 | return iter(self.queue) |
| 31 | |
| 32 | def __eq__(self, other): |
| 33 | if isinstance(other, list): |
| 34 | return self.queue == other |
| 35 | return self.queue == other.queue |
| 36 | |
Ankush Menat | 4b29fb6 | 2021-12-18 18:40:22 +0530 | [diff] [blame] | 37 | def get_state(self) -> List[FifoBin]: |
| 38 | """Get current state of queue.""" |
| 39 | return self.queue |
| 40 | |
| 41 | def get_total_stock_and_value(self) -> Tuple[float, float]: |
| 42 | total_qty = 0.0 |
| 43 | total_value = 0.0 |
| 44 | |
| 45 | for qty, rate in self.queue: |
| 46 | total_qty += flt(qty) |
| 47 | total_value += flt(qty) * flt(rate) |
| 48 | |
| 49 | return _round_off_if_near_zero(total_qty), _round_off_if_near_zero(total_value) |
| 50 | |
Ankush Menat | db1c088 | 2021-12-19 18:37:12 +0530 | [diff] [blame] | 51 | def add_stock(self, qty: float, rate: float) -> None: |
| 52 | """Update fifo queue with new stock. |
Ankush Menat | 4b29fb6 | 2021-12-18 18:40:22 +0530 | [diff] [blame] | 53 | |
| 54 | args: |
| 55 | qty: new quantity to add |
| 56 | rate: incoming rate of new quantity""" |
| 57 | |
| 58 | if not len(self.queue): |
| 59 | self.queue.append([0, 0]) |
| 60 | |
| 61 | # last row has the same rate, merge new bin. |
| 62 | if self.queue[-1][RATE] == rate: |
| 63 | self.queue[-1][QTY] += qty |
| 64 | else: |
| 65 | # Item has a positive balance qty, add new entry |
| 66 | if self.queue[-1][QTY] > 0: |
| 67 | self.queue.append([qty, rate]) |
| 68 | else: # negative balance qty |
| 69 | qty = self.queue[-1][QTY] + qty |
| 70 | if qty > 0: # new balance qty is positive |
| 71 | self.queue[-1] = [qty, rate] |
| 72 | else: # new balance qty is still negative, maintain same rate |
| 73 | self.queue[-1][QTY] = qty |
Ankush Menat | 4b29fb6 | 2021-12-18 18:40:22 +0530 | [diff] [blame] | 74 | |
| 75 | def remove_stock( |
Ankush Menat | a00d8d0 | 2021-12-19 18:45:04 +0530 | [diff] [blame] | 76 | self, qty: float, outgoing_rate: float = 0.0, rate_generator: Callable[[], float] = None |
Ankush Menat | 4b29fb6 | 2021-12-18 18:40:22 +0530 | [diff] [blame] | 77 | ) -> List[FifoBin]: |
Ankush Menat | db1c088 | 2021-12-19 18:37:12 +0530 | [diff] [blame] | 78 | """Remove stock from the queue and return popped bins. |
Ankush Menat | 4b29fb6 | 2021-12-18 18:40:22 +0530 | [diff] [blame] | 79 | |
| 80 | args: |
| 81 | qty: quantity to remove |
| 82 | rate: outgoing rate |
| 83 | rate_generator: function to be called if queue is not found and rate is required. |
| 84 | """ |
Ankush Menat | a00d8d0 | 2021-12-19 18:45:04 +0530 | [diff] [blame] | 85 | if not rate_generator: |
| 86 | rate_generator = lambda : 0.0 # noqa |
Ankush Menat | 4b29fb6 | 2021-12-18 18:40:22 +0530 | [diff] [blame] | 87 | |
Ankush Menat | db1c088 | 2021-12-19 18:37:12 +0530 | [diff] [blame] | 88 | consumed_bins = [] |
Ankush Menat | 4b29fb6 | 2021-12-18 18:40:22 +0530 | [diff] [blame] | 89 | while qty: |
| 90 | if not len(self.queue): |
| 91 | # rely on rate generator. |
| 92 | self.queue.append([0, rate_generator()]) |
| 93 | |
| 94 | index = None |
Ankush Menat | a00d8d0 | 2021-12-19 18:45:04 +0530 | [diff] [blame] | 95 | if outgoing_rate > 0: |
Ankush Menat | 4b29fb6 | 2021-12-18 18:40:22 +0530 | [diff] [blame] | 96 | # Find the entry where rate matched with outgoing rate |
| 97 | for idx, fifo_bin in enumerate(self.queue): |
Ankush Menat | a00d8d0 | 2021-12-19 18:45:04 +0530 | [diff] [blame] | 98 | if fifo_bin[RATE] == outgoing_rate: |
Ankush Menat | 4b29fb6 | 2021-12-18 18:40:22 +0530 | [diff] [blame] | 99 | index = idx |
| 100 | break |
| 101 | |
Ankush Menat | 9d17743 | 2021-12-19 18:51:55 +0530 | [diff] [blame] | 102 | # If no entry found with outgoing rate, collapse queue |
Ankush Menat | 4b29fb6 | 2021-12-18 18:40:22 +0530 | [diff] [blame] | 103 | if index is None: # nosemgrep |
Ankush Menat | a00d8d0 | 2021-12-19 18:45:04 +0530 | [diff] [blame] | 104 | new_stock_value = sum(d[QTY] * d[RATE] for d in self.queue) - qty * outgoing_rate |
Ankush Menat | 4b29fb6 | 2021-12-18 18:40:22 +0530 | [diff] [blame] | 105 | new_stock_qty = sum(d[QTY] for d in self.queue) - qty |
Ankush Menat | a00d8d0 | 2021-12-19 18:45:04 +0530 | [diff] [blame] | 106 | self.queue = [[new_stock_qty, new_stock_value / new_stock_qty if new_stock_qty > 0 else outgoing_rate]] |
Ankush Menat | 9d17743 | 2021-12-19 18:51:55 +0530 | [diff] [blame] | 107 | consumed_bins.append([qty, outgoing_rate]) |
Ankush Menat | 4b29fb6 | 2021-12-18 18:40:22 +0530 | [diff] [blame] | 108 | break |
| 109 | else: |
| 110 | index = 0 |
| 111 | |
| 112 | # select first bin or the bin with same rate |
| 113 | fifo_bin = self.queue[index] |
| 114 | if qty >= fifo_bin[QTY]: |
| 115 | # consume current bin |
| 116 | qty = _round_off_if_near_zero(qty - fifo_bin[QTY]) |
Ankush Menat | db1c088 | 2021-12-19 18:37:12 +0530 | [diff] [blame] | 117 | to_consume = self.queue.pop(index) |
| 118 | consumed_bins.append(list(to_consume)) |
| 119 | |
Ankush Menat | 4b29fb6 | 2021-12-18 18:40:22 +0530 | [diff] [blame] | 120 | if not self.queue and qty: |
| 121 | # stock finished, qty still remains to be withdrawn |
| 122 | # negative stock, keep in as a negative bin |
Ankush Menat | a00d8d0 | 2021-12-19 18:45:04 +0530 | [diff] [blame] | 123 | self.queue.append([-qty, outgoing_rate or fifo_bin[RATE]]) |
| 124 | consumed_bins.append([qty, outgoing_rate or fifo_bin[RATE]]) |
Ankush Menat | 4b29fb6 | 2021-12-18 18:40:22 +0530 | [diff] [blame] | 125 | break |
Ankush Menat | 4b29fb6 | 2021-12-18 18:40:22 +0530 | [diff] [blame] | 126 | else: |
| 127 | # qty found in current bin consume it and exit |
Ankush Menat | e6e679c | 2021-12-18 20:36:47 +0530 | [diff] [blame] | 128 | fifo_bin[QTY] = _round_off_if_near_zero(fifo_bin[QTY] - qty) |
Ankush Menat | db1c088 | 2021-12-19 18:37:12 +0530 | [diff] [blame] | 129 | consumed_bins.append([qty, fifo_bin[RATE]]) |
Ankush Menat | 4b29fb6 | 2021-12-18 18:40:22 +0530 | [diff] [blame] | 130 | qty = 0 |
| 131 | |
Ankush Menat | db1c088 | 2021-12-19 18:37:12 +0530 | [diff] [blame] | 132 | return consumed_bins |
Ankush Menat | 4b29fb6 | 2021-12-18 18:40:22 +0530 | [diff] [blame] | 133 | |
| 134 | |
Ankush Menat | e6e679c | 2021-12-18 20:36:47 +0530 | [diff] [blame] | 135 | def _round_off_if_near_zero(number: float, precision: int = 7) -> float: |
Ankush Menat | 4b29fb6 | 2021-12-18 18:40:22 +0530 | [diff] [blame] | 136 | """Rounds off the number to zero only if number is close to zero for decimal |
Ankush Menat | 1833f7a | 2021-12-18 19:37:41 +0530 | [diff] [blame] | 137 | specified in precision. Precision defaults to 7. |
Ankush Menat | 4b29fb6 | 2021-12-18 18:40:22 +0530 | [diff] [blame] | 138 | """ |
Ankush Menat | e6e679c | 2021-12-18 20:36:47 +0530 | [diff] [blame] | 139 | if abs(0.0 - flt(number)) < (1.0 / (10 ** precision)): |
| 140 | return 0.0 |
Ankush Menat | 4b29fb6 | 2021-12-18 18:40:22 +0530 | [diff] [blame] | 141 | |
| 142 | return flt(number) |