refactor: mark absent for employees with no attendance
- break down into smaller functions
- make it work with multiple shifts
- this will mark employee as absent per shift, meaning employee can be present for one shift and absent for another on the same day
diff --git a/erpnext/hr/doctype/shift_type/shift_type.py b/erpnext/hr/doctype/shift_type/shift_type.py
index 17bca60..27d368c 100644
--- a/erpnext/hr/doctype/shift_type/shift_type.py
+++ b/erpnext/hr/doctype/shift_type/shift_type.py
@@ -3,21 +3,23 @@
import itertools
-from datetime import timedelta
+from datetime import datetime, timedelta
import frappe
from frappe.model.document import Document
-from frappe.utils import cint, get_datetime, getdate
+from frappe.utils import cint, get_datetime, get_time, getdate
+from erpnext.buying.doctype.supplier_scorecard.supplier_scorecard import daterange
from erpnext.hr.doctype.attendance.attendance import mark_attendance
from erpnext.hr.doctype.employee.employee import get_holiday_list_for_employee
from erpnext.hr.doctype.employee_checkin.employee_checkin import (
calculate_working_hours,
mark_attendance_and_link_log,
)
+from erpnext.hr.doctype.holiday_list.holiday_list import is_holiday
from erpnext.hr.doctype.shift_assignment.shift_assignment import (
- get_actual_start_end_datetime_of_shift,
get_employee_shift,
+ get_shift_details
)
@@ -90,46 +92,60 @@
"""Marks Absents for the given employee on working days in this shift which have no attendance marked.
The Absent is marked starting from 'process_attendance_after' or employee creation date.
"""
- date_of_joining, relieving_date, employee_creation = frappe.db.get_value(
- "Employee", employee, ["date_of_joining", "relieving_date", "creation"]
- )
- if not date_of_joining:
- date_of_joining = employee_creation.date()
- start_date = max(getdate(self.process_attendance_after), date_of_joining)
- actual_shift_datetime = get_actual_start_end_datetime_of_shift(employee, get_datetime(self.last_sync_of_checkin), True)
- last_shift_time = actual_shift_datetime.actual_start if actual_shift_datetime else get_datetime(self.last_sync_of_checkin)
- prev_shift = get_employee_shift(employee, last_shift_time - timedelta(days=1), True, 'reverse')
- if prev_shift:
- end_date = (
- min(prev_shift.start_datetime.date(), relieving_date)
- if relieving_date
- else prev_shift.start_datetime.date()
- )
- else:
+ start_date, end_date = self.get_start_and_end_dates(employee)
+
+ # no shift assignment found, no need to process absent attendance records
+ if end_date is None:
return
+
holiday_list_name = self.holiday_list
if not holiday_list_name:
holiday_list_name = get_holiday_list_for_employee(employee, False)
- dates = get_filtered_date_list(employee, start_date, end_date, holiday_list=holiday_list_name)
- for date in dates:
- shift_details = get_employee_shift(employee, get_datetime(date), True)
+ start_time = get_time(self.start_time)
+
+ for date in daterange(getdate(start_date), getdate(end_date)):
+ if is_holiday(holiday_list_name, date):
+ # skip marking absent on a holiday
+ continue
+
+ timestamp = datetime.combine(date, start_time)
+ shift_details = get_employee_shift(employee, timestamp, True)
+
if shift_details and shift_details.shift_type.name == self.name:
mark_attendance(employee, date, "Absent", self.name)
- def get_assigned_employee(self, from_date=None, consider_default_shift=False):
- filters = {"start_date": (">", from_date), "shift_type": self.name, "docstatus": "1"}
- if not from_date:
- del filters["start_date"]
+ def get_start_and_end_dates(self, employee):
+ date_of_joining, relieving_date, employee_creation = frappe.db.get_value("Employee", employee,
+ ["date_of_joining", "relieving_date", "creation"])
- assigned_employees = frappe.get_all("Shift Assignment", "employee", filters, as_list=True)
- assigned_employees = [x[0] for x in assigned_employees]
+ if not date_of_joining:
+ date_of_joining = employee_creation.date()
+
+ start_date = max(getdate(self.process_attendance_after), date_of_joining)
+ end_date = None
+
+ shift_details = get_shift_details(self.name, get_datetime(self.last_sync_of_checkin))
+ last_shift_time = shift_details.actual_start if shift_details else get_datetime(self.last_sync_of_checkin)
+
+ prev_shift = get_employee_shift(employee, last_shift_time - timedelta(days=1), True, 'reverse')
+ if prev_shift:
+ end_date = min(prev_shift.start_datetime.date(), relieving_date) if relieving_date else prev_shift.start_datetime.date()
+
+ return start_date, end_date
+
+ def get_assigned_employee(self, from_date=None, consider_default_shift=False):
+ filters = {'shift_type': self.name, 'docstatus': '1'}
+ if from_date:
+ filters['start_date'] = ('>', from_date)
+
+ assigned_employees = frappe.get_all('Shift Assignment', filters=filters, pluck='employee')
if consider_default_shift:
- filters = {"default_shift": self.name, "status": ["!=", "Inactive"]}
- default_shift_employees = frappe.get_all("Employee", "name", filters, as_list=True)
- default_shift_employees = [x[0] for x in default_shift_employees]
- return list(set(assigned_employees + default_shift_employees))
+ filters = {'default_shift': self.name, 'status': ['!=', 'Inactive']}
+ default_shift_employees = frappe.get_all('Employee', filters=filters, pluck='name')
+
+ return list(set(assigned_employees+default_shift_employees))
return assigned_employees
@@ -138,42 +154,3 @@
for shift in shift_list:
doc = frappe.get_doc("Shift Type", shift[0])
doc.process_auto_attendance()
-
-
-def get_filtered_date_list(
- employee, start_date, end_date, filter_attendance=True, holiday_list=None
-):
- """Returns a list of dates after removing the dates with attendance and holidays"""
- base_dates_query = """select adddate(%(start_date)s, t2.i*100 + t1.i*10 + t0.i) selected_date from
- (select 0 i union select 1 union select 2 union select 3 union select 4 union select 5 union select 6 union select 7 union select 8 union select 9) t0,
- (select 0 i union select 1 union select 2 union select 3 union select 4 union select 5 union select 6 union select 7 union select 8 union select 9) t1,
- (select 0 i union select 1 union select 2 union select 3 union select 4 union select 5 union select 6 union select 7 union select 8 union select 9) t2"""
- condition_query = ""
- if filter_attendance:
- condition_query += """ and a.selected_date not in (
- select attendance_date from `tabAttendance`
- where docstatus = 1 and employee = %(employee)s
- and attendance_date between %(start_date)s and %(end_date)s)"""
- if holiday_list:
- condition_query += """ and a.selected_date not in (
- select holiday_date from `tabHoliday` where parenttype = 'Holiday List' and
- parentfield = 'holidays' and parent = %(holiday_list)s
- and holiday_date between %(start_date)s and %(end_date)s)"""
-
- dates = frappe.db.sql(
- """select * from
- ({base_dates_query}) as a
- where a.selected_date <= %(end_date)s {condition_query}
- """.format(
- base_dates_query=base_dates_query, condition_query=condition_query
- ),
- {
- "employee": employee,
- "start_date": start_date,
- "end_date": end_date,
- "holiday_list": holiday_list,
- },
- as_list=True,
- )
-
- return [getdate(date[0]) for date in dates]