blob: 1e28988817e32fa20c27ad1b40e907f5acb461f4 [file] [log] [blame]
from collections import defaultdict
from typing import List
import frappe
from frappe import _, bold
from frappe.model.naming import make_autoname
from frappe.query_builder.functions import Sum
from frappe.utils import cint, flt, now
from pypika import Case
from erpnext.stock.deprecated_serial_batch import (
DeprecatedBatchNoValuation,
DeprecatedSerialNoValuation,
)
from erpnext.stock.valuation import round_off_if_near_zero
class SerialBatchBundle:
def __init__(self, **kwargs):
for key, value in kwargs.items():
setattr(self, key, value)
self.set_item_details()
self.process_serial_and_batch_bundle()
if self.sle.is_cancelled:
self.delink_serial_and_batch_bundle()
self.post_process()
def process_serial_and_batch_bundle(self):
if self.item_details.has_serial_no:
self.process_serial_no()
elif self.item_details.has_batch_no:
self.process_batch_no()
def set_item_details(self):
fields = [
"has_batch_no",
"has_serial_no",
"item_name",
"item_group",
"serial_no_series",
"create_new_batch",
"batch_number_series",
]
self.item_details = frappe.get_cached_value("Item", self.sle.item_code, fields, as_dict=1)
def process_serial_no(self):
if (
not self.sle.is_cancelled
and not self.sle.serial_and_batch_bundle
and self.sle.actual_qty > 0
and self.item_details.has_serial_no == 1
and self.item_details.serial_no_series
and self.allow_to_make_auto_bundle()
):
self.make_serial_batch_no_bundle()
elif not self.sle.is_cancelled:
self.validate_item_and_warehouse()
def auto_create_serial_nos(self, batch_no=None):
sr_nos = []
serial_nos_details = []
for i in range(cint(self.sle.actual_qty)):
serial_no = make_autoname(self.item_details.serial_no_series, "Serial No")
sr_nos.append(serial_no)
serial_nos_details.append(
(
serial_no,
serial_no,
now(),
now(),
frappe.session.user,
frappe.session.user,
self.warehouse,
self.company,
self.item_code,
self.item_details.item_name,
self.item_details.description,
"Active",
batch_no,
)
)
if serial_nos_details:
fields = [
"name",
"serial_no",
"creation",
"modified",
"owner",
"modified_by",
"warehouse",
"company",
"item_code",
"item_name",
"description",
"status",
"batch_no",
]
frappe.db.bulk_insert("Serial No", fields=fields, values=set(serial_nos_details))
return sr_nos
def make_serial_batch_no_bundle(self):
sn_doc = frappe.new_doc("Serial and Batch Bundle")
sn_doc.item_code = self.item_code
sn_doc.warehouse = self.warehouse
sn_doc.item_name = self.item_details.item_name
sn_doc.item_group = self.item_details.item_group
sn_doc.has_serial_no = self.item_details.has_serial_no
sn_doc.has_batch_no = self.item_details.has_batch_no
sn_doc.voucher_type = self.sle.voucher_type
sn_doc.voucher_no = self.sle.voucher_no
sn_doc.voucher_detail_no = self.sle.voucher_detail_no
sn_doc.total_qty = self.sle.actual_qty
sn_doc.avg_rate = self.sle.incoming_rate
sn_doc.total_amount = flt(self.sle.actual_qty) * flt(self.sle.incoming_rate)
sn_doc.type_of_transaction = "Inward"
sn_doc.posting_date = self.sle.posting_date
sn_doc.posting_time = self.sle.posting_time
sn_doc.is_rejected = self.is_rejected_entry()
sn_doc.flags.ignore_mandatory = True
sn_doc.insert()
batch_no = ""
if self.item_details.has_batch_no:
batch_no = self.create_batch()
incoming_rate = self.sle.incoming_rate
if not incoming_rate:
incoming_rate = frappe.get_cached_value(
self.child_doctype, self.sle.voucher_detail_no, "valuation_rate"
)
if self.item_details.has_serial_no:
sr_nos = self.auto_create_serial_nos(batch_no)
self.add_serial_no_to_bundle(sn_doc, sr_nos, incoming_rate, batch_no)
elif self.item_details.has_batch_no:
self.add_batch_no_to_bundle(sn_doc, batch_no, incoming_rate)
sn_doc.save()
sn_doc.load_from_db()
sn_doc.flags.ignore_validate = True
sn_doc.flags.ignore_mandatory = True
sn_doc.submit()
self.set_serial_and_batch_bundle(sn_doc)
def set_serial_and_batch_bundle(self, sn_doc):
self.sle.db_set("serial_and_batch_bundle", sn_doc.name)
if sn_doc.is_rejected:
frappe.db.set_value(
self.child_doctype, self.sle.voucher_detail_no, "rejected_serial_and_batch_bundle", sn_doc.name
)
else:
frappe.db.set_value(
self.child_doctype, self.sle.voucher_detail_no, "serial_and_batch_bundle", sn_doc.name
)
@property
def child_doctype(self):
child_doctype = self.sle.voucher_type + " Item"
if self.sle.voucher_type == "Stock Entry":
child_doctype = "Stock Entry Detail"
return child_doctype
def is_rejected_entry(self):
return is_rejected(self.sle.voucher_type, self.sle.voucher_detail_no, self.sle.warehouse)
def add_serial_no_to_bundle(self, sn_doc, serial_nos, incoming_rate, batch_no=None):
ledgers = []
fields = [
"name",
"serial_no",
"batch_no",
"warehouse",
"item_code",
"qty",
"incoming_rate",
"parent",
"parenttype",
"parentfield",
]
for serial_no in serial_nos:
ledgers.append(
(
frappe.generate_hash("Serial and Batch Ledger", 10),
serial_no,
batch_no,
self.warehouse,
self.item_details.item_code,
1,
incoming_rate,
sn_doc.name,
sn_doc.doctype,
"ledgers",
)
)
frappe.db.bulk_insert("Serial and Batch Ledger", fields=fields, values=set(ledgers))
def add_batch_no_to_bundle(self, sn_doc, batch_no, incoming_rate):
sn_doc.append(
"ledgers",
{
"batch_no": batch_no,
"qty": self.sle.actual_qty,
"incoming_rate": incoming_rate,
"stock_value_difference": flt(self.sle.actual_qty) * flt(incoming_rate),
},
)
def create_batch(self):
from erpnext.stock.doctype.batch.batch import make_batch
return make_batch(
frappe._dict(
{
"item": self.item_code,
"reference_doctype": self.sle.voucher_type,
"reference_name": self.sle.voucher_no,
}
)
)
def process_batch_no(self):
if (
not self.sle.is_cancelled
and not self.sle.serial_and_batch_bundle
and self.sle.actual_qty > 0
and self.item_details.has_batch_no == 1
and self.item_details.create_new_batch
and self.item_details.batch_number_series
and self.allow_to_make_auto_bundle()
):
self.make_serial_batch_no_bundle()
elif not self.sle.is_cancelled:
self.validate_item_and_warehouse()
def validate_item_and_warehouse(self):
data = frappe.db.get_value(
"Serial and Batch Bundle",
self.sle.serial_and_batch_bundle,
["item_code", "warehouse", "voucher_no"],
as_dict=1,
)
if self.sle.serial_and_batch_bundle and not frappe.db.exists(
"Serial and Batch Bundle",
{
"name": self.sle.serial_and_batch_bundle,
"item_code": self.item_code,
"warehouse": self.warehouse,
"voucher_no": self.sle.voucher_no,
},
):
msg = f"""
The Serial and Batch Bundle
{bold(self.sle.serial_and_batch_bundle)}
does not belong to Item {bold(self.item_code)}
or Warehouse {bold(self.warehouse)}
or {self.sle.voucher_type} no {bold(self.sle.voucher_no)}
"""
frappe.throw(_(msg))
def delink_serial_and_batch_bundle(self):
update_values = {
"serial_and_batch_bundle": "",
}
if is_rejected(self.sle.voucher_type, self.sle.voucher_detail_no, self.sle.warehouse):
update_values["rejected_serial_and_batch_bundle"] = ""
frappe.db.set_value(self.child_doctype, self.sle.voucher_detail_no, update_values)
frappe.db.set_value(
"Serial and Batch Bundle",
self.sle.serial_and_batch_bundle,
{"is_cancelled": 1, "voucher_no": ""},
)
def allow_to_make_auto_bundle(self):
if self.sle.voucher_type in ["Stock Entry", "Purchase Receipt", "Purchase Invoice"]:
if self.sle.voucher_type == "Stock Entry":
stock_entry_type = frappe.get_cached_value("Stock Entry", self.sle.voucher_no, "purpose")
if stock_entry_type in ["Material Receipt", "Manufacture", "Repack"]:
return True
return True
return False
def post_process(self):
if not self.sle.is_cancelled:
if self.item_details.has_serial_no == 1:
self.set_warehouse_and_status_in_serial_nos()
if self.item_details.has_serial_no == 1 and self.item_details.has_batch_no == 1:
self.set_batch_no_in_serial_nos()
else:
pass
# self.set_data_based_on_last_sle()
def set_warehouse_and_status_in_serial_nos(self):
warehouse = self.warehouse if self.sle.actual_qty > 0 else None
sn_table = frappe.qb.DocType("Serial No")
serial_nos = get_serial_nos(self.sle.serial_and_batch_bundle, check_outward=False)
(
frappe.qb.update(sn_table)
.set(sn_table.warehouse, warehouse)
.set(sn_table.status, "Active" if warehouse else "Inactive")
.where(sn_table.name.isin(serial_nos))
).run()
def set_batch_no_in_serial_nos(self):
ledgers = frappe.get_all(
"Serial and Batch Ledger",
fields=["serial_no", "batch_no"],
filters={"parent": self.serial_and_batch_bundle},
)
batch_serial_nos = {}
for ledger in ledgers:
batch_serial_nos.setdefault(ledger.batch_no, []).append(ledger.serial_no)
for batch_no, serial_nos in batch_serial_nos.items():
sn_table = frappe.qb.DocType("Serial No")
(
frappe.qb.update(sn_table)
.set(sn_table.batch_no, batch_no)
.where(sn_table.name.isin(serial_nos))
).run()
def get_serial_nos(serial_and_batch_bundle, check_outward=True):
filters = {"parent": serial_and_batch_bundle}
if check_outward:
filters["is_outward"] = 1
ledgers = frappe.get_all("Serial and Batch Ledger", fields=["serial_no"], filters=filters)
return [d.serial_no for d in ledgers]
class SerialNoBundleValuation(DeprecatedSerialNoValuation):
def __init__(self, **kwargs):
for key, value in kwargs.items():
setattr(self, key, value)
self.calculate_stock_value_change()
self.calculate_valuation_rate()
def calculate_stock_value_change(self):
if self.sle.actual_qty > 0:
self.stock_value_change = frappe.get_cached_value(
"Serial and Batch Bundle", self.sle.serial_and_batch_bundle, "total_amount"
)
else:
ledgers = self.get_serial_no_ledgers()
self.serial_no_incoming_rate = defaultdict(float)
self.stock_value_change = 0.0
for ledger in ledgers:
self.stock_value_change += ledger.incoming_rate * -1
self.serial_no_incoming_rate[ledger.serial_no] = ledger.incoming_rate
self.calculate_stock_value_from_deprecarated_ledgers()
def get_serial_no_ledgers(self):
serial_nos = self.get_serial_nos()
subquery = f"""
SELECT
MAX(
TIMESTAMP(
parent.posting_date, parent.posting_time
)
), child.name
FROM
`tabSerial and Batch Bundle` as parent,
`tabSerial and Batch Ledger` as child
WHERE
parent.name = child.parent
AND child.serial_no IN ({', '.join([frappe.db.escape(s) for s in serial_nos])})
AND child.is_outward = 0
AND parent.docstatus < 2
AND parent.is_cancelled = 0
AND child.warehouse = {frappe.db.escape(self.sle.warehouse)}
AND parent.item_code = {frappe.db.escape(self.sle.item_code)}
AND (
parent.posting_date < '{self.sle.posting_date}'
OR (
parent.posting_date = '{self.sle.posting_date}'
AND parent.posting_time <= '{self.sle.posting_time}'
)
)
GROUP BY
child.serial_no
"""
return frappe.db.sql(
f"""
SELECT
serial_no, incoming_rate
FROM
`tabSerial and Batch Ledger` AS ledger,
({subquery}) AS SubQuery
WHERE
ledger.name = SubQuery.name
GROUP BY
ledger.serial_no
""",
as_dict=1,
)
def get_serial_nos(self):
if self.sle.get("serial_nos"):
return self.sle.serial_nos
return get_serial_nos(self.sle.serial_and_batch_bundle)
def calculate_valuation_rate(self):
if not hasattr(self, "wh_data"):
return
new_stock_qty = self.wh_data.qty_after_transaction + self.sle.actual_qty
if new_stock_qty > 0:
new_stock_value = (
self.wh_data.qty_after_transaction * self.wh_data.valuation_rate
) + self.stock_value_change
if new_stock_value >= 0:
# calculate new valuation rate only if stock value is positive
# else it remains the same as that of previous entry
self.wh_data.valuation_rate = new_stock_value / new_stock_qty
if (
not self.wh_data.valuation_rate and self.sle.voucher_detail_no and not self.is_rejected_entry()
):
allow_zero_rate = self.sle_self.check_if_allow_zero_valuation_rate(
self.sle.voucher_type, self.sle.voucher_detail_no
)
if not allow_zero_rate:
self.wh_data.valuation_rate = self.sle_self.get_fallback_rate(self.sle)
self.wh_data.qty_after_transaction += self.sle.actual_qty
self.wh_data.stock_value = flt(self.wh_data.qty_after_transaction) * flt(
self.wh_data.valuation_rate
)
def is_rejected_entry(self):
return is_rejected(self.sle.voucher_type, self.sle.voucher_detail_no, self.sle.warehouse)
def get_incoming_rate(self):
return flt(self.stock_value_change) / flt(self.sle.actual_qty)
def is_rejected(voucher_type, voucher_detail_no, warehouse):
if voucher_type in ["Purchase Receipt", "Purchase Invoice"]:
return warehouse == frappe.get_cached_value(
voucher_type + " Item", voucher_detail_no, "rejected_warehouse"
)
return False
class BatchNoBundleValuation(DeprecatedBatchNoValuation):
def __init__(self, **kwargs):
for key, value in kwargs.items():
setattr(self, key, value)
self.batch_nos = self.get_batch_nos()
self.calculate_avg_rate()
self.calculate_valuation_rate()
def calculate_avg_rate(self):
if self.sle.actual_qty > 0:
self.stock_value_change = frappe.get_cached_value(
"Serial and Batch Bundle", self.sle.serial_and_batch_bundle, "total_amount"
)
else:
ledgers = self.get_batch_no_ledgers()
self.batch_avg_rate = defaultdict(float)
for ledger in ledgers:
self.batch_avg_rate[ledger.batch_no] += flt(ledger.incoming_rate) / flt(ledger.qty)
self.calculate_avg_rate_from_deprecarated_ledgers()
self.set_stock_value_difference()
def get_batch_no_ledgers(self) -> List[dict]:
parent = frappe.qb.DocType("Serial and Batch Bundle")
child = frappe.qb.DocType("Serial and Batch Ledger")
batch_nos = list(self.batch_nos.keys())
return (
frappe.qb.from_(parent)
.inner_join(child)
.on(parent.name == child.parent)
.select(
child.batch_no,
Sum(child.stock_value_difference).as_("incoming_rate"),
Sum(Case().when(child.is_outward == 1, child.qty * -1).else_(child.qty)).as_("qty"),
)
.where(
(child.batch_no.isin(batch_nos))
& (child.parent != self.sle.serial_and_batch_bundle)
& (parent.warehouse == self.sle.warehouse)
& (parent.item_code == self.sle.item_code)
& (parent.is_cancelled == 0)
)
.groupby(child.batch_no)
).run(as_dict=True)
def get_batch_nos(self) -> list:
if self.sle.get("batch_nos"):
return self.sle.batch_nos
ledgers = frappe.get_all(
"Serial and Batch Ledger",
fields=["batch_no", "qty", "name"],
filters={"parent": self.sle.serial_and_batch_bundle, "is_outward": 1},
)
return {d.batch_no: d for d in ledgers}
def set_stock_value_difference(self):
self.stock_value_change = 0
for batch_no, ledger in self.batch_nos.items():
stock_value_change = self.batch_avg_rate[batch_no] * ledger.qty * -1
self.stock_value_change += stock_value_change
frappe.db.set_value(
"Serial and Batch Ledger", ledger.name, "stock_value_difference", stock_value_change
)
def calculate_valuation_rate(self):
if not hasattr(self, "wh_data"):
return
self.wh_data.stock_value = round_off_if_near_zero(
self.wh_data.stock_value + self.stock_value_change
)
if self.wh_data.qty_after_transaction:
self.wh_data.valuation_rate = self.wh_data.stock_value / self.wh_data.qty_after_transaction
self.wh_data.qty_after_transaction += self.sle.actual_qty
def get_incoming_rate(self):
return flt(self.stock_value_change) / flt(self.sle.actual_qty)
class GetAvailableSerialBatchBundle:
def __init__(self) -> None:
pass