refactor: convert heatmap queries to QB (#33581)
Uses new `UnixTimestamp` function, don't backport.
diff --git a/erpnext/projects/doctype/project/project.py b/erpnext/projects/doctype/project/project.py
index 4735f24..7d80ac1 100644
--- a/erpnext/projects/doctype/project/project.py
+++ b/erpnext/projects/doctype/project/project.py
@@ -7,6 +7,8 @@
from frappe import _
from frappe.desk.reportview import get_match_cond
from frappe.model.document import Document
+from frappe.query_builder import Interval
+from frappe.query_builder.functions import Count, CurDate, Date, UnixTimestamp
from frappe.utils import add_days, flt, get_datetime, get_time, get_url, nowtime, today
from erpnext import get_default_company
@@ -297,17 +299,19 @@
user.welcome_email_sent = 1
-def get_timeline_data(doctype, name):
+def get_timeline_data(doctype: str, name: str) -> dict[int, int]:
"""Return timeline for attendance"""
+
+ timesheet_detail = frappe.qb.DocType("Timesheet Detail")
+
return dict(
- frappe.db.sql(
- """select unix_timestamp(from_time), count(*)
- from `tabTimesheet Detail` where project=%s
- and from_time > date_sub(curdate(), interval 1 year)
- and docstatus < 2
- group by date(from_time)""",
- name,
- )
+ frappe.qb.from_(timesheet_detail)
+ .select(UnixTimestamp(timesheet_detail.from_time), Count("*"))
+ .where(timesheet_detail.project == name)
+ .where(timesheet_detail.from_time > CurDate() - Interval(years=1))
+ .where(timesheet_detail.docstatus < 2)
+ .groupby(Date(timesheet_detail.from_time))
+ .run()
)
diff --git a/erpnext/setup/doctype/sales_person/sales_person.py b/erpnext/setup/doctype/sales_person/sales_person.py
index 0082c70..beff7f5 100644
--- a/erpnext/setup/doctype/sales_person/sales_person.py
+++ b/erpnext/setup/doctype/sales_person/sales_person.py
@@ -2,8 +2,13 @@
# License: GNU General Public License v3. See license.txt
+from collections import defaultdict
+from itertools import chain
+
import frappe
from frappe import _
+from frappe.query_builder import Interval
+from frappe.query_builder.functions import Count, CurDate, UnixTimestamp
from frappe.utils import flt
from frappe.utils.nestedset import NestedSet, get_root_of
@@ -77,61 +82,31 @@
frappe.db.add_index("Sales Person", ["lft", "rgt"])
-def get_timeline_data(doctype, name):
+def get_timeline_data(doctype: str, name: str) -> dict[int, int]:
+ def _fetch_activity(doctype: str, date_field: str):
+ sales_team = frappe.qb.DocType("Sales Team")
+ transaction = frappe.qb.DocType(doctype)
- out = {}
-
- out.update(
- dict(
- frappe.db.sql(
- """select
- unix_timestamp(dt.transaction_date), count(st.parenttype)
- from
- `tabSales Order` dt, `tabSales Team` st
- where
- st.sales_person = %s and st.parent = dt.name and dt.transaction_date > date_sub(curdate(), interval 1 year)
- group by dt.transaction_date """,
- name,
- )
+ return dict(
+ frappe.qb.from_(transaction)
+ .join(sales_team)
+ .on(transaction.name == sales_team.parent)
+ .select(UnixTimestamp(transaction[date_field]), Count("*"))
+ .where(sales_team.sales_person == name)
+ .where(transaction[date_field] > CurDate() - Interval(years=1))
+ .groupby(transaction[date_field])
+ .run()
)
- )
- sales_invoice = dict(
- frappe.db.sql(
- """select
- unix_timestamp(dt.posting_date), count(st.parenttype)
- from
- `tabSales Invoice` dt, `tabSales Team` st
- where
- st.sales_person = %s and st.parent = dt.name and dt.posting_date > date_sub(curdate(), interval 1 year)
- group by dt.posting_date """,
- name,
- )
- )
+ sales_order_activity = _fetch_activity("Sales Order", "transaction_date")
+ sales_invoice_activity = _fetch_activity("Sales Invoice", "posting_date")
+ delivery_note_activity = _fetch_activity("Delivery Note", "posting_date")
- for key in sales_invoice:
- if out.get(key):
- out[key] += sales_invoice[key]
- else:
- out[key] = sales_invoice[key]
+ merged_activities = defaultdict(int)
- delivery_note = dict(
- frappe.db.sql(
- """select
- unix_timestamp(dt.posting_date), count(st.parenttype)
- from
- `tabDelivery Note` dt, `tabSales Team` st
- where
- st.sales_person = %s and st.parent = dt.name and dt.posting_date > date_sub(curdate(), interval 1 year)
- group by dt.posting_date """,
- name,
- )
- )
+ for ts, count in chain(
+ sales_order_activity.items(), sales_invoice_activity.items(), delivery_note_activity.items()
+ ):
+ merged_activities[ts] += count
- for key in delivery_note:
- if out.get(key):
- out[key] += delivery_note[key]
- else:
- out[key] = delivery_note[key]
-
- return out
+ return merged_activities
diff --git a/erpnext/stock/doctype/item/item.py b/erpnext/stock/doctype/item/item.py
index 20bc9d9..cf12380 100644
--- a/erpnext/stock/doctype/item/item.py
+++ b/erpnext/stock/doctype/item/item.py
@@ -8,6 +8,8 @@
import frappe
from frappe import _
from frappe.model.document import Document
+from frappe.query_builder import Interval
+from frappe.query_builder.functions import Count, CurDate, UnixTimestamp
from frappe.utils import (
cint,
cstr,
@@ -997,18 +999,19 @@
).insert()
-def get_timeline_data(doctype, name):
+def get_timeline_data(doctype: str, name: str) -> dict[int, int]:
"""get timeline data based on Stock Ledger Entry. This is displayed as heatmap on the item page."""
- items = frappe.db.sql(
- """select unix_timestamp(posting_date), count(*)
- from `tabStock Ledger Entry`
- where item_code=%s and posting_date > date_sub(curdate(), interval 1 year)
- group by posting_date""",
- name,
- )
+ sle = frappe.qb.DocType("Stock Ledger Entry")
- return dict(items)
+ return dict(
+ frappe.qb.from_(sle)
+ .select(UnixTimestamp(sle.posting_date), Count("*"))
+ .where(sle.item_code == name)
+ .where(sle.posting_date > CurDate() - Interval(years=1))
+ .groupby(sle.posting_date)
+ .run()
+ )
def validate_end_of_life(item_code, end_of_life=None, disabled=None):