trytonpsk-staff_payroll_co/payroll.py

2581 lines
101 KiB
Python

# This file is part of Tryton. The COPYRIGHT file at the top level of
# this repository contains the full copyright notices and license terms.
import calendar
import copy
import time
import traceback
from decimal import Decimal
from datetime import date, timedelta
from dateutil.relativedelta import relativedelta
import math
from operator import attrgetter
from functools import lru_cache
from trytond.exceptions import UserError
from trytond.model import ModelView, fields, Workflow, ModelSQL
from trytond.pool import Pool, PoolMeta
from trytond.report import Report
from trytond.wizard import (
Wizard, StateView, Button, StateAction, StateReport, StateTransition
)
from trytond.transaction import Transaction
from trytond.pyson import Eval, Id
from trytond.modules.staff_payroll.period import Period
from trytond.modules.staff_payroll.payroll import PayrollReport, get_dom_contract_period
from trytond.i18n import gettext
from .exceptions import (
GeneratePayrollError, MissingTemplateEmailPayroll,
WageTypeConceptError, RecordDuplicateError
)
from .constants import (
SHEET_FIELDS_NOT_AMOUNT, LIM_UVT_DEDUCTIBLE, ENTITY_ACCOUNTS,
FIELDS_AMOUNT, EXTRAS, SHEET_SUMABLES, LIM_PERCENT_DEDUCTIBLE,
)
# Nimporter is required for impot Nim
# try:
# import nimporter
# import trytond.modules.staff_payroll_co.fpc as fpc # Nim imports!
# except:
# print("Nim or nimporter is not installed")
STATES = {'readonly': (Eval('state') != 'draft')}
_ZERO = Decimal('0.0')
class PayrollLine(metaclass=PoolMeta):
__name__ = "staff.payroll.line"
is_event = fields.Boolean('Is Event')
start_date = fields.Date('Start Date', depends=['is_event'],
states={'invisible': ~Eval('is_event')})
end_date = fields.Date('End Date', depends=['is_event'],
states={'invisible': ~Eval('is_event')})
move_lines = fields.Many2Many('staff.payroll.line-move.line',
'line', 'move_line', 'Payroll Line - Move Line')
origin = fields.Reference("Origin", selection='get_origin')
amount_60_40 = fields.Numeric('Amount 60/40', digits=(16, 2),
depends=['wage_type'])
tax_base = fields.Numeric('Tax Base', digits=(16, 2), states={
'invisible': ~Eval('is_wage_tax', False)}, depends=['wage_type', 'is_wage_tax'])
is_wage_tax = fields.Function(fields.Boolean('Is wage tax'),
'on_change_with_is_wage_tax')
@fields.depends('wage_type')
def on_change_with_is_wage_tax(self, name=None):
if self.wage_type:
return self.wage_type.type_concept == 'tax'
return False
def get_expense_amount(self, wage_type):
expense = super(PayrollLine, self).get_expense_amount(wage_type)
round_amounts = wage_type['round_amounts']
if not round_amounts:
return expense
if round_amounts == 'above_amount':
expense = math.floor(float(expense) / 100.0) * 100
elif round_amounts == 'under_amount':
expense = math.floor(float(expense) / 100.0) * 100
elif round_amounts == 'automatic':
expense = round(expense, -2)
return expense
@classmethod
def delete(cls, lines):
LoanLine = Pool().get('staff.loan.line')
loan_lines = LoanLine.search([
('origin', 'in', [str(l) for l in lines])
])
LoanLine.write([m for m in loan_lines], {'state': 'pending', 'origin': None})
super(PayrollLine, cls).delete(lines)
@classmethod
def _get_origin(cls):
'Return list of Model names for origin Reference'
return ['staff.loan.line', 'staff.event']
@classmethod
def get_origin(cls):
Model = Pool().get('ir.model')
get_name = Model.get_name
models = cls._get_origin()
return [(None, '')] + [(m, get_name(m)) for m in models]
class PayrollLineMoveLine(ModelSQL):
"Payroll Line - MoveLine"
__name__ = "staff.payroll.line-move.line"
_table = 'staff_payroll_line_move_line_rel'
line = fields.Many2One('staff.payroll.line', 'Line',
ondelete='CASCADE', select=True, required=True)
move_line = fields.Many2One('account.move.line', 'Move Line',
ondelete='RESTRICT', select=True, required=True)
class Payroll(metaclass=PoolMeta):
__name__ = "staff.payroll"
last_payroll = fields.Boolean('Last Payroll', states=STATES, select=True)
ibc = fields.Function(fields.Numeric('IBC'), 'on_change_with_ibc')
health_amount = fields.Function(fields.Numeric('EPS Amount'),
'get_non_fiscal_amount')
retirement_amount = fields.Function(fields.Numeric('AFP Amount'),
'get_non_fiscal_amount')
risk_amount = fields.Function(fields.Numeric('ARL Amount'),
'get_non_fiscal_amount')
box_family_amount = fields.Function(fields.Numeric('Box Amount'),
'get_non_fiscal_amount')
absenteeism_days = fields.Integer("Absenteeism Days", states=STATES)
department = fields.Many2One('company.department', 'Department',
required=False, depends=['employee'])
sended_mail = fields.Boolean('Sended Email')
worked_days_effective = fields.Function(
fields.Numeric('Worked Days Effective'), 'get_worked_days_effective')
events = fields.Function(fields.One2Many('staff.event', None,
'Assistance'), 'get_events')
recompute = fields.Boolean('Recompute')
@classmethod
def __setup__(cls):
super(Payroll, cls).__setup__()
cls._buttons.update({
'force_draft': {
'invisible': Eval('state') != 'posted',
'depends': ['state'],
},
})
@fields.depends('end', 'date_effective')
def on_change_with_date_effective(self):
if self.end:
return self.end
@fields.depends('employee', 'department')
def on_change_employee(self):
if self.employee and self.employee.department:
self.department = self.employee.department.id
@classmethod
def copy(cls, records, default=None):
raise RecordDuplicateError(gettext(
'staff_payroll_co.msg_cannot_duplicate_record'))
@classmethod
def process(cls, records):
to_write = []
cache_wage_dict = cls.create_cache_wage_types()
for payroll in records:
if payroll.recompute:
payroll.recompute_lines(cache_wage_dict)
to_write.append(payroll)
if to_write:
cls.write(to_write, {'recompute': False})
super(Payroll, cls).process(records)
@fields.depends('recompute', 'lines')
def on_change_lines(self):
if self.lines:
self.recompute = True
def create_move(self, wage_dict):
super(Payroll, self).create_move(wage_dict)
pool = Pool()
MoveLine = pool.get('account.move.line')
grouped = {}
for line in self.lines:
if line.wage_type.definition != 'payment':
account_id = line.wage_type.credit_account.id
else:
account_id = line.wage_type.debit_account.id
grouped.update({account_id: {
'lines': list(line.move_lines)
}})
for p in self.move.lines:
if p.account.id not in grouped or (
p.account.type.statement not in ('balance')) or p.reconciliation:
continue
to_reconcile = [p] + grouped[p.account.id]['lines']
amount = sum([(r.debit - r.credit) for r in to_reconcile])
if self.company.currency.is_zero(amount):
MoveLine.reconcile(set(to_reconcile))
@fields.depends('period', 'employee', 'start', 'end', 'contract',
'description', 'date_effective', 'last_payroll')
def on_change_period(self):
if not self.period:
return
self.date_effective = self.period.end
self.on_change_employee()
period_start = self.period.start
period_end = self.period.end
# Search last contract
contract = self.search_contract_on_period(self.employee.id, period_start, period_end)
start_date = None
end_date = None
self.contract = contract
contract_end_date = None
if contract:
contract_start_date = contract.start_date
if not contract.finished_date:
end_date = period_end
if period_start >= contract_start_date:
start_date = period_start
elif contract_start_date >= period_start and \
contract_start_date <= period_end:
start_date = contract_start_date
else:
contract_end_date = contract.finished_date if contract.finished_date else contract.end_date
if contract_start_date <= period_start and \
contract_end_date >= period_end:
start_date = period_start
end_date = period_end
elif contract_start_date >= period_start and \
contract_end_date <= period_end:
start_date = contract_start_date
end_date = contract_end_date
elif contract_start_date >= period_start and \
contract_start_date <= period_end and \
contract_end_date >= period_end:
start_date = contract_start_date
end_date = period_end
elif contract_start_date <= period_start and \
contract_end_date >= period_start and \
contract_end_date <= period_end:
start_date = period_start
end_date = contract_end_date
if start_date and end_date:
self.start = start_date
self.end = end_date
if contract_end_date == self.end:
self.last_payroll = True
if not start_date or not end_date:
self.period = None
self.description = None
raise GeneratePayrollError(
gettext('staff_payroll_co.msg_period_without_contract',
employee=self.employee.party.name)
)
if not self.description and self.period:
self.description = self.period.description
def get_worked_days_effective(self, name=None):
res = 0
if self.lines:
days = []
unit = 1
for l in self.lines:
if l.wage_type.type_concept == 'salary':
days.append(l.quantity)
if l.wage_type.uom.id == Id('product', 'uom_hour').pyson():
unit = 8
res = sum(days) / unit
return res
def adjust_partial_sunday(self, quantity):
# Factor = 8 hour sunday / 6 days (monday-saturday)
factor = 1.33
if self.is_first_payroll():
delta_days = (self.end - self.start).days
_sunday = False
for dd in range(delta_days):
next_day = self.start + timedelta(days=dd)
if next_day.weekday() == 6:
_sunday = True
break
if _sunday:
fix_qty = Decimal(self.start.weekday() * factor)
quantity = Decimal(str(round(quantity - fix_qty, 2)))
return quantity
def _get_line_quantity(self, config, quantity_days, wage, extras, discount):
quantity_days = self.get_days(self.start, self.end, wage['adjust_days_worked'])
quantity = super(Payroll, self)._get_line_quantity(
config, quantity_days, wage, extras, discount
)
if config.payment_partial_sunday and wage['type_concept'] == 'salary':
quantity = self.adjust_partial_sunday(quantity)
return quantity
@fields.depends('start', 'end', 'employee', 'period', 'contract')
@lru_cache(maxsize=20)
def get_days(self, start, end, wage_adjust_days_worked=None):
adjust = 1
# adjust_days_worked = False
adjust_days_worked = self.contract.position.adjust_days_worked if self.contract else True
# print('this is', adj)
# if self.contract and self.contract.position:
# adjust_days_worked = self.contract.position.adjust_days_worked
# else:
# if self.employee.position:
# adjust_days_worked = self.employee.position.adjust_days_worked
end_month, end_day = attrgetter('month', 'day')(end)
if (adjust_days_worked) or (wage_adjust_days_worked):
if end_day == 31 or (end_month == 2
and (end_day == 28 or end_day == 29)):
adjust = 31 - end_day
quantity_days = (end - start).days + adjust
if quantity_days < 0:
quantity_days = 0
if start == end:
quantity_days = 1
absenteeism_days = self.absenteeism_days
if absenteeism_days:
quantity_days = quantity_days - absenteeism_days
return quantity_days
def get_salary_full(self, wage):
res = super(Payroll, self).get_salary_full(wage)
salary_full = res['salary']
concepts = ('health', 'retirement', 'risk', 'box_family')
if wage['type_concept'] in concepts:
salary_full += sum(line.amount_60_40 for line in self.lines if line.amount_60_40)
if wage['month_application']:
if self._is_last_payroll_month(self.start, self.period.end):
salary_full_month = self.search_salary_month(wage)
salary_full = salary_full_month
else:
salary_full = _ZERO
if wage['minimal_amount'] and salary_full < wage['minimal_amount']:
salary_full = _ZERO
return {'salary': salary_full}
@lru_cache(maxsize=20)
def is_first_payroll(self):
if self.start <= self.contract.start_date <= self.end:
return True
return False
@lru_cache(maxsize=20)
def _is_last_payroll_month(self, start_date_payroll, end_period_date):
year, month = attrgetter('year', 'month')(start_date_payroll)
_, end_day = calendar.monthrange(year, month)
last_day = date(year, month, end_day)
if last_day == end_period_date:
return True
return False
def _compute_apply_special_salary(self):
MandatoryWages = Pool().get('staff.payroll.mandatory_wage')
mandatory_wages = MandatoryWages.search([
('employee', '=', self.employee.id),
('wage_type.apply_special_salary', '=', True),
])
salary_plus = _ZERO
wages = [w.wage_type for w in mandatory_wages]
for wage in wages:
dom_payroll = [('employee', '=', self.employee.id)]
if wage.type_concept == 'bonus_service':
year = self.start.year
if self.start < date(year, 7, 1):
dom_payroll.extend([
('start', '>=', date(year, 1, 1)),
('start', '<=', date(year, 6, 30)),
])
else:
dom_payroll.extend([
('start', '>=', date(year, 7, 1)),
('start', '<=', date(year, 12, 31)),
])
else:
dom_payroll.extend([
('start', '>=', date(year, 1, 1)),
('start', '<=', date(year, 12, 31)),
])
payrolls = self.search(dom_payroll)
worked_days = Decimal(sum([p.worked_days for p in payrolls]))
precomputed_salary = {
'salary': worked_days * self.employee.salary_day}
salary_plus += wage.compute_formula(
wage.unit_price_formula,
precomputed_salary
)
return salary_plus
def _get_payrolls_month(self):
_, end_day = calendar.monthrange(self.start.year, self.start.month)
start = date(self.start.year, self.start.month, 1)
end = date(self.start.year, self.start.month, end_day)
payrolls = self.search([
('employee', '=', self.employee.id),
('start', '>=', start),
('start', '<=', end),
])
return payrolls
@classmethod
def _get_payrolls_period(cls, employee, contract, start_date, end_date):
_, end_day = calendar.monthrange(end_date.year, end_date.month)
start = date(start_date.year, start_date.month, 1)
end = date(end_date.year, end_date.month, end_day)
payrolls = cls.search([
('employee', '=', employee.id),
('start', '>=', start),
('start', '<=', end),
('contract', '=', contract.id)
])
return payrolls
def _create_payroll_lines(self, config, wages, extras, discounts=None, cache_wage_dict=None):
super(Payroll, self)._create_payroll_lines(
config, wages, extras, discounts, cache_wage_dict)
pool = Pool()
MoveLine = pool.get('account.move.line')
PayrollLine = pool.get('staff.payroll.line')
for line in self.lines:
to_write = {}
if line.wage_type.provision_cancellation and line.wage_type.contract_finish:
amount_line = [m.amount for m in self.lines if m.wage_type
== line.wage_type.provision_cancellation]
move_lines = MoveLine.search([
('account', '=',
line.wage_type.provision_cancellation.credit_account.id),
('reconciliation', '=', None),
('party', '=', self.employee.party.id)
])
lines_to_reconcile = []
values = []
if line.origin and line.wage_type.type_concept == 'holidays':
amount = line.amount
for m in move_lines:
values.append(abs(m.debit - m.credit))
if sum(values) <= amount:
lines_to_reconcile.append(m.id)
to_write = {
'move_lines': [('add', lines_to_reconcile)]
}
else:
for m in move_lines:
values.append(abs(m.debit - m.credit))
lines_to_reconcile.append(m.id)
amount = sum(values) + sum(amount_line)
unit_value = amount
to_write = {
'unit_value': unit_value,
'move_lines': [('add', lines_to_reconcile)]
}
PayrollLine.write([line], to_write)
def process_loans_to_pay(self, LoanLine, PayrollLine, MoveLine):
dom = [
('loan.party', '=', self.employee.party.id),
('loan.wage_type', '!=', None),
('maturity_date', '<=', self.end),
('state', 'in', ['pending', 'partial']),
]
lines_loan = LoanLine.search(dom)
for m, r in zip(lines_loan, range(len(lines_loan))):
party = m.loan.party_to_pay if m.loan.party_to_pay else None
move_lines = MoveLine.search([
('origin', 'in', ['staff.loan.line,' + str(m)]),
])
wage_type = m.loan.wage_type
amount = m.amount
to_create = {
'origin': m,
'party': party,
'quantity': 1,
'uom': wage_type.uom,
'unit_value': amount,
'move_lines': [('add', move_lines)],
'wage_type': wage_type,
'description': wage_type.name,
'payroll': self,
}
line, = PayrollLine.create([to_create])
LoanLine.write([m], {'state': 'paid', 'origin': line})
def search_salary_month(self, wage):
res = _ZERO
payrolls = self._get_payrolls_month()
if payrolls:
for payroll in payrolls:
res += payroll.compute_salary_full(wage)
return res
def get_round_amount(self, round_amounts, unit_value):
if round_amounts == 'above_amount':
unit_value = math.ceil(float(unit_value) / 100.0) * 100
elif round_amounts == 'under_amount':
unit_value = math.floor(float(unit_value) / 100.0) * 100
elif round_amounts == 'automatic':
unit_value = round(unit_value, -2)
return unit_value
# def round_amount(self, round_amount, amount):
# amount = super(Payroll, self).round_amount(wage, amount)
# if amount and round_amount:
# amount = self.get_round_amount(wage, amount)
# return amount
# def get_line(self, wage, qty, unit_value, party=None):
# if unit_value and wage.get('round_amounts'):
# unit_value = self.get_round_amount(wage['round_amounts'], unit_value)
# res = super(Payroll, self).get_line(wage, qty, unit_value, party)
# return res
def set_preliquidation(self, config, extras, discounts=None, cache_wage_dict=None):
pool = Pool()
PayrollLine = pool.get('staff.payroll.line')
MoveLine = pool.get('account.move.line')
LoanLine = pool.get('staff.loan.line')
self.process_loans_to_pay(LoanLine, PayrollLine, MoveLine)
discounts = self.set_events(cache_wage_dict)
ctx = {
'absenteeism_days': self.absenteeism_days
}
with Transaction().set_context(ctx):
super(Payroll, self).set_preliquidation(config, extras, discounts, cache_wage_dict)
self.save()
if extras.get('wage_shift_fixed'):
# Shifts fixed must be a list of wage_shift_fixed
shifts_fixed = extras.get('wage_shift_fixed')
to_create = []
for wg in shifts_fixed:
to_create.append(
self.get_line(wg['wage'], wg['qty'], wg['unit_value']))
PayrollLine.create(to_create)
# self.update_wage_no_salary()
time_r = time.time()
self.recompute_lines(cache_wage_dict)
time_r2 = time.time()
# print(time_r2-time_r, 'time recompute lines')
if not config.allow_zero_quantities:
lines_delete = [ln for ln in self.lines if ln.quantity == 0]
PayrollLine.delete(lines_delete)
def set_events(self, cache_wage_dict):
pool = Pool()
Event = pool.get('staff.event')
PayrollLine = pool.get('staff.payroll.line')
Wage = pool.get('staff.wage_type')
start = self.start
end = self.end
fields_names = [
'category.payroll_effect', 'end_date', 'start_date',
'uom.symbol', 'quantity', 'absenteeism', 'category.wage_type',
'amount', 'total_amount', 'unit_price_formula',
'category.wage_type_discount', 'is_vacations'
]
events = Event.search_read(['OR',
[
('employee', '=', self.employee),
('state', '=', 'done'),
['OR',
[
('end_date', '>=', start),
('start_date', '<=', start),
],
[
('end_date', '>=', end),
('start_date', '<=', end),
],
[
('start_date', '>=', start),
('end_date', '<=', end),
],
]
],
[
('employee', '=', self.employee),
('state', '=', 'in_progress'),
('start_date', '<=', end),
('unit_price_formula', 'not in', [None, '']),
]
], fields_names=fields_names)
absenteeism_days = 0
discounts = {}
compute_unit_price = Wage.compute_unit_price
compute_formula = Event.compute_formula
for event in events:
event_ = Event(event['id'])
if not event['category.']['payroll_effect']:
continue
start_date = start
end_date = None
if event['end_date'] and event['uom.']['symbol'] == 'd':
if event['start_date'] > start_date:
start_date = event['start_date']
end_date = end
if event['end_date'] < end_date:
end_date = event['end_date']
qty_pay = self.contract.get_time_days(start_date, end_date)
else:
qty_pay = event['quantity']
if event['absenteeism']:
absenteeism_days += qty_pay
if event['quantity']:
wage = event['category.']['wage_type']
if wage:
wage_ = cache_wage_dict[wage]
salary_args = self.get_salary_full(wage_)
if event['amount'] and event['unit_price_formula']:
unit_value = compute_formula(salary_args)
if event['amount'] - event['total_amount'] < unit_value:
unit_value = event['amount'] - event['total_amount']
event_.state = 'done'
event_.end_date = end
event_.save()
elif event['amount']:
unit_value = Decimal(event['amount'])
else:
if event['is_vacations']:
unit_value = self.get_salary_average(
start, self.employee, self.contract, wage_)
else:
unit_value = compute_unit_price(wage_['unit_price_formula'], salary_args)
res = self.get_line(wage_, qty_pay, unit_value)
res.update({
'is_event': True,
'start_date': start_date,
'end_date': end_date,
'origin': event_
})
PayrollLine.create([res])
if event['category.']['wage_type_discount'] and event['quantity']:
id_wt_event = event['category.']['wage_type_discount']
if id_wt_event not in discounts.keys():
discounts[id_wt_event] = 0
discounts[id_wt_event] += event['quantity']
self.absenteeism_days = absenteeism_days
self.save()
return discounts
@classmethod
def get_salary_average(cls, end_date, employee, contract, wage):
# end_date = self.start
start_date_contract = contract.start_date
if not contract.variable_salary:
return Decimal(str(round(contract.last_salary / 30 if contract.last_salary else 0, 2)))
if start_date_contract.month != end_date.month:
start_date = end_date + relativedelta(months=-3)
if start_date <= start_date_contract:
start_date = start_date_contract
next_date = date(start_date.year, start_date.month, 1)
average_days_monthly = []
while next_date < end_date + relativedelta(months=-1):
_, end_day = calendar.monthrange(
next_date.year, next_date.month)
payrolls = cls._get_payrolls_period(
employee, contract, next_date, next_date)
salary_base = 0
if payrolls:
salary_base += payrolls[0].compute_salary_full(wage)
average_days_monthly.append(salary_base)
next_date += relativedelta(months=1)
try:
res = sum(average_days_monthly)/len(average_days_monthly)/30
except:
res = contract.salary / 30 if contract.salary else 0
else:
next_date = end_date
payrolls = cls._get_payrolls_period(
employee, contract, next_date, next_date)
if payrolls:
res = payrolls[0].compute_salary_full(wage)/30
return Decimal(str(round(res, 2)))
def update_wage_no_salary(self, cache_wage_dict):
PayrollLine = Pool().get('staff.payroll.line')
att_getter = attrgetter('wage_type.type_concept', 'wage_type.month_application')
tax_base = None
for line in self.lines:
wage = cache_wage_dict[line.wage_type.id]
type_concept, month_application = att_getter(line)
if month_application:
salary_args = self.get_salary_full(wage)
unit_value = line.wage_type.compute_unit_price(wage['unit_price_formula'], salary_args)
if type_concept == 'interest':
unit_value = self._compute_interest(wage, self.start)
elif type_concept == 'tax':
unit_value, tax_base = self._compute_line_tax(line, wage)
if unit_value > 0:
tax_base = Decimal(str(round(tax_base, 2)))
else:
tax_base = None
else:
continue
PayrollLine.write([line], {'unit_value': unit_value, 'tax_base': tax_base})
def _get_payrolls_contract(self):
dom_payroll = [('employee', '=', self.employee.id)]
if not self.contract:
contract = self.search_contract_on_period()
else:
contract = self.contract
dom_payroll.append(
('start', '>=', contract.start_date),
)
if contract.end_date:
dom_payroll.append(
('start', '<=', contract.end_date)
)
payrolls = self.search(dom_payroll)
return [p.id for p in payrolls]
def _compute_interest(self, wage, limit_date=False, name=None):
year = self.start.year
start = date(year, 1, 1)
end = date(year, 12, 31)
concepts_salary_ids = wage['concepts_salary']
start_date, contract_id, employee_id = attrgetter(
'contract.start_date', 'contract.id', 'employee.id')(self)
if start_date > start:
start = start_date
dom_payrolls = [
('start', '>=', start),
('contract', '=', contract_id),
('employee', '=', employee_id),
]
if limit_date:
dom_payrolls.append(('start', '<=', limit_date))
else:
dom_payrolls.append(('start', '<=', end))
payrolls = self.search(dom_payrolls)
payrolls_ids = [p.id for p in payrolls]
time_contracting = sum(p.worked_days for p in payrolls)
salary_hoard = float(self._compute_hoard(
payrolls_ids,
concepts_salary_ids
))
total_interest = salary_hoard * time_contracting * 0.01 / 360
if self.id in payrolls_ids:
payrolls_ids.remove(self.id)
interest_hoard = float(self._compute_hoard(
payrolls_ids,
[wage['id']]
))
interest = total_interest - interest_hoard
if interest < _ZERO:
interest = _ZERO
return Decimal(str(round(interest, 2)))
# return self.currency.round(Decimal(interest))
def _compute_hoard(self, payrolls_ids, wages_ids):
if not payrolls_ids:
return _ZERO
PayrollLine = Pool().get('staff.payroll.line')
lines = PayrollLine.search_read([
('wage_type', 'in', wages_ids),
('payroll', 'in', payrolls_ids),
], fields_names=['amount', 'reconciled'])
return sum(pl['amount'] for pl in lines if not pl['reconciled'])
def recompute_lines(self, cache_wage_dict):
PayrollLine = Pool().get('staff.payroll.line')
amounts_60_40 = []
amounts_60_40_app = amounts_60_40.append
wages_60_40 = []
wages_60_40_app = wages_60_40.append
att_getter = attrgetter(
'amount', 'wage_type.definition',
'wage_type.account_60_40',
'wage_type.salary_constitute')
for line in self.lines:
amount, definition, account_60_40, salary_constitute = att_getter(line)
if definition == 'payment':
if salary_constitute:
amounts_60_40_app(amount)
elif account_60_40:
wages_60_40_app(line)
amounts_60_40_app(amount)
amount_wages_60_40 = sum(pl.amount for pl in wages_60_40)
fourty_percentage_amount = sum(amounts_60_40) * Decimal(0.4)
amount_to_validate = fourty_percentage_amount < amount_wages_60_40
if amounts_60_40 and wages_60_40 and amount_to_validate:
amount_dif = amount_wages_60_40 - fourty_percentage_amount
for wl in wages_60_40:
new_unit_value = Decimal(str(round(
(amount_dif * (wl.amount / amount_wages_60_40)), 2)))
PayrollLine.write([wl], {
'amount_60_40': new_unit_value,
})
super(Payroll, self).recompute_lines(cache_wage_dict)
line_tax = None
# deductions = _ZERO
for line in self.lines:
w_provision_cancellation = line.wage_type.provision_cancellation
if w_provision_cancellation and line.wage_type.contract_finish:
amount_line = [m.amount for m in self.lines if m.wage_type
== w_provision_cancellation]
values = []
for m in line.move_lines:
values.append(abs(m.debit - m.credit))
amount = sum(values) + sum(amount_line)
line.write([line], {
'unit_value': amount,
})
self.update_wage_no_salary(cache_wage_dict)
for line in self.lines:
wage = cache_wage_dict[line.wage_type.id]
if not wage['round_amounts']:
continue
unit_value = self.get_round_amount(wage['round_amounts'], line.unit_value)
line.write([line], {'unit_value': unit_value})
def check_limit(self, base, field, value_field):
Configuration = Pool().get('staff.configuration')
configuration = Configuration(1)
if not configuration.uvt_value:
return 0
uvt_config = configuration.uvt_value
lim_percent = LIM_PERCENT_DEDUCTIBLE[field] if field in LIM_PERCENT_DEDUCTIBLE.keys(
) else None
lim_uvt = LIM_UVT_DEDUCTIBLE[field] if field in LIM_UVT_DEDUCTIBLE.keys(
) else None
res = 0
if field == 'dependents' and value_field:
value_percent = lim_percent * value_field / 100 * base
value_uvt = lim_uvt * uvt_config
res = min(value_percent, value_uvt)
elif field == 'exempted_income':
value_limit_uvt = uvt_config * lim_uvt
value_limit_percent = base * lim_percent / 100
res = min(value_limit_uvt, value_limit_percent)
else:
res = value_field
if lim_uvt:
value_limit_uvt = uvt_config * lim_uvt
if value_field > value_limit_uvt:
res = value_limit_uvt
if lim_percent:
value_limit_percent = base * lim_percent / 100
if value_field > value_limit_percent:
res = value_limit_percent
return res
def _compute_line_tax(self, line_tax, wage):
UvtWithholding = Pool().get('staff.payroll.uvt_withholding')
salary_full = self.get_salary_full(wage)
amount_tax = line_tax.amount if line_tax.amount else 0
deductions_month = self.get_deductions_month() - amount_tax
print(salary_full, 'salary')
print(deductions_month, 'deductions')
salary_full = salary_full['salary']
if self.last_payroll:
payrolls_ids = self._get_payrolls_contract()
deductions_fields = [
'fvp_ind', 'health_prepaid', 'fvp', 'afc',
'housing_interest', 'dependents', 'other_income',
]
attrs = attrgetter(*deductions_fields)(self.employee)
deductions_values = {
'fvp_ind': (attrs[0] or _ZERO),
'health_prepaid': (attrs[1] or _ZERO),
'fvp_afc': (attrs[2] or _ZERO) + (attrs[3] or _ZERO),
'housing_interest': (attrs[4] or _ZERO),
'dependents': (attrs[5] or _ZERO),
'other_income': (attrs[6] or _ZERO),
}
for k, v in deductions_values.items():
res = self.check_limit(salary_full, k, v)
deductions_values[k] = res
base_salary_withholding = salary_full - \
deductions_month - deductions_values['fvp_ind']
print(base_salary_withholding, 'base_salary_withholding')
deductions_values.pop('fvp_ind')
deductions_renta = sum(v for v in deductions_values.values())
base_c = base_salary_withholding - deductions_renta
renta25c = self.check_limit(
base_c, 'exempted_income', base_c * LIM_PERCENT_DEDUCTIBLE['exempted_income']/100)
deductions_renta_renta25c = deductions_renta + renta25c
ret_general = self.check_limit(base_salary_withholding, 'renta_deductions', deductions_renta_renta25c)
print(deductions_renta, 'renta')
print(renta25c, 'renta 25')
print(ret_general, 'renta general')
if deductions_renta_renta25c > ret_general:
deductions_renta_renta25c = ret_general
print(deductions_renta_renta25c, 'rr25', base_salary_withholding)
base_salary_withholding -= deductions_renta_renta25c
unit_value = UvtWithholding.compute_withholding(
base_salary_withholding)
unit_value = self.currency.round(Decimal(unit_value))
return unit_value, base_salary_withholding
def get_non_fiscal_amount(self, name=None):
res = _ZERO
concept = name[:-7]
res = sum(pl.amount for pl in self.lines if pl.wage_type.type_concept == concept)
return res
def get_deductions_month(self):
payrolls = self._get_payrolls_month()
sum_deductions = _ZERO
sum_deductions = sum(pl.amount for p in payrolls for pl in p.lines \
if pl.wage_type.definition == 'deduction')
return sum_deductions
@fields.depends('lines')
def on_change_with_ibc(self, name=None):
pool = Pool()
WageType = pool.get('staff.wage_type')
not_concepts = ['transport']
wage_types = WageType.search([
('salary_constitute', '=', True),
('type_concept', 'not in', not_concepts),
])
wage_ids_concepts = list(set([wage.id for wage in wage_types]))
res = []
for line in self.lines:
_type = line.wage_type
if _type and _type.id in wage_ids_concepts:
res.append(line.amount)
return sum(res)
@classmethod
@ModelView.button
@Workflow.transition('draft')
def force_draft(cls, records):
Move = Pool().get('account.move')
for payroll in records:
if payroll.move:
Move.draft([payroll.move.id])
# Move.delete([payroll.move])
def send_payroll_email(self):
pool = Pool()
config = pool.get('staff.configuration')(1)
Template = pool.get('email.template')
template = config.template_email_confirm
if template:
response = Template.send(template, self, self.employee.party.email)
if response and response.status_code == 202:
self.write([self], {'sended_mail': True})
else:
raise UserError(gettext('staff_payroll_co.msg_dont_send_email',
employee=self.employee.party.name))
else:
raise MissingTemplateEmailPayroll(
gettext('staff_payroll_co.msg_template_not_exist'))
def get_events(self, name=None):
res = []
for line in self.lines:
if line.origin and line.origin.__name__ == 'staff.event':
res.append(line.origin.id)
return res
class PayrollSupportDispersionStart(ModelView):
'Payroll Support Dispersion'
__name__ = 'staff.payroll_support_dispersion.start'
company = fields.Many2One('company.company', 'Company', required=True)
department = fields.Many2One('company.department', 'Department')
period = fields.Many2One('staff.payroll.period',
'Start Period', required=True)
@staticmethod
def default_company():
return Transaction().context.get('company')
class PayrollSupportDispersion(Wizard):
'Payroll Support Dispersion'
__name__ = 'staff.payroll.support_dispersion'
start = StateView('staff.payroll_support_dispersion.start',
'staff_payroll_co.payroll_support_dispersion_start_view_form', [
Button('Cancel', 'end', 'tryton-cancel'),
Button('Send', 'open_', 'tryton-ok', default=True),
])
open_ = StateTransition()
def transition_open_(self):
pool = Pool()
Payroll = pool.get('staff.payroll')
dom = [
('company', '=', self.start.company.id),
('period', '=', self.start.period.id),
('state', 'in', ['processed', 'posted']),
('sended_mail', '=', False)
]
if self.start.department:
dom.append(('department', '=', self.start.department.id))
payrolls_period = Payroll.search(dom)
no_email = []
y_email = []
for payroll in payrolls_period:
email = payroll.employee.party.email
if email:
payroll.send_payroll_email()
y_email.append(payroll.employee.party.full_name,)
else:
no_email.append(payroll.employee.party.full_name,)
return 'end'
class PayrollGlobalStart(ModelView):
'Payroll Global Start'
__name__ = 'staff.payroll_global.start'
start_period = fields.Many2One('staff.payroll.period',
'Start Period', required=True)
end_period = fields.Many2One('staff.payroll.period', 'End Period',
depends=['start_period'])
company = fields.Many2One('company.company', 'Company', required=True)
department = fields.Many2One('company.department', 'Department')
include_finished = fields.Boolean('Include Finished Contract')
@staticmethod
def default_company():
return Transaction().context.get('company')
@fields.depends('start_period')
def on_change_with_end_period(self, name=None):
if self.start_period:
return self.start_period.id
class PayrollGlobal(Wizard):
'Payroll Global'
__name__ = 'staff.payroll.global'
start = StateView('staff.payroll_global.start',
'staff_payroll_co.payroll_global_start_view_form', [
Button('Cancel', 'end', 'tryton-cancel'),
Button('Print', 'print_', 'tryton-ok', default=True),
])
print_ = StateReport('staff.payroll.global_report')
def do_print_(self, action):
end_period_id = None
department_id = None
if self.start.end_period:
end_period_id = self.start.end_period.id
if self.start.department:
department_id = self.start.department.id
data = {
'ids': [],
'company': self.start.company.id,
'start_period': self.start.start_period.id,
'end_period': end_period_id,
'department': department_id,
'include_finished': self.start.include_finished,
}
return action, data
def transition_print_(self):
return 'end'
class PayrollGlobalReport(Report):
__name__ = 'staff.payroll.global_report'
@classmethod
def get_domain_payroll(cls, data):
dom_payroll = []
return dom_payroll
@classmethod
def get_context(cls, records, header, data):
report_context = super().get_context(records, header, data)
pool = Pool()
user = pool.get('res.user')(Transaction().user)
Payroll = pool.get('staff.payroll')
Period = pool.get('staff.payroll.period')
Department = pool.get('company.department')
start_period = Period(data['start_period'])
dom_periods = []
if data['end_period']:
end_period = Period(data['end_period'])
dom_periods.extend([
('start', '>=', start_period.start),
('end', '<=', end_period.end),
])
else:
dom_periods.append(
('id', '=', start_period.id)
)
periods = Period.search(dom_periods)
dom_pay = cls.get_domain_payroll(data)
dom_pay.append(
('period', 'in', [p.id for p in periods]),
)
dom_pay.append(
('state', 'in', ['processed', 'posted', 'draft']),
)
if data['department']:
dom_pay.append(
['AND', [
'OR', [
('employee.department', '=', data['department']),
('department', '=', None),
], [
('department', '=', data['department']),
],
]]
)
department = Department(data['department']).name
else:
department = None
if not data['include_finished']:
dom_pay.append(
('contract.state', '!=', 'finished')
)
payrolls = Payroll.search(dom_pay)
periods_number = len(periods)
default_vals = cls.default_values()
sum_gross_payments = []
sum_total_deductions = []
sum_net_payment = []
parties = {}
payments = ['salary', 'transport', 'extras', 'food', 'bonus', 'commission']
deductions = [
'health', 'retirement', 'tax', 'syndicate',
'fsp', 'acquired_product', 'advance', 'loan'
]
for payroll in payrolls:
employee_id = payroll.employee.id
party_health = ''
party_retirement = ''
if employee_id not in parties.keys():
position_employee = payroll.employee.position.name if payroll.employee.position else ''
position_contract = payroll.contract.position.name if payroll.contract and payroll.contract.position else ''
parties[employee_id] = default_vals.copy()
parties[employee_id]['employee_code'] = payroll.employee.code
parties[employee_id]['employee'] = payroll.employee.party.name
parties[employee_id]['employee_id_number'] = payroll.employee.party.id_number
if payroll.employee.party_health:
party_health = payroll.employee.party_health.name
if payroll.employee.party_retirement:
party_retirement = payroll.employee.party_retirement.name
parties[employee_id]['party_health'] = party_health
parties[employee_id]['party_retirement'] = party_retirement
parties[employee_id]['basic_salary'] = payroll.contract.get_salary_in_date(
payroll.end)
parties[employee_id]['employee_position'] = position_contract or position_employee or ''
for line in payroll.lines:
if line.wage_type.type_concept in (payments + deductions):
concept = line.wage_type.type_concept
else:
if line.wage_type.definition == 'payment' and line.wage_type.receipt:
concept = 'others_payments'
elif line.wage_type.definition == 'deduction' or \
line.wage_type.definition == 'discount' and \
line.wage_type.receipt:
concept = 'others_deductions'
else:
concept = line.wage_type.type_concept
if not concept:
raise WageTypeConceptError(
gettext('staff_payroll_co.msg_type_concept_not_exists', s=line.wage_type.name))
parties[employee_id][concept] += line.amount
parties[employee_id]['worked_days'] += Decimal(math.ceil(payroll.worked_days_effective))
parties[employee_id]['gross_payments'] += payroll.gross_payments
parties[employee_id]['total_deductions'] += payroll.total_deductions
parties[employee_id]['net_payment'] += payroll.net_payment
sum_gross_payments.append(payroll.gross_payments)
sum_total_deductions.append(payroll.total_deductions)
sum_net_payment.append(payroll.net_payment)
report_context['records'] = sorted(parties.values(), key=lambda x: x['employee'])
report_context['department'] = department
report_context['periods_number'] = periods_number
report_context['start_period'] = start_period
report_context['end_period'] = end_period
report_context['company'] = user.company
report_context['user'] = user
report_context['sum_gross_payments'] = sum(sum_gross_payments)
report_context['sum_net_payment'] = sum(sum_net_payment)
report_context['sum_total_deductions'] = sum(sum_total_deductions)
return report_context
@classmethod
def default_values(cls):
WageType = Pool().get('staff.wage_type')
fields_string = [
'employee_code', 'employee', 'employee_salary',
'employee_id_number', 'party_health', 'party_retirement',
'basic_salary',
]
fields_numeric = [
'net_payment', 'worked_days', 'gross_payments', 'total_deductions',
'others_payments', 'others_deductions'
]
lines_fields = dict(WageType.type_concept.selection)
if '' in lines_fields.keys():
lines_fields.pop('')
default_values = {}
for field in fields_string:
default_values.setdefault(field, None)
for field in fields_numeric:
default_values.setdefault(field, Decimal(0))
for field in lines_fields.keys():
default_values.setdefault(field, Decimal(0))
return default_values
class PayrollPaymentStart(ModelView):
'Payroll Payment Start'
__name__ = 'staff.payroll_payment.start'
period = fields.Many2One('staff.payroll.period', 'Period', required=True)
company = fields.Many2One('company.company', 'Company', required=True)
department = fields.Many2One('company.department', 'Department')
@staticmethod
def default_company():
return Transaction().context.get('company')
class PayrollPayment(Wizard):
'Payroll Payment'
__name__ = 'staff.payroll.payment'
start = StateView('staff.payroll_payment.start',
'staff_payroll_co.payroll_payment_start_view_form', [
Button('Cancel', 'end', 'tryton-cancel'),
Button('Print', 'print_', 'tryton-ok', default=True),
])
print_ = StateReport('staff.payroll.payment_report')
def do_print_(self, action):
period = None
department_id = None
if self.start.department:
department_id = self.start.department.id
if self.start.period:
period = self.start.period.id
data = {
'ids': [],
'company': self.start.company.id,
'period': period,
'department': department_id,
}
return action, data
def transition_print_(self):
return 'end'
class PayrollPaymentReport(Report):
__name__ = 'staff.payroll.payment_report'
@classmethod
def get_context(cls, records, header, data):
report_context = super().get_context(records, header, data)
user = Pool().get('res.user')(Transaction().user)
Payroll = Pool().get('staff.payroll')
Period = Pool().get('staff.payroll.period')
Department = Pool().get('company.department')
clause = []
start_date = None
end_date = None
period = None
period_name = None
department = None
if data['period']:
period = Period(data['period'])
start_date = period.start
end_date = period.end
clause = [('period', '=', period)]
period_name = period.name
if data['department']:
clause.append(('employee.department', '=', data['department']))
department = Department(data['department']).name
payrolls = Payroll.search(clause)
new_objects = []
sum_net_payment = []
values = {}
for payroll in payrolls:
values = values.copy()
values['employee'] = payroll.employee.party.name
values['employee_department'] = payroll.employee.department.name \
if payroll.employee.department else ''
values['employee_id_number'] = payroll.employee.party.id_number
values['bank_name'] = payroll.employee.party.bank_name
values['bank_account'] = payroll.employee.party.bank_account
values['type_account'] = payroll.employee.party.bank_account_type_string
values['net_payment'] = payroll.net_payment
sum_net_payment.append(payroll.net_payment)
new_objects.append(values)
report_context['records'] = new_objects
report_context['department'] = department
report_context['start_date'] = start_date
report_context['end_date'] = end_date
report_context['period'] = period_name
report_context['company'] = user.company
report_context['user'] = user
report_context['sum_net_payment'] = sum(sum_net_payment)
return report_context
class PayrollPaycheckStart(ModelView):
'Payroll Paycheck Start'
__name__ = 'staff.payroll_paycheck.start'
periods = fields.One2Many('staff.payroll.period', None,
'Periods', add_remove=[], required=True)
company = fields.Many2One('company.company', 'Company', required=True)
department = fields.Many2One('company.department', 'Department')
@staticmethod
def default_company():
return Transaction().context.get('company')
class PayrollPaycheck(Wizard):
'Payroll Paycheck'
__name__ = 'staff.payroll.paycheck'
start = StateView('staff.payroll_paycheck.start',
'staff_payroll_co.payroll_paycheck_start_view_form', [
Button('Cancel', 'end', 'tryton-cancel'),
Button('Print', 'print_', 'tryton-ok', default=True),
])
print_ = StateReport('staff.payroll.paycheck_report')
def do_print_(self, action):
department_id = None
if self.start.department:
department_id = self.start.department.id
periods = [p.id for p in self.start.periods]
periods_name = [p.name for p in self.start.periods]
data = {
'ids': [],
'company': self.start.company.id,
'periods': periods,
'periods_name': periods_name,
'department': department_id,
}
return action, data
def transition_print_(self):
return 'end'
class PayrollPaycheckReport(Report):
__name__ = 'staff.payroll.paycheck_report'
@classmethod
def get_domain_payroll(cls, data):
dom_payroll = [
('period', 'in', data['periods']),
('state', '!=', 'draft'),
]
if data['department']:
dom_payroll.append(
('employee.department', '=', data['department'])
)
return dom_payroll
@classmethod
def get_context(cls, records, header, data):
report_context = super().get_context(records, header, data)
pool = Pool()
Payroll = pool.get('staff.payroll')
PayrollLine = pool.get('staff.payroll.line')
Company = pool.get('company.company')
dom_payroll = cls.get_domain_payroll(data)
fields_payroll = [
'id', 'employee.party.name', 'employee.party.id_number',
'contract.start_date', 'contract.end_date', 'date_effective',
'ibc', 'contract.last_salary', 'worked_days', 'employee', 'contract'
]
payrolls = Payroll.search_read(
dom_payroll, fields_names=fields_payroll)
# wage_dict_cache = Payroll.create_cache_wage_types()
today = date.today()
res = {}
wage_type_default = [
'health', 'retirement', 'risk', 'box_family',
'salary', 'fsp', 'icbf', 'sena'
]
for p in payrolls:
key = str(p['employee']) + '_' + str(p['contract'])
try:
res[key]['ibc'] += p['ibc']
res[key]['variation'] += p['ibc']
res[key]['worked_days'] += p['worked_days']
except:
res[key] = p
res[key]['today'] = today
res[key]['ibc'] = p['ibc']
res[key]['worked_days'] = p['worked_days']
res[key]['type_contributor'] = '01'
res[key]['type_id'] = 'CC'
res[key]['type_affiliation'] = 'D'
for w in wage_type_default:
res[key][w + '_amount'] = 0
if w not in ('salary'):
res[key][w + '_code'] = ''
res[key][w + '_name'] = ''
res[key][w + '_rate'] = 0
res[key]['license_amount'] = 0
res[key]['incapacity_amount'] = 0
res[key]['holidays_amount'] = 0
res[key]['variation'] = p['ibc']
res[key]['subtotal'] = 0
payroll_ids = [p['id'] for p in payrolls]
PayrollLine = pool.get('staff.payroll.line')
fields_lines = [
'amount', 'quantity', 'party.name', 'wage_type.type_concept',
'wage_type.unit_price_formula', 'wage_type.expense_formula',
'payroll', 'start_date', 'end_date', 'payroll.employee',
'payroll.contract', 'wage_type.salary_constitute',
'wage_type.concepts_salary', 'wage_type.month_application',
'party.code', 'wage_type.round_amounts', 'wage_type.minimal_amount'
]
dom_line = [
('payroll', 'in', payroll_ids),
['OR',
('wage_type.type_concept', 'in', wage_type_default),
('wage_type.type_concept', 'ilike', 'incapacity%'),
('wage_type.type_concept', 'ilike', 'license%'),
['AND',
('wage_type.type_concept', '=', 'holidays'),
('wage_type.provision_cancellation', '!=', None),
]]
]
order = [('payroll.employee', 'DESC'), ('payroll', 'ASC')]
payroll_lines = PayrollLine.search_read(
dom_line, fields_names=fields_lines, order=order)
total = []
total_append = total.append
for line in payroll_lines:
key = str(line['payroll.']['employee']) + '_' + \
str(line['payroll.']['contract'])
total_append(cls._values_without_move(
line, wage_type_default, res, key))
report_context['records'] = res.values()
report_context['company'] = Company(data['company'])
report_context['total'] = sum(total)
return report_context
@classmethod
def _values_without_move(cls, line, wage_type_default, res, key):
PayrollLine = Pool().get('staff.payroll.line')
total = 0
concept = line['wage_type.']['type_concept']
if concept in wage_type_default and concept != 'salary':
unit_formula = line['wage_type.']['unit_price_formula']
if unit_formula:
try:
unit_formula = Decimal(
(unit_formula[unit_formula.index('*')+1:]).strip())
except:
unit_formula = 0
else:
unit_formula = 0
expense_formula = line['wage_type.']['expense_formula']
if expense_formula:
expense_formula = Decimal(
(expense_formula[expense_formula.index('*')+1:]).strip())
line_ = PayrollLine(line['id'])
expense_amount = line_.get_expense_amount(line['wage_type.'])
res[key][concept + '_amount'] += expense_amount
res[key]['subtotal'] += expense_amount
total += expense_amount
else:
expense_formula = 0
res[key][concept + '_name'] = line['party.']['name'] if line['party.'] else ''
res[key][concept + '_rate'] = unit_formula + \
(expense_formula if expense_formula < 1 else 0)
res[key][concept + '_code'] = line['party.']['code'] if line['party.'] else ''
res[key]['subtotal'] += line['amount']
total += line['amount']
res[key][concept + '_amount'] += line['amount']
elif concept.startswith('license'):
res[key]['license_amount'] += line['amount']
res[key]['variation'] -= line['amount']
# dict_employee[concept]['start_date'] = line['start_date']
# dict_employee[concept]['end_date'] = line['start_date']
elif concept.startswith('incapacity'):
res[key]['incapacity_amount'] += line['amount']
res[key]['variation'] -= line['amount']
elif concept == 'salary':
res[key]['salary_amount'] += line['amount']
res[key]['variation'] -= line['amount']
elif concept == 'holidays':
if line['wage_type.']['salary_constitute']:
res[key]['variation'] -= line['amount']
res[key]['holidays_amount'] += line['amount']
# dict_employee[concept]['start_date'] = line['start_date']
# dict_employee[concept]['end_date'] = line['start_date']
return total
class PayrollSheetStart(ModelView):
'Payroll Sheet Start'
__name__ = 'staff.payroll.sheet.start'
periods = fields.One2Many('staff.payroll.period', None,
'Periods', add_remove=[], required=True)
company = fields.Many2One('company.company', 'Company', required=True)
@staticmethod
def default_company():
return Transaction().context.get('company')
class PayrollSheet(Wizard):
'Payroll Sheet'
__name__ = 'staff.payroll.sheet'
start = StateView('staff.payroll.sheet.start',
'staff_payroll_co.payroll_sheet_start_view_form', [
Button('Cancel', 'end', 'tryton-cancel'),
Button('Print', 'print_', 'tryton-ok', default=True),
])
print_ = StateReport('staff.payroll.sheet_report')
def do_print_(self, action):
periods = [p.id for p in self.start.periods]
data = {
'ids': [],
'company': self.start.company.id,
'periods': periods,
}
return action, data
def transition_print_(self):
return 'end'
class PayrollSheetReport(Report):
__name__ = 'staff.payroll.sheet_report'
@classmethod
def get_domain_payroll(cls, data):
return []
@classmethod
def get_context(cls, records, header, data):
report_context = super().get_context(records, header, data)
pool = Pool()
user = pool.get('res.user')(Transaction().user)
Payroll = pool.get('staff.payroll')
Period = pool.get('staff.payroll.period')
periods_names = ''
dom_payroll = cls.get_domain_payroll(data)
if data['periods']:
periods = Period.browse(data['periods'])
periods_names = [p.name + ' / ' for p in periods]
dom_payroll.append([('period', 'in', data['periods'])])
payrolls = Payroll.search(dom_payroll,
order=[
('employee.party.name', 'ASC'),
('period.name', 'ASC')
])
wages_dict = Payroll.create_cache_wage_types()
new_objects = []
default_vals = cls.default_values()
sum_gross_payments = []
sum_total_deductions = []
sum_net_payment = []
item = 0
for payroll in payrolls:
item += 1
values = copy.deepcopy(default_vals)
values['item'] = item
values['employee'] = payroll.employee.party.name
values['id_number'] = payroll.employee.party.id_number
position_name, position_contract = '', ''
if payroll.employee.position:
position_name = payroll.employee.position.name
if payroll.contract and payroll.contract.position:
position_contract = payroll.contract.position.name
values['position'] = position_contract or position_name
values['department'] = payroll.employee.department.name \
if payroll.employee.department else ''
values['company'] = user.company.party.name
values['legal_salary'] = payroll.contract.get_salary_in_date(
payroll.end)
values['period'] = payroll.period.name
salary_day_in_date = payroll.contract.get_salary_in_date(
payroll.end) / 30
values['salary_day'] = salary_day_in_date
values['salary_hour'] = (
salary_day_in_date / 8) if salary_day_in_date else 0
values['worked_days'] = payroll.worked_days
values['gross_payment'] = payroll.gross_payments
# Add compatibility with staff contracting
project = ""
if hasattr(payroll, 'project'):
if payroll.project:
project = payroll.project.name
if hasattr(payroll.employee, 'project_contract'):
if payroll.employee.project_contract and \
payroll.employee.project_contract.reference:
project = payroll.employee.project_contract.reference
values['project'] = project
values.update(cls._prepare_lines(payroll, values, wages_dict))
sum_gross_payments.append(payroll.gross_payments)
sum_total_deductions.append(payroll.total_deductions)
sum_net_payment.append(payroll.net_payment)
new_objects.append(values)
report_context['records'] = new_objects
report_context['periods'] = periods_names
report_context['company'] = user.company.rec_name
report_context['user'] = user
report_context['sum_gross_payments'] = sum(sum_gross_payments)
report_context['sum_net_payment'] = sum(sum_net_payment)
report_context['sum_total_deductions'] = sum(sum_total_deductions)
return report_context
@classmethod
def default_values(cls):
_values = {}
for field in SHEET_FIELDS_NOT_AMOUNT:
_values.setdefault(field, None)
for field in FIELDS_AMOUNT:
_values.setdefault(field, [])
return _values
@classmethod
def _prepare_lines(cls, payroll, vals, wages_dict):
for line in payroll.lines:
wage_id = line.wage_type.id
wage_dict = wages_dict[wage_id]
concept = wage_dict['type_concept']
amount = line.amount
definition = wage_dict['definition']
if definition == 'payment':
if concept == 'extras':
vals['total_extras'].append(amount)
for e in EXTRAS:
if e.upper() in wage_dict['name']:
vals[e].append(line.quantity or 0)
vals['cost_' + e].append(amount)
break
elif concept in FIELDS_AMOUNT:
vals[concept].append(amount)
else:
vals['other'].append(amount)
elif definition == 'deduction':
vals['total_deduction'].append(amount)
if concept == 'health':
vals['health'].append(amount)
vals['health_provision'].append(line.get_expense_amount(wage_dict))
elif concept == 'retirement':
vals['retirement'].append(amount)
vals['retirement_provision'].append(
line.get_expense_amount(wage_dict))
else:
if concept == 'fsp':
vals['fsp'].append(amount)
elif concept == 'tax':
vals['retefuente'].append(amount)
else:
vals['other_deduction'].append(amount)
else:
vals['discount'].append(amount)
print('Warning: Line no processed... ', line.wage_type.name)
for key in SHEET_SUMABLES:
vals[key] = sum(vals[key])
vals['gross_payment'] = sum([
vals['salary'],
vals['total_extras'],
vals['transport'],
vals['food'],
vals['bonus']
])
vals['net_payment'] = vals['gross_payment'] - vals['total_deduction']
vals['ibc'] = vals['gross_payment']
vals['total_benefit'] = sum([
vals['unemployment'],
vals['interest'],
vals['holidays'],
vals['bonus_service'],
])
vals['total_parafiscales'] = sum([
vals['box_family'],
vals['sena'],
vals['icbf']
])
vals['total_ssi'] = vals['retirement_provision'] + vals['risk']
vals['total_cost'] = sum([
vals['total_ssi'],
vals['box_family'],
vals['gross_payment'],
vals['total_benefit']
])
return vals
class PayrollGroupStart(metaclass=PoolMeta):
__name__ = 'staff.payroll_group.start'
department = fields.Many2One('company.department', 'Department')
employees = fields.Many2Many('company.employee', None, None,
'Employees', domain=[('active', '=', True)])
task = fields.Boolean('Programm Task')
class PayrollGroup(metaclass=PoolMeta):
__name__ = 'staff.payroll_group'
def transition_open_nim(self):
Payroll = Pool().get('staff.payroll')
Wage = Pool().get('staff.wage_type')
fields_names = [
'name', 'sequence', 'definition', 'unit_price_formula',
'unit_price_formula', 'salary_constitute', 'expense_formula',
'type_concept'
]
wages = Wage.search_read([], fields_names=fields_names)
# print('wages ===================', type(wages))
wage = {
"id" : 3,
"name" : "User"
}
# Fast Payroll Compute
res = fpc.computePayroll(wage)
print("Running in Nim....", res)
return 'end'
def transition_open_(self):
if self.start.task:
Employee = Pool().get('company.employee')
Payroll = Pool().get('staff.payroll')
Task = Pool().get('payroll.task')
fields = self.start.fields_get()
transaction = Transaction()
context = transaction.context
payrolls_period = Payroll.search([
('period', '=', self.start.period.id),
])
employee_w_payroll = [p.employee.id for p in payrolls_period]
dom_employees = self.get_employees_dom(employee_w_payroll)
employees = Employee.search(dom_employees)
if len(employees) <= 0:
raise UserError(gettext('staff_payroll_co.msg_dont_get_employees'))
args = {}
for k, v in fields.items():
if k == 'task':
continue
if v['type'] == 'many2one':
try:
args[k] = getattr(self.start, k).id
except:
pass
elif v['type'] == 'many2many':
args[k] = [f.id for f in getattr(self.start, k)]
else:
args[k] = getattr(self.start, k)
args['employees'] = [e.id for e in employees]
department = self.start.department.name if self.start.department else ''
data = {
'model': self.__name__,
'method': 'transition_open_',
'user': transaction.user,
'context': context,
'args': args,
'kwargs': None,
}
value = {
'name': self.start.description + department,
'data': data,
'payroll': len(employees),
'state': 'active'
}
Task.create([value])
return 'end'
return super(PayrollGroup, self).transition_open_()
def get_employees_dom(self, employees_w_payroll):
dom = super(PayrollGroup, self).get_employees_dom(employees_w_payroll)
Contract = Pool().get('staff.contract')
dom_contract = get_dom_contract_period(self.start.period.start, self.start.period.end)
contracts = Contract.search(dom_contract)
employee_ids = list(set(contract.employee.id for contract in contracts))
if employee_ids:
dom.append(('id', 'in', employee_ids))
if self.start.department:
dom.append(('department', '=', self.start.department.id))
if self.start.employees:
employees_ids = [e.id for e in self.start.employees]
dom.append(('id', 'in', employees_ids))
return dom
class OpenPayrollByPeriodStart(ModelView):
'Open Payroll By Period Start'
__name__ = 'staff_payroll_co.open_payroll_by_period.start'
company = fields.Many2One('company.company', 'Company', required=True)
fiscalyear = fields.Many2One('account.fiscalyear', 'Fiscal Year',
required=True)
@staticmethod
def default_company():
return Transaction().context.get('company')
@staticmethod
def default_fiscalyear():
FiscalYear = Pool().get('account.fiscalyear')
return FiscalYear.find(
Transaction().context.get('company'), exception=False)
class OpenPayrollByPeriod(Wizard):
'Open Payroll By Period'
__name__ = 'staff_payroll_co.open_payroll_by_period'
start = StateView('staff_payroll_co.open_payroll_by_period.start',
'staff_payroll_co.open_payroll_by_period_start_view_form', [
Button('Cancel', 'end', 'tryton-cancel'),
Button('Print', 'open_', 'tryton-ok', default=True),
])
open_ = StateAction('staff_payroll_co.act_payroll_by_period_board')
def do_open_(self, action):
data = {
'company': self.start.company.id,
'fiscalyear': self.start.fiscalyear.id,
}
action['name'] += ' - %s' % self.start.fiscalyear.name
return action, data
def transition_open_(self):
return 'end'
class PayrollByPeriodDynamic(Period):
'Payroll By Period Dynamic'
__name__ = 'staff_payroll_co.payroll_by_period_dynamic'
payrolls = fields.Function(fields.One2Many('staff.payroll', None,
'Payrolls'), 'get_payrolls')
amount_net_payment = fields.Function(fields.Numeric('Amount Net Payment',
digits=(16, 2)), 'get_amount')
amount_total_cost = fields.Function(fields.Numeric('Amount Total Cost',
digits=(16, 2)), 'get_amount')
def get_amount(self, name=None):
res = []
method = name[7:]
for payroll in self.payrolls:
value = getattr(payroll, method)
res.append(value)
return sum(res)
def get_payrolls(self, name=None):
pool = Pool()
Payroll = pool.get('staff.payroll')
payrolls = Payroll.search([
('period', '=', self.id),
('state', 'in', ['processed', 'posted']),
])
return [i.id for i in payrolls]
class Exo2276Start(ModelView):
'Payroll Exo 2276 Start'
__name__ = 'staff.payroll_exo2276.start'
start_period = fields.Many2One('staff.payroll.period', 'Start Period',
required=True)
end_period = fields.Many2One('staff.payroll.period', 'End Period',
required=True)
company = fields.Many2One('company.company', 'Company', required=True)
@staticmethod
def default_company():
return Transaction().context.get('company')
class Exo2276(Wizard):
'Payroll Exo 2276'
__name__ = 'staff.payroll_exo2276'
start = StateView('staff.payroll_exo2276.start',
'staff_payroll_co.payroll_exo2276_start_view_form', [
Button('Cancel', 'end', 'tryton-cancel'),
Button('Print', 'print_', 'tryton-ok', default=True),
])
print_ = StateReport('staff.payroll_exo2276.report')
def do_print_(self, action):
data = {
'ids': [],
'company': self.start.company.id,
'start_period': self.start.start_period.start,
'end_period': self.start.end_period.end,
}
return action, data
def transition_print_(self):
return 'end'
class Exo2276Report(Report):
__name__ = 'staff.payroll_exo2276.report'
@classmethod
def get_domain_payroll(cls, data=None):
Period = Pool().get('staff.payroll.period')
periods = Period.search([
('start', '>=', data['start_period']),
('end', '<=', data['end_period']),
])
periods_ids = [p.id for p in periods]
domain = [
('period', 'in', periods_ids),
('state', '=', 'posted'),
]
return domain
@classmethod
def get_context(cls, records, header, data):
report_context = super().get_context(records, header, data)
pool = Pool()
user = pool.get('res.user')(Transaction().user)
Payroll = pool.get('staff.payroll')
MoveLine = pool.get('account.move.line')
LiquidationLine = pool.get('staff.liquidation.line')
Period = pool.get('staff.payroll.period')
WageType = pool.get('staff.wage_type')
domain_ = cls.get_domain_payroll(data)
payrolls = Payroll.search([domain_])
new_objects = {}
start_period = data['start_period']
end_period = data['end_period']
# pag_wage = WageType.search_read([
# ('type_concept', 'in', [
# 'unemployment', 'interest', 'holidays']),
# ], fields_names=['credit_account', 'type_concept', 'debit_account'])
# accounts_id = {'cesanpag': [], 'holidays': []}
# for p in pag_wage:
# if p['type_concept'] in ('unemployment', 'interest'):
# accounts_id['cesanpag'].append(p['credit_account'])
# accounts_id['cesanpag'].append(p['debit_account'])
# else:
# accounts_id['holidays'].append(p['debit_account'])
# accounts_id['holidays'].append(p['credit_account'])
index = 0
for payroll in payrolls:
index += 1
party = payroll.employee.party
if party.id in new_objects.keys():
continue
new_objects[party.id] = {
'index': index,
'type_document': party.type_document,
'id_number': party.id_number,
'first_family_name': party.first_family_name,
'second_family_name': party.second_family_name,
'first_name': party.first_name,
'second_name': party.second_name,
'addresses': party.addresses[0].street,
'department_code': party.department_code,
'city_code': party.city_code,
'country_code': party.country_code,
'email': party.email,
'payments': 0,
'cesanpag': 0,
'interest': 0,
'holidays': 0,
'bonus': 0,
'bonus_service': 0,
'transport': 0,
'food': 0,
'total_deduction': 0,
'health': 0,
'retirement': 0,
'fsp': 0,
'retefuente': 0,
'other_deduction': 0,
'discount': 0,
'other': 0,
'various': 0,
'salary': 0,
'extras': 0,
'box_family': 0,
'risk': 0,
'unemployment': 0,
'allowance': 0,
'syndicate': 0,
'commission': 0,
'total_benefit': 0,
'gross_payment': 0,
'_cesanpag': 0,
'others_payments': 0,
'total_retirement': 0,
'total_salary': 0,
}
new_objects[party.id] = cls._prepare_lines(
payrolls, new_objects[party.id], party.id)
_cesanpag = 0
_total_benefit = 0
_other = 0
_tax = 0
lines_liquid = LiquidationLine.search_read([
('liquidation.employee.party', '=', party.id),
('liquidation.liquidation_date', '>=', data['start_period']),
('liquidation.liquidation_date', '<=', data['end_period']),
('wage.type_concept', 'in', ['unemployment', 'interest', 'holidays', 'bonus_service', 'tax'])
], fields_names=['amount', 'wage.type_concept'])
for l in lines_liquid:
if l['wage.']['type_concept'] in ['unemployment', 'interest']:
_cesanpag += l['amount']
elif l['wage.']['type_concept'] == 'convencional_bonus':
_other += l['amount']
elif l['wage.']['type_concept'] == 'tax':
_tax += abs(l['amount'])
else:
_total_benefit += l['amount']
new_objects[party.id]['cesanpag'] = _cesanpag
new_objects[party.id]['total_benefit'] += _total_benefit
new_objects[party.id]['others_payments'] += _other
new_objects[party.id]['retefuente'] += _tax
report_context['records'] = new_objects.values()
report_context['end_period'] = end_period
report_context['start_period'] = start_period
report_context['today'] = date.today()
report_context['company'] = user.company
return report_context
@classmethod
def _prepare_lines(cls, payrolls, vals, party_id):
payroll_ids = [payroll.id for payroll in payrolls]
Lines = Pool().get('staff.payroll.line')
lines = Lines.search([
('payroll', 'in', payroll_ids),
('payroll.employee.party', '=', party_id),
])
for line in lines:
concept = line.wage_type.type_concept
if line.wage_type.definition == 'payment':
vals['payments'] += line.amount
if concept == 'unemployment':
continue
# vals['unemployment'] += line.amount
elif concept == 'interest':
continue
# vals['interest'] += line.amount
elif concept == 'salary':
vals['salary'] += line.amount
elif concept == 'commission':
vals['payments'] += line.amount
vals['commission'] += line.amount
elif concept == 'allowance':
vals['payments'] += line.amount
vals['allowance'] += line.amount
elif concept == 'extras':
vals['payments'] += line.amount
vals['extras'] += line.amount
elif concept == 'holidays':
continue
# vals['holidays'] += line.amount
elif concept == 'bonus':
vals['payments'] += line.amount
vals['bonus'] += line.amount
elif concept == 'bonus_service':
vals['payments'] += line.amount
vals['bonus_service'] += line.amount
elif concept == 'transport':
vals['payments'] += line.amount
vals['transport'] += line.amount
elif concept == 'food':
vals['payments'] += line.amount
vals['food'] += line.amount
elif concept == 'box_family':
vals['box_family'] += line.amount
elif concept == 'risk':
vals['risk'] += line.amount
elif concept in ['other', 'convencional_bonus']:
vals['other'] += line.amount
else:
print('Warning: Line no processed... ',
concept, line.wage_type.name)
vals['various'] += line.amount
elif line.wage_type.definition == 'deduction':
vals['total_deduction'] += line.amount
if concept == 'health':
vals['health'] += line.amount
elif concept == 'retirement':
vals['retirement'] += line.amount
else:
if concept == 'fsp':
vals['fsp'] += line.amount
elif concept == 'tax':
vals['retefuente'] += line.amount
else:
vals['other_deduction'] += line.amount
else:
vals['discount'] += line.amount
print('Warning: Line no processed... ', line.wage_type.name)
# vals['cesanpag'] = vals['unemployment'] + vals['interest']
vals['others_payments'] = (vals['other'] + vals['commission']
+ vals['bonus'] + vals['allowance']
+ vals['various'] + vals['food']
+ vals['transport'])
# vals['total_benefit'] = vals['holidays'] + vals['bonus_service']
vals['total_retirement'] = vals['fsp'] + vals['retirement']
vals['total_salary'] = sum([
vals['salary'], vals['extras']
])
return vals
# vals['unemployment'], vals['interest'], vals['holidays'],
# vals['bonus_service']
class IncomeWithholdingsStart(ModelView):
'Income Withholding Start'
__name__ = 'staff.payroll.income_withholdings.start'
fiscalyear = fields.Many2One('account.fiscalyear', 'Fiscal Year',
required=True)
company = fields.Many2One('company.company', 'Company', required=True)
employees = fields.Many2Many('company.employee', None, None, 'Employees')
@staticmethod
def default_company():
return Transaction().context.get('company')
@staticmethod
def default_fiscalyear():
FiscalYear = Pool().get('account.fiscalyear')
return FiscalYear.find(
Transaction().context.get('company'), exception=False)
class IncomeWithholdings(Wizard):
'Income Withholding'
__name__ = 'staff.payroll.income_withholdings'
start = StateView('staff.payroll.income_withholdings.start',
'staff_payroll_co.income_withholdings_start_form', [
Button('Cancel', 'end', 'tryton-cancel'),
Button('Print', 'print_', 'tryton-ok', default=True),
])
print_ = StateReport('staff.payroll.income_withholdings_report')
def do_print_(self, action):
employees = []
if self.start.employees:
employees = [e.id for e in self.start.employees]
data = {
'ids': [],
'company': self.start.company.id,
'start_period': self.start.fiscalyear.start_date,
'end_period': self.start.fiscalyear.end_date,
'employees': employees,
}
return action, data
def transition_print_(self):
return 'end'
class IncomeWithholdingsReport(Exo2276Report):
'Income Withholding Report'
__name__ = 'staff.payroll.income_withholdings_report'
@classmethod
def get_domain_payroll(cls, data=None):
dom = super(IncomeWithholdingsReport, cls).get_domain_payroll(data)
if data['employees']:
dom.append(('employee', 'in', data['employees']))
return dom
class ExportMovesReport(PayrollReport):
__name__ = 'staff_payroll.export_moves.report'
@classmethod
def get_context(cls, records, header, data):
report_context = super().get_context(records, header, data)
config = Pool().get('staff.configuration')(1)
prefix = ''
if config:
prefix = config.staff_payroll_sequence.prefix
# ids = Transaction().context['active_ids']
# moves = []
# for payroll in Payroll.browse(ids):
# if not payroll.move:
# continue
# moves.append(payroll.move)
# print(payroll.number)
report_context['prefix'] = prefix
return report_context
class PayrollExportStart(ModelView):
'Export Payroll Start'
__name__ = 'staff.payroll.export.start'
company = fields.Many2One('company.company', 'Company', required=True)
start_period = fields.Many2One('staff.payroll.period', 'Start Period',
required=True)
end_period = fields.Many2One('staff.payroll.period', 'End Period',
required=True)
department = fields.Many2One('company.department', 'Department',
required=False, depends=['employee'])
@staticmethod
def default_company():
return Transaction().context.get('company')
class PayrollExport(Wizard):
'Payroll Export'
__name__ = 'staff.payroll.export'
start = StateView('staff.payroll.export.start',
'staff_payroll_co.payroll_export_start_view_form', [
Button('Cancel', 'end', 'tryton-cancel'),
Button('Print', 'print_', 'tryton-ok', default=True),
])
print_ = StateReport('staff.payroll.export_report')
def do_print_(self, action):
department_id = self.start.department.id \
if self.start.department else None
data = {
'ids': [],
'company': self.start.company.id,
'start_period': self.start.start_period.id,
'end_period': self.start.end_period.id,
'department_id': department_id,
}
return action, data
def transition_print_(self):
return 'end'
class PayrollExportReport(Report):
__name__ = 'staff.payroll.export_report'
@classmethod
def get_domain_payroll(cls, data=None):
dom_payroll = []
return dom_payroll
@classmethod
def get_context(cls, records, header, data):
report_context = super().get_context(records, header, data)
pool = Pool()
company = pool.get('company.company')(data['company'])
Payroll = pool.get('staff.payroll')
Period = pool.get('staff.payroll.period')
dom_payroll = cls.get_domain_payroll()
start_period, = Period.search([('id', '=', data['start_period'])])
end_period, = Period.search([('id', '=', data['end_period'])])
dom_payroll.append([
('period.start', '>=', start_period.start),
('period.end', '<=', end_period.end),
('move', '!=', None)
])
if data['department_id'] not in (None, ''):
dom_payroll.append([
('employee.department', '=', data['department_id'])
])
payrolls = Payroll.search(dom_payroll, order=[('period.name', 'ASC')])
records = {}
for payroll in payrolls:
employee = payroll.employee
""" extract debit account and party mandatory_wages"""
accdb_party = {mw.wage_type.debit_account.id: mw.party
for mw in employee.mandatory_wages
if mw.wage_type.debit_account and mw.party}
move = payroll.move
accountdb_ids = accdb_party.keys()
for line in move.lines:
"""Check account code in dict account debit and party"""
if not line.party:
continue
line_ = {
'date': line.move.date,
'code': '---',
'party': employee.party.id_number,
'description': line.description,
'department': employee.department.name if employee.department else '---',
'amount': line.debit or line.credit,
'type': 'D',
}
if line.debit > 0:
if line.account.id in accountdb_ids:
id_number = accdb_party[line.account.id].id_number
else:
id_number = None
if id_number in ENTITY_ACCOUNTS.keys():
line_['code'] = ENTITY_ACCOUNTS[id_number][1]
else:
line_['code'] = line.account.code
else:
line_['type'] = 'C'
id_number = line.party.id_number
if id_number in ENTITY_ACCOUNTS.keys():
line_['code'] = ENTITY_ACCOUNTS[id_number][0]
else:
line_['code'] = line.account.code
if line.account.code not in records.keys():
records[line.account.code] = {
'name': line.account.name,
'lines': []
}
records[line.account.code]['lines'].append(line_)
report_context['records'] = records
report_context['start_date'] = start_period.name
report_context['end_date'] = end_period.name
report_context['company'] = company.party.name
return report_context
class PayrollsMultiPaymentStart(ModelView):
'Payroll Multi Payment Start'
__name__ = 'staff.payroll_multi_payment.start'
period = fields.Many2One('staff.payroll.period', 'Period',
required=True, domain=[('state', '=', 'open')])
payment_mode = fields.Many2One('account.voucher.paymode', 'Payment Mode',
required=True)
company = fields.Many2One('company.company', 'Company',
required=True)
department = fields.Many2One('company.department', 'Department')
wage_type = fields.Many2One('staff.wage_type', 'Wage Type', domain=[
('definition', '=', 'payment'),
('type_concept', '=', 'salary'),
], required=True)
description = fields.Char('Description')
@staticmethod
def default_company():
return Transaction().context.get('company')
class PayrollsMultiPayment(Wizard):
'Payrolls MultiPayment'
__name__ = 'staff.payroll_multi_payment'
start = StateView('staff.payroll_multi_payment.start',
'staff_payroll_co.payroll_multi_payment_start_view_form', [
Button('Cancel', 'end', 'tryton-cancel'),
Button('Accept', 'open_', 'tryton-ok', default=True),
])
open_ = StateTransition()
def transition_open_(self):
pool = Pool()
Payroll = pool.get('staff.payroll')
Voucher = pool.get('account.voucher')
MoveLine = pool.get('account.move.line')
dom_pay = [
('period', '=', self.start.period.id),
('state', '=', 'posted'),
('move', '!=', None),
]
if self.start.department:
dom_pay.append(
('department', '=', self.start.department.id)
)
payrolls = Payroll.search(dom_pay)
moves_payroll_ids = [p.move.id for p in payrolls]
lines = MoveLine.search([
('account', '=', self.start.wage_type.credit_account.id),
('move', 'in', moves_payroll_ids),
('reconciliation', '=', None),
('party', '!=', None),
('credit', '>', 0),
])
line_to_create = []
for line in lines:
line_to_create.append({
'amount': line.credit,
'account': line.account.id,
'party': line.party.id,
'move_line': line.id,
'detail': line.move.origin.number,
'amount_original': line.credit,
})
account_id = Voucher.get_account(
'multipayment', self.start.payment_mode)
journal_id = self.start.payment_mode.journal.id
voucher, = Voucher.create([{
'payment_mode': self.start.payment_mode.id,
'date': self.start.period.end,
'voucher_type': 'multipayment',
'lines': [('create', line_to_create)],
'journal': journal_id,
'state': 'draft',
'account': account_id,
'description': self.start.description,
}])
amount_to_pay = voucher._get_amount_to_pay()
voucher.amount_to_pay = amount_to_pay
voucher.save()
return 'end'
# def transition_create_(self):
# return 'open_'
class PayrollTask(ModelSQL, ModelView):
"Payroll Task"
__name__ = 'payroll.task'
name = fields.Char('Name')
data = fields.Dict(None, "Data", readonly=True)
payroll = fields.Integer("Number Payrolls", readonly=True)
payroll_processed = fields.Integer("Payrolls processed", readonly=True)
percentage = fields.Function(fields.Float("Percentage", digits=(16, 2)), 'get_percentage')
state = fields.Selection([
('active', 'Active'),
('done', 'Done')
], "State")
data_text = fields.Function(fields.Text('Data Text'), 'get_data')
def get_percentage(self, name):
if self.payroll and self.payroll_processed:
return (self.payroll_processed/self.payroll)
def get_data(self, name):
if self.data:
return str(self.data)
@classmethod
def create_payroll_async(cls):
tasks = cls.search([('state', '=', 'active')], limit=1)
if not tasks:
return
task = tasks[0]
Payroll = Pool().get('staff.payroll')
Period = Pool().get('staff.payroll.period')
Configuration = Pool().get('staff.configuration')
config = Configuration(1)
transaction = Transaction()
with transaction.set_user(task.data['user']), \
transaction.set_context(
task.data['context'], _skip_warnings=True):
period_id = task.data['args']['period']
period = Period(period_id)
start, end = attrgetter('start', 'end')(period)
# Remove employees with payroll this period
payrolls_period = Payroll.search([
('period', '=', period_id),
])
cache_wage_dict = Payroll.create_cache_wage_types()
employee_payroll = [p.employee.id for p in payrolls_period]
employees = task.data['args']['employees']
search_contract_on_period = Payroll.search_contract_on_period
payroll_to_create = []
cont = 0
for employee in employees:
if cont == 5:
break
employees.remove(employee)
cont += 1
if employee in employee_payroll:
continue
contract = search_contract_on_period(employee, start, end)
if not contract:
continue
values = task.get_values(contract, start, end)
payroll_to_create.append(values)
data = task.data
data['args']['employees'] = employees
task.data['args']['employees'] = employees
task.payroll_processed = task.payroll - len(employees)
if len(employees) <= 0:
task.state = 'done'
task.data = data
task.save()
wages = None
if len(task.data['args']['wage_types']) > 0:
WageType = Pool().get('staff.wage_type')
wage_types = WageType.browse(task.data['args']['wage_types'])
wages = [
(cache_wage_dict[wage_type.id], None, None) for wage_type in wage_types
]
PayrollCreate = Payroll.create
if payroll_to_create:
payrolls = PayrollCreate(payroll_to_create)
for payroll in payrolls:
try:
payroll.set_preliquidation(config, {}, None, cache_wage_dict)
if wages:
payroll._create_payroll_lines(config, wages, None, {}, cache_wage_dict)
except Exception as e:
print('Fallo al crear nomina : ', e)
traceback.print_exc()
def get_values(self, contract, start_date, end_date):
employee = contract.employee
ct_start_date = contract.start_date
ct_end_date = contract.finished_date
if ct_start_date and ct_start_date >= start_date and \
ct_start_date <= end_date:
start_date = ct_start_date
if ct_end_date and ct_end_date >= start_date and \
ct_end_date <= end_date:
end_date = ct_end_date
values = {
'employee': employee.id,
'period': self.data['args']['period'],
'start': start_date,
'end': end_date,
'description': self.data['args']['description'],
'date_effective': end_date,
'contract': contract.id,
'department': employee.department.id if employee.department else None,
}
if self.data.get('start_extras') and self.data.get('end_extras'):
values['start_extras'] = self.data.get('start_extras')
values['end_extras'] = self.data.get('end_extras')
if hasattr(contract, 'project') and contract.project:
values['project'] = contract.project.id
return values