trytonpsk-surveillance/schedule.py

1198 lines
41 KiB
Python

# This file is part of Tryton. The COPYRIGHT file at the top level of
# this repository contains the full copyright notices and license terms.
import copy
from decimal import Decimal
from datetime import date, timedelta, datetime
from collections import OrderedDict
from trytond.model import Workflow, ModelView, ModelSQL, fields
from trytond.pyson import Eval
from trytond.transaction import Transaction
from trytond.wizard import (
Wizard, StateView, Button, StateTransition, StateReport
)
from trytond.pool import Pool
from trytond.report import Report
localeWeekday = {
7: 'D',
1: 'L',
2: 'M',
3: 'M',
4: 'J',
5: 'V',
6: 'S',
}
STATES = {
'readonly': (Eval('state') != 'draft'),
}
_ZERO = Decimal('0.0')
_HEADERS = [
('PUESTO', '7%'),
('NOMBRE', '12%'),
# ('LUGAR', '12%')
]
PAYMENT_METHOD = [
(None, ''),
('extratime', 'Extratime'),
('fixed_amount', 'Fixed Amount'),
]
DAYS_WEEK = [
'monday',
'tuesday',
'wednesday',
'thursday',
'friday',
'saturday',
'sunday',
# 'holiday',
]
class SurveillanceSchedule(Workflow, ModelSQL, ModelView):
'Surveillance Schedule'
__name__ = 'surveillance.schedule'
_rec_name = 'guard'
location = fields.Many2One('surveillance.location', 'Location',
required=True, states=STATES)
company = fields.Many2One('company.company', 'Company', required=True)
period = fields.Many2One('account.period', 'Period', states=STATES)
contract = fields.Many2One('sale.contract', 'Contract', states=STATES)
plan_line = fields.Many2One('surveillance.plan.line', 'Plan',
states={
'readonly': True, 'required': False
})
shifts = fields.One2Many('surveillance.schedule.shift', 'schedule',
'Shifts', states=STATES)
operation_center = fields.Many2One('company.operation_center', 'OC',
required=False)
comment = fields.Text('Comment', states=STATES)
state = fields.Selection([
('draft', 'Draft'),
('planned', 'Planned'),
('done', 'Done'),
('cancelled', 'Cancelled'),
], 'State', readonly=True, required=True)
state_string = state.translated('state')
next_pattern_shift = fields.Char('Next Pattern Shift')
@classmethod
def __setup__(cls):
super(SurveillanceSchedule, cls).__setup__()
cls._order = [
('period', 'DESC'),
]
cls._transitions |= set((
('draft', 'planned'),
('draft', 'cancel'),
('planned', 'draft'),
('planned', 'done'),
('done', 'planned'),
))
cls._buttons.update({
'draft': {
'invisible': ~Eval('state').in_(['planned', 'cancelled']),
},
'planned': {
'invisible': ~Eval('state').in_(['draft', 'done']),
},
'cancel': {
'invisible': Eval('state') != 'draft',
},
'done': {
'invisible': Eval('state') != 'planned',
}
})
def get_rec_name(self, name):
return f'{self.location.name} | {self.period.name}'
@classmethod
def get_calendar(cls, args):
res = []
# FIXME
Period = Pool().get('surveillance.schedule')
if args.get('period_id'):
period_id = args.get('period_id')
else:
today = date.today()
periods = Period.search([
('start_date', '<=', today),
('end_date', '>=', today),
('type', '=', 'standard')
])
if periods:
period_id = periods[0].id
else:
return []
dom = [
('period', '=', period_id),
]
if args.get('customer_id'):
dom.append(('customer', '=', args['customer_id']))
schedule = cls.search(dom)
for s in schedule:
header = [{
'id': s.id,
'name': s.guard.rec_name,
'location': s.location.rec_name
}]
records = []
for ws in s.shifts:
records.append({
'id': ws.id,
'column': str(ws.shift_date),
'value': ws.kind.code,
})
res.append({
'header': header,
'redords': records
})
return res
@staticmethod
def default_company():
return Transaction().context.get('company')
@staticmethod
def default_state():
return 'draft'
@classmethod
@ModelView.button
@Workflow.transition('draft')
def draft(cls, records):
pass
@classmethod
@ModelView.button
@Workflow.transition('cancelled')
def cancel(cls, records):
pass
@classmethod
@ModelView.button
@Workflow.transition('planned')
def planned(cls, records):
pass
# cls.set_number(records)
@classmethod
@ModelView.button
@Workflow.transition('done')
def done(cls, records):
for record in records:
pass
@classmethod
def check_schedule(cls, args):
Shift = Pool().get('surveillance.schedule.shift')
shifts = Shift.search([
('guard', '=', args['employee']),
('shift_date', '=', args['shift_date'])
])
if len(shifts) > 1:
return 'invalid'
return True
@classmethod
def getValueDate(cls, d):
res = {
'formattedDate': d.strftime('%Y-%m-%d'),
'labelDate': d.strftime('%d'),
'style': 'sheet-day',
}
weekday = d.isoweekday()
res['localeWeekday'] = localeWeekday[weekday]
if weekday == 7:
res['style'] = 'sheet-holiday'
return res
@classmethod
def create_sheet_row(cls, guard, position):
tags_guard = [{tag.id: tag.name} for tag in guard.tags]
row = [
{
'id': position.id,
'value': position.name,
'readOnly': True,
'style': 'sheet-position',
}, {
'id': guard.id,
'value': guard.rec_name,
'readOnly': True,
'tags': tags_guard,
'style': 'sheet-guard',
'info': {
'id_number': guard.party.id_number,
},
},
]
return row
@classmethod
def get_sheet(cls, args):
period_id = args.get('period')
pool = Pool()
Period = pool.get('account.period')
ShiftKind = pool.get('staff.shift.kind')
Employee = pool.get('company.employee')
_kinds = ShiftKind.search([])
kinds = {k.code: k.id for k in _kinds}
kinds['0'] = None
period = Period(period_id)
headers = []
guards_ids = []
for h in _HEADERS:
headers.append({
'label': h[0],
'readOnly': True,
'style': 'sheet-head-location'
})
start_date = period.start_date
delta = (period.end_date - start_date).days + 1
dates_of_month = [start_date + timedelta(days=i) for i in range(delta)]
for d in dates_of_month:
res = cls.getValueDate(d)
headers.append({
'label': res['labelDate'],
'value': res['localeWeekday'] + res['labelDate'],
'localeWeekday': res['localeWeekday'],
'readOnly': True,
'style': 'sheet-head-day'
})
days_headers = headers[2:]
dom = [
('period', '=', period_id),
]
if args.get('locations'):
dom.append(('location', 'in', args['locations']))
if args.get('searchText'):
target_text = args.get('searchText')
string = f'%{target_text}%'
guards_target = Employee.search_read(['OR', [
('party.name', 'ilike', string),
], [
('party.id_number', 'ilike', string),
]
], fields_names=['id'])
guards_ids = [gt['id'] for gt in guards_target]
sub_dom = ['OR', [('location.name', 'ilike', string)]]
if guards_ids:
sub_dom.append([('shifts.guard', 'in', guards_ids)])
dom.append(sub_dom)
schedules = cls.search(dom, order=[('location.name', 'ASC')])
count_id = 0
days_month = {}
for d in dates_of_month:
count_id -= 1
days_month[str(d)] = {
'id': count_id,
}
def get_days_month(sch, loc_id, guard_id, position_id):
days = copy.deepcopy(days_month)
payment_method = sch.plan_line.shift_payment_method
sch_id = sch.id
for k, v in days.items():
v.update({
'shift_date': k,
'location': loc_id,
'guard': guard_id,
'position': position_id,
'schedule': sch_id,
'payment_method': payment_method,
})
return days
days_lines = {}
locations = {}
def _get_row_data(guard, position, loc_id, sch):
row = cls.create_sheet_row(guard, position)
days = get_days_month(
sch, loc_id, guard.id, position.id
)
return row, days
for sch in schedules:
loc = sch.location
loc_id = sch.location.id
loc_name = sch.location.name
plan_shifts = []
# This part orders the row shifts by position sequence
plan = sch.plan_line
if not plan:
continue
for patt in plan.shifts_pattern + plan.shifts_week:
if guards_ids and patt.guard.id not in guards_ids:
continue
plan_shifts.append((
int(patt.sequence),
patt.guard,
patt.position,
plan.shift_payment_method
))
plan_shifts.sort(key=lambda x: x[0])
ord_shifts = OrderedDict()
for [seq, guard, pos, pay] in plan_shifts:
row, days = _get_row_data(guard, pos, loc_id, sch)
ord_shifts[(guard.id, pos.id)] = row
days_lines[(loc_id, guard.id, pos.id)] = days
# End order
locations[loc_id] = {
'id': loc_id,
'value': loc_name,
'readOnly': True,
'colSpan': delta,
'tags': [{'id': tg.id, 'name': tg.name} for tg in loc.tags],
'shifts': ord_shifts,
'style': 'sheet-head-location'
}
for shift in sch.shifts:
guard = shift.guard
guard_id = guard.id
guard_name = guard.rec_name
kind = shift.kind
event = shift.event
event_category = shift.event_category
position = shift.position
pos_id = shift.position.id
_kind = {}
_position = {}
_event = None
_event_category = None
if guards_ids and guard_id not in guards_ids:
continue
if (loc_id, guard_id, pos_id) not in days_lines.keys():
row, days = _get_row_data(
guard,
position,
loc_id,
sch)
locations[loc_id]['shifts'][(guard_id, pos_id)] = row
days_lines[(loc_id, guard_id, pos_id)] = days
res = cls.getValueDate(shift.shift_date)
if event:
_event = {
'id': event.id,
'name': event.category.name,
}
if event_category:
_event_category = {
'id': event_category.id,
'name': event_category.name,
}
if position:
_position = {
'id': position.id,
'name': position.name,
}
if kind:
_kind = {
'id': kind.id,
'name': kind.name,
'start': kind.start,
'end': kind.end,
'mode': kind.mode,
}
enter_dtime = None
exit_dtime = None
if shift.enter_dtime:
enter_dtime = shift.enter_dtime
exit_dtime = shift.exit_dtime
val = {
'id': shift.id,
'schedule': sch.id,
'enter_dtime': enter_dtime,
'exit_dtime': exit_dtime,
'guard': {
'id': guard.id,
'name': guard_name,
'photo': guard.photo_link
},
'guard_photo': guard.photo_link,
'location': {'id': loc.id, 'name': loc_name},
'location_id': loc.id,
'period_id': period_id,
'shift_date': res['formattedDate'],
'value': kind.code if kind else None,
'extra_shift': shift.extra_shift,
'kind': _kind,
'event': _event,
'position': _position,
'event_category': _event_category,
'event_start_date': shift.event_start_date,
'event_end_date': shift.event_end_date,
'notes': shift.notes,
'state': shift.state,
'style': res['style']
}
days_lines[(loc_id, guard_id, pos_id)][str(shift.shift_date)] = val
data = []
_append = data.append
for loc_id, row_loc in locations.items():
shifts = row_loc.pop('shifts')
row_loc_header = [row_loc] + days_headers
_append(row_loc_header)
for (guard_id, pos_id), row in shifts.items():
row.extend(days_lines[(loc_id, guard_id, pos_id)].values())
_append(row)
return (headers, data, kinds)
class ScheduleShift(Workflow, ModelSQL, ModelView):
'Schedule Shift'
__name__ = 'surveillance.schedule.shift'
_rec_name = 'shift_date'
schedule = fields.Many2One('surveillance.schedule', 'Schedule',
required=True, ondelete='CASCADE')
kind = fields.Many2One('staff.shift.kind', 'Kind')
shift_date = fields.Date('Shift Date', required=True)
enter_dtime = fields.DateTime('Enter Date')
exit_dtime = fields.DateTime('Exit Date')
guard = fields.Many2One('company.employee', 'Guard', required=True)
contract_service = fields.Many2One('sale.contract.product.position',
'Contract Service')
position = fields.Many2One('surveillance.location.position', 'Position',
required=False, states=STATES)
notes = fields.Text('Notes')
absenteeism = fields.Boolean('Absenteeism')
multiple_days = fields.Boolean('Multiple Days')
extra_shift = fields.Boolean('Extra Shift')
type = fields.Selection([
('work', 'Work'),
('day_off', 'Day Off'),
('not_assigned', 'Not Assigned'),
('absenteeism', 'Absenteeism'),
], 'Type', required=True)
payment_method = fields.Selection(PAYMENT_METHOD, 'Payment Method',
required=True)
access = fields.Many2One('staff.access', 'Access')
event = fields.Many2One('staff.event', 'Event', states={
'readonly': True,
# 'invisible': ~Eval('is_event')
})
event_category = fields.Many2One('staff.event_category', 'Event Category',
states={
'readonly': False,
# 'invisible': ~Eval('is_event')
})
event_start_date = fields.Date('Event Date Start', states={
'invisible': ~Eval('is_event')
})
event_end_date = fields.Date('Event Date End', states={
'invisible': ~Eval('is_event')
})
event_description = fields.Text("Event Description", states={
'invisible': ~Eval('is_event')
})
shift_amount = fields.Numeric('Shift Amount', digits=(16, 2),
states={
'readonly': True,
'invisible': Eval('payment_mode') == 'extratime',
})
state = fields.Selection([
('draft', 'Draft'),
('performed', 'Performed'),
], 'State', required=True)
extra_payments = fields.One2Many(
'surveillance.schedule.shift.extra_payment',
'shift', 'Extra Payments')
location = fields.Function(fields.Many2One('surveillance.location',
'Location'), 'get_location')
# duration = fields.Function(fields.Many2One('surveillance.location',
# 'Location'), 'get_location')
@classmethod
def __setup__(cls):
super(ScheduleShift, cls).__setup__()
cls._order = [('shift_date', 'ASC')]
@staticmethod
def default_payment_method():
return 'fixed_amount'
@staticmethod
def default_state():
return 'draft'
@fields.depends('kind', 'shift_date', 'enter_dtime', 'exit_dtime')
def on_change_kind(self):
Company = Pool().get('company.company')
if self.kind and self.shift_date and self.kind.mode == 'work':
_enter = datetime.combine(self.shift_date, self.kind.start)
enter_dtime = Company.convert_timezone(_enter, to_utc=True)
exit_dtime = enter_dtime + timedelta(hours=self.kind.total_time)
self.enter_dtime = enter_dtime
self.exit_dtime = exit_dtime
self.event_category = None
else:
self.enter_dtime = None
self.exit_dtime = None
@fields.depends('shift_amount', 'extra_shift', 'position')
def on_change_extra_shift(self):
if self.extra_shift and self.position:
self.shift_amount = self.position.extra_shift_amount
else:
self.shift_amount = self.position.shift_amount
def get_location(self, name=None):
if self.schedule:
return self.schedule.location.id
@classmethod
def create(cls, vlist):
pool = Pool()
Position = pool.get('surveillance.location.position')
for val in vlist:
if val.get('kind'):
data = cls.get_enter_exit_time(
val.get('kind'), val.get('shift_date')
)
val.update(data)
if val['position']:
position, = Position.browse([val['position']])
amount = position.shift_amount
if val.get('extra_shift'):
amount = position.extra_shift_amount
if amount:
val['shift_amount'] = amount
extras = []
for payment in position.extra_payments:
extras.append({
'wage_type': payment.wage_type.id,
'amount': payment.amount,
})
if not val.get('extra_payments') and extras:
val['extra_payments'] = [('create', extras)]
records_ids = super(ScheduleShift, cls).create(vlist)
return records_ids
@classmethod
def write(cls, records, *args):
super(ScheduleShift, cls).write(records, *args)
for rec in records:
for values in args:
if not isinstance(values, dict):
continue
if values.get('kind') and (not values.get('enter_dtime')
and not values.get('exit_dtime')):
rec.on_change_kind()
if values.get('extra_shift') in (True, False):
rec.on_change_extra_shift()
rec.save()
@classmethod
def combine_dates(cls, shift_date, kind):
Company = Pool().get('company.company')
_enter = datetime.combine(shift_date, kind.start)
enter_dtime = Company.convert_timezone(_enter, to_utc=True)
exit_dtime = enter_dtime + timedelta(hours=kind.total_time)
return enter_dtime, exit_dtime
@classmethod
def validate_shift(cls, args):
pool = Pool()
Kind = pool.get('staff.shift.kind')
guard = args['guard']
if type(guard) == dict:
employee_id = guard['id']
else:
employee_id = guard
kind = Kind(args['kind'])
if kind.mode != 'work':
return
shift_date = datetime.strptime(args['shift_date'], '%Y-%m-%d')
enter_dtime, exit_dtime = cls.combine_dates(shift_date, kind)
prev_enter_dtime = enter_dtime - timedelta(hours=8)
prev_exit_dtime = exit_dtime + timedelta(hours=8)
shifts = cls.search(['OR', [
('enter_dtime', '<=', enter_dtime),
('exit_dtime', '>=', enter_dtime),
('guard', '=', employee_id),
], [
('enter_dtime', '>=', exit_dtime),
('exit_dtime', '<=', exit_dtime),
('guard', '=', employee_id),
], [
('enter_dtime', '>=', prev_enter_dtime),
('exit_dtime', '<=', prev_enter_dtime),
('guard', '=', employee_id),
], [
('enter_dtime', '>=', prev_exit_dtime),
('exit_dtime', '<=', prev_exit_dtime),
('guard', '=', employee_id),
],
], order=[('id', 'ASC')])
same_shift = False
for shift in shifts:
if str(shift.shift_date) == args['shift_date'] and not same_shift:
same_shift = True
continue
loc = shifts[0].schedule.location.name
res = f'El turno registrado se cruza con {loc} para la misma fecha'
return res
@classmethod
def get_enter_exit_time(cls, kind_id, date_):
pool = Pool()
Kind = pool.get('staff.shift.kind')
Company = pool.get('company.company')
kind = Kind(kind_id)
if isinstance(date_, str):
shift_date = datetime.strptime(date_, '%Y-%m-%d')
else:
shift_date = date_
enter_dtime = None
exit_dtime = None
if kind and kind.start:
_enter = datetime.combine(shift_date, kind.start)
enter_dtime = Company.convert_timezone(_enter, to_utc=True)
exit_dtime = enter_dtime + timedelta(hours=kind.total_time)
res = {
'enter_dtime': enter_dtime,
'exit_dtime': exit_dtime,
}
return res
@classmethod
def better_guards(cls, args):
shift_id = args.get('shift')
shift = cls(shift_id)
pool = Pool()
Contract = pool.get('staff.contract')
oc_id = shift.schedule.location.operation_center.id
shifts = cls.search([
('shift_date', '=', shift.shift_date),
('kind.mode', '=', 'work'),
('schedule.location.operation_center', '=', oc_id),
])
not_guards_ids = [sh.guard.id for sh in shifts]
contracts = Contract.search([
('start_date', '<=', shift.shift_date),
('end_date', '>=', shift.shift_date),
('state', '=', 'active'),
('employee.operation_center', '=', oc_id),
('employee.guard', '=', True),
('employee', 'not in', not_guards_ids),
])
tags_target = [tag.id for tag in shift.schedule.location.tags]
employees = (contract.employee for contract in contracts)
target_guards = []
for employee in employees:
weight_guard = []
for tag in employee.tags:
if tag.id in tags_target:
weight_guard.append(tag.weight)
target_guards.append((
sum(weight_guard), {
'id': employee.id,
'name': employee.party.name,
'weight': sum(weight_guard),
}
))
targets = sorted(target_guards, reverse=True)
return targets
@classmethod
def replicate_shift(cls, args):
event_category = args.get('event_category')
start_date = args.get('event_start_date')
end_date = args.get('event_end_date')
kind = args.get('kind')
shift, = cls.browse([args.get('id')])
target_shifts = cls.search([
('schedule', '=', shift.schedule.id),
('guard', '=', shift.guard.id),
('shift_date', '>=', start_date),
('shift_date', '<=', end_date),
('state', '=', 'draft'),
], order=[('shift_date', 'ASC')])
for ts in target_shifts:
if str(ts.shift_date) == start_date:
ts.event_category = event_category['id']
ts.kind = kind['id']
ts.enter_dtime = None
ts.exit_dtime = None
ts.position = ts.position.id if ts.position else None
ts.type = 'absenteeism'
ts.shift_amount = 0
ts.save()
@classmethod
def insert_shift(cls, args, ctx=None):
pool = Pool()
Schedule = pool.get('surveillance.schedule')
Period = pool.get('account.period')
company_id = Transaction().context.get('company')
period_id = Period.find(company_id, args['date'])
if not period_id:
return {}
schedules = Schedule.search([
('period', '=', period_id),
('location', '=', args['location']['id']),
])
if not schedules:
schedule, = Schedule.create([{
'period': period_id,
'location': args['location']['id'],
'company': company_id,
'state': 'draft',
}])
else:
schedule = schedules[0]
enter_dtime = args.get('enter_dtime', None)
exit_dtime = args.get('exit_dtime', None)
to_create = {
'schedule': schedule.id,
'kind': args['kind']['id'],
'guard': args['guard']['id'],
'shift_date': args['date'],
'enter_dtime': enter_dtime,
'exit_dtime': exit_dtime,
'state': 'draft',
'type': 'work',
'position': args['position']['id'],
}
event_start_date = args.get('event_start_date')
event_end_date = args.get('event_end_date')
event_category = args.get('event_category')
multiple_days = args.get('replicate')
cloned_shift = None
if multiple_days:
to_create['multiple_days'] = args.get('replicate')
cloned_shift = copy.deepcopy(to_create)
cloned_shift.update({
'enter_dtime': None,
'exit_dtime': None,
})
if event_category:
to_create['event_category'] = event_category['id']
if event_start_date:
to_create['event_start_date'] = event_start_date
if event_end_date:
to_create['event_end_date'] = event_end_date
new_shift, = cls.create([to_create])
if event_start_date and event_end_date and multiple_days:
end_date = datetime.fromisoformat(event_end_date).date()
start_date = datetime.fromisoformat(event_start_date).date()
days = (end_date - start_date).days
for rg in range(1, days):
cloned_shift['shift_date'] = start_date + timedelta(days=rg)
cls.create([cloned_shift])
return {
'id': new_shift.id,
}
class ShiftExtraPayment(ModelSQL, ModelView):
'Shift Extra Payment'
__name__ = 'surveillance.schedule.shift.extra_payment'
shift = fields.Many2One('surveillance.schedule.shift', 'Position',
required=True, ondelete='CASCADE')
wage_type = fields.Many2One('staff.wage_type', 'Wage Type', required=True)
amount = fields.Numeric('Amount', digits=(16, 2), required=True)
class CreateScheduleShiftStart(ModelView):
'Create Schedule Shift Start'
__name__ = 'surveillance.create_schedule_shift.start'
company = fields.Many2One('company.company', 'Company', required=True)
period = fields.Many2One('account.period', 'Period', required=True,
domain=[
('start_date', '>=', date.today() - timedelta(days=90)),
])
customer = fields.Many2One('party.party', 'Customer')
@staticmethod
def default_company():
return Transaction().context.get('company')
class CreateScheduleShift(Wizard):
'Create Schedule Shift'
__name__ = 'surveillance.create_schedule_shift'
start = StateView(
'surveillance.create_schedule_shift.start',
'surveillance.create_schedule_shift_start_view_form', [
Button('Cancel', 'end', 'tryton-cancel'),
Button('Create', 'do_', 'tryton-ok', default=True)
])
do_ = StateTransition()
def get_cicle_fields_pattern(self, pattern):
# return ['day01', 'day02', 'day03', ...]
days = []
pad = '0'
for day in range(16):
day_ = 'day' + str(day + 1).rjust(2, pad)
valid_day = getattr(pattern, day_)
if valid_day:
days.append(day_)
return days
def get_cicle_fields_week(self, lweek):
days = []
for day in DAYS_WEEK:
days.append(day)
return days
def get_shifts_pattern(self, pattern, fields_day, month_days, next_day):
# Create redundant data repliying the pattern
name_fields = fields_day * 13
length_month = len(month_days)
_next_day = 0
if next_day:
_next_day = int(next_day.replace('day', '')) - 1
target_fields = name_fields[_next_day: length_month + _next_day]
next_pattern = name_fields[_next_day + length_month]
matrix = []
for day_ in target_fields:
matrix.append(getattr(pattern, day_))
pattern_guard = zip(month_days, matrix)
return pattern_guard, next_pattern
def get_shifts_week(self, lweek, fields_day, month_days, first_day):
name_fields = fields_day * 6 # Create redundant data
# We need split name_fieds according to first day of the month
name_fields = name_fields[first_day:]
length_month = len(month_days)
target_fields = name_fields[:length_month]
matrix = []
_append = matrix.append
for day_ in target_fields:
_append(getattr(lweek, day_))
pattern_guard = zip(month_days, matrix)
return pattern_guard
def get_range_dates(self, line):
range_dates = []
start_date = max([self.start.period.start_date, line.start_date])
end_date = min([self.start.period.end_date, line.end_date])
delta_dates = end_date - start_date + timedelta(days=1)
for i in range(delta_dates.days):
range_dates.append(start_date + timedelta(days=i))
return range_dates
def get_schedule_week(self, line, range_dates):
pool = Pool()
Company = pool.get('company.company')
Holidays = pool.get('staff.holidays')
shifts_ = []
fields_day = None
sh_append = shifts_.append
convert = Company.convert_timezone
combine = datetime.combine
first_day = self.start.period.start_date.weekday()
_holidays = Holidays.search([
('holiday', 'in', range_dates)
])
if _holidays:
_holidays = [ho.holiday for ho in _holidays]
contracted_positions = {}
if line.hired_service:
for pos in line.hired_service.positions_plan:
contracted_positions[pos.position.id] = pos.id
for lweek in line.shifts_week:
guard_id = lweek.guard.id if lweek.guard else None
if not fields_day:
fields_day = self.get_cicle_fields_week(lweek)
list_shifts = self.get_shifts_week(
lweek, fields_day, range_dates, first_day
)
for day_, shift_kind in list_shifts:
contract_service = None
if contracted_positions and lweek.position:
contract_service = contracted_positions[lweek.position.id]
shift_kind_id = None
_type = 'day_off'
_enter = None
_exit = None
if shift_kind:
shift_kind_id = shift_kind.id
_type = 'work'
enter_date = combine(day_, shift_kind.start)
_enter = convert(enter_date, to_utc=True)
_exit = _enter + timedelta(hours=shift_kind.total_time)
if day_ in _holidays:
shift_kind_id = lweek.holiday.id if lweek.holiday else None
if not shift_kind_id:
continue
sh_append({
'kind': shift_kind_id,
'guard': guard_id,
'position': lweek.position.id,
'shift_date': day_,
'enter_dtime': _enter,
'exit_dtime': _exit,
'type': _type,
'contract_service': contract_service,
'state': 'draft',
'payment_method': line.shift_payment_method,
'shift_amount': lweek.position.shift_amount,
})
return shifts_
def get_schedule_pattern(self, line, range_dates, next_day):
pool = Pool()
Company = pool.get('company.company')
shifts_ = []
sh_append = shifts_.append
# fields_day = None
contracted_positions = {}
if line.hired_service:
for pos in line.hired_service.positions_plan:
contracted_positions[pos.position.id] = pos.id
convert_tz = Company.convert_timezone
for lpattern in line.shifts_pattern:
guard_id = lpattern.guard.id if lpattern.guard else None
# if not fields_day:
fields_day = self.get_cicle_fields_pattern(lpattern)
list_shifts, next_pattern = self.get_shifts_pattern(
lpattern, fields_day, range_dates, next_day)
for day_, shift_kind in list_shifts:
position = lpattern.position
contract_service = None
if shift_kind.mode == 'none':
continue
if contracted_positions and position:
contract_service = contracted_positions[position.id]
shift_kind_id = None
_type = 'day_off'
if shift_kind:
shift_kind_id = shift_kind.id
_type = 'work'
if shift_kind:
enter_date = datetime.combine(day_, shift_kind.start)
_enter = convert_tz(enter_date, to_utc=True)
_exit = _enter + timedelta(hours=shift_kind.total_time)
else:
_enter = None
_exit = None
extras = []
for payment in position.extra_payments:
extras.append({
'wage_type': payment.wage_type.id,
'amount': payment.amount,
})
shift = {
'kind': shift_kind_id,
'guard': guard_id,
'position': position.id,
'shift_date': day_,
'enter_dtime': _enter,
'exit_dtime': _exit,
'type': _type,
'contract_service': contract_service,
'state': 'draft',
'payment_method': line.shift_payment_method,
'shift_amount': position.shift_amount,
}
if extras:
shift['extra_payments'] = [('create', extras)]
sh_append(shift)
return shifts_, next_pattern
def transition_do_(self):
pool = Pool()
Schedule = pool.get('surveillance.schedule')
PlanLine = pool.get('surveillance.plan.line')
dom = [
('plan.state', '=', 'running'),
('plan.company', '=', self.start.company.id)
]
if self.start.customer:
dom.append(
('plan.customer', '=', self.start.customer.id)
)
period_id = self.start.period.id
current_schs = Schedule.search_read([
('period', '=', period_id)
], fields_names=['location'])
if current_schs:
ignore_locs_ids = [cs['location'] for cs in current_schs]
dom.append(
('location', 'not in', ignore_locs_ids)
)
plans_lines = PlanLine.search(dom)
to_create = []
for line in plans_lines:
next_pattern = None
if not line.shifts_pattern and not line.shifts_week:
continue
next_pattern_shift = None
last_schedules = Schedule.search_read([
('location', '=', line.location.id)
], fields_names=['location', 'next_pattern_shift'],
order=[('period.start_date', 'DESC')],
limit=1)
if last_schedules:
next_pattern_shift = last_schedules[0]['next_pattern_shift']
range_dates = self.get_range_dates(line)
location_id = line.location.id
shifts_ = []
if line.shifts_pattern:
shifts_, next_pattern = self.get_schedule_pattern(
line, range_dates, next_pattern_shift)
if line.shifts_week:
shifts_.extend(self.get_schedule_week(line, range_dates))
if not shifts_:
continue
to_create.append({
'location': location_id,
'period': period_id,
'state': 'draft',
'plan_line': line.id,
'next_pattern_shift': next_pattern,
'shifts': [('create', shifts_)],
'operation_center': line.location.operation_center.id,
})
Schedule.create(to_create)
return 'end'
class ScheduleMonthStart(ModelView):
'Schedule Month Start'
__name__ = 'surveillance.print_schedule_month.start'
period = fields.Many2One('account.period', 'Period', required=True)
company = fields.Many2One('company.company', 'Company', required=True)
customer = fields.Many2One('party.party', 'Customer')
locations = fields.Many2Many('surveillance.location', None, None,
'Location', required=True)
@staticmethod
def default_company():
return Transaction().context.get('company')
class ScheduleMonth(Wizard):
'Schedule Month'
__name__ = 'surveillance.print_schedule_month'
start = StateView(
'surveillance.print_schedule_month.start',
'surveillance.print_schedule_month_start_view_form', [
Button('Cancel', 'end', 'tryton-cancel'),
Button('Print', 'print_', 'tryton-print', default=True),
])
print_ = StateReport('surveillance.schedule_month.report')
def do_print_(self, action):
location_ids = [loc.id for loc in self.start.locations]
data = {
'company': self.start.company.id,
'period': self.start.period.id,
'location': location_ids,
}
return action, data
def transition_print_(self):
return 'end'
class ScheduleMonthReport(Report):
__name__ = 'surveillance.schedule_month.report'
@classmethod
def get_context(cls, records, header, data):
report_context = super().get_context(records, header, data)
pool = Pool()
Company = pool.get('company.company')
Period = pool.get('account.period')
Schedule = pool.get('surveillance.schedule')
period = Period(data['period'])
args = {
'period': data['period']
}
_, rows, _ = Schedule.get_sheet(args)
locations = []
for rec in rows:
if rec[0]['style'] == 'sheet-head-location':
row = {
'name': rec[0]['value'],
'shifts': [],
}
locations.append(row)
continue
else:
shift = {
'position': rec[0]['value'],
'name': rec[1]['value'],
}
row['shifts'].append(shift)
# day = 1
length_days = len(rec) - 2
if length_days <= 30:
length_days = 31
for x in range(length_days):
try:
val_ = rec[x + 2]['value']
except:
val_ = ''
shift['day' + str(x + 1)] = val_
report_context['records'] = locations
report_context['period'] = period.name
report_context['company'] = Company(data['company']).party.name
return report_context