refactor: Monthly Attendance Sheet
- split into smaller functions
- add type hints
- get rid of unnecessary db calls and loops
- add docstrings for functions
diff --git a/erpnext/hr/report/monthly_attendance_sheet/monthly_attendance_sheet.js b/erpnext/hr/report/monthly_attendance_sheet/monthly_attendance_sheet.js
index 42f7cdb..26c8684 100644
--- a/erpnext/hr/report/monthly_attendance_sheet/monthly_attendance_sheet.js
+++ b/erpnext/hr/report/monthly_attendance_sheet/monthly_attendance_sheet.js
@@ -66,8 +66,7 @@
"Default": 0,
}
],
-
- "onload": function() {
+ onload: function() {
return frappe.call({
method: "erpnext.hr.report.monthly_attendance_sheet.monthly_attendance_sheet.get_attendance_years",
callback: function(r) {
diff --git a/erpnext/hr/report/monthly_attendance_sheet/monthly_attendance_sheet.py b/erpnext/hr/report/monthly_attendance_sheet/monthly_attendance_sheet.py
index 8ea4989..c9d9aae 100644
--- a/erpnext/hr/report/monthly_attendance_sheet/monthly_attendance_sheet.py
+++ b/erpnext/hr/report/monthly_attendance_sheet/monthly_attendance_sheet.py
@@ -3,365 +3,496 @@
from calendar import monthrange
+from itertools import groupby
+from typing import Dict, List, Optional, Tuple
import frappe
from frappe import _, msgprint
from frappe.utils import cint, cstr, getdate
+from frappe.query_builder.functions import Count, Extract, Sum
+
+Filters = frappe._dict
+
status_map = {
- "Absent": "A",
- "Half Day": "HD",
- "Holiday": "<b>H</b>",
- "Weekly Off": "<b>WO</b>",
- "On Leave": "L",
- "Present": "P",
- "Work From Home": "WFH",
+ 'Absent': 'A',
+ 'Half Day': 'HD',
+ 'Holiday': 'H',
+ 'Weekly Off': 'WO',
+ 'On Leave': 'L',
+ 'Present': 'P',
+ 'Work From Home': 'WFH'
}
-day_abbr = ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"]
+day_abbr = ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun']
+def execute(filters: Optional[Filters] = None) -> Tuple:
+ filters = frappe._dict(filters or {})
-def execute(filters=None):
- if not filters:
- filters = {}
+ if not (filters.month and filters.year):
+ frappe.throw(_('Please select month and year.'))
- if filters.hide_year_field == 1:
- filters.year = 2020
+ columns = get_columns(filters)
+ data, attendance_map = get_data(filters)
- conditions, filters = get_conditions(filters)
- columns, days = get_columns(filters)
- att_map = get_attendance_list(conditions, filters)
- if not att_map:
+ if not data:
return columns, [], None, None
- if filters.group_by:
- emp_map, group_by_parameters = get_employee_details(filters.group_by, filters.company)
- holiday_list = []
- for parameter in group_by_parameters:
- h_list = [
- emp_map[parameter][d]["holiday_list"]
- for d in emp_map[parameter]
- if emp_map[parameter][d]["holiday_list"]
- ]
- holiday_list += h_list
- else:
- emp_map = get_employee_details(filters.group_by, filters.company)
- holiday_list = [emp_map[d]["holiday_list"] for d in emp_map if emp_map[d]["holiday_list"]]
+ chart = get_chart_data(attendance_map, filters)
- default_holiday_list = frappe.get_cached_value(
- "Company", filters.get("company"), "default_holiday_list"
- )
- holiday_list.append(default_holiday_list)
- holiday_list = list(set(holiday_list))
- holiday_map = get_holiday(holiday_list, filters["month"])
-
- data = []
-
- leave_types = frappe.db.get_list("Leave Type")
- leave_list = None
- if filters.summarized_view:
- leave_list = [d.name + ":Float:120" for d in leave_types]
- columns.extend(leave_list)
- columns.extend([_("Total Late Entries") + ":Float:120", _("Total Early Exits") + ":Float:120"])
-
- if filters.group_by:
- emp_att_map = {}
- for parameter in group_by_parameters:
- emp_map_set = set([key for key in emp_map[parameter].keys()])
- att_map_set = set([key for key in att_map.keys()])
- if att_map_set & emp_map_set:
- parameter_row = ["<b>" + parameter + "</b>"] + [
- "" for day in range(filters["total_days_in_month"] + 2)
- ]
- data.append(parameter_row)
- record, emp_att_data = add_data(
- emp_map[parameter],
- att_map,
- filters,
- holiday_map,
- conditions,
- default_holiday_list,
- leave_types=leave_types,
- )
- emp_att_map.update(emp_att_data)
- data += record
- else:
- record, emp_att_map = add_data(
- emp_map,
- att_map,
- filters,
- holiday_map,
- conditions,
- default_holiday_list,
- leave_types=leave_types,
- )
- data += record
-
- chart_data = get_chart_data(emp_att_map, days)
-
- return columns, data, None, chart_data
+ return columns, data, None, chart
-def get_chart_data(emp_att_map, days):
- labels = []
- datasets = [
- {"name": "Absent", "values": []},
- {"name": "Present", "values": []},
- {"name": "Leave", "values": []},
- ]
- for idx, day in enumerate(days, start=0):
- p = day.replace("::65", "")
- labels.append(day.replace("::65", ""))
- total_absent_on_day = 0
- total_leave_on_day = 0
- total_present_on_day = 0
- total_holiday = 0
- for emp in emp_att_map.keys():
- if emp_att_map[emp][idx]:
- if emp_att_map[emp][idx] == "A":
- total_absent_on_day += 1
- if emp_att_map[emp][idx] in ["P", "WFH"]:
- total_present_on_day += 1
- if emp_att_map[emp][idx] == "HD":
- total_present_on_day += 0.5
- total_leave_on_day += 0.5
- if emp_att_map[emp][idx] == "L":
- total_leave_on_day += 1
-
- datasets[0]["values"].append(total_absent_on_day)
- datasets[1]["values"].append(total_present_on_day)
- datasets[2]["values"].append(total_leave_on_day)
-
- chart = {"data": {"labels": labels, "datasets": datasets}}
-
- chart["type"] = "line"
-
- return chart
-
-
-def add_data(
- employee_map, att_map, filters, holiday_map, conditions, default_holiday_list, leave_types=None
-):
-
- record = []
- emp_att_map = {}
- for emp in employee_map:
- emp_det = employee_map.get(emp)
- if not emp_det or emp not in att_map:
- continue
-
- row = []
- if filters.group_by:
- row += [" "]
- row += [emp, emp_det.employee_name]
-
- total_p = total_a = total_l = total_h = total_um = 0.0
- emp_status_map = []
- for day in range(filters["total_days_in_month"]):
- status = None
- status = att_map.get(emp).get(day + 1)
-
- if status is None and holiday_map:
- emp_holiday_list = emp_det.holiday_list if emp_det.holiday_list else default_holiday_list
-
- if emp_holiday_list in holiday_map:
- for idx, ele in enumerate(holiday_map[emp_holiday_list]):
- if day + 1 == holiday_map[emp_holiday_list][idx][0]:
- if holiday_map[emp_holiday_list][idx][1]:
- status = "Weekly Off"
- else:
- status = "Holiday"
- total_h += 1
-
- abbr = status_map.get(status, "")
- emp_status_map.append(abbr)
-
- if filters.summarized_view:
- if status == "Present" or status == "Work From Home":
- total_p += 1
- elif status == "Absent":
- total_a += 1
- elif status == "On Leave":
- total_l += 1
- elif status == "Half Day":
- total_p += 0.5
- total_a += 0.5
- total_l += 0.5
- elif not status:
- total_um += 1
-
- if not filters.summarized_view:
- row += emp_status_map
-
- if filters.summarized_view:
- row += [total_p, total_l, total_a, total_h, total_um]
-
- if not filters.get("employee"):
- filters.update({"employee": emp})
- conditions += " and employee = %(employee)s"
- elif not filters.get("employee") == emp:
- filters.update({"employee": emp})
-
- if filters.summarized_view:
- leave_details = frappe.db.sql(
- """select leave_type, status, count(*) as count from `tabAttendance`\
- where leave_type is not NULL %s group by leave_type, status"""
- % conditions,
- filters,
- as_dict=1,
- )
-
- time_default_counts = frappe.db.sql(
- """select (select count(*) from `tabAttendance` where \
- late_entry = 1 %s) as late_entry_count, (select count(*) from tabAttendance where \
- early_exit = 1 %s) as early_exit_count"""
- % (conditions, conditions),
- filters,
- )
-
- leaves = {}
- for d in leave_details:
- if d.status == "Half Day":
- d.count = d.count * 0.5
- if d.leave_type in leaves:
- leaves[d.leave_type] += d.count
- else:
- leaves[d.leave_type] = d.count
-
- for d in leave_types:
- if d.name in leaves:
- row.append(leaves[d.name])
- else:
- row.append("0.0")
-
- row.extend([time_default_counts[0][0], time_default_counts[0][1]])
- emp_att_map[emp] = emp_status_map
- record.append(row)
-
- return record, emp_att_map
-
-
-def get_columns(filters):
-
+def get_columns(filters: Filters) -> List[Dict]:
columns = []
if filters.group_by:
- columns = [_(filters.group_by) + ":Link/Branch:120"]
+ columns.append(
+ {'label': _(filters.group_by), 'fieldname': frappe.scrub(filters.group_by), 'fieldtype': 'Link', 'options': 'Branch', 'width': 120}
+ )
- columns += [_("Employee") + ":Link/Employee:120", _("Employee Name") + ":Data/:120"]
- days = []
- for day in range(filters["total_days_in_month"]):
- date = str(filters.year) + "-" + str(filters.month) + "-" + str(day + 1)
- day_name = day_abbr[getdate(date).weekday()]
- days.append(cstr(day + 1) + " " + day_name + "::65")
- if not filters.summarized_view:
- columns += days
+ columns.extend([
+ {'label': _('Employee'), 'fieldname': 'employee', 'fieldtype': 'Link', 'options': 'Employee', 'width': 120},
+ {'label': _('Employee Name'), 'fieldname': 'employee_name', 'fieldtype': 'Data', 'width': 120}
+ ])
if filters.summarized_view:
- columns += [
- _("Total Present") + ":Float:120",
- _("Total Leaves") + ":Float:120",
- _("Total Absent") + ":Float:120",
- _("Total Holidays") + ":Float:120",
- _("Unmarked Days") + ":Float:120",
- ]
- return columns, days
+ columns.extend([
+ {'label': _('Total Present'), 'fieldname': 'total_present', 'fieldtype': 'Float', 'width': 120},
+ {'label': _('Total Leaves'), 'fieldname': 'total_leaves', 'fieldtype': 'Float', 'width': 120},
+ {'label': _('Total Absent'), 'fieldname': 'total_absent', 'fieldtype': 'Float', 'width': 120},
+ {'label': _('Total Holidays'), 'fieldname': 'total_holidays', 'fieldtype': 'Float', 'width': 120},
+ {'label': _('Unmarked Days'), 'fieldname': 'unmarked_days', 'fieldtype': 'Float', 'width': 120}
+ ])
+ columns.extend(get_columns_for_leave_types())
+ columns.extend([
+ {'label': _('Total Late Entries'), 'fieldname': 'total_late_entries', 'fieldtype': 'Float', 'width': 120},
+ {'label': _('Total Early Exits'), 'fieldname': 'total_early_exits', 'fieldtype': 'Float', 'width': 120}
+ ])
+ else:
+ columns.extend(get_columns_for_days(filters))
+
+ return columns
-def get_attendance_list(conditions, filters):
- attendance_list = frappe.db.sql(
- """select employee, day(attendance_date) as day_of_month,
- status from tabAttendance where docstatus = 1 %s order by employee, attendance_date"""
- % conditions,
- filters,
- as_dict=1,
+def get_columns_for_leave_types() -> List[Dict]:
+ leave_types = frappe.db.get_all('Leave Type', pluck='name')
+ types = []
+ for entry in leave_types:
+ types.append({'label': entry, 'fieldname': frappe.scrub(entry), 'fieldtype': 'Float', 'width': 120})
+
+ return types
+
+
+def get_columns_for_days(filters: Filters) -> List[Dict]:
+ total_days = get_total_days_in_month(filters)
+ days = []
+
+ for day in range(1, total_days+1):
+ # forms the dates from selected year and month from filters
+ date = '{}-{}-{}'.format(
+ cstr(filters.year),
+ cstr(filters.month),
+ cstr(day)
+ )
+ # gets abbr from weekday number
+ weekday = day_abbr[getdate(date).weekday()]
+ # sets days as 1 Mon, 2 Tue, 3 Wed
+ label = '{} {}'.format(cstr(day), weekday)
+ days.append({
+ 'label': label,
+ 'fieldtype': 'Data',
+ 'fieldname': day,
+ 'width': 65
+ })
+
+ return days
+
+
+def get_total_days_in_month(filters: Filters) -> int:
+ return monthrange(cint(filters.year), cint(filters.month))[1]
+
+
+def get_data(filters: Filters) -> Tuple[List, Dict]:
+ attendance_map = get_attendance_map(filters)
+
+ if not attendance_map:
+ frappe.msgprint(_('No attendance records found.'), alert=True, indicator='orange')
+ return [], {}
+
+ employee_details, group_by_param_values = get_employee_related_details(filters.group_by, filters.company)
+ holiday_map = get_holiday_map(filters)
+
+ data = []
+
+ if filters.group_by:
+ group_by_column = frappe.scrub(filters.group_by)
+
+ for value in group_by_param_values:
+ if not value:
+ continue
+
+ records = get_rows(
+ employee_details[value],
+ attendance_map,
+ filters,
+ holiday_map
+ )
+
+ if records:
+ data.append({
+ group_by_column: frappe.bold(value)
+ })
+ data.extend(records)
+ else:
+ data = get_rows(
+ employee_details,
+ attendance_map,
+ filters,
+ holiday_map
+ )
+
+ if not data:
+ frappe.msgprint(_('No attendance records found for this criteria.'), alert=True, indicator='orange')
+ return [], {}
+
+ return data, attendance_map
+
+
+def get_attendance_map(filters: Filters) -> Dict[str, Dict[int, str]]:
+ """Returns a dictionary of employee wise attendance map for all the days of the month like
+ {
+ 'employee1': {
+ 1: 'Present',
+ 2: 'Absent'
+ },
+ 'employee2': {
+ 1: 'Absent',
+ 2: 'Present'
+ }
+ }
+ """
+ Attendance = frappe.qb.DocType('Attendance')
+ query = (
+ frappe.qb.from_(Attendance)
+ .select(
+ Attendance.employee,
+ Extract('day', Attendance.attendance_date).as_('day_of_month'),
+ Attendance.status
+ ).where(
+ (Attendance.docstatus == 1)
+ & (Attendance.company == filters.company)
+ & (Extract('month', Attendance.attendance_date) == filters.month)
+ & (Extract('year', Attendance.attendance_date) == filters.year)
+ )
)
+ if filters.employee:
+ query = query.where(Attendance.employee == filters.employee)
+
+ query = query.orderby(Attendance.employee, Attendance.attendance_date)
+
+ attendance_list = query.run(as_dict=1)
if not attendance_list:
- msgprint(_("No attendance record found"), alert=True, indicator="orange")
+ frappe.msgprint(_('No attendance records found'), alert=True, indicator='orange')
- att_map = {}
+ attendance_map = {}
for d in attendance_list:
- att_map.setdefault(d.employee, frappe._dict()).setdefault(d.day_of_month, "")
- att_map[d.employee][d.day_of_month] = d.status
+ attendance_map.setdefault(d.employee, frappe._dict()).setdefault(d.day_of_month, '')
+ attendance_map[d.employee][d.day_of_month] = d.status
- return att_map
+ return attendance_map
-def get_conditions(filters):
- if not (filters.get("month") and filters.get("year")):
- msgprint(_("Please select month and year"), raise_exception=1)
-
- filters["total_days_in_month"] = monthrange(cint(filters.year), cint(filters.month))[1]
-
- conditions = " and month(attendance_date) = %(month)s and year(attendance_date) = %(year)s"
-
- if filters.get("company"):
- conditions += " and company = %(company)s"
- if filters.get("employee"):
- conditions += " and employee = %(employee)s"
-
- return conditions, filters
-
-
-def get_employee_details(group_by, company):
- emp_map = {}
- query = """select name, employee_name, designation, department, branch, company,
- holiday_list from `tabEmployee` where company = %s """ % frappe.db.escape(
- company
+def get_employee_related_details(group_by: str, company: str) -> Tuple[Dict, List]:
+ """Returns
+ 1. nested dict for employee details
+ 2. list of values for the group by filter
+ eg: if group by filter is set to "Department" then returns a list like ['HR', 'Support', 'Engineering']
+ """
+ Employee = frappe.qb.DocType('Employee')
+ query = (
+ frappe.qb.from_(Employee)
+ .select(
+ Employee.name, Employee.employee_name, Employee.designation,
+ Employee.grade, Employee.department, Employee.branch,
+ Employee.company, Employee.holiday_list
+ ).where(Employee.company == company)
)
if group_by:
group_by = group_by.lower()
- query += " order by " + group_by + " ASC"
+ query = query.orderby(group_by)
- employee_details = frappe.db.sql(query, as_dict=1)
+ employee_details = query.run(as_dict=True)
- group_by_parameters = []
+ group_by_param_values = []
+ emp_map = {}
+
if group_by:
+ for parameter, employees in groupby(employee_details, key=lambda d: d[group_by]):
+ group_by_param_values.append(parameter)
+ emp_map.setdefault(parameter, frappe._dict())
- group_by_parameters = list(
- set(detail.get(group_by, "") for detail in employee_details if detail.get(group_by, ""))
- )
- for parameter in group_by_parameters:
- emp_map[parameter] = {}
-
- for d in employee_details:
- if group_by and len(group_by_parameters):
- if d.get(group_by, None):
-
- emp_map[d.get(group_by)][d.name] = d
- else:
- emp_map[d.name] = d
-
- if not group_by:
- return emp_map
+ for emp in employees:
+ emp_map[parameter][emp.name] = emp
else:
- return emp_map, group_by_parameters
+ for emp in employee_details:
+ emp_map[emp.name] = emp
+
+ return emp_map, group_by_param_values
-def get_holiday(holiday_list, month):
+def get_holiday_map(filters: Filters) -> Dict[str, List[Dict]]:
+ """
+ Returns a dict of holidays falling in the filter month and year
+ with list name as key and list of holidays as values like
+ {
+ 'Holiday List 1': [
+ {'day_of_month': '0' , 'weekly_off': 1},
+ {'day_of_month': '1', 'weekly_off': 0}
+ ],
+ 'Holiday List 2': [
+ {'day_of_month': '0' , 'weekly_off': 1},
+ {'day_of_month': '1', 'weekly_off': 0}
+ ]
+ }
+ """
+ # add default holiday list too
+ holiday_lists = frappe.db.get_all('Holiday List', pluck='name')
+ default_holiday_list = frappe.get_cached_value('Company', filters.company, 'default_holiday_list')
+ holiday_lists.append(default_holiday_list)
+
holiday_map = frappe._dict()
- for d in holiday_list:
- if d:
- holiday_map.setdefault(
- d,
- frappe.db.sql(
- """select day(holiday_date), weekly_off from `tabHoliday`
- where parent=%s and month(holiday_date)=%s""",
- (d, month),
- ),
+ Holiday = frappe.qb.DocType('Holiday')
+
+ for d in holiday_lists:
+ if not d:
+ continue
+
+ holidays = (
+ frappe.qb.from_(Holiday)
+ .select(
+ Extract('day', Holiday.holiday_date).as_('day_of_month'),
+ Holiday.weekly_off
+ ).where(
+ (Holiday.parent == d)
+ & (Extract('month', Holiday.holiday_date) == filters.month)
+ & (Extract('year', Holiday.holiday_date) == filters.year)
)
+ ).run(as_dict=True)
+
+ holiday_map.setdefault(d, holidays)
return holiday_map
+def get_rows(employee_details: Dict, attendance_map: Dict, filters: Filters, holiday_map: Dict) -> List[Dict]:
+ records = []
+ default_holiday_list = frappe.get_cached_value('Company', filters.company, 'default_holiday_list')
+ employees_with_attendance = list(attendance_map.keys())
+
+ for employee, details in employee_details.items():
+ if employee not in employees_with_attendance:
+ continue
+
+ row = {
+ 'employee': employee,
+ 'employee_name': details.employee_name
+ }
+
+ if filters.summarized_view:
+ # set defaults for summarized view
+ for entry in get_columns(filters):
+ if entry.get('fieldtype') == 'Float':
+ row[entry.get('fieldname')] = 0.0
+
+ emp_holiday_list = details.holiday_list or default_holiday_list
+ holidays = holiday_map[emp_holiday_list]
+
+ attendance_for_employee = get_attendance_status_for_employee(employee, filters, attendance_map, holidays)
+ row.update(attendance_for_employee)
+
+ if filters.summarized_view:
+ leave_summary = get_leave_summary(employee, filters)
+ entry_exits_summary = get_entry_exits_summary(employee, filters)
+
+ row.update(leave_summary)
+ row.update(entry_exits_summary)
+
+ records.append(row)
+
+ return records
+
+
+def get_attendance_status_for_employee(employee: str, filters: Filters, attendance_map: Dict, holidays: List) -> Dict:
+ """Returns dict of attendance status for employee
+ - for summarized view: {'total_present': 1.5, 'total_leaves': 0.5, 'total_absent': 13.5, 'total_holidays': 8, 'unmarked_days': 5}
+ - for detailed view (day wise): {1: 'A', 2: 'P', 3: 'A'....}
+ """
+ emp_attendance = {}
+
+ total_days = get_total_days_in_month(filters)
+ totals = {'total_present': 0, 'total_leaves': 0, 'total_absent': 0, 'total_holidays': 0, 'unmarked_days': 0}
+
+ for day in range(1, total_days + 1):
+ status = None
+ employee_attendance = attendance_map.get(employee)
+ if employee_attendance:
+ status = employee_attendance.get(day)
+
+ if status is None and holidays:
+ status = get_holiday_status(day, holidays)
+
+ if filters.summarized_view:
+ if status in ['Present', 'Work From Home']:
+ totals['total_present'] += 1
+ elif status in ['Weekly Off', 'Holiday']:
+ totals['total_holidays'] += 1
+ elif status == 'Absent':
+ totals['total_absent'] += 1
+ elif status == 'On Leave':
+ totals['total_leaves'] += 1
+ elif status == 'Half Day':
+ totals['total_present'] += 0.5
+ totals['total_absent'] += 0.5
+ totals['total_leaves'] += 0.5
+ elif not status:
+ totals['unmarked_days'] += 1
+ else:
+ abbr = status_map.get(status, '')
+ emp_attendance[day] = abbr
+
+ if filters.summarized_view:
+ emp_attendance.update(totals)
+
+ return emp_attendance
+
+
+def get_holiday_status(day: int, holidays: List) -> str:
+ status = None
+ for holiday in holidays:
+ if day == holiday.get('day_of_month'):
+ if holiday.get('weekly_off'):
+ status = 'Weekly Off'
+ else:
+ status = 'Holiday'
+ break
+ return status
+
+
+def get_leave_summary(employee: str, filters: Filters) -> Dict[str, float]:
+ """Returns a dict of leave type and corresponding leaves taken by employee like:
+ {'leave_without_pay': 1.0, 'sick_leave': 2.0}
+ """
+ Attendance = frappe.qb.DocType('Attendance')
+ day_case = frappe.qb.terms.Case().when(Attendance.status == 'Half Day', 0.5).else_(1)
+ sum_leave_days = Sum(day_case).as_('leave_days')
+
+ leave_details = (
+ frappe.qb.from_(Attendance)
+ .select(Attendance.leave_type, sum_leave_days)
+ .where(
+ (Attendance.employee == employee)
+ & (Attendance.company == filters.company)
+ & ((Attendance.leave_type.isnotnull()) | (Attendance.leave_type != ''))
+ & (Extract('month', Attendance.attendance_date) == filters.month)
+ & (Extract('year', Attendance.attendance_date) == filters.year)
+ ).groupby(Attendance.leave_type)
+ ).run(as_dict=True)
+
+ leaves = {}
+ for d in leave_details:
+ leave_type = frappe.scrub(d.leave_type)
+ leaves[leave_type] = d.leave_days
+
+ return leaves
+
+
+def get_entry_exits_summary(employee: str, filters: Filters) -> Dict[str, float]:
+ """Returns total late entries and total early exits for employee like:
+ {'total_late_entries': 5, 'total_early_exits': 2}
+ """
+ Attendance = frappe.qb.DocType('Attendance')
+
+ late_entry_case = frappe.qb.terms.Case().when(Attendance.late_entry == '1', '1')
+ count_late_entries = Count(late_entry_case).as_('total_late_entries')
+
+ early_exit_case = frappe.qb.terms.Case().when(Attendance.early_exit == '1', '1')
+ count_early_exits = Count(early_exit_case).as_('total_early_exits')
+
+ entry_exits = (
+ frappe.qb.from_(Attendance)
+ .select(count_late_entries, count_early_exits)
+ .where(
+ (Attendance.employee == employee)
+ & (Attendance.company == filters.company)
+ & (Extract('month', Attendance.attendance_date) == filters.month)
+ & (Extract('year', Attendance.attendance_date) == filters.year)
+ )
+ ).run(as_dict=True)
+
+ return entry_exits[0]
+
+
@frappe.whitelist()
-def get_attendance_years():
- year_list = frappe.db.sql_list(
- """select distinct YEAR(attendance_date) from tabAttendance ORDER BY YEAR(attendance_date) DESC"""
- )
- if not year_list:
+def get_attendance_years() -> str:
+ """Returns all the years for which attendance records exist"""
+ Attendance = frappe.qb.DocType('Attendance')
+ year_list = (
+ frappe.qb.from_(Attendance)
+ .select(Extract('year', Attendance.attendance_date).as_('year'))
+ .distinct()
+ ).run(as_dict=True)
+
+ if year_list:
+ year_list.sort(key=lambda d: d.year, reverse=True)
+ else:
year_list = [getdate().year]
- return "\n".join(str(year) for year in year_list)
+ return "\n".join(cstr(entry.year) for entry in year_list)
+
+
+def get_chart_data(attendance_map: Dict, filters: Filters) -> Dict:
+ days = get_columns_for_days(filters)
+ labels = []
+ absent = []
+ present = []
+ leave = []
+
+ for day in days:
+ labels.append(day['label'])
+ total_absent_on_day = total_leaves_on_day = total_present_on_day = 0
+
+ for employee, attendance in attendance_map.items():
+ attendance_on_day = attendance.get(day['fieldname'])
+ if not attendance_on_day:
+ continue
+
+ if attendance_on_day == 'Absent':
+ total_absent_on_day += 1
+ elif attendance_on_day in ['Present', 'Work From Home']:
+ total_present_on_day += 1
+ elif attendance_on_day == 'Half Day':
+ total_present_on_day += 0.5
+ total_leaves_on_day += 0.5
+ elif attendance_on_day == 'On Leave':
+ total_leaves_on_day += 1
+
+ absent.append(total_absent_on_day)
+ present.append(total_present_on_day)
+ leave.append(total_leaves_on_day)
+
+ return {
+ 'data': {
+ 'labels': labels,
+ 'datasets': [
+ {'name': 'Absent', 'values': absent},
+ {'name': 'Present', 'values': present},
+ {'name': 'Leave', 'values': leave},
+ ]
+ },
+ 'type': 'line'
+ }
+
+ return chart
\ No newline at end of file
diff --git a/erpnext/hr/report/monthly_attendance_sheet/test_monthly_attendance_sheet.py b/erpnext/hr/report/monthly_attendance_sheet/test_monthly_attendance_sheet.py
index 91da08e..9f2babb 100644
--- a/erpnext/hr/report/monthly_attendance_sheet/test_monthly_attendance_sheet.py
+++ b/erpnext/hr/report/monthly_attendance_sheet/test_monthly_attendance_sheet.py
@@ -33,14 +33,15 @@
}
)
report = execute(filters=filters)
- employees = report[1][0]
- datasets = report[3]["data"]["datasets"]
- absent = datasets[0]["values"]
- present = datasets[1]["values"]
- leaves = datasets[2]["values"]
+
+ record = report[1][0]
+ datasets = report[3]['data']['datasets']
+ absent = datasets[0]['values']
+ present = datasets[1]['values']
+ leaves = datasets[2]['values']
# ensure correct attendance is reflect on the report
- self.assertIn(self.employee, employees)
+ self.assertEqual(self.employee, record.get('employee'))
self.assertEqual(absent[0], 1)
self.assertEqual(present[1], 1)
self.assertEqual(leaves[2], 1)