style: format code with black
diff --git a/erpnext/hr/doctype/attendance/attendance.py b/erpnext/hr/doctype/attendance/attendance.py
index ae0f2e7..a2487b3 100644
--- a/erpnext/hr/doctype/attendance/attendance.py
+++ b/erpnext/hr/doctype/attendance/attendance.py
@@ -14,6 +14,7 @@
class DuplicateAttendanceError(frappe.ValidationError):
pass
+
class Attendance(Document):
def validate(self):
from erpnext.controllers.status_updater import validate_status
@@ -39,12 +40,20 @@
frappe.throw(_("Attendance date can not be less than employee's joining date"))
def validate_duplicate_record(self):
- duplicate = get_duplicate_attendance_record(self.employee, self.attendance_date, self.shift, self.name)
+ duplicate = get_duplicate_attendance_record(
+ self.employee, self.attendance_date, self.shift, self.name
+ )
if duplicate:
- frappe.throw(_("Attendance for employee {0} is already marked for the date {1}: {2}").format(
- frappe.bold(self.employee), frappe.bold(self.attendance_date), get_link_to_form("Attendance", duplicate[0].name)),
- title=_("Duplicate Attendance"), exc=DuplicateAttendanceError)
+ frappe.throw(
+ _("Attendance for employee {0} is already marked for the date {1}: {2}").format(
+ frappe.bold(self.employee),
+ frappe.bold(self.attendance_date),
+ get_link_to_form("Attendance", duplicate[0].name),
+ ),
+ title=_("Duplicate Attendance"),
+ exc=DuplicateAttendanceError,
+ )
def validate_employee_status(self):
if frappe.db.get_value("Employee", self.employee, "status") == "Inactive":
@@ -101,26 +110,29 @@
attendance = frappe.qb.DocType("Attendance")
query = (
frappe.qb.from_(attendance)
- .select(attendance.name)
- .where(
- (attendance.employee == employee)
- & (attendance.docstatus < 2)
- )
+ .select(attendance.name)
+ .where((attendance.employee == employee) & (attendance.docstatus < 2))
)
if shift:
query = query.where(
- Criterion.any([
- Criterion.all([
- ((attendance.shift.isnull()) | (attendance.shift == "")),
- (attendance.attendance_date == attendance_date)
- ]),
- Criterion.all([
- ((attendance.shift.isnotnull()) | (attendance.shift != "")),
- (attendance.attendance_date == attendance_date),
- (attendance.shift == shift)
- ])
- ])
+ Criterion.any(
+ [
+ Criterion.all(
+ [
+ ((attendance.shift.isnull()) | (attendance.shift == "")),
+ (attendance.attendance_date == attendance_date),
+ ]
+ ),
+ Criterion.all(
+ [
+ ((attendance.shift.isnotnull()) | (attendance.shift != "")),
+ (attendance.attendance_date == attendance_date),
+ (attendance.shift == shift),
+ ]
+ ),
+ ]
+ )
)
else:
query = query.where((attendance.attendance_date == attendance_date))
@@ -167,18 +179,32 @@
if e not in events:
events.append(e)
-def mark_attendance(employee, attendance_date, status, shift=None, leave_type=None, ignore_validate=False):
+
+def mark_attendance(
+ employee,
+ attendance_date,
+ status,
+ shift=None,
+ leave_type=None,
+ ignore_validate=False,
+ late_entry=False,
+ early_exit=False,
+):
if not get_duplicate_attendance_record(employee, attendance_date, shift):
- company = frappe.db.get_value('Employee', employee, 'company')
- attendance = frappe.get_doc({
- 'doctype': 'Attendance',
- 'employee': employee,
- 'attendance_date': attendance_date,
- 'status': status,
- 'company': company,
- 'shift': shift,
- 'leave_type': leave_type
- })
+ company = frappe.db.get_value("Employee", employee, "company")
+ attendance = frappe.get_doc(
+ {
+ "doctype": "Attendance",
+ "employee": employee,
+ "attendance_date": attendance_date,
+ "status": status,
+ "company": company,
+ "shift": shift,
+ "leave_type": leave_type,
+ "late_entry": late_entry,
+ "early_exit": early_exit,
+ }
+ )
attendance.flags.ignore_validate = ignore_validate
attendance.insert()
attendance.submit()
diff --git a/erpnext/hr/doctype/employee_checkin/employee_checkin.py b/erpnext/hr/doctype/employee_checkin/employee_checkin.py
index a9ac60d..81c9a46 100644
--- a/erpnext/hr/doctype/employee_checkin/employee_checkin.py
+++ b/erpnext/hr/doctype/employee_checkin/employee_checkin.py
@@ -31,11 +31,21 @@
)
def fetch_shift(self):
- shift_actual_timings = get_actual_start_end_datetime_of_shift(self.employee, get_datetime(self.time), True)
+ shift_actual_timings = get_actual_start_end_datetime_of_shift(
+ self.employee, get_datetime(self.time), True
+ )
if shift_actual_timings:
- if shift_actual_timings.shift_type.determine_check_in_and_check_out == 'Strictly based on Log Type in Employee Checkin' \
- and not self.log_type and not self.skip_auto_attendance:
- frappe.throw(_('Log Type is required for check-ins falling in the shift: {0}.').format(shift_actual_timings.shift_type.name))
+ if (
+ shift_actual_timings.shift_type.determine_check_in_and_check_out
+ == "Strictly based on Log Type in Employee Checkin"
+ and not self.log_type
+ and not self.skip_auto_attendance
+ ):
+ frappe.throw(
+ _("Log Type is required for check-ins falling in the shift: {0}.").format(
+ shift_actual_timings.shift_type.name
+ )
+ )
if not self.attendance:
self.shift = shift_actual_timings.shift_type.name
self.shift_actual_start = shift_actual_timings.actual_start
@@ -125,8 +135,8 @@
("1", log_names),
)
return None
- elif attendance_status in ('Present', 'Absent', 'Half Day'):
- employee_doc = frappe.get_doc('Employee', employee)
+ elif attendance_status in ("Present", "Absent", "Half Day"):
+ employee_doc = frappe.get_doc("Employee", employee)
if not get_duplicate_attendance_record(employee, attendance_date, shift):
doc_dict = {
"doctype": "Attendance",
diff --git a/erpnext/hr/doctype/shift_assignment/shift_assignment.py b/erpnext/hr/doctype/shift_assignment/shift_assignment.py
index 768a862..fd0b4d5 100644
--- a/erpnext/hr/doctype/shift_assignment/shift_assignment.py
+++ b/erpnext/hr/doctype/shift_assignment/shift_assignment.py
@@ -19,6 +19,7 @@
class OverlappingShiftError(frappe.ValidationError):
pass
+
class ShiftAssignment(Document):
def validate(self):
validate_active_employee(self.employee)
@@ -42,27 +43,35 @@
shift = frappe.qb.DocType("Shift Assignment")
query = (
frappe.qb.from_(shift)
- .select(shift.name, shift.shift_type, shift.start_date, shift.end_date, shift.docstatus, shift.status)
- .where(
- (shift.employee == self.employee)
- & (shift.docstatus == 1)
- & (shift.name != self.name)
- & (shift.status == "Active")
- )
+ .select(
+ shift.name, shift.shift_type, shift.start_date, shift.end_date, shift.docstatus, shift.status
+ )
+ .where(
+ (shift.employee == self.employee)
+ & (shift.docstatus == 1)
+ & (shift.name != self.name)
+ & (shift.status == "Active")
+ )
)
if self.end_date:
query = query.where(
- Criterion.any([
- Criterion.any([
- shift.end_date.isnull(),
- ((self.start_date >= shift.start_date) & (self.start_date <= shift.end_date))
- ]),
- Criterion.any([
- ((self.end_date >= shift.start_date) & (self.end_date <= shift.end_date)),
- shift.start_date.between(self.start_date, self.end_date)
- ])
- ])
+ Criterion.any(
+ [
+ Criterion.any(
+ [
+ shift.end_date.isnull(),
+ ((self.start_date >= shift.start_date) & (self.start_date <= shift.end_date)),
+ ]
+ ),
+ Criterion.any(
+ [
+ ((self.end_date >= shift.start_date) & (self.end_date <= shift.end_date)),
+ shift.start_date.between(self.start_date, self.end_date),
+ ]
+ ),
+ ]
+ )
)
else:
query = query.where(
@@ -73,12 +82,27 @@
return query.run(as_dict=True)
def has_overlapping_timings(self, overlapping_shift):
- curr_shift = frappe.db.get_value("Shift Type", self.shift_type, ["start_time", "end_time"], as_dict=True)
- overlapping_shift = frappe.db.get_value("Shift Type", overlapping_shift, ["start_time", "end_time"], as_dict=True)
+ curr_shift = frappe.db.get_value(
+ "Shift Type", self.shift_type, ["start_time", "end_time"], as_dict=True
+ )
+ overlapping_shift = frappe.db.get_value(
+ "Shift Type", overlapping_shift, ["start_time", "end_time"], as_dict=True
+ )
- if ((curr_shift.start_time > overlapping_shift.start_time and curr_shift.start_time < overlapping_shift.end_time) or
- (curr_shift.end_time > overlapping_shift.start_time and curr_shift.end_time < overlapping_shift.end_time) or
- (curr_shift.start_time <= overlapping_shift.start_time and curr_shift.end_time >= overlapping_shift.end_time)):
+ if (
+ (
+ curr_shift.start_time > overlapping_shift.start_time
+ and curr_shift.start_time < overlapping_shift.end_time
+ )
+ or (
+ curr_shift.end_time > overlapping_shift.start_time
+ and curr_shift.end_time < overlapping_shift.end_time
+ )
+ or (
+ curr_shift.start_time <= overlapping_shift.start_time
+ and curr_shift.end_time >= overlapping_shift.end_time
+ )
+ ):
return True
return False
@@ -87,14 +111,20 @@
msg = None
if shift_details.docstatus == 1 and shift_details.status == "Active":
if shift_details.start_date and shift_details.end_date:
- msg = _("Employee {0} already has an active Shift {1}: {2} from {3} to {4}").format(frappe.bold(self.employee), frappe.bold(self.shift_type),
+ msg = _("Employee {0} already has an active Shift {1}: {2} from {3} to {4}").format(
+ frappe.bold(self.employee),
+ frappe.bold(self.shift_type),
get_link_to_form("Shift Assignment", shift_details.name),
getdate(self.start_date).strftime("%d-%m-%Y"),
- getdate(self.end_date).strftime("%d-%m-%Y"))
+ getdate(self.end_date).strftime("%d-%m-%Y"),
+ )
else:
- msg = _("Employee {0} already has an active Shift {1}: {2} from {3}").format(frappe.bold(self.employee), frappe.bold(self.shift_type),
+ msg = _("Employee {0} already has an active Shift {1}: {2} from {3}").format(
+ frappe.bold(self.employee),
+ frappe.bold(self.shift_type),
get_link_to_form("Shift Assignment", shift_details.name),
- getdate(self.start_date).strftime("%d-%m-%Y"))
+ getdate(self.start_date).strftime("%d-%m-%Y"),
+ )
if msg:
frappe.throw(msg, title=_("Overlapping Shifts"), exc=OverlappingShiftError)
@@ -180,10 +210,14 @@
for entry in shifts:
shift_details = get_shift_details(entry.shift_type, for_timestamp=for_timestamp)
- if get_datetime(shift_details.actual_start) <= get_datetime(for_timestamp) <= get_datetime(shift_details.actual_end):
+ if (
+ get_datetime(shift_details.actual_start)
+ <= get_datetime(for_timestamp)
+ <= get_datetime(shift_details.actual_end)
+ ):
valid_shifts.append(shift_details)
- valid_shifts.sort(key=lambda x: x['actual_start'])
+ valid_shifts.sort(key=lambda x: x["actual_start"])
if len(valid_shifts) > 1:
for i in range(len(valid_shifts) - 1):
@@ -193,8 +227,16 @@
next_shift = valid_shifts[i + 1]
if curr_shift and next_shift:
- next_shift.actual_start = curr_shift.end_datetime if next_shift.actual_start < curr_shift.end_datetime else next_shift.actual_start
- curr_shift.actual_end = next_shift.actual_start if curr_shift.actual_end > next_shift.actual_start else curr_shift.actual_end
+ next_shift.actual_start = (
+ curr_shift.end_datetime
+ if next_shift.actual_start < curr_shift.end_datetime
+ else next_shift.actual_start
+ )
+ curr_shift.actual_end = (
+ next_shift.actual_start
+ if curr_shift.actual_end > next_shift.actual_start
+ else curr_shift.actual_end
+ )
valid_shifts[i] = curr_shift
valid_shifts[i + 1] = next_shift
@@ -206,23 +248,25 @@
def get_shifts_for_date(employee: str, for_timestamp: datetime) -> List[Dict[str, str]]:
"""Returns list of shifts with details for given date"""
- assignment = frappe.qb.DocType('Shift Assignment')
+ assignment = frappe.qb.DocType("Shift Assignment")
return (
frappe.qb.from_(assignment)
- .select(assignment.name, assignment.shift_type)
- .where(
- (assignment.employee == employee)
- & (assignment.docstatus == 1)
- & (assignment.status == 'Active')
- & (assignment.start_date <= getdate(for_timestamp.date()))
- & (
- Criterion.any([
+ .select(assignment.name, assignment.shift_type)
+ .where(
+ (assignment.employee == employee)
+ & (assignment.docstatus == 1)
+ & (assignment.status == "Active")
+ & (assignment.start_date <= getdate(for_timestamp.date()))
+ & (
+ Criterion.any(
+ [
assignment.end_date.isnull(),
- (assignment.end_date.isnotnull() & (getdate(for_timestamp.date()) >= assignment.end_date))
- ])
+ (assignment.end_date.isnotnull() & (getdate(for_timestamp.date()) >= assignment.end_date)),
+ ]
)
)
+ )
).run(as_dict=True)
@@ -233,7 +277,12 @@
return {}
-def get_employee_shift(employee: str, for_timestamp: datetime = None, consider_default_shift: bool = False, next_shift_direction: str = None) -> Dict:
+def get_employee_shift(
+ employee: str,
+ for_timestamp: datetime = None,
+ consider_default_shift: bool = False,
+ next_shift_direction: str = None,
+) -> Dict:
"""Returns a Shift Type for the given employee on the given date. (excluding the holidays)
:param employee: Employee for which shift is required.
@@ -247,7 +296,7 @@
shift_details = get_shift_for_timestamp(employee, for_timestamp)
# if shift assignment is not found, consider default shift
- default_shift = frappe.db.get_value('Employee', employee, 'default_shift')
+ default_shift = frappe.db.get_value("Employee", employee, "default_shift")
if not shift_details and consider_default_shift:
shift_details = get_shift_details(default_shift, for_timestamp)
@@ -257,38 +306,55 @@
# if no shift is found, find next or prev shift assignment based on direction
if not shift_details and next_shift_direction:
- shift_details = get_prev_or_next_shift(employee, for_timestamp, consider_default_shift, default_shift, next_shift_direction)
+ shift_details = get_prev_or_next_shift(
+ employee, for_timestamp, consider_default_shift, default_shift, next_shift_direction
+ )
return shift_details or {}
-def get_prev_or_next_shift(employee: str, for_timestamp: datetime, consider_default_shift: bool,
- default_shift: str, next_shift_direction: str) -> Dict:
+def get_prev_or_next_shift(
+ employee: str,
+ for_timestamp: datetime,
+ consider_default_shift: bool,
+ default_shift: str,
+ next_shift_direction: str,
+) -> Dict:
"""Returns a dict of shift details for the next or prev shift based on the next_shift_direction"""
MAX_DAYS = 366
shift_details = {}
if consider_default_shift and default_shift:
- direction = -1 if next_shift_direction == 'reverse' else 1
+ direction = -1 if next_shift_direction == "reverse" else 1
for i in range(MAX_DAYS):
- date = for_timestamp + timedelta(days=direction*(i+1))
+ date = for_timestamp + timedelta(days=direction * (i + 1))
shift_details = get_employee_shift(employee, date, consider_default_shift, None)
if shift_details:
break
else:
- direction = '<' if next_shift_direction == 'reverse' else '>'
- sort_order = 'desc' if next_shift_direction == 'reverse' else 'asc'
- dates = frappe.db.get_all('Shift Assignment',
- ['start_date', 'end_date'],
- {'employee': employee, 'start_date': (direction, for_timestamp.date()), 'docstatus': 1, 'status': 'Active'},
+ direction = "<" if next_shift_direction == "reverse" else ">"
+ sort_order = "desc" if next_shift_direction == "reverse" else "asc"
+ dates = frappe.db.get_all(
+ "Shift Assignment",
+ ["start_date", "end_date"],
+ {
+ "employee": employee,
+ "start_date": (direction, for_timestamp.date()),
+ "docstatus": 1,
+ "status": "Active",
+ },
as_list=True,
- limit=MAX_DAYS, order_by='start_date ' + sort_order)
+ limit=MAX_DAYS,
+ order_by="start_date " + sort_order,
+ )
if dates:
for date in dates:
if date[1] and date[1] < for_timestamp.date():
continue
- shift_details = get_employee_shift(employee, datetime.combine(date[0], for_timestamp.time()), consider_default_shift, None)
+ shift_details = get_employee_shift(
+ employee, datetime.combine(date[0], for_timestamp.time()), consider_default_shift, None
+ )
if shift_details:
break
@@ -296,7 +362,9 @@
def is_holiday_date(employee: str, shift_details: Dict) -> bool:
- holiday_list_name = frappe.db.get_value('Shift Type', shift_details.shift_type.name, 'holiday_list')
+ holiday_list_name = frappe.db.get_value(
+ "Shift Type", shift_details.shift_type.name, "holiday_list"
+ )
if not holiday_list_name:
holiday_list_name = get_holiday_list_for_employee(employee, False)
@@ -304,17 +372,23 @@
return holiday_list_name and is_holiday(holiday_list_name, shift_details.start_datetime.date())
-def get_employee_shift_timings(employee: str, for_timestamp: datetime = None, consider_default_shift: bool = False) -> List[Dict]:
+def get_employee_shift_timings(
+ employee: str, for_timestamp: datetime = None, consider_default_shift: bool = False
+) -> List[Dict]:
"""Returns previous shift, current/upcoming shift, next_shift for the given timestamp and employee"""
if for_timestamp is None:
for_timestamp = now_datetime()
# write and verify a test case for midnight shift.
prev_shift = curr_shift = next_shift = None
- curr_shift = get_employee_shift(employee, for_timestamp, consider_default_shift, 'forward')
+ curr_shift = get_employee_shift(employee, for_timestamp, consider_default_shift, "forward")
if curr_shift:
- next_shift = get_employee_shift(employee, curr_shift.start_datetime + timedelta(days=1), consider_default_shift, 'forward')
- prev_shift = get_employee_shift(employee, for_timestamp + timedelta(days=-1), consider_default_shift, 'reverse')
+ next_shift = get_employee_shift(
+ employee, curr_shift.start_datetime + timedelta(days=1), consider_default_shift, "forward"
+ )
+ prev_shift = get_employee_shift(
+ employee, for_timestamp + timedelta(days=-1), consider_default_shift, "reverse"
+ )
if curr_shift:
# adjust actual start and end times if they are overlapping with grace period (before start and after end)
@@ -330,26 +404,35 @@
else prev_shift.actual_end
)
if next_shift:
- next_shift.actual_start = curr_shift.end_datetime if next_shift.actual_start < curr_shift.end_datetime else next_shift.actual_start
- curr_shift.actual_end = next_shift.actual_start if curr_shift.actual_end > next_shift.actual_start else curr_shift.actual_end
+ next_shift.actual_start = (
+ curr_shift.end_datetime
+ if next_shift.actual_start < curr_shift.end_datetime
+ else next_shift.actual_start
+ )
+ curr_shift.actual_end = (
+ next_shift.actual_start
+ if curr_shift.actual_end > next_shift.actual_start
+ else curr_shift.actual_end
+ )
return prev_shift, curr_shift, next_shift
-def get_actual_start_end_datetime_of_shift(employee: str, for_timestamp: datetime, consider_default_shift: bool = False) -> Dict:
- """
- Params:
- employee (str): Employee name
- for_timestamp (datetime, optional): Datetime value of checkin, if not provided considers current datetime
- consider_default_shift (bool, optional): Flag (defaults to False) to specify whether to consider
- default shift in employee master if no shift assignment is found
+def get_actual_start_end_datetime_of_shift(
+ employee: str, for_timestamp: datetime, consider_default_shift: bool = False
+) -> Dict:
+ """Returns a Dict containing shift details with actual_start and actual_end datetime values
+ Here 'actual' means taking into account the "begin_check_in_before_shift_start_time" and "allow_check_out_after_shift_end_time".
+ Empty Dict is returned if the timestamp is outside any actual shift timings.
- Returns:
- dict: Dict containing shift details with actual_start and actual_end datetime values
- Here 'actual' means taking into account the "begin_check_in_before_shift_start_time" and "allow_check_out_after_shift_end_time".
- Empty Dict is returned if the timestamp is outside any actual shift timings.
+ :param employee (str): Employee name
+ :param for_timestamp (datetime, optional): Datetime value of checkin, if not provided considers current datetime
+ :param consider_default_shift (bool, optional): Flag (defaults to False) to specify whether to consider
+ default shift in employee master if no shift assignment is found
"""
- shift_timings_as_per_timestamp = get_employee_shift_timings(employee, for_timestamp, consider_default_shift)
+ shift_timings_as_per_timestamp = get_employee_shift_timings(
+ employee, for_timestamp, consider_default_shift
+ )
return get_exact_shift(shift_timings_as_per_timestamp, for_timestamp)
@@ -381,25 +464,22 @@
if timestamp_index:
break
- if timestamp_index and timestamp_index%2 == 1:
- shift_details = shifts[int((timestamp_index-1)/2)]
+ if timestamp_index and timestamp_index % 2 == 1:
+ shift_details = shifts[int((timestamp_index - 1) / 2)]
return shift_details
def get_shift_details(shift_type_name: str, for_timestamp: datetime = None) -> Dict:
- """
- Params:
- shift_type_name (str): shift type name for which shift_details are required.
- for_timestamp (datetime, optional): Datetime value of checkin, if not provided considers current datetime
+ """Returns a Dict containing shift details with the following data:
+ 'shift_type' - Object of DocType Shift Type,
+ 'start_datetime' - datetime of shift start on given timestamp,
+ 'end_datetime' - datetime of shift end on given timestamp,
+ 'actual_start' - datetime of shift start after adding 'begin_check_in_before_shift_start_time',
+ 'actual_end' - datetime of shift end after adding 'allow_check_out_after_shift_end_time' (None is returned if this is zero)
- Returns:
- dict: Dict containing shift details with the following data:
- 'shift_type' - Object of DocType Shift Type,
- 'start_datetime' - datetime of shift start on given timestamp,
- 'end_datetime' - datetime of shift end on given timestamp,
- 'actual_start' - datetime of shift start after adding 'begin_check_in_before_shift_start_time',
- 'actual_end' - datetime of shift end after adding 'allow_check_out_after_shift_end_time' (None is returned if this is zero)
+ :param shift_type_name (str): shift type name for which shift_details are required.
+ :param for_timestamp (datetime, optional): Datetime value of checkin, if not provided considers current datetime
"""
if not shift_type_name:
return {}
@@ -407,8 +487,10 @@
if for_timestamp is None:
for_timestamp = now_datetime()
- shift_type = frappe.get_doc('Shift Type', shift_type_name)
- shift_actual_start = shift_type.start_time - timedelta(minutes=shift_type.begin_check_in_before_shift_start_time)
+ shift_type = frappe.get_doc("Shift Type", shift_type_name)
+ shift_actual_start = shift_type.start_time - timedelta(
+ minutes=shift_type.begin_check_in_before_shift_start_time
+ )
if shift_type.start_time > shift_type.end_time:
# shift spans accross 2 different days
@@ -428,13 +510,17 @@
start_datetime = datetime.combine(for_timestamp, datetime.min.time()) + shift_type.start_time
end_datetime = datetime.combine(for_timestamp, datetime.min.time()) + shift_type.end_time
- actual_start = start_datetime - timedelta(minutes=shift_type.begin_check_in_before_shift_start_time)
+ actual_start = start_datetime - timedelta(
+ minutes=shift_type.begin_check_in_before_shift_start_time
+ )
actual_end = end_datetime + timedelta(minutes=shift_type.allow_check_out_after_shift_end_time)
- return frappe._dict({
- 'shift_type': shift_type,
- 'start_datetime': start_datetime,
- 'end_datetime': end_datetime,
- 'actual_start': actual_start,
- 'actual_end': actual_end
- })
+ return frappe._dict(
+ {
+ "shift_type": shift_type,
+ "start_datetime": start_datetime,
+ "end_datetime": end_datetime,
+ "actual_start": actual_start,
+ "actual_end": actual_end,
+ }
+ )
diff --git a/erpnext/hr/doctype/shift_type/shift_type.py b/erpnext/hr/doctype/shift_type/shift_type.py
index dd1dff1..f5689d1 100644
--- a/erpnext/hr/doctype/shift_type/shift_type.py
+++ b/erpnext/hr/doctype/shift_type/shift_type.py
@@ -34,19 +34,40 @@
return
filters = {
- 'skip_auto_attendance': 0,
- 'attendance': ('is', 'not set'),
- 'time': ('>=', self.process_attendance_after),
- 'shift_actual_end': ('<', self.last_sync_of_checkin),
- 'shift': self.name
+ "skip_auto_attendance": 0,
+ "attendance": ("is", "not set"),
+ "time": (">=", self.process_attendance_after),
+ "shift_actual_end": ("<", self.last_sync_of_checkin),
+ "shift": self.name,
}
- logs = frappe.db.get_list('Employee Checkin', fields="*", filters=filters, order_by="employee,time")
+ logs = frappe.db.get_list(
+ "Employee Checkin", fields="*", filters=filters, order_by="employee,time"
+ )
- for key, group in itertools.groupby(logs, key=lambda x: (x['employee'], x['shift_actual_start'])):
+ for key, group in itertools.groupby(
+ logs, key=lambda x: (x["employee"], x["shift_actual_start"])
+ ):
single_shift_logs = list(group)
- attendance_status, working_hours, late_entry, early_exit, in_time, out_time = self.get_attendance(single_shift_logs)
- mark_attendance_and_link_log(single_shift_logs, attendance_status, key[1].date(),
- working_hours, late_entry, early_exit, in_time, out_time, self.name)
+ (
+ attendance_status,
+ working_hours,
+ late_entry,
+ early_exit,
+ in_time,
+ out_time,
+ ) = self.get_attendance(single_shift_logs)
+
+ mark_attendance_and_link_log(
+ single_shift_logs,
+ attendance_status,
+ key[1].date(),
+ working_hours,
+ late_entry,
+ early_exit,
+ in_time,
+ out_time,
+ self.name,
+ )
for employee in self.get_assigned_employee(self.process_attendance_after, True):
self.mark_absent_for_dates_with_no_attendance(employee)
@@ -54,9 +75,9 @@
def get_attendance(self, logs):
"""Return attendance_status, working_hours, late_entry, early_exit, in_time, out_time
for a set of logs belonging to a single shift.
- Assumption:
- 1. These logs belongs to a single shift, single employee and it's not in a holiday date.
- 2. Logs are in chronological order
+ Assumptions:
+ 1. These logs belongs to a single shift, single employee and it's not in a holiday date.
+ 2. Logs are in chronological order
"""
late_entry = early_exit = False
total_working_hours, in_time, out_time = calculate_working_hours(
@@ -116,8 +137,9 @@
mark_attendance(employee, date, "Absent", self.name)
def get_start_and_end_dates(self, employee):
- date_of_joining, relieving_date, employee_creation = frappe.db.get_value("Employee", employee,
- ["date_of_joining", "relieving_date", "creation"])
+ date_of_joining, relieving_date, employee_creation = frappe.db.get_value(
+ "Employee", employee, ["date_of_joining", "relieving_date", "creation"]
+ )
if not date_of_joining:
date_of_joining = employee_creation.date()
@@ -126,26 +148,32 @@
end_date = None
shift_details = get_shift_details(self.name, get_datetime(self.last_sync_of_checkin))
- last_shift_time = shift_details.actual_start if shift_details else get_datetime(self.last_sync_of_checkin)
+ last_shift_time = (
+ shift_details.actual_start if shift_details else get_datetime(self.last_sync_of_checkin)
+ )
- prev_shift = get_employee_shift(employee, last_shift_time - timedelta(days=1), True, 'reverse')
+ prev_shift = get_employee_shift(employee, last_shift_time - timedelta(days=1), True, "reverse")
if prev_shift:
- end_date = min(prev_shift.start_datetime.date(), relieving_date) if relieving_date else prev_shift.start_datetime.date()
+ end_date = (
+ min(prev_shift.start_datetime.date(), relieving_date)
+ if relieving_date
+ else prev_shift.start_datetime.date()
+ )
return start_date, end_date
def get_assigned_employee(self, from_date=None, consider_default_shift=False):
- filters = {'shift_type': self.name, 'docstatus': '1'}
+ filters = {"shift_type": self.name, "docstatus": "1"}
if from_date:
- filters['start_date'] = ('>', from_date)
+ filters["start_date"] = (">", from_date)
- assigned_employees = frappe.get_all('Shift Assignment', filters=filters, pluck='employee')
+ assigned_employees = frappe.get_all("Shift Assignment", filters=filters, pluck="employee")
if consider_default_shift:
- filters = {'default_shift': self.name, 'status': ['!=', 'Inactive']}
- default_shift_employees = frappe.get_all('Employee', filters=filters, pluck='name')
+ filters = {"default_shift": self.name, "status": ["!=", "Inactive"]}
+ default_shift_employees = frappe.get_all("Employee", filters=filters, pluck="name")
- return list(set(assigned_employees+default_shift_employees))
+ return list(set(assigned_employees + default_shift_employees))
return assigned_employees
diff --git a/erpnext/hr/report/monthly_attendance_sheet/monthly_attendance_sheet.py b/erpnext/hr/report/monthly_attendance_sheet/monthly_attendance_sheet.py
index a98afe4..efd2d38 100644
--- a/erpnext/hr/report/monthly_attendance_sheet/monthly_attendance_sheet.py
+++ b/erpnext/hr/report/monthly_attendance_sheet/monthly_attendance_sheet.py
@@ -14,44 +14,47 @@
Filters = frappe._dict
status_map = {
- 'Present': 'P',
- 'Absent': 'A',
- 'Half Day': 'HD',
- 'Work From Home': 'WFH',
- 'On Leave': 'L',
- 'Holiday': 'H',
- 'Weekly Off': 'WO'
+ "Present": "P",
+ "Absent": "A",
+ "Half Day": "HD",
+ "Work From Home": "WFH",
+ "On Leave": "L",
+ "Holiday": "H",
+ "Weekly Off": "WO",
}
-day_abbr = ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun']
+day_abbr = ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"]
-def execute(filters: Optional[Filters] = None) -> Tuple:
+
+def execute(filters: Optional[Filters] = None) -> Tuple:
filters = frappe._dict(filters or {})
if not (filters.month and filters.year):
- frappe.throw(_('Please select month and year.'))
+ frappe.throw(_("Please select month and year."))
attendance_map = get_attendance_map(filters)
if not attendance_map:
- frappe.msgprint(_('No attendance records found.'), alert=True, indicator='orange')
+ frappe.msgprint(_("No attendance records found."), alert=True, indicator="orange")
return [], [], None, None
columns = get_columns(filters)
data = get_data(filters, attendance_map)
if not data:
- frappe.msgprint(_('No attendance records found for this criteria.'), alert=True, indicator='orange')
+ frappe.msgprint(
+ _("No attendance records found for this criteria."), alert=True, indicator="orange"
+ )
return columns, [], None, None
- message = get_message() if not filters.summarized_view else ''
+ message = get_message() if not filters.summarized_view else ""
chart = get_chart_data(attendance_map, filters)
return columns, data, message, chart
def get_message() -> str:
- message = ''
- colors = ['green', 'red', 'orange', 'green', '#318AD8', '', '']
+ message = ""
+ colors = ["green", "red", "orange", "green", "#318AD8", "", ""]
count = 0
for status, abbr in status_map.items():
@@ -70,39 +73,84 @@
if filters.group_by:
columns.append(
- {'label': _(filters.group_by), 'fieldname': frappe.scrub(filters.group_by), 'fieldtype': 'Link', 'options': 'Branch', 'width': 120}
+ {
+ "label": _(filters.group_by),
+ "fieldname": frappe.scrub(filters.group_by),
+ "fieldtype": "Link",
+ "options": "Branch",
+ "width": 120,
+ }
)
- columns.extend([
- {'label': _('Employee'), 'fieldname': 'employee', 'fieldtype': 'Link', 'options': 'Employee', 'width': 135},
- {'label': _('Employee Name'), 'fieldname': 'employee_name', 'fieldtype': 'Data', 'width': 120}
- ])
+ columns.extend(
+ [
+ {
+ "label": _("Employee"),
+ "fieldname": "employee",
+ "fieldtype": "Link",
+ "options": "Employee",
+ "width": 135,
+ },
+ {"label": _("Employee Name"), "fieldname": "employee_name", "fieldtype": "Data", "width": 120},
+ ]
+ )
if filters.summarized_view:
- columns.extend([
- {'label': _('Total Present'), 'fieldname': 'total_present', 'fieldtype': 'Float', 'width': 110},
- {'label': _('Total Leaves'), 'fieldname': 'total_leaves', 'fieldtype': 'Float', 'width': 110},
- {'label': _('Total Absent'), 'fieldname': 'total_absent', 'fieldtype': 'Float', 'width': 110},
- {'label': _('Total Holidays'), 'fieldname': 'total_holidays', 'fieldtype': 'Float', 'width': 120},
- {'label': _('Unmarked Days'), 'fieldname': 'unmarked_days', 'fieldtype': 'Float', 'width': 130}
- ])
+ columns.extend(
+ [
+ {
+ "label": _("Total Present"),
+ "fieldname": "total_present",
+ "fieldtype": "Float",
+ "width": 110,
+ },
+ {"label": _("Total Leaves"), "fieldname": "total_leaves", "fieldtype": "Float", "width": 110},
+ {"label": _("Total Absent"), "fieldname": "total_absent", "fieldtype": "Float", "width": 110},
+ {
+ "label": _("Total Holidays"),
+ "fieldname": "total_holidays",
+ "fieldtype": "Float",
+ "width": 120,
+ },
+ {
+ "label": _("Unmarked Days"),
+ "fieldname": "unmarked_days",
+ "fieldtype": "Float",
+ "width": 130,
+ },
+ ]
+ )
columns.extend(get_columns_for_leave_types())
- columns.extend([
- {'label': _('Total Late Entries'), 'fieldname': 'total_late_entries', 'fieldtype': 'Float', 'width': 140},
- {'label': _('Total Early Exits'), 'fieldname': 'total_early_exits', 'fieldtype': 'Float', 'width': 140}
- ])
+ columns.extend(
+ [
+ {
+ "label": _("Total Late Entries"),
+ "fieldname": "total_late_entries",
+ "fieldtype": "Float",
+ "width": 140,
+ },
+ {
+ "label": _("Total Early Exits"),
+ "fieldname": "total_early_exits",
+ "fieldtype": "Float",
+ "width": 140,
+ },
+ ]
+ )
else:
- columns.append({'label': _('Shift'), 'fieldname': 'shift', 'fieldtype': 'Data', 'width': 120})
+ columns.append({"label": _("Shift"), "fieldname": "shift", "fieldtype": "Data", "width": 120})
columns.extend(get_columns_for_days(filters))
return columns
def get_columns_for_leave_types() -> List[Dict]:
- leave_types = frappe.db.get_all('Leave Type', pluck='name')
+ leave_types = frappe.db.get_all("Leave Type", pluck="name")
types = []
for entry in leave_types:
- types.append({'label': entry, 'fieldname': frappe.scrub(entry), 'fieldtype': 'Float', 'width': 120})
+ types.append(
+ {"label": entry, "fieldname": frappe.scrub(entry), "fieldtype": "Float", "width": 120}
+ )
return types
@@ -111,23 +159,14 @@
total_days = get_total_days_in_month(filters)
days = []
- for day in range(1, total_days+1):
+ for day in range(1, total_days + 1):
# forms the dates from selected year and month from filters
- date = '{}-{}-{}'.format(
- cstr(filters.year),
- cstr(filters.month),
- cstr(day)
- )
+ date = "{}-{}-{}".format(cstr(filters.year), cstr(filters.month), cstr(day))
# gets abbr from weekday number
weekday = day_abbr[getdate(date).weekday()]
# sets days as 1 Mon, 2 Tue, 3 Wed
- label = '{} {}'.format(cstr(day), weekday)
- days.append({
- 'label': label,
- 'fieldtype': 'Data',
- 'fieldname': day,
- 'width': 65
- })
+ label = "{} {}".format(cstr(day), weekday)
+ days.append({"label": label, "fieldtype": "Data", "fieldname": day, "width": 65})
return days
@@ -137,7 +176,9 @@
def get_data(filters: Filters, attendance_map: Dict) -> List[Dict]:
- employee_details, group_by_param_values = get_employee_related_details(filters.group_by, filters.company)
+ employee_details, group_by_param_values = get_employee_related_details(
+ filters.group_by, filters.company
+ )
holiday_map = get_holiday_map(filters)
data = []
@@ -151,9 +192,7 @@
records = get_rows(employee_details[value], filters, holiday_map, attendance_map)
if records:
- data.append({
- group_by_column: frappe.bold(value)
- })
+ data.append({group_by_column: frappe.bold(value)})
data.extend(records)
else:
data = get_rows(employee_details, filters, holiday_map, attendance_map)
@@ -163,30 +202,31 @@
def get_attendance_map(filters: Filters) -> Dict:
"""Returns a dictionary of employee wise attendance map as per shifts for all the days of the month like
- {
- 'employee1': {
- 'Morning Shift': {1: 'Present', 2: 'Absent', ...}
- 'Evening Shift': {1: 'Absent', 2: 'Present', ...}
- },
- 'employee2': {
- 'Afternoon Shift': {1: 'Present', 2: 'Absent', ...}
- 'Night Shift': {1: 'Absent', 2: 'Absent', ...}
- }
- }
+ {
+ 'employee1': {
+ 'Morning Shift': {1: 'Present', 2: 'Absent', ...}
+ 'Evening Shift': {1: 'Absent', 2: 'Present', ...}
+ },
+ 'employee2': {
+ 'Afternoon Shift': {1: 'Present', 2: 'Absent', ...}
+ 'Night Shift': {1: 'Absent', 2: 'Absent', ...}
+ }
+ }
"""
- Attendance = frappe.qb.DocType('Attendance')
+ Attendance = frappe.qb.DocType("Attendance")
query = (
frappe.qb.from_(Attendance)
.select(
Attendance.employee,
- Extract('day', Attendance.attendance_date).as_('day_of_month'),
+ Extract("day", Attendance.attendance_date).as_("day_of_month"),
Attendance.status,
- Attendance.shift
- ).where(
+ Attendance.shift,
+ )
+ .where(
(Attendance.docstatus == 1)
& (Attendance.company == filters.company)
- & (Extract('month', Attendance.attendance_date) == filters.month)
- & (Extract('year', Attendance.attendance_date) == filters.year)
+ & (Extract("month", Attendance.attendance_date) == filters.month)
+ & (Extract("year", Attendance.attendance_date) == filters.year)
)
)
if filters.employee:
@@ -205,18 +245,23 @@
def get_employee_related_details(group_by: str, company: str) -> Tuple[Dict, List]:
"""Returns
- 1. nested dict for employee details
- 2. list of values for the group by filter
- eg: if group by filter is set to "Department" then returns a list like ['HR', 'Support', 'Engineering']
+ 1. nested dict for employee details
+ 2. list of values for the group by filter
"""
- Employee = frappe.qb.DocType('Employee')
+ Employee = frappe.qb.DocType("Employee")
query = (
frappe.qb.from_(Employee)
.select(
- Employee.name, Employee.employee_name, Employee.designation,
- Employee.grade, Employee.department, Employee.branch,
- Employee.company, Employee.holiday_list
- ).where(Employee.company == company)
+ Employee.name,
+ Employee.employee_name,
+ Employee.designation,
+ Employee.grade,
+ Employee.department,
+ Employee.branch,
+ Employee.company,
+ Employee.holiday_list,
+ )
+ .where(Employee.company == company)
)
if group_by:
@@ -247,23 +292,23 @@
Returns a dict of holidays falling in the filter month and year
with list name as key and list of holidays as values like
{
- 'Holiday List 1': [
- {'day_of_month': '0' , 'weekly_off': 1},
- {'day_of_month': '1', 'weekly_off': 0}
- ],
- 'Holiday List 2': [
- {'day_of_month': '0' , 'weekly_off': 1},
- {'day_of_month': '1', 'weekly_off': 0}
- ]
+ 'Holiday List 1': [
+ {'day_of_month': '0' , 'weekly_off': 1},
+ {'day_of_month': '1', 'weekly_off': 0}
+ ],
+ 'Holiday List 2': [
+ {'day_of_month': '0' , 'weekly_off': 1},
+ {'day_of_month': '1', 'weekly_off': 0}
+ ]
}
"""
# add default holiday list too
- holiday_lists = frappe.db.get_all('Holiday List', pluck='name')
- default_holiday_list = frappe.get_cached_value('Company', filters.company, 'default_holiday_list')
+ holiday_lists = frappe.db.get_all("Holiday List", pluck="name")
+ default_holiday_list = frappe.get_cached_value("Company", filters.company, "default_holiday_list")
holiday_lists.append(default_holiday_list)
holiday_map = frappe._dict()
- Holiday = frappe.qb.DocType('Holiday')
+ Holiday = frappe.qb.DocType("Holiday")
for d in holiday_lists:
if not d:
@@ -271,13 +316,11 @@
holidays = (
frappe.qb.from_(Holiday)
- .select(
- Extract('day', Holiday.holiday_date).as_('day_of_month'),
- Holiday.weekly_off
- ).where(
+ .select(Extract("day", Holiday.holiday_date).as_("day_of_month"), Holiday.weekly_off)
+ .where(
(Holiday.parent == d)
- & (Extract('month', Holiday.holiday_date) == filters.month)
- & (Extract('year', Holiday.holiday_date) == filters.year)
+ & (Extract("month", Holiday.holiday_date) == filters.month)
+ & (Extract("year", Holiday.holiday_date) == filters.year)
)
).run(as_dict=True)
@@ -286,13 +329,15 @@
return holiday_map
-def get_rows(employee_details: Dict, filters: Filters, holiday_map: Dict, attendance_map: Dict) -> List[Dict]:
+def get_rows(
+ employee_details: Dict, filters: Filters, holiday_map: Dict, attendance_map: Dict
+) -> List[Dict]:
records = []
- default_holiday_list = frappe.get_cached_value('Company', filters.company, 'default_holiday_list')
+ default_holiday_list = frappe.get_cached_value("Company", filters.company, "default_holiday_list")
for employee, details in employee_details.items():
emp_holiday_list = details.holiday_list or default_holiday_list
- holidays = holiday_map[emp_holiday_list]
+ holidays = holiday_map.get(emp_holiday_list)
if filters.summarized_view:
attendance = get_attendance_status_for_summarized_view(employee, filters, holidays)
@@ -302,7 +347,7 @@
leave_summary = get_leave_summary(employee, filters)
entry_exits_summary = get_entry_exits_summary(employee, filters)
- row = {'employee': employee, 'employee_name': details.employee_name}
+ row = {"employee": employee, "employee_name": details.employee_name}
set_defaults_for_summarized_view(filters, row)
row.update(attendance)
row.update(leave_summary)
@@ -314,12 +359,13 @@
if not employee_attendance:
continue
- attendance_for_employee = get_attendance_status_for_detailed_view(employee, filters, employee_attendance, holidays)
+ attendance_for_employee = get_attendance_status_for_detailed_view(
+ employee, filters, employee_attendance, holidays
+ )
# set employee details in the first row
- attendance_for_employee[0].update({
- 'employee': employee,
- 'employee_name': details.employee_name
- })
+ attendance_for_employee[0].update(
+ {"employee": employee, "employee_name": details.employee_name}
+ )
records.extend(attendance_for_employee)
@@ -328,13 +374,15 @@
def set_defaults_for_summarized_view(filters, row):
for entry in get_columns(filters):
- if entry.get('fieldtype') == 'Float':
- row[entry.get('fieldname')] = 0.0
+ if entry.get("fieldtype") == "Float":
+ row[entry.get("fieldname")] = 0.0
-def get_attendance_status_for_summarized_view(employee: str, filters: Filters, holidays: List) -> Dict:
+def get_attendance_status_for_summarized_view(
+ employee: str, filters: Filters, holidays: List
+) -> Dict:
"""Returns dict of attendance status for employee like
- {'total_present': 1.5, 'total_leaves': 0.5, 'total_absent': 13.5, 'total_holidays': 8, 'unmarked_days': 5}
+ {'total_present': 1.5, 'total_leaves': 0.5, 'total_absent': 13.5, 'total_holidays': 8, 'unmarked_days': 5}
"""
summary, attendance_days = get_attendance_summary_and_days(employee, filters)
if not any(summary.values()):
@@ -348,83 +396,93 @@
continue
status = get_holiday_status(day, holidays)
- if status in ['Weekly Off', 'Holiday']:
+ if status in ["Weekly Off", "Holiday"]:
total_holidays += 1
elif not status:
total_unmarked_days += 1
return {
- 'total_present': summary.total_present + summary.total_half_days,
- 'total_leaves': summary.total_leaves + summary.total_half_days,
- 'total_absent': summary.total_absent + summary.total_half_days,
- 'total_holidays': total_holidays,
- 'unmarked_days': total_unmarked_days
+ "total_present": summary.total_present + summary.total_half_days,
+ "total_leaves": summary.total_leaves + summary.total_half_days,
+ "total_absent": summary.total_absent + summary.total_half_days,
+ "total_holidays": total_holidays,
+ "unmarked_days": total_unmarked_days,
}
def get_attendance_summary_and_days(employee: str, filters: Filters) -> Tuple[Dict, List]:
- Attendance = frappe.qb.DocType('Attendance')
+ Attendance = frappe.qb.DocType("Attendance")
- present_case = frappe.qb.terms.Case().when(((Attendance.status == 'Present') | (Attendance.status == 'Work From Home')), 1).else_(0)
- sum_present = Sum(present_case).as_('total_present')
+ present_case = (
+ frappe.qb.terms.Case()
+ .when(((Attendance.status == "Present") | (Attendance.status == "Work From Home")), 1)
+ .else_(0)
+ )
+ sum_present = Sum(present_case).as_("total_present")
- absent_case = frappe.qb.terms.Case().when(Attendance.status == 'Absent', 1).else_(0)
- sum_absent = Sum(absent_case).as_('total_absent')
+ absent_case = frappe.qb.terms.Case().when(Attendance.status == "Absent", 1).else_(0)
+ sum_absent = Sum(absent_case).as_("total_absent")
- leave_case = frappe.qb.terms.Case().when(Attendance.status == 'On Leave', 1).else_(0)
- sum_leave = Sum(leave_case).as_('total_leaves')
+ leave_case = frappe.qb.terms.Case().when(Attendance.status == "On Leave", 1).else_(0)
+ sum_leave = Sum(leave_case).as_("total_leaves")
- half_day_case = frappe.qb.terms.Case().when(Attendance.status == 'Half Day', 0.5).else_(0)
- sum_half_day = Sum(half_day_case).as_('total_half_days')
+ half_day_case = frappe.qb.terms.Case().when(Attendance.status == "Half Day", 0.5).else_(0)
+ sum_half_day = Sum(half_day_case).as_("total_half_days")
summary = (
frappe.qb.from_(Attendance)
.select(
- sum_present, sum_absent, sum_leave, sum_half_day,
- ).where(
+ sum_present,
+ sum_absent,
+ sum_leave,
+ sum_half_day,
+ )
+ .where(
(Attendance.docstatus == 1)
& (Attendance.employee == employee)
& (Attendance.company == filters.company)
- & (Extract('month', Attendance.attendance_date) == filters.month)
- & (Extract('year', Attendance.attendance_date) == filters.year)
+ & (Extract("month", Attendance.attendance_date) == filters.month)
+ & (Extract("year", Attendance.attendance_date) == filters.year)
)
).run(as_dict=True)
days = (
frappe.qb.from_(Attendance)
- .select(Extract('day', Attendance.attendance_date).as_('day_of_month'))
+ .select(Extract("day", Attendance.attendance_date).as_("day_of_month"))
.distinct()
.where(
(Attendance.docstatus == 1)
& (Attendance.employee == employee)
& (Attendance.company == filters.company)
- & (Extract('month', Attendance.attendance_date) == filters.month)
- & (Extract('year', Attendance.attendance_date) == filters.year)
+ & (Extract("month", Attendance.attendance_date) == filters.month)
+ & (Extract("year", Attendance.attendance_date) == filters.year)
)
).run(pluck=True)
return summary[0], days
-def get_attendance_status_for_detailed_view(employee: str, filters: Filters, employee_attendance: Dict, holidays: List) -> List[Dict]:
+def get_attendance_status_for_detailed_view(
+ employee: str, filters: Filters, employee_attendance: Dict, holidays: List
+) -> List[Dict]:
"""Returns list of shift-wise attendance status for employee
- [
- {'shift': 'Morning Shift', 1: 'A', 2: 'P', 3: 'A'....},
- {'shift': 'Evening Shift', 1: 'P', 2: 'A', 3: 'P'....}
- ]
+ [
+ {'shift': 'Morning Shift', 1: 'A', 2: 'P', 3: 'A'....},
+ {'shift': 'Evening Shift', 1: 'P', 2: 'A', 3: 'P'....}
+ ]
"""
total_days = get_total_days_in_month(filters)
attendance_values = []
for shift, status_dict in employee_attendance.items():
- row = {'shift': shift}
+ row = {"shift": shift}
for day in range(1, total_days + 1):
status = status_dict.get(day)
if status is None and holidays:
status = get_holiday_status(day, holidays)
- abbr = status_map.get(status, '')
+ abbr = status_map.get(status, "")
row[day] = abbr
attendance_values.append(row)
@@ -435,22 +493,22 @@
def get_holiday_status(day: int, holidays: List) -> str:
status = None
for holiday in holidays:
- if day == holiday.get('day_of_month'):
- if holiday.get('weekly_off'):
- status = 'Weekly Off'
+ if day == holiday.get("day_of_month"):
+ if holiday.get("weekly_off"):
+ status = "Weekly Off"
else:
- status = 'Holiday'
+ status = "Holiday"
break
return status
def get_leave_summary(employee: str, filters: Filters) -> Dict[str, float]:
"""Returns a dict of leave type and corresponding leaves taken by employee like:
- {'leave_without_pay': 1.0, 'sick_leave': 2.0}
+ {'leave_without_pay': 1.0, 'sick_leave': 2.0}
"""
- Attendance = frappe.qb.DocType('Attendance')
- day_case = frappe.qb.terms.Case().when(Attendance.status == 'Half Day', 0.5).else_(1)
- sum_leave_days = Sum(day_case).as_('leave_days')
+ Attendance = frappe.qb.DocType("Attendance")
+ day_case = frappe.qb.terms.Case().when(Attendance.status == "Half Day", 0.5).else_(1)
+ sum_leave_days = Sum(day_case).as_("leave_days")
leave_details = (
frappe.qb.from_(Attendance)
@@ -459,10 +517,11 @@
(Attendance.employee == employee)
& (Attendance.docstatus == 1)
& (Attendance.company == filters.company)
- & ((Attendance.leave_type.isnotnull()) | (Attendance.leave_type != ''))
- & (Extract('month', Attendance.attendance_date) == filters.month)
- & (Extract('year', Attendance.attendance_date) == filters.year)
- ).groupby(Attendance.leave_type)
+ & ((Attendance.leave_type.isnotnull()) | (Attendance.leave_type != ""))
+ & (Extract("month", Attendance.attendance_date) == filters.month)
+ & (Extract("year", Attendance.attendance_date) == filters.year)
+ )
+ .groupby(Attendance.leave_type)
).run(as_dict=True)
leaves = {}
@@ -475,15 +534,15 @@
def get_entry_exits_summary(employee: str, filters: Filters) -> Dict[str, float]:
"""Returns total late entries and total early exits for employee like:
- {'total_late_entries': 5, 'total_early_exits': 2}
+ {'total_late_entries': 5, 'total_early_exits': 2}
"""
- Attendance = frappe.qb.DocType('Attendance')
+ Attendance = frappe.qb.DocType("Attendance")
- late_entry_case = frappe.qb.terms.Case().when(Attendance.late_entry == '1', '1')
- count_late_entries = Count(late_entry_case).as_('total_late_entries')
+ late_entry_case = frappe.qb.terms.Case().when(Attendance.late_entry == "1", "1")
+ count_late_entries = Count(late_entry_case).as_("total_late_entries")
- early_exit_case = frappe.qb.terms.Case().when(Attendance.early_exit == '1', '1')
- count_early_exits = Count(early_exit_case).as_('total_early_exits')
+ early_exit_case = frappe.qb.terms.Case().when(Attendance.early_exit == "1", "1")
+ count_early_exits = Count(early_exit_case).as_("total_early_exits")
entry_exits = (
frappe.qb.from_(Attendance)
@@ -492,8 +551,8 @@
(Attendance.docstatus == 1)
& (Attendance.employee == employee)
& (Attendance.company == filters.company)
- & (Extract('month', Attendance.attendance_date) == filters.month)
- & (Extract('year', Attendance.attendance_date) == filters.year)
+ & (Extract("month", Attendance.attendance_date) == filters.month)
+ & (Extract("year", Attendance.attendance_date) == filters.year)
)
).run(as_dict=True)
@@ -503,10 +562,10 @@
@frappe.whitelist()
def get_attendance_years() -> str:
"""Returns all the years for which attendance records exist"""
- Attendance = frappe.qb.DocType('Attendance')
+ Attendance = frappe.qb.DocType("Attendance")
year_list = (
frappe.qb.from_(Attendance)
- .select(Extract('year', Attendance.attendance_date).as_('year'))
+ .select(Extract("year", Attendance.attendance_date).as_("year"))
.distinct()
).run(as_dict=True)
@@ -526,21 +585,21 @@
leave = []
for day in days:
- labels.append(day['label'])
+ labels.append(day["label"])
total_absent_on_day = total_leaves_on_day = total_present_on_day = 0
for employee, attendance_dict in attendance_map.items():
for shift, attendance in attendance_dict.items():
- attendance_on_day = attendance.get(day['fieldname'])
+ attendance_on_day = attendance.get(day["fieldname"])
- if attendance_on_day == 'Absent':
+ if attendance_on_day == "Absent":
total_absent_on_day += 1
- elif attendance_on_day in ['Present', 'Work From Home']:
+ elif attendance_on_day in ["Present", "Work From Home"]:
total_present_on_day += 1
- elif attendance_on_day == 'Half Day':
+ elif attendance_on_day == "Half Day":
total_present_on_day += 0.5
total_leaves_on_day += 0.5
- elif attendance_on_day == 'On Leave':
+ elif attendance_on_day == "On Leave":
total_leaves_on_day += 1
absent.append(total_absent_on_day)
@@ -548,14 +607,14 @@
leave.append(total_leaves_on_day)
return {
- 'data': {
- 'labels': labels,
- 'datasets': [
- {'name': 'Absent', 'values': absent},
- {'name': 'Present', 'values': present},
- {'name': 'Leave', 'values': leave},
- ]
+ "data": {
+ "labels": labels,
+ "datasets": [
+ {"name": "Absent", "values": absent},
+ {"name": "Present", "values": present},
+ {"name": "Leave", "values": leave},
+ ],
},
- 'type': 'line',
- 'colors': ['red', 'green', 'blue'],
- }
\ No newline at end of file
+ "type": "line",
+ "colors": ["red", "green", "blue"],
+ }
diff --git a/erpnext/hr/report/monthly_attendance_sheet/test_monthly_attendance_sheet.py b/erpnext/hr/report/monthly_attendance_sheet/test_monthly_attendance_sheet.py
index cc899eb..2f3cb53 100644
--- a/erpnext/hr/report/monthly_attendance_sheet/test_monthly_attendance_sheet.py
+++ b/erpnext/hr/report/monthly_attendance_sheet/test_monthly_attendance_sheet.py
@@ -7,10 +7,7 @@
from erpnext.hr.doctype.employee.test_employee import make_employee
from erpnext.hr.doctype.holiday_list.test_holiday_list import set_holiday_list
from erpnext.hr.doctype.leave_application.test_leave_application import make_allocation_record
-from erpnext.hr.report.monthly_attendance_sheet.monthly_attendance_sheet import (
- execute,
- get_total_days_in_month,
-)
+from erpnext.hr.report.monthly_attendance_sheet.monthly_attendance_sheet import execute
from erpnext.payroll.doctype.salary_slip.test_salary_slip import make_leave_application
test_dependencies = ["Shift Type"]