refactor: consider timeslots in `get_employee_shift`
diff --git a/erpnext/hr/doctype/shift_assignment/shift_assignment.py b/erpnext/hr/doctype/shift_assignment/shift_assignment.py
index f51a860..86564e0 100644
--- a/erpnext/hr/doctype/shift_assignment/shift_assignment.py
+++ b/erpnext/hr/doctype/shift_assignment/shift_assignment.py
@@ -7,7 +7,7 @@
import frappe
from frappe import _
from frappe.model.document import Document
-from frappe.query_builder import Criterion
+from frappe.query_builder import Criterion, Column
from frappe.utils import cstr, get_link_to_form, getdate, now_datetime, nowdate
from erpnext.hr.doctype.employee.employee import get_holiday_list_for_employee
@@ -171,102 +171,124 @@
return shift_timing_map
-def get_employee_shift(
- employee, for_date=None, consider_default_shift=False, next_shift_direction=None
-):
+def get_shift_for_time(shifts, for_timestamp):
+ for entry in shifts:
+ shift_details = get_shift_details(entry.shift_type, for_date=for_timestamp.date())
+ if shift_details.actual_start <= for_timestamp <= shift_details.actual_end:
+ return shift_details
+
+
+def get_shifts_for_date(employee, for_timestamp):
+ assignment = frappe.qb.DocType('Shift Assignment')
+
+ return (
+ frappe.qb.from_(assignment)
+ .select(assignment.name, assignment.shift_type)
+ .where(
+ (assignment.employee == employee)
+ & (assignment.docstatus == 1)
+ & (assignment.status == 'Active')
+ & (assignment.start_date <= getdate(for_timestamp.date()))
+ & (
+ Criterion.any([
+ assignment.end_date.isnull(),
+ (assignment.end_date.isnotnull() & (getdate(for_timestamp.date()) >= assignment.end_date))
+ ])
+ )
+ )
+ ).run(as_dict=True)
+
+
+def get_shift_for_timestamp(employee, for_timestamp):
+ shifts = get_shifts_for_date(employee, for_timestamp)
+ if shifts:
+ return get_shift_for_time(shifts, for_timestamp)
+ return None
+
+
+def get_employee_shift(employee, for_timestamp=None, consider_default_shift=False, next_shift_direction=None):
"""Returns a Shift Type for the given employee on the given date. (excluding the holidays)
:param employee: Employee for which shift is required.
- :param for_date: Date on which shift are required
+ :param for_timestamp: DateTime on which shift is required
:param consider_default_shift: If set to true, default shift is taken when no shift assignment is found.
:param next_shift_direction: One of: None, 'forward', 'reverse'. Direction to look for next shift if shift not found on given date.
"""
- if for_date is None:
- for_date = nowdate()
- default_shift = frappe.db.get_value("Employee", employee, "default_shift")
- shift_type_name = None
- shift_assignment_details = frappe.db.get_value(
- "Shift Assignment",
- {"employee": employee, "start_date": ("<=", for_date), "docstatus": "1", "status": "Active"},
- ["shift_type", "end_date"],
- )
+ if for_timestamp is None:
+ for_timestamp = now_datetime()
- if shift_assignment_details:
- shift_type_name = shift_assignment_details[0]
+ shift_details = get_shift_for_timestamp(employee, for_timestamp)
- # if end_date present means that shift is over after end_date else it is a ongoing shift.
- if shift_assignment_details[1] and for_date >= shift_assignment_details[1]:
- shift_type_name = None
+ # if shift assignment is not found, consider default shift
+ default_shift = frappe.db.get_value('Employee', employee, 'default_shift')
+ if not shift_details and consider_default_shift:
+ shift_details = get_shift_details(default_shift, for_timestamp.date())
- if not shift_type_name and consider_default_shift:
- shift_type_name = default_shift
- if shift_type_name:
- holiday_list_name = frappe.db.get_value("Shift Type", shift_type_name, "holiday_list")
- if not holiday_list_name:
- holiday_list_name = get_holiday_list_for_employee(employee, False)
- if holiday_list_name and is_holiday(holiday_list_name, for_date):
- shift_type_name = None
+ # if its a holiday, reset
+ if shift_details and is_holiday_date(employee, shift_details):
+ shift_details = None
- if not shift_type_name and next_shift_direction:
- MAX_DAYS = 366
- if consider_default_shift and default_shift:
- direction = -1 if next_shift_direction == "reverse" else +1
- for i in range(MAX_DAYS):
- date = for_date + timedelta(days=direction * (i + 1))
- shift_details = get_employee_shift(employee, date, consider_default_shift, None)
+ # if no shift is found, find next or prev shift based on direction
+ if not shift_details and next_shift_direction:
+ shift_details = get_prev_or_next_shift(employee, for_timestamp, consider_default_shift, default_shift, next_shift_direction)
+
+ return shift_details
+
+
+def get_prev_or_next_shift(employee, for_timestamp, consider_default_shift, default_shift, next_shift_direction):
+ MAX_DAYS = 366
+ shift_details = None
+
+ if consider_default_shift and default_shift:
+ direction = -1 if next_shift_direction == 'reverse' else 1
+ for i in range(MAX_DAYS):
+ date = for_timestamp + timedelta(days=direction*(i+1))
+ shift_details = get_employee_shift(employee, date, consider_default_shift, None)
+ if shift_details:
+ break
+ else:
+ direction = '<' if next_shift_direction == 'reverse' else '>'
+ sort_order = 'desc' if next_shift_direction == 'reverse' else 'asc'
+ dates = frappe.db.get_all('Shift Assignment',
+ ['start_date', 'end_date'],
+ {'employee':employee, 'start_date': (direction, for_timestamp.date()), 'docstatus': '1', "status": "Active"},
+ as_list=True,
+ limit=MAX_DAYS, order_by='start_date ' + sort_order)
+
+ if dates:
+ for date in dates:
+ if date[1] and date[1] < for_timestamp.date():
+ continue
+ shift_details = get_employee_shift(employee, datetime.combine(date, for_timestamp.time()), consider_default_shift, None)
if shift_details:
- shift_type_name = shift_details.shift_type.name
- for_date = date
break
- else:
- direction = "<" if next_shift_direction == "reverse" else ">"
- sort_order = "desc" if next_shift_direction == "reverse" else "asc"
- dates = frappe.db.get_all(
- "Shift Assignment",
- ["start_date", "end_date"],
- {
- "employee": employee,
- "start_date": (direction, for_date),
- "docstatus": "1",
- "status": "Active",
- },
- as_list=True,
- limit=MAX_DAYS,
- order_by="start_date " + sort_order,
- )
- if dates:
- for date in dates:
- if date[1] and date[1] < for_date:
- continue
- shift_details = get_employee_shift(employee, date[0], consider_default_shift, None)
- if shift_details:
- shift_type_name = shift_details.shift_type.name
- for_date = date[0]
- break
+ return shift_details
- return get_shift_details(shift_type_name, for_date)
+
+def is_holiday_date(employee, shift_details):
+ holiday_list_name = frappe.db.get_value('Shift Type', shift_details.shift_type.name, 'holiday_list')
+
+ if not holiday_list_name:
+ holiday_list_name = get_holiday_list_for_employee(employee, False)
+
+ return holiday_list_name and is_holiday(holiday_list_name, shift_details.start_datetime.date())
def get_employee_shift_timings(employee, for_timestamp=None, consider_default_shift=False):
"""Returns previous shift, current/upcoming shift, next_shift for the given timestamp and employee"""
if for_timestamp is None:
for_timestamp = now_datetime()
+
# write and verify a test case for midnight shift.
prev_shift = curr_shift = next_shift = None
- curr_shift = get_employee_shift(employee, for_timestamp.date(), consider_default_shift, "forward")
+ curr_shift = get_employee_shift(employee, for_timestamp, consider_default_shift, 'forward')
if curr_shift:
- next_shift = get_employee_shift(
- employee,
- curr_shift.start_datetime.date() + timedelta(days=1),
- consider_default_shift,
- "forward",
- )
- prev_shift = get_employee_shift(
- employee, for_timestamp.date() + timedelta(days=-1), consider_default_shift, "reverse"
- )
+ next_shift = get_employee_shift(employee, curr_shift.start_datetime + timedelta(days=1), consider_default_shift, 'forward')
+ prev_shift = get_employee_shift(employee, for_timestamp + timedelta(days=-1), consider_default_shift, 'reverse')
if curr_shift:
+ # adjust actual start and end times if they are overlapping with grace period (before start and after end)
if prev_shift:
curr_shift.actual_start = (
prev_shift.end_datetime
@@ -292,6 +314,38 @@
return prev_shift, curr_shift, next_shift
+def get_actual_start_end_datetime_of_shift(employee, for_datetime, consider_default_shift=False):
+ """Takes a datetime and returns the 'actual' start datetime and end datetime of the shift in which the timestamp belongs.
+ Here 'actual' means - taking in to account the "begin_check_in_before_shift_start_time" and "allow_check_out_after_shift_end_time".
+ None is returned if the timestamp is outside any actual shift timings.
+ Shift Details is also returned(current/upcoming i.e. if timestamp not in any actual shift then details of next shift returned)
+ """
+ actual_shift_start = actual_shift_end = shift_details = None
+ shift_timings_as_per_timestamp = get_employee_shift_timings(employee, for_datetime, consider_default_shift)
+ timestamp_list = []
+
+ for shift in shift_timings_as_per_timestamp:
+ if shift:
+ timestamp_list.extend([shift.actual_start, shift.actual_end])
+ else:
+ timestamp_list.extend([None, None])
+
+ timestamp_index = None
+ for index, timestamp in enumerate(timestamp_list):
+ if timestamp and for_datetime <= timestamp:
+ timestamp_index = index
+ break
+
+ if timestamp_index and timestamp_index%2 == 1:
+ shift_details = shift_timings_as_per_timestamp[int((timestamp_index-1)/2)]
+ actual_shift_start = shift_details.actual_start
+ actual_shift_end = shift_details.actual_end
+ elif timestamp_index:
+ shift_details = shift_timings_as_per_timestamp[int(timestamp_index/2)]
+
+ return actual_shift_start, actual_shift_end, shift_details
+
+
def get_shift_details(shift_type_name, for_date=None):
"""Returns Shift Details which contain some additional information as described below.
'shift_details' contains the following keys:
@@ -319,43 +373,10 @@
)
actual_end = end_datetime + timedelta(minutes=shift_type.allow_check_out_after_shift_end_time)
- return frappe._dict(
- {
- "shift_type": shift_type,
- "start_datetime": start_datetime,
- "end_datetime": end_datetime,
- "actual_start": actual_start,
- "actual_end": actual_end,
- }
- )
-
-
-def get_actual_start_end_datetime_of_shift(employee, for_datetime, consider_default_shift=False):
- """Takes a datetime and returns the 'actual' start datetime and end datetime of the shift in which the timestamp belongs.
- Here 'actual' means - taking in to account the "begin_check_in_before_shift_start_time" and "allow_check_out_after_shift_end_time".
- None is returned if the timestamp is outside any actual shift timings.
- Shift Details is also returned(current/upcoming i.e. if timestamp not in any actual shift then details of next shift returned)
- """
- actual_shift_start = actual_shift_end = shift_details = None
- shift_timings_as_per_timestamp = get_employee_shift_timings(
- employee, for_datetime, consider_default_shift
- )
- timestamp_list = []
- for shift in shift_timings_as_per_timestamp:
- if shift:
- timestamp_list.extend([shift.actual_start, shift.actual_end])
- else:
- timestamp_list.extend([None, None])
- timestamp_index = None
- for index, timestamp in enumerate(timestamp_list):
- if timestamp and for_datetime <= timestamp:
- timestamp_index = index
- break
- if timestamp_index and timestamp_index % 2 == 1:
- shift_details = shift_timings_as_per_timestamp[int((timestamp_index - 1) / 2)]
- actual_shift_start = shift_details.actual_start
- actual_shift_end = shift_details.actual_end
- elif timestamp_index:
- shift_details = shift_timings_as_per_timestamp[int(timestamp_index / 2)]
-
- return actual_shift_start, actual_shift_end, shift_details
+ return frappe._dict({
+ 'shift_type': shift_type,
+ 'start_datetime': start_datetime,
+ 'end_datetime': end_datetime,
+ 'actual_start': actual_start,
+ 'actual_end': actual_end
+ })