# This file is part of Tryton. The COPYRIGHT file at the top level of # this repository contains the full copyright notices and license terms. import datetime from decimal import Decimal from trytond.model import Workflow, ModelView, ModelSQL, fields from trytond.pyson import Bool, Eval, If, Id from trytond.pool import Pool, PoolMeta from trytond.transaction import Transaction from trytond.modules.company import CompanyReport from trytond.wizard import Wizard, StateView, StateTransition, Button __all__ = ['Payroll', 'PayrollLine', 'PayrollReport', 'Move', 'PayrollGroupStart', 'PayrollGroup', 'PayrollPreliquidation', 'PayrollRecompute'] STATES = {'readonly': (Eval('state') != 'draft')} _DEFAULT_WORK_DAY = 8 _ZERO = Decimal('0.0') class Payroll(Workflow, ModelSQL, ModelView): "Staff Payroll" __name__ = "staff.payroll" _rec_name = 'number' number = fields.Char('Number', readonly=True, help="Secuence", select=True) period = fields.Many2One('staff.payroll.period', 'Period', required=True, states={ 'readonly': Eval('state') != 'draft', }) employee = fields.Many2One('company.employee', 'Employee', states=STATES, required=True, depends=['state'], select=True) kind = fields.Selection([ ('normal', 'Normal'), ('special', 'Special'), ], 'Kind', required=True, select=True, states=STATES, help="Special allow overlap dates with another payroll") contract = fields.Many2One('staff.contract', 'Contract', select=True, domain=[ ('employee', '=', Eval('employee')), ]) start = fields.Date('Start', states=STATES, required=True) end = fields.Date('End', states=STATES, required=True) date_effective = fields.Date('Date Effective', states=STATES, required=True) description = fields.Char('Description', states=STATES, select=True) lines = fields.One2Many('staff.payroll.line', 'payroll', 'Wage Line', states=STATES, depends=['employee', 'state']) gross_payments = fields.Function(fields.Numeric('Gross Payments', digits=(16, 2), depends=['lines', 'states']), 'on_change_with_amount') total_deductions = fields.Function(fields.Numeric( 'Total Deductions', digits=(16, 2), depends=['lines', 'states']), 'on_change_with_amount') net_payment = fields.Function(fields.Numeric('Net Payment', digits=(16, 2), depends=['lines', 'states']), 'get_net_payment') total_cost = fields.Function(fields.Numeric('Total Cost', digits=(16, 2), depends=['lines', 'states']), 'get_total_cost') currency = fields.Many2One('currency.currency', 'Currency', required=False, states={ 'readonly': ((Eval('state') != 'draft') | (Eval('lines', [0]) & Eval('currency'))), }, depends=['state']) worked_days = fields.Function(fields.Integer('Worked Days', depends=['start', 'end', 'state']), 'on_change_with_worked_days') state = fields.Selection([ ('draft', 'Draft'), ('processed', 'Processed'), ('cancel', 'Cancel'), ('posted', 'Posted'), ], 'State', readonly=True) journal = fields.Many2One('account.journal', 'Journal', required=True, states=STATES) company = fields.Many2One('company.company', 'Company', required=True, states={ 'readonly': (Eval('state') != 'draft') | Eval('lines', [0]), }, domain=[ ('id', If(Eval('context', {}).contains('company'), '=', '!='), Eval('context', {}).get('company', 0)), ], depends=['state']) move = fields.Many2One('account.move', 'Move', readonly=True) origin = fields.Reference('Origin', selection='get_origin', select=True, depends=['state'], states={ 'readonly': Eval('state') != 'draft', }) notes = fields.Text("Notes", states=STATES) @classmethod def __setup__(cls): super(Payroll, cls).__setup__() cls._order = [ ('period', 'DESC'), ('start', 'DESC'), ] cls._error_messages.update({ 'employee_without_salary': ('The employee does not have salary!'), 'wrong_start_end': ('The date end can not smaller than date start, for employee %s'), 'sequence_missing': ('Sequence Payroll is missing!'), 'period_closed': ('Payroll period is closed!'), 'payroll_exist_period': ('Already exist one payroll in this period with this contract!'), 'wrong_date_consistent': ('The date start/end is repetead \ or crossed with other date payroll'), 'delete_cancel': ('Payroll "%s" must be cancelled before ' 'deletion.'), 'existing_move': ('Payroll "%s" has a move, must be deleted ' 'before deletion.'), 'bad_configuration_wage_type': ('Bad configuration of the wage type "%s".'), }) cls._transitions |= set(( ('draft', 'cancel'), ('cancel', 'draft'), ('draft', 'processed'), ('processed', 'posted'), ('posted', 'draft'), ('processed', 'draft'), )) cls._buttons.update({ 'draft': { 'invisible': Eval('state') == 'draft', }, 'post': { 'invisible': Eval('state') != 'processed', }, 'cancel': { 'invisible': Eval('state') != 'draft', }, 'process': { 'invisible': Eval('state') != 'draft', }, }) @staticmethod def default_company(): return Transaction().context.get('company') @staticmethod def default_kind(): return 'normal' @staticmethod def default_journal(): Configuration = Pool().get('staff.configuration') configuration = Configuration(1) if configuration.default_journal: return configuration.default_journal.id @staticmethod def default_currency(): Company = Pool().get('company.company') if Transaction().context.get('company'): company = Company(Transaction().context['company']) return company.currency.id @classmethod def delete(cls, records): # Cancel before delete cls.cancel(records) for payroll in records: if payroll.state != 'cancel': cls.raise_user_error('delete_cancel', (payroll.rec_name,)) if payroll.move: cls.raise_user_error('existing_move', (payroll.rec_name,)) super(Payroll, cls).delete(records) @classmethod def validate(cls, payrolls): super(Payroll, cls).validate(payrolls) for payroll in payrolls: payroll.check_start_end() @staticmethod def default_state(): return 'draft' @staticmethod def _get_origin(): 'Return list of Model names for origin Reference' return [] @classmethod def search_rec_name(cls, name, clause): if clause[1].startswith('!') or clause[1].startswith('not '): bool_op = 'AND' else: bool_op = 'OR' return [bool_op, ('employee',) + tuple(clause[1:]), ('number',) + tuple(clause[1:]), ] @classmethod @ModelView.button @Workflow.transition('draft') def draft(cls, records): pass @classmethod @ModelView.button @Workflow.transition('cancel') def cancel(cls, records): pass @classmethod @ModelView.button @Workflow.transition('processed') def process(cls, records): Payroll = Pool().get('staff.payroll') for payroll in records: payrolls = Payroll.search([ ('period', '=', payroll.period.id), ('contract', '=', payroll.contract.id), ]) if len(payrolls) > 1: cls.raise_user_error('payroll_exist_period',) return if payroll.period.state == 'closed': cls.raise_user_error('period_closed',) return payroll.set_number() @classmethod @ModelView.button @Workflow.transition('posted') def post(cls, records): for payroll in records: payroll.create_move() @fields.depends('start', 'end') def on_change_start(self): if not self.start: return Configuration = Pool().get('staff.configuration') configuration = Configuration(1) period_days = configuration.default_liquidation_period if period_days and not self.end: self.end = self.start + datetime.timedelta(period_days - 1) @classmethod def get_origin(cls): Model = Pool().get('ir.model') models = cls._get_origin() models = Model.search([ ('model', 'in', models), ]) return [(None, '')] + [(m.model, m.name) for m in models] def set_number(self): if self.number: return pool = Pool() Sequence = pool.get('ir.sequence') Configuration = pool.get('staff.configuration') configuration = Configuration(1) if not configuration.staff_payroll_sequence: self.raise_user_error('sequence_missing',) seq = configuration.staff_payroll_sequence.id self.write([self], {'number': Sequence.get_id(seq)}) @fields.depends('start', 'end') def on_change_with_worked_days(self, name=None): if self.start and self.end and self.employee: return self.get_days(self.start, self.end) def get_salary_full(self, wage): """ Return a dict with sum of total amount of all wages defined as salary on context of wage """ salary_full = self.compute_salary_full(wage) return {'salary': salary_full} def compute_salary_full(self, wage): wages_ids = [s.id for s in wage.concepts_salary] wages_names = [s.name for s in wage.concepts_salary] if wage.amount_required: salary_full = self.employee.get_defect_amount_wage_type(wage.id) elif wages_ids: salary_full = sum([line.amount for line in self.lines if line.wage_type.id in wages_ids]) else: salary_full = self.employee.salary or 0 return salary_full def create_move(self): pool = Pool() Move = pool.get('account.move') Period = pool.get('account.period') if self.move: return period_id = Period.find(self.company.id, date=self.date_effective) move_lines = self.get_moves_lines() move, = Move.create([{ 'journal': self.journal.id, 'origin': str(self), 'period': period_id, 'date': self.date_effective, 'state': 'draft', 'description': self.description, 'lines': [('create', move_lines)], }]) self.write([self], {'move': move.id}) Move.post([self.move]) def get_moves_lines(self): lines_moves = {} mandatory_wages = dict([(m.wage_type.id, m.party) for m in self.employee.mandatory_wages ]) for line in self.lines: if line.amount <= 0: continue if line.party: party = line.party else: if mandatory_wages.get(line.wage_type.id): party = mandatory_wages[line.wage_type.id] else: party = self.employee.party expense = Decimal(0) if not line.wage_type: continue if line.wage_type.expense_formula: salary_args = self.get_salary_full(line.wage_type) expense = line.wage_type.compute_expense(salary_args) if line.wage_type.definition == 'payment': amount_debit = line.amount + expense else: if expense: amount_debit = expense elif line.wage_type.debit_account: amount_debit = line.amount amount_credit = line.amount + expense debit_acc = line.wage_type.debit_account try: if debit_acc and amount_debit > _ZERO: if line.wage_type.definition == 'discount': amount_debit = amount_debit * (-1) if debit_acc.id not in lines_moves.keys(): lines_moves[debit_acc.id] = { self.employee.party.id: line.get_move_line( debit_acc, self.employee.party, ('debit', amount_debit) )} else: line.update_move_line( lines_moves[debit_acc.id][self.employee.party.id], {'debit': amount_debit, 'credit': _ZERO} ) credit_acc = line.wage_type.credit_account if amount_credit > _ZERO: line_credit_ready = False if credit_acc: if credit_acc.id not in lines_moves.keys(): lines_moves[credit_acc.id] = { party.id: line.get_move_line( credit_acc, party, ('credit', amount_credit) )} line_credit_ready = True else: if party.id not in lines_moves[credit_acc.id].keys(): lines_moves[credit_acc.id].update({ party.id: line.get_move_line( credit_acc, party, ('credit', amount_credit) )}) line_credit_ready = True if line.wage_type.definition != 'payment': deduction_acc = line.wage_type.deduction_account if deduction_acc: if deduction_acc.id not in lines_moves.keys(): lines_moves[deduction_acc.id] = { self.employee.party.id: line.get_move_line( deduction_acc, self.employee.party, ('credit', -line.amount), )} line_credit_ready = True else: lines_moves[deduction_acc.id][self.employee.party.id]['credit'] -= line.amount if credit_acc and not line_credit_ready: lines_moves[credit_acc.id][party.id]['credit'] += amount_credit except: self.raise_user_error('bad_configuration_wage_type', line.wage_type.name) result = [] for r in lines_moves.values(): _line = r.values() if _line[0]['debit'] > 0 and _line[0]['credit'] > 0: new_value = _line[0]['debit'] - _line[0]['credit'] if new_value >= 0: _line[0]['debit'] = new_value _line[0]['credit'] = 0 else: _line[0]['credit'] = new_value _line[0]['debit'] = 0 result.extend(_line) return result def _create_payroll_lines(self, wages, extras, discounts=None): PayrollLine = Pool().get('staff.payroll.line') config = Pool().get('staff.configuration')(1) values = [] salary_args = {} for wage, party in wages: if wage.salary_constitute: if wage.amount_required: salary_args = self.get_salary_full(wage) else: salary_args['salary'] = self.employee.salary else: salary_args = self.get_salary_full(wage) if config and config.minimum_salary and wage.type_concept == 'transport' and \ self.employee.salary >= (config.minimum_salary * 2): unit_value = 0 else: unit_value = wage.compute_unit_price(salary_args) discount = None if discounts.get(wage.id): discount = discounts.get(wage.id) qty = self.get_line_quantity_special(wage) if qty == 0: qty = self.get_line_quantity(wage, self.start, self.end, extras, discount) values.append(self.get_line(wage, qty, unit_value, party)) PayrollLine.create(values) def set_preliquidation(self, extras, discounts=None): wage_salary = [] wage_no_salary = [] for concept in self.employee.mandatory_wages: if concept.wage_type.salary_constitute: wage_salary.append((concept.wage_type, concept.party)) else: wage_no_salary.append((concept.wage_type, concept.party)) self._create_payroll_lines(wage_salary, extras, discounts) self._create_payroll_lines(wage_no_salary, extras, discounts) def update_preliquidation(self, extras): for line in self.lines: if not line.wage_type.salary_constitute: salary_args = self.get_salary_full(line.wage_type) unit_value = line.wage_type.compute_unit_price(salary_args) line.write([line], { 'unit_value': unit_value, }) def get_line(self, wage, qty, unit_value, party=None): res = { 'sequence': wage.sequence, 'payroll': self.id, 'wage_type': wage.id, 'description': wage.name, 'quantity': qty, 'unit_value': unit_value, 'uom': wage.uom, 'receipt': wage.receipt, } if party: res['party'] = party.id return res def _get_line_quantity(self, quantity_days, wage, extras, discount): Configuration = Pool().get('staff.configuration') configuration = Configuration(1) default_hour_workday = configuration.default_hour_workday or _DEFAULT_WORK_DAY quantity = wage.default_quantity or 0 if quantity_days < 0: quantity_days = 0 if wage.uom.id == Id('product', 'uom_day').pyson(): quantity = quantity_days if discount: quantity -= discount elif wage.uom.id == Id('product', 'uom_hour').pyson(): if wage.type_concept != 'extras': quantity = quantity_days * default_hour_workday if discount: quantity -= discount else: key_ = [key for key in extras.keys() if wage.name.lower().count(key) > 0] if key_: key_ext = key_[0] extras_ = extras.get(key_ext) else: extras_ = extras.get((wage.name.lower())) if extras_ and self.employee.position and self.employee.position.extras: quantity = extras_ return quantity def get_line_quantity(self, wage, start=None, end=None, extras=None, discount=None): quantity = wage.default_quantity or 0 quantity_days = self.get_days(start, end) quantity = self._get_line_quantity(quantity_days, wage, extras, discount) return quantity def get_line_quantity_special(self, wage): quantity_days = 0 if self.contract and self.date_effective and wage.type_concept == 'special': quantity_days = (self.date_effective - self.contract.start_date).days if quantity_days > wage.limit_days: quantity_days = wage.limit_days return quantity_days @fields.depends('lines') def on_change_with_amount(self, name=None): res = [] for line in self.lines: if not line.amount: continue if name == 'gross_payments': if line.wage_type.definition == 'payment' and line.receipt: res.append(line.amount) else: if line.wage_type.definition != 'payment': res.append(line.amount) res = self.currency.round(sum(res)) return res def get_net_payment(self, name=None): return (self.gross_payments - self.total_deductions) def get_total_cost(self, name): res = sum([line.amount for line in self.lines if line.wage_type.definition == 'payment']) return res def check_start_end(self): if self.start <= self.end: if self.kind != 'normal': return if self.start >= self.period.start and \ self.end <= self.period.end: return if self.start >= self.period.start and \ self.end == None: return self.raise_user_error('wrong_start_end', self.employee.party.name) @fields.depends('period', 'start', 'end', 'employee') def on_change_period(self): self.start = None self.end = None self.contract = None self.description = None if self.period: self.start = self.period.start self.end = self.period.end self.contract = self.search_contract_on_period( self.employee, self.period ) if not self.description: self.description = self.period.description @classmethod def search_contract_on_period(cls, employee, period): Contract = Pool().get('staff.contract') contracts = Contract.search([ ('employee', '=', employee.id), ['AND', ['OR', [ ('start_date', '>=', period.start), ('end_date', '<=', period.end), ('end_date', '!=', None), ], [ ('start_date', '<=', period.start), ('end_date', '>=', period.start), ('end_date', '!=', None), ], [ ('start_date', '<=', period.end), ('end_date', '>=', period.end), ('end_date', '!=', None), ], [ ('start_date', '<=', period.start), ('end_date', '>=', period.end), ('end_date', '!=', None), ], [ ('start_date', '<=', period.start), ('end_date', '=', None), ], [ ('start_date', '>=', period.start), ('start_date', '<=', period.end), ('end_date', '=', None), ], ]] ]) if not contracts: last_date_futhermore = employee.get_last_date_futhermore() if last_date_futhermore and last_date_futhermore > period.start: return employee.contract return values = dict([(c.end_date, c) for c in contracts]) last_contract = values[max(values.keys())] if last_contract.end_date: if (last_contract.end_date >= period.start and last_contract.end_date <= period.end) or \ (last_contract.end_date >= period.end and last_contract.end_date >= period.start): return last_contract else: return last_contract def get_days(self, start, end): adjust = 1 quantity_days = (end - start).days + adjust if quantity_days < 0: quantity_days = 0 return quantity_days def recompute_lines(self): for line in self.lines: if not line.wage_type.concepts_salary: continue salary_args = self.get_salary_full(line.wage_type) unit_value = line.wage_type.compute_unit_price(salary_args) line.write([line], {'unit_value': unit_value}) class PayrollLine(ModelSQL, ModelView): "Payroll Line" __name__ = "staff.payroll.line" sequence = fields.Integer('Sequence') payroll = fields.Many2One('staff.payroll', 'Payroll', ondelete='CASCADE', select=True, required=True) description = fields.Char('Description', required=True) wage_type = fields.Many2One('staff.wage_type', 'Wage Type', required=True, depends=['payroll']) uom = fields.Many2One('product.uom', 'Unit', depends=['wage_type'], states={'readonly': Bool(Eval('wage_type'))}) quantity = fields.Numeric('Quantity', digits=(16, 2)) unit_value = fields.Numeric('Unit Value', digits=(16, 2), depends=['wage_type']) amount = fields.Function(fields.Numeric('Amount', digits=(16, 2), depends=['unit_value', 'quantity'], states={ 'readonly': ~Eval('_parent_payroll'), }), 'get_amount') receipt = fields.Boolean('Print Receipt') reconciled = fields.Function(fields.Boolean('Reconciled'), 'get_reconciled') party = fields.Many2One('party.party', 'Party', depends=['wage_type']) @classmethod def __setup__(cls): super(PayrollLine, cls).__setup__() cls._order.insert(0, ('sequence', 'ASC')) @staticmethod def default_quantity(): return Decimal(str(1)) @fields.depends('wage_type', 'uom', 'quantity', 'party', 'description', 'unit_value', 'payroll', 'receipt', 'sequence', '_parent_payroll.employee') def on_change_wage_type(self): if not self.wage_type: return self.uom = self.wage_type.uom.id self.description = self.wage_type.name self.quantity = self.wage_type.default_quantity self.receipt = self.wage_type.receipt self.sequence = self.wage_type.sequence parties = [] for wage in self.payroll.employee.mandatory_wages: if wage.wage_type.id == self.wage_type.id and wage.party: parties.append(wage.party.id) if parties: self.party = parties[0] if self.wage_type.unit_price_formula: salary_args = self.payroll.get_salary_full(self.wage_type) self.unit_value = self.wage_type.compute_unit_price( salary_args) def get_amount(self, name): return self.on_change_with_amount() @fields.depends('quantity', 'unit_value', '_parent_payroll.currency') def on_change_with_amount(self): quantity = 0 unit_value = 0 res = _ZERO if self.quantity and self.unit_value: quantity = float(self.quantity) unit_value = float(self.unit_value) res = Decimal(str(round((quantity * unit_value), 2))) return res def get_reconciled(self, name=None): #TODO: Reconciled must be computed from move line, similar way # to account invoice pass def get_move_line(self, account, party, amount): debit = credit = _ZERO if amount[0] == 'debit': debit = amount[1] else: credit = amount[1] res = { 'description': account.name, 'debit': debit, 'credit': credit, 'account': account.id, 'party': party.id, } return res def update_move_line(self, move_line, values): if values['debit']: move_line['debit'] += values['debit'] if values['credit']: move_line['credit'] += values['credit'] return move_line class PayrollReport(CompanyReport): __name__ = 'staff.payroll' @classmethod def get_context(cls, records, data): report_context = super(PayrollReport, cls).get_context(records, data) return report_context class PayrollGroupStart(ModelView): 'Payroll Group Start' __name__ = 'staff.payroll_group.start' period = fields.Many2One('staff.payroll.period', 'Period', required=True, domain=[('state', '=', 'open')]) description = fields.Char('Description', required=True) company = fields.Many2One('company.company', 'Company', required=True) wage_types = fields.Many2Many('staff.wage_type', None, None, 'Wage Types') @staticmethod def default_company(): return Transaction().context.get('company') @fields.depends('period', 'description') def on_change_period(self): if not self.period: return if not self.description and self.period.description: self.description = self.period.description class PayrollGroup(Wizard): 'Payroll Group' __name__ = 'staff.payroll_group' start = StateView('staff.payroll_group.start', 'staff_payroll.payroll_group_start_view_form', [ Button('Cancel', 'end', 'tryton-cancel'), Button('Accept', 'open_', 'tryton-ok', default=True), ]) open_ = StateTransition() def transition_open_(self): pool = Pool() Employee = pool.get('company.employee') Payroll = pool.get('staff.payroll') Contract = pool.get('staff.contract') #Remove employees with payroll this period payrolls_period = Payroll.search([ ('period', '=', self.start.period.id), ]) employees_w_payroll = [p.employee.id for p in payrolls_period] dom_employees = self.get_employees_dom(employees_w_payroll) payroll_to_create = [] for employee in Employee.search(dom_employees): if not employee.contract: continue start = self.start.period.start end = self.start.period.end contract = Payroll.search_contract_on_period(employee, self.start.period) if not contract: continue values = self.get_values(employee, start, end) payroll_to_create.append(values) wages = [(wage_type, None) for wage_type in self.start.wage_types] if payroll_to_create: payrolls = Payroll.create(payroll_to_create) for payroll in payrolls: payroll.on_change_period() payroll.set_preliquidation({}) if wages: payroll._create_payroll_lines(wages, None, {}) return 'end' def get_employees_dom(self, employees_w_payroll): dom_employees = [ ('active', '=', True), ('id', 'not in', employees_w_payroll), ] return dom_employees def get_values(self, employee, start_date, end_date): Payroll = Pool().get('staff.payroll') if employee.contract.start_date and \ employee.contract.start_date >= start_date and \ employee.contract.start_date <= start_date: start_date = employee.contract.start_date if employee.contract.end_date and \ employee.contract.end_date >= start_date and \ employee.contract.end_date <= end_date: end_date = employee.contract.end_date values = { 'employee': employee.id, 'period': self.start.period.id, 'start': start_date, 'end': end_date, 'description': self.start.description, 'date_effective': end_date, 'contract': Payroll.search_contract_on_period( employee, self.start.period ) } return values class PayrollPreliquidation(Wizard): 'Payroll Preliquidation' __name__ = 'staff.payroll.preliquidation' start_state = 'create_preliquidation' create_preliquidation = StateTransition() def transition_create_preliquidation(self): Payroll = Pool().get('staff.payroll') ids = Transaction().context['active_ids'] for payroll in Payroll.browse(ids): if payroll.state != 'draft': return if not payroll.lines: payroll.set_preliquidation({}) else: payroll.update_preliquidation({}) return 'end' class PayrollRecompute(Wizard): 'Payroll Recompute' __name__ = 'staff.payroll.recompute' start_state = 'do_recompute' do_recompute = StateTransition() def transition_do_recompute(self): Payroll = Pool().get('staff.payroll') ids = Transaction().context['active_ids'] for payroll in Payroll.browse(ids): if payroll.state != 'draft' or not payroll.lines: continue payroll.recompute_lines() return 'end' class Move: __metaclass__ = PoolMeta __name__ = 'account.move' @classmethod def _get_origin(cls): return super(Move, cls)._get_origin() + ['staff.payroll']