trytonpsk-staff_payroll_co/payroll.py
2021-06-09 11:07:42 -05:00

2244 lines
86 KiB
Python

# This file is part of Tryton. The COPYRIGHT file at the top level of
# this repository contains the full copyright notices and license terms.
import calendar
from decimal import Decimal
from datetime import date, timedelta
from dateutil.relativedelta import relativedelta
import math
from trytond.exceptions import UserError
from trytond.model import ModelView, fields, Workflow, ModelSQL
from trytond.pool import Pool, PoolMeta
from trytond.report import Report
from trytond.wizard import (
Wizard, StateView, Button, StateAction, StateReport, StateTransition
)
from trytond.transaction import Transaction
from trytond.pyson import Eval
from trytond.modules.staff_payroll.period import Period
from trytond.modules.staff_payroll.payroll import PayrollReport
from trytond.i18n import gettext
from .exceptions import (GeneratePayrollError, MissingTemplateEmailPayroll,
WageTypeConceptError)
STATES = {'readonly': (Eval('state') != 'draft')}
_ZERO = Decimal('0.0')
PAYMENTS = [
'salary', 'bonus', 'reco', 'recf', 'hedo', 'heno', 'dom', 'hedf', 'henf',
]
SOCIAL_SEGURITY = [
'risk', 'health', 'retirement', 'box_family', 'sena', 'icbf'
]
LIM_UVT_DEDUCTIBLE = {
'fvp_ind': 2500,
'afc_fvp': (3800/12),
'housing_interest': 100,
'health_prepaid': 16,
'dependents': 32,
'exempted_incom': 240
}
LIM_PERCENT_DEDUCTIBLE = {
'fvp_ind': 25,
'dependents': 10,
'afc_fvp': 30,
'exempted_income': 25,
'renta_deductions': 40
}
ENTITY_ACCOUNTS = {
'830113831': (23700501, 72056901),
'890102044': (23700514, 72056914),
'900298372': (23700515, 72056915),
'860045904': (23700512, 72056912),
'804002105': (23700513, 72056913),
'860066942': (23700503, 72056903),
'805000427': (23700504, 72056904),
'900226715': (23700505, 72056905),
'830009783': (23700516, 72056916),
'900935126': (23700517, 72056917),
'830003564': (23700506, 72056906),
'901097473': (23700509, 72056909),
'806008394': (23700502, 72056902),
'900156264': (23700510, 72056910),
'800130907': (23700511, 72056911),
'800251440': (23700507, 72056907),
'900604350': (23700518, 72056918),
'800088702': (23700508, 72056908),
'800227940': (23803002, 72057002),
'900336004': (23803001, 72057001),
'800253055': (23803003, 72057003),
'800224808': (23803004, 72057004),
'800229739': (23803005, 72057005),
'890102002': (2370100102, 72057202),
'860013570': (2370100101, 72057201),
'891780093': (2370100103, 72057203),
'892399989': (2370100104, 72057204),
'890480023': (2370100105, 72057205),
'890903790': (23700601, 72056801),
'890500675': (23700519, 72056919),
'818000140': (23700520, 72056920),
'901037916': (23700521, 72056921),
}
class PayrollLine(metaclass=PoolMeta):
__name__ = "staff.payroll.line"
move_lines = fields.Many2Many('staff.payroll.line-move.line',
'line', 'move_line', 'Payroll Line - Move Line')
def get_expense_amount(self):
expense = super(PayrollLine, self).get_expense_amount()
if self.wage_type.round_amounts:
if self.wage_type.round_amounts == 'above_amount':
# expense = math.ceil(float(expense) / 100.0) * 100
expense = math.floor(float(expense) / 100.0) * 100
elif self.wage_type.round_amounts == 'under_amount':
expense = math.floor(float(expense) / 100.0) * 100
elif self.wage_type.round_amounts == 'automatic':
expense = round(expense, -2)
return expense
class PayrollLineMoveLine(ModelSQL):
"Payroll Line - MoveLine"
__name__ = "staff.payroll.line-move.line"
_table = 'staff_payroll_line_move_line_rel'
line = fields.Many2One('staff.payroll.line', 'Line',
ondelete='CASCADE', select=True, required=True)
move_line = fields.Many2One('account.move.line', 'Move Line',
ondelete='RESTRICT', select=True, required=True)
class Payroll(metaclass=PoolMeta):
__name__ = "staff.payroll"
last_payroll = fields.Boolean('Last Payroll', states=STATES, select=True)
ibc = fields.Function(fields.Numeric('IBC'), 'on_change_with_ibc')
health_amount = fields.Function(fields.Numeric('EPS Amount'),
'get_non_fiscal_amount')
retirement_amount = fields.Function(fields.Numeric('AFP Amount'),
'get_non_fiscal_amount')
risk_amount = fields.Function(fields.Numeric('ARL Amount'),
'get_non_fiscal_amount')
box_family_amount = fields.Function(fields.Numeric('Box Amount'),
'get_non_fiscal_amount')
absenteeism_days = fields.Integer("Absenteeism Days", states=STATES)
department = fields.Many2One('company.department', 'Department',
required=False, depends=['employee'])
sended_mail = fields.Boolean('Sended Email')
@classmethod
def __setup__(cls):
super(Payroll, cls).__setup__()
@fields.depends('end', 'date_effective')
def on_change_with_date_effective(self):
if self.end:
return self.end
@fields.depends('employee', 'department')
def on_change_employee(self):
if self.employee and self.employee.department:
self.department = self.employee.department.id
def create_move(self):
super(Payroll, self).create_move()
pool = Pool()
MoveLine = pool.get('account.move.line')
grouped = {line.wage_type.debit_account.id: {'lines': [m for m in line.move_lines]} for line in self.lines if line.wage_type.provision_cancellation}
for p in self.move.lines:
if p.account.id not in grouped or (
p.account.type.statement not in ('balance')):
continue
to_reconcile = [p]
to_reconcile.extend(grouped[p.account.id]['lines'])
MoveLine.reconcile(set(to_reconcile))
@fields.depends('period', 'employee', 'start', 'end', 'contract',
'description', 'date_effective', 'last_payroll')
def on_change_period(self):
if not self.period:
return
self.date_effective = self.period.end
self.on_change_employee()
# Search last contract
contract = self.search_contract_on_period(self.employee, self.period)
start_date = None
end_date = None
self.contract = contract
contract_end_date = None
if contract:
if not contract.end_date:
end_date = self.period.end
if self.period.start >= contract.start_date:
start_date = self.period.start
elif contract.start_date >= self.period.start and \
contract.start_date <= self.period.end:
start_date = contract.start_date
else:
contract_end_date = contract.finished_date if contract.finished_date else contract.end_date
if contract.start_date <= self.period.start and \
contract_end_date >= self.period.end:
start_date = self.period.start
end_date = self.period.end
elif contract.start_date >= self.period.start and \
contract_end_date <= self.period.end:
start_date = contract.start_date
end_date = contract_end_date
elif contract.start_date >= self.period.start and \
contract.start_date <= self.period.end and \
contract_end_date >= self.period.end:
start_date = contract.start_date
end_date = self.period.end
elif contract.start_date <= self.period.start and \
contract_end_date >= self.period.start and \
contract_end_date <= self.period.end:
start_date = self.period.start
end_date = contract_end_date
if start_date and end_date:
self.start = start_date
self.end = end_date
if contract_end_date == self.end:
self.last_payroll = True
if not start_date or not end_date:
self.period = None
self.description = None
raise GeneratePayrollError(
gettext('staff_payroll_co.msg_period_without_contract', employee=self.employee.party.name)
)
if not self.description and self.period:
self.description = self.period.description
def adjust_partial_sunday(self, quantity):
# Factor = 8 hour sunday / 6 days (monday-saturday)
factor = 1.33
if self.is_first_payroll():
delta_days = (self.end - self.start).days
_sunday = False
for dd in range(delta_days):
next_day = self.start + timedelta(days=dd)
if next_day.weekday() == 6:
_sunday = True
break
if _sunday:
fix_qty = self.start.weekday() * factor
quantity = Decimal(str(round(quantity - fix_qty, 2)))
# FIXME
# if self.is_last_payroll:
# pass
return quantity
def _get_line_quantity(self, quantity_days, wage, extras, discount):
Configuration = Pool().get('staff.configuration')
config = Configuration(1)
quantity_days = self.get_days(self.start, self.end, wage)
quantity = super(Payroll, self)._get_line_quantity(
quantity_days, wage, extras, discount
)
if config.payment_partial_sunday and wage.type_concept == 'salary':
quantity = self.adjust_partial_sunday(quantity)
return quantity
@fields.depends('start', 'end', 'employee', 'period', 'contract')
def get_days(self, start, end, wage=None):
adjust = 1
adjust_days_worked = False
if self.contract and self.contract.position:
adjust_days_worked = self.contract.position.adjust_days_worked
else:
if self.employee.position:
adjust_days_worked = self.employee.position.adjust_days_worked
if (adjust_days_worked) or (wage and wage.adjust_days_worked):
if end.day == 31 or (end.month == 2 and \
(end.day == 28 or end.day == 29)):
adjust = 31 - end.day
quantity_days = (end - start).days + adjust
if quantity_days < 0:
quantity_days = 0
if start == end:
quantity_days = 1
if Transaction().context.get('absenteeism_days'):
quantity_days = quantity_days - Transaction().context.get('absenteeism_days')
return quantity_days
def get_salary_full(self, wage):
res = super(Payroll, self).get_salary_full(wage)
salary_pae = _ZERO
salary_full = res['salary']
if wage.month_application:
if self._is_last_payroll_month():
salary_full_month = self.search_salary_month(wage)
salary_full = salary_full_month + salary_pae
elif self.last_payroll:
salary_full = salary_full + salary_pae
else:
salary_full = _ZERO
else:
salary_full += salary_pae
salary_full = self.validate_minimal_amount(wage, salary_full)
return {'salary': salary_full}
def validate_minimal_amount(self, wage, amount):
if wage.minimal_amount:
if amount < wage.minimal_amount:
return 0
return amount
def is_first_payroll(self):
if self.start <= self.contract.start_date <= self.end:
return True
return False
def _is_last_payroll_month(self):
_, end_day = calendar.monthrange(self.start.year, self.start.month)
last_day = date(self.start.year, self.start.month, end_day)
if last_day == self.period.end:
return True
return False
def _compute_apply_special_salary(self):
MandatoryWages = Pool().get('staff.payroll.mandatory_wage')
mandatory_wages = MandatoryWages.search([
('employee', '=', self.employee.id),
('wage_type.apply_special_salary', '=', True),
])
salary_plus = _ZERO
wages = [w.wage_type for w in mandatory_wages]
for wage in wages:
dom_payroll = [('employee', '=', self.employee.id)]
if wage.type_concept == 'bonus_service':
if self.start < date(self.start.year, 7, 1):
dom_payroll.extend([
('start', '>=', date(self.start.year, 1, 1)),
('start', '<=', date(self.start.year, 6, 30)),
])
else:
dom_payroll.extend([
('start', '>=', date(self.start.year, 7, 1)),
('start', '<=', date(self.start.year, 12, 31)),
])
else:
dom_payroll.extend([
('start', '>=', date(self.start.year, 1, 1)),
('start', '<=', date(self.start.year, 12, 31)),
])
payrolls = self.search(dom_payroll)
worked_days = Decimal(sum([p.worked_days for p in payrolls]))
precomputed_salary = {'salary': worked_days * self.employee.salary_day}
salary_plus += wage.compute_formula(
'unit_price_formula',
precomputed_salary)
return salary_plus
def _get_payrolls_month(self):
_, end_day = calendar.monthrange(self.start.year, self.start.month)
start = date(self.start.year, self.start.month, 1)
end = date(self.start.year, self.start.month, end_day)
payrolls = self.search([
('employee', '=', self.employee.id),
('start', '>=', start),
('start', '<=', end),
])
return payrolls
def _get_payrolls_period(self, start_date, end_date):
_, end_day = calendar.monthrange(end_date.year, end_date.month)
start = date(start_date.year, start_date.month, 1)
end = date(end_date.year, end_date.month, end_day)
payrolls = self.search([
('employee', '=', self.employee.id),
('start', '>=', start),
('start', '<=', end),
('contract', '=', self.contract.id)
])
return payrolls
def _create_payroll_lines(self, wages, extras, discounts=None):
super(Payroll, self)._create_payroll_lines(wages, extras, discounts=None)
pool = Pool()
MoveLine = pool.get('account.move.line')
PayrollLine = pool.get('staff.payroll.line')
for line in self.lines:
if line.wage_type.provision_cancellation:
amount_line = [m.amount for m in self.lines if m.wage_type == line.wage_type.provision_cancellation]
move_lines = MoveLine.search([
('account', '=', line.wage_type.provision_cancellation.credit_account.id),
('reconciliation', '=', None),
('party', '=', self.employee.party.id)
])
lines_to_reconcile = []
values = []
if line.wage_type.type_concept == 'holidays':
unit_value = line.unit_value
amount = line.amount
for m in move_lines:
values.append(abs(m.debit - m.credit))
if sum(values) <= amount:
lines_to_reconcile.append(m.id)
else:
for m in move_lines:
values.append(abs(m.debit - m.credit))
lines_to_reconcile.append(m.id)
amount = sum(values) + sum(amount_line)
unit_value = amount
PayrollLine.write([line], {
'unit_value': unit_value,
'amount': amount,
'move_lines': [('add', lines_to_reconcile)]
})
def search_salary_month(self, wage):
res = _ZERO
payrolls = self._get_payrolls_month()
if payrolls:
for payroll in payrolls:
res += payroll.compute_salary_full(wage)
return res
def get_round_amount(self, wage, unit_value):
if wage.round_amounts == 'above_amount':
unit_value = math.ceil(float(unit_value) / 100.0) * 100
elif wage.round_amounts == 'under_amount':
unit_value = math.floor(float(unit_value) / 100.0) * 100
elif wage.round_amounts == 'automatic':
unit_value = round(unit_value, -2)
return unit_value
def _validate_amount_wage(self, wage, amount):
amount = super(Payroll, self)._validate_amount_wage(wage, amount)
if amount and wage and wage.round_amounts:
amount = self.get_round_amount(wage, amount)
return amount
def get_line(self, wage, qty, unit_value, party=None):
if unit_value and wage and wage.round_amounts:
unit_value = self.get_round_amount(wage, unit_value)
res = super(Payroll, self).get_line(wage, qty, unit_value, party)
return res
def set_preliquidation(self, extras, discounts=None):
discounts = self.set_events()
ctx = {
'absenteeism_days': self.absenteeism_days
}
with Transaction().set_context(ctx):
super(Payroll, self).set_preliquidation(extras, discounts)
self.save()
self.update_wage_no_salary()
def set_events(self):
pool = Pool()
Event = pool.get('staff.event')
PayrollLine = pool.get('staff.payroll.line')
events = Event.search([
('employee', '=', self.employee),
('start_date', '<=', self.end),
('end_date', '>=', self.start),
('state', '=', 'done'),
])
absenteeism_days = 0
discounts = {}
for event in events:
if not event.category.payroll_effect and event.line_payroll:
continue
if event.absenteeism:
absenteeism_days += event.days
if event.quantity_pay:
qty_pay = event.quantity_pay
wage = event.category.wage_type
if wage:
salary_args = self.get_salary_full(wage)
if event.amount_to_pay:
unit_value = Decimal(event.amount_to_pay)
else:
if event.is_vacations:
unit_value = self.get_salary_average(wage)
else:
unit_value = wage.compute_unit_price(salary_args)
res = self.get_line(wage, qty_pay, unit_value)
lines = PayrollLine.create([res])
event.line_payroll = lines[0]
if event.category.wage_type_discount and event.quantity_discount:
id_wt_event = event.category.wage_type_discount.id
if id_wt_event not in discounts.keys():
discounts[id_wt_event] = 0
discounts[id_wt_event] += event.quantity_discount
self.absenteeism_days = absenteeism_days
self.save()
return discounts
def get_salary_average(self, wage):
end_date = self.start
start_date_contract = self.contract.start_date
start_date = end_date + relativedelta(months=-4)
if start_date <= start_date_contract:
start_date = start_date_contract
next_date = date(start_date.year, start_date.month, 1)
average_days_monthly = []
while next_date < end_date:
_, end_day = calendar.monthrange(next_date.year, next_date.month)
payrolls = self._get_payrolls_period(next_date, next_date)
salary_base = 0
worked_days = 0
if payrolls:
for payroll in payrolls:
salary_base += payroll.compute_salary_full(wage)
worked_days += payroll.worked_days
if end_day == worked_days:
worked_days = 30
average_day = salary_base / worked_days
average_days_monthly.append(average_day)
next_date += relativedelta(months=1)
res = sum(average_days_monthly)/len(average_days_monthly)
# if len(average_days_monthly) != len(set(average_days_monthly)):
# res = sum(average_days_monthly)/len(average_days_monthly) * 30
# else:
# res = sum(average_days_monthly)/len(average_days_monthly) * 30
return res
def update_wage_no_salary(self):
PayrollLine = Pool().get('staff.payroll.line')
for line in self.lines:
if line.wage_type.month_application:
salary_args = self.get_salary_full(line.wage_type)
unit_value = line.wage_type.compute_unit_price(salary_args)
if line.wage_type.type_concept == 'interest':
unit_value = self._compute_interest(line.wage_type, self.start)
elif line.wage_type.type_concept == 'tax':
unit_value = self._compute_line_tax(line)
else:
continue
PayrollLine.write([line], {'unit_value': unit_value})
def _get_payrolls_contract(self):
dom_payroll = [('employee', '=', self.employee.id)]
if not self.contract:
contract = self.search_contract_on_period()
else:
contract = self.contract
dom_payroll.append(
('start', '>=', contract.start_date),
)
if contract.end_date:
dom_payroll.append(
('start', '<=', contract.end_date)
)
payrolls = self.search(dom_payroll)
return [p.id for p in payrolls]
def _compute_interest(self, wage_type, limit_date=False):
start = date(self.start.year, 1, 1)
end = date(self.start.year, 12, 31)
concepts_salary_ids = [wt.id for wt in wage_type.concepts_salary]
if self.contract.start_date > start:
start = self.contract.start_date
dom_payrolls = [
('start', '>=', start),
('contract', '=', self.contract.id),
('employee', '=', self.employee.id),
]
if limit_date:
dom_payrolls.append(('start', '<=', limit_date))
else:
dom_payrolls.append(('start', '<=', end))
payrolls = self.search(dom_payrolls)
payrolls_ids = [p.id for p in payrolls]
time_contracting = sum([p.worked_days for p in payrolls])
salary_hoard = float(self._compute_hoard(
payrolls_ids,
concepts_salary_ids
))
total_interest = salary_hoard * time_contracting * 0.01 / 360
if self.id in payrolls_ids:
payrolls_ids.remove(self.id)
interest_hoard = float(self._compute_hoard(
payrolls_ids,
[wage_type.id]
))
interest = total_interest - interest_hoard
if interest < _ZERO:
interest = _ZERO
return self.currency.round(Decimal(interest))
def _compute_hoard(self, payrolls_ids, wages_ids):
if not payrolls_ids:
return _ZERO
PayrollLine = Pool().get('staff.payroll.line')
lines = PayrollLine.search([
('wage_type', 'in', wages_ids),
('payroll', 'in', payrolls_ids),
])
return sum([l.amount for l in lines if not l.reconciled])
def recompute_lines(self):
super(Payroll, self).recompute_lines()
self.update_wage_no_salary()
line_tax = None
deductions = _ZERO
for line in self.lines:
if line.wage_type.provision_cancellation:
amount_line = [m.amount for m in self.lines if m.wage_type == line.wage_type.provision_cancellation]
values = []
for m in line.move_lines:
values.append(abs(m.debit - m.credit))
amount = sum(values) + sum(amount_line)
line.write([line], {
'unit_value': amount,
'amount': amount,
})
if line.wage_type.definition == 'deduction':
deductions += line.amount
if line.wage_type.type_concept == 'tax':
line_tax = line
if line_tax:
unit_value = self._compute_line_tax(line_tax)
line_tax.write([line_tax], {
'unit_value': unit_value, 'quantity': 1
})
def check_limit(self, base, field, value_field):
Configuration = Pool().get('staff.configuration')
configuration = Configuration(1)
if not configuration.uvt_value:
return 0
uvt_config = configuration.uvt_value
lim_percent = LIM_PERCENT_DEDUCTIBLE[field] if field in LIM_PERCENT_DEDUCTIBLE.keys() else None
lim_uvt = LIM_UVT_DEDUCTIBLE[field] if field in LIM_UVT_DEDUCTIBLE.keys() else None
res = value_field
if lim_uvt:
value_limit_uvt = uvt_config * lim_uvt
if value_field > value_limit_uvt:
res = value_limit_uvt
if lim_percent:
value_limit_percent = base * lim_percent / 100
if value_field > value_limit_percent:
res = value_limit_percent
return res
def _compute_line_tax(self, line_tax):
UvtWithholding = Pool().get('staff.payroll.uvt_withholding')
salary_full = self.get_salary_full(line_tax.wage_type)
amount_tax = line_tax.amount if line_tax.amount else 0
deductions_month = self.get_deductions_month() - amount_tax
salary_full = salary_full['salary']
if self.last_payroll:
payrolls_ids = self._get_payrolls_contract()
# This apply in liquidation document
# WageType = Pool().get('staff.wage_type')
# wage_types = WageType.search([
# ('type_concept', '=', 'holidays')
# ])
# holidays_ids = [w.id for w in wage_types]
# hoard_holidays = self._compute_hoard(
# payrolls_ids,
# holidays_ids
# )
# salary_full += hoard_holidays
deductions_fields = ['fvp_ind', 'health_prepaid', 'fvp', 'afc', 'other_income', 'housing_interest', 'dependents']
deductions_values = {
'fvp_ind': 0,
'health_prepaid': 0,
'fvp_afc': 0,
'other_income': 0,
'housing_interest': 0,
'dependents': 0
}
for field in deductions_fields:
value = getattr(self.employee, field) if getattr(self.employee, field) else 0
if field not in ('fvp', 'afc'):
deductions_values[field] = value
elif field in ('fvp', 'afc'):
deductions_values['fvp_afc'] += value
value = 0
for k, v in deductions_values.items():
res = self.check_limit(salary_full, k, v)
deductions_values[k] = res
base_salary_withholding = salary_full - deductions_month - deductions_values['fvp_ind']
deductions_values.pop('fvp_ind')
deductions_renta = sum(v for v in deductions_values.values())
lim_rete_deductions = base_salary_withholding * LIM_PERCENT_DEDUCTIBLE['renta_deductions'] /100
if deductions_renta >= lim_rete_deductions:
deductions_renta = lim_rete_deductions
base_c = base_salary_withholding - deductions_renta
renta25c = self.check_limit(None, 'exempte_income', base_c * LIM_PERCENT_DEDUCTIBLE['exempted_income']/100)
deductions_renta_renta25c = deductions_renta + renta25c
if deductions_renta_renta25c > lim_rete_deductions:
deductions_renta_renta25c = lim_rete_deductions
base_salary_withholding -= deductions_renta
unit_value = UvtWithholding.compute_withholding(
base_salary_withholding)
unit_value = self.currency.round(Decimal(unit_value))
return unit_value
def get_non_fiscal_amount(self, name=None):
res = _ZERO
concept = name[:-7]
for line in self.lines:
if line.wage_type.type_concept == concept:
res += line.amount
return res
def get_deductions_month(self):
payrolls = self._get_payrolls_month()
sum_deductions = _ZERO
for payroll in payrolls:
for line in payroll.lines:
if line.wage_type.definition == 'deduction':
sum_deductions += line.amount
return sum_deductions
@fields.depends('lines')
def on_change_with_ibc(self, name=None):
pool = Pool()
WageType = pool.get('staff.wage_type')
wage_types = WageType.search([('salary_constitute', '=', True)])
concepts = list(set([wage.type_concept for wage in wage_types]))
# concepts = [
# 'salary', 'extras', 'bonus', 'commission',
# 'bonus_service', 'advance'
# ]
res = _ZERO
for l in self.lines:
if l.wage_type and l.wage_type.type_concept in concepts and \
l.wage_type.definition == 'payment' and \
l.wage_type.salary_constitute is True:
res += l.amount
return res
@classmethod
@Workflow.transition('draft')
def draft(cls, records):
Move = Pool().get('account.move')
for payroll in records:
if payroll.move:
Move.draft([payroll.move.id])
#Move.delete([payroll.move])
def send_payroll_email(self):
pool = Pool()
config = pool.get('staff.configuration')(1)
Template = pool.get('email.template')
template = config.template_email_confirm
if template:
response = Template.send(template, self, self.employee.party.email)
# Template.send(template, self, self.employee.party.email)
if response and response.status_code == 202:
self.write([self], {'sended_mail': True})
else:
raise UserError('No mail sent, check employee email', employee=self.employee.party.name)
else:
raise MissingTemplateEmailPayroll(
gettext('staff_payroll_co.msg_template_not_exist'))
class PayrollSupportDispersionStart(ModelView):
'Payroll Support Dispersion'
__name__ = 'staff.payroll_support_dispersion.start'
company = fields.Many2One('company.company', 'Company', required=True)
department = fields.Many2One('company.department', 'Department')
period = fields.Many2One('staff.payroll.period', 'Start Period', required=True)
@staticmethod
def default_company():
return Transaction().context.get('company')
class PayrollSupportDispersion(Wizard):
'Payroll Support Dispersion'
__name__ = 'staff.payroll.support_dispersion'
start = StateView('staff.payroll_support_dispersion.start',
'staff_payroll_co.payroll_support_dispersion_start_view_form', [
Button('Cancel', 'end', 'tryton-cancel'),
Button('Send', 'open_', 'tryton-ok', default=True),
])
open_ = StateTransition()
def transition_open_(self):
pool = Pool()
Payroll = pool.get('staff.payroll')
dom = [
('company', '=', self.start.company.id),
('period', '=', self.start.period.id),
('state', 'in', ['processed', 'posted']),
('sended_mail', '=', False)
]
if self.start.department:
dom.append(('department', '=', self.start.department.id))
payrolls_period = Payroll.search(dom)
no_email = []
y_email = []
for payroll in payrolls_period:
email = payroll.employee.party.email
if email:
payroll.send_payroll_email()
y_email.append(payroll.employee.party.full_name,)
else:
no_email.append(payroll.employee.party.full_name,)
return 'end'
class PayrollGlobalStart(ModelView):
'Payroll Global Start'
__name__ = 'staff.payroll_global.start'
start_period = fields.Many2One('staff.payroll.period',
'Start Period', required=True)
end_period = fields.Many2One('staff.payroll.period', 'End Period',
depends=['start_period'])
company = fields.Many2One('company.company', 'Company', required=True)
department = fields.Many2One('company.department', 'Department')
include_finished = fields.Boolean('Include Finished Contract')
@staticmethod
def default_company():
return Transaction().context.get('company')
@fields.depends('start_period')
def on_change_with_end_period(self, name=None):
if self.start_period:
return self.start_period.id
class PayrollGlobal(Wizard):
'Payroll Global'
__name__ = 'staff.payroll.global'
start = StateView('staff.payroll_global.start',
'staff_payroll_co.payroll_global_start_view_form', [
Button('Cancel', 'end', 'tryton-cancel'),
Button('Print', 'print_', 'tryton-ok', default=True),
])
print_ = StateReport('staff.payroll.global_report')
def do_print_(self, action):
end_period_id = None
department_id = None
if self.start.end_period:
end_period_id = self.start.end_period.id
if self.start.department:
department_id = self.start.department.id
data = {
'ids': [],
'company': self.start.company.id,
'start_period': self.start.start_period.id,
'end_period': end_period_id,
'department': department_id,
'include_finished': self.start.include_finished,
}
return action, data
def transition_print_(self):
return 'end'
class PayrollGlobalReport(Report):
__name__ = 'staff.payroll.global_report'
@classmethod
def get_domain_payroll(cls, data):
dom_payroll = []
return dom_payroll
@classmethod
def get_context(cls, records, header, data):
report_context = super().get_context(records, header, data)
pool = Pool()
user = pool.get('res.user')(Transaction().user)
Payroll = pool.get('staff.payroll')
Period = pool.get('staff.payroll.period')
Department = pool.get('company.department')
start_period = Period(data['start_period'])
dom_periods = []
if data['end_period']:
end_period = Period(data['end_period'])
dom_periods.extend([
('start', '>=', start_period.start),
('end', '<=', end_period.end),
])
else:
dom_periods.append(
('id', '=', start_period.id)
)
periods = Period.search(dom_periods)
dom_pay = cls.get_domain_payroll(data)
dom_pay.append(
('period', 'in', [p.id for p in periods]),
)
dom_pay.append(
('state', 'in', ['processed', 'posted', 'draft']),
)
if data['department']:
dom_pay.append(
['AND', ['OR', [
('employee.department', '=', data['department']),
('department', '=', None),
], [
('department', '=', data['department']),
],
]]
)
department = Department(data['department']).name
else:
department = None
if not data['include_finished']:
dom_pay.append(
('contract.state', '!=', 'finished')
)
payrolls = Payroll.search(dom_pay)
periods_number = len(periods)
default_vals = cls.default_values()
sum_gross_payments = []
sum_total_deductions = []
sum_net_payment = []
parties = {}
payments = ['salary', 'transport', 'extras', 'food', 'bonus']
deductions = [
'health', 'retirement', 'tax', 'syndicate',
'fsp', 'acquired_product'
]
for payroll in payrolls:
employee_id = payroll.employee.id
party_health = ''
party_retirement = ''
if employee_id not in parties.keys():
position_employee = payroll.employee.position.name if payroll.employee.position else ''
position_contract = payroll.contract.position.name if payroll.contract and payroll.contract.position else ''
parties[employee_id] = default_vals.copy()
parties[employee_id]['employee_code'] = payroll.employee.code
parties[employee_id]['employee'] = payroll.employee.party.name
parties[employee_id]['employee_id_number'] = payroll.employee.party.id_number
if payroll.employee.party_health:
party_health = payroll.employee.party_health.name
if payroll.employee.party_retirement:
party_retirement = payroll.employee.party_retirement.name
parties[employee_id]['party_health'] = party_health
parties[employee_id]['party_retirement'] = party_retirement
parties[employee_id]['basic_salary'] = payroll.contract.get_salary_in_date(payroll.end)
parties[employee_id]['employee_position'] = position_contract or position_employee or ''
for line in payroll.lines:
if line.wage_type.type_concept in (payments + deductions):
concept = line.wage_type.type_concept
else:
if line.wage_type.definition == 'payment' and line.wage_type.receipt:
concept = 'others_payments'
elif line.wage_type.definition == 'deduction' or \
line.wage_type.definition == 'discount' and \
line.wage_type.receipt:
concept = 'others_deductions'
else:
concept = line.wage_type.type_concept
if not concept:
raise WageTypeConceptError(
gettext('staff_payroll_co.msg_type_concept_not_exists', s=line.wage_type.name))
parties[employee_id][concept] += line.amount
parties[employee_id]['worked_days'] += payroll.worked_days
parties[employee_id]['gross_payments'] += payroll.gross_payments
parties[employee_id]['total_deductions'] += payroll.total_deductions
parties[employee_id]['net_payment'] += payroll.net_payment
sum_gross_payments.append(payroll.gross_payments)
sum_total_deductions.append(payroll.total_deductions)
sum_net_payment.append(payroll.net_payment)
employee_dict = {e['employee']: e for e in parties.values()}
report_context['records'] = sorted(employee_dict.items(), key=lambda t: t[0])
report_context['department'] = department
report_context['periods_number'] = periods_number
report_context['start_period'] = start_period
report_context['end_period'] = end_period
report_context['company'] = user.company
report_context['user'] = user
report_context['sum_gross_payments'] = sum(sum_gross_payments)
report_context['sum_net_payment'] = sum(sum_net_payment)
report_context['sum_total_deductions'] = sum(sum_total_deductions)
return report_context
@classmethod
def default_values(cls):
WageType = Pool().get('staff.wage_type')
fields_string = [
'employee_code', 'employee', 'employee_salary',
'employee_id_number', 'party_health', 'party_retirement',
'basic_salary',
]
fields_numeric = [
'net_payment', 'worked_days', 'gross_payments', 'total_deductions',
'others_payments', 'others_deductions'
]
lines_fields = dict(WageType.type_concept.selection).keys()
if '' in lines_fields:
lines_fields.remove('')
default_values = {}
for field in fields_string:
default_values.setdefault(field, None)
for field in fields_numeric:
default_values.setdefault(field, Decimal(0))
for field in lines_fields:
default_values.setdefault(field, Decimal(0))
return default_values
class PayrollPaymentStart(ModelView):
'Payroll Payment Start'
__name__ = 'staff.payroll_payment.start'
period = fields.Many2One('staff.payroll.period', 'Period',
required=True)
company = fields.Many2One('company.company', 'Company',
required=True)
department = fields.Many2One('company.department', 'Department')
@staticmethod
def default_company():
return Transaction().context.get('company')
class PayrollPayment(Wizard):
'Payroll Payment'
__name__ = 'staff.payroll.payment'
start = StateView('staff.payroll_payment.start',
'staff_payroll_co.payroll_payment_start_view_form', [
Button('Cancel', 'end', 'tryton-cancel'),
Button('Print', 'print_', 'tryton-ok', default=True),
])
print_ = StateReport('staff.payroll.payment_report')
def do_print_(self, action):
period = None
department_id = None
if self.start.department:
department_id = self.start.department.id
if self.start.period:
period = self.start.period.id
data = {
'ids': [],
'company': self.start.company.id,
'period': period,
'department': department_id,
}
return action, data
def transition_print_(self):
return 'end'
class PayrollPaymentReport(Report):
__name__ = 'staff.payroll.payment_report'
@classmethod
def get_context(cls, records, header, data):
report_context = super().get_context(records, header, data)
user = Pool().get('res.user')(Transaction().user)
Payroll = Pool().get('staff.payroll')
Period = Pool().get('staff.payroll.period')
Department = Pool().get('company.department')
clause = []
start_date = None
end_date = None
period = None
period_name = None
department = None
if data['period']:
period = Period(data['period'])
start_date = period.start
end_date = period.end
clause = [('period', '=', period)]
period_name = period.name
if data['department']:
clause.append(('employee.department', '=', data['department']))
department = Department(data['department']).name
payrolls = Payroll.search(clause)
new_objects = []
sum_net_payment = []
values = {}
for payroll in payrolls:
values = values.copy()
values['employee'] = payroll.employee.party.name
values['employee_id_number'] = payroll.employee.party.id_number
values['bank_name'] = payroll.employee.party.bank_name
values['bank_account'] = payroll.employee.party.bank_account
values['type_account'] = payroll.employee.party.bank_account_type
values['net_payment'] = payroll.net_payment
sum_net_payment.append(payroll.net_payment)
new_objects.append(values)
report_context['records'] = new_objects
report_context['department'] = department
report_context['start_date'] = start_date
report_context['end_date'] = end_date
report_context['period'] = period_name
report_context['company'] = user.company
report_context['user'] = user
report_context['sum_net_payment'] = sum(sum_net_payment)
return report_context
class PayrollPaycheckStart(ModelView):
'Payroll Paycheck Start'
__name__ = 'staff.payroll_paycheck.start'
periods = fields.One2Many('staff.payroll.period', None,
'Periods', add_remove=[], required=True)
company = fields.Many2One('company.company', 'Company', required=True)
department = fields.Many2One('company.department', 'Department')
values_without_move = fields.Boolean('Values Without Move')
@staticmethod
def default_company():
return Transaction().context.get('company')
class PayrollPaycheck(Wizard):
'Payroll Paycheck'
__name__ = 'staff.payroll.paycheck'
start = StateView('staff.payroll_paycheck.start',
'staff_payroll_co.payroll_paycheck_start_view_form', [
Button('Cancel', 'end', 'tryton-cancel'),
Button('Print', 'print_', 'tryton-ok', default=True),
])
print_ = StateReport('staff.payroll.paycheck_report')
def do_print_(self, action):
department_id = None
if self.start.department:
department_id = self.start.department.id
periods = [p.id for p in self.start.periods]
data = {
'ids': [],
'company': self.start.company.id,
'periods': periods,
'department': department_id,
'values_without_move': self.start.values_without_move,
}
return action, data
def transition_print_(self):
return 'end'
class PayrollPaycheckReport(Report):
__name__ = 'staff.payroll.paycheck_report'
@classmethod
def get_domain_payroll(cls, data):
dom_payroll = [
('period', 'in', data['periods']),
('state', '!=', 'draft'),
]
if data['department']:
dom_payroll.append(
('employee.department', '=', data['department'])
)
return dom_payroll
@classmethod
def get_context(cls, records, header, data):
report_context = super().get_context(records, header, data)
pool = Pool()
Payroll = pool.get('staff.payroll')
Period = pool.get('staff.payroll.period')
Company = pool.get('company.company')
dom_payroll = cls.get_domain_payroll(data)
payrolls = Payroll.search(dom_payroll)
res = {}
today = date.today()
periods = [p.name for p in Period.browse(data['periods'])]
total = []
for payroll in payrolls:
employee_id = payroll.employee.id
health_amount = cls.get_amount_move(payroll, 'health')
retirement_amount = cls.get_amount_move(payroll, 'retirement')
risk_amount = cls.get_amount_move(payroll, 'risk')
box_family_amount = cls.get_amount_move(payroll, 'box_family')
# fsp_amount = cls.get_amount_move(payroll, 'fsp')
salary = payroll.contract.salary
worked_days = payroll.worked_days
if data['values_without_move']:
health_amount = cls._values_without_move(payroll, 'health')
retirement_amount = cls._values_without_move(payroll, 'retirement')
risk_amount = cls._values_without_move(payroll, 'risk')
box_family_amount = cls._values_without_move(payroll, 'box_family')
salary_payroll_amount = cls._values_without_move(payroll, 'salary')
fsp_amount = cls._values_without_move(payroll, 'fsp')
# retirement_amount = cls._values_without_move(payroll, 'retirement')
subtotal = sum([health_amount, retirement_amount, risk_amount, box_family_amount, fsp_amount])
if employee_id not in res.keys():
eps_code = afp_code = arl_code = box_code = ''
eps_name = afp_name = arl_name = box_name = ''
if payroll.employee.party_health:
eps_code = payroll.employee.party_health.legal_code
eps_name = payroll.employee.party_health.name
if payroll.employee.party_retirement:
afp_code = payroll.employee.party_retirement.legal_code
afp_name = payroll.employee.party_retirement.name
if payroll.employee.party_risk:
arl_code = payroll.employee.party_risk.legal_code
arl_name = payroll.employee.party_risk.name
if payroll.employee.party_box_family:
box_code = payroll.employee.party_box_family.legal_code
box_name = payroll.employee.party_box_family.name
res[employee_id] = {
'type_contributor': '01',
'type_id': 'CC',
'id_number': payroll.employee.party.id_number,
'employee': payroll.employee.party.name,
'start_date': payroll.contract.start_date,
'end_date': payroll.contract.end_date,
'salary_base': salary,
'days': worked_days,
'type_affiliation': 'D',
'year': payroll.date_effective.year,
'date': today,
'eps_code': eps_code,
'eps_name': eps_name,
'afp_code': afp_code,
'afp_name': afp_name,
'arl_code': arl_code,
'arl_name': arl_name,
'box_code': box_code,
'box_name': box_name,
'ibc': payroll.ibc,
'eps_amount': health_amount,
'afp_amount': retirement_amount,
'fsp_amount': fsp_amount,
'arl_amount': risk_amount,
'box_amount': box_family_amount,
'salary_payroll_amount': salary_payroll_amount,
'subtotal': subtotal
}
else:
res[employee_id]['days'] += worked_days
res[employee_id]['eps_amount'] += health_amount
res[employee_id]['afp_amount'] += retirement_amount
res[employee_id]['fsp_amount'] += fsp_amount
res[employee_id]['arl_amount'] += risk_amount
res[employee_id]['box_amount'] += box_family_amount
res[employee_id]['salary_payroll_amount'] += salary_payroll_amount
res[employee_id]['ibc'] += payroll.ibc
res[employee_id]['subtotal'] += subtotal
total.append(subtotal)
report_context['total'] = sum(total)
report_context['records'] = res.values()
report_context['periods'] = periods
report_context['company'] = Company(data['company'])
return report_context
@classmethod
def _values_without_move(cls, payroll, kind):
res = 0
for line in payroll.lines:
if line.wage_type.type_concept == kind:
res += line.amount
return res
@classmethod
def get_amount_move(cls, payroll, kind):
res = _ZERO
party = None
if kind == 'health':
party = payroll.employee.party_health
elif kind == 'retirement':
party = payroll.employee.party_retirement
elif kind == 'risk':
party = payroll.employee.party_risk
else:
party = payroll.employee.party_box_family
if party and payroll.move:
for line in payroll.move.lines:
if line.party.id == party.id:
res = line.credit
break
return res
class PayrollSheetStart(ModelView):
'Payroll Sheet Start'
__name__ = 'staff.payroll.sheet.start'
periods = fields.One2Many('staff.payroll.period', None,
'Periods', add_remove=[], required=True)
company = fields.Many2One('company.company', 'Company', required=True)
@staticmethod
def default_company():
return Transaction().context.get('company')
class PayrollSheet(Wizard):
'Payroll Sheet'
__name__ = 'staff.payroll.sheet'
start = StateView('staff.payroll.sheet.start',
'staff_payroll_co.payroll_sheet_start_view_form', [
Button('Cancel', 'end', 'tryton-cancel'),
Button('Print', 'print_', 'tryton-ok', default=True),
])
print_ = StateReport('staff.payroll.sheet_report')
def do_print_(self, action):
periods = [p.id for p in self.start.periods]
data = {
'ids': [],
'company': self.start.company.id,
'periods': periods,
}
return action, data
def transition_print_(self):
return 'end'
class PayrollSheetReport(Report):
__name__ = 'staff.payroll.sheet_report'
@classmethod
def get_domain_payroll(cls, data):
dom_payroll = []
return dom_payroll
@classmethod
def get_context(cls, records, header, data):
report_context = super().get_context(records, header, data)
pool = Pool()
user = pool.get('res.user')(Transaction().user)
Payroll = pool.get('staff.payroll')
Period = pool.get('staff.payroll.period')
periods_names = ''
dom_payroll = cls.get_domain_payroll(data)
if data['periods']:
periods = Period.browse(data['periods'])
periods_names = [p.name + ' / ' for p in periods]
dom_payroll.append([('period', 'in', data['periods'])])
payrolls = Payroll.search(dom_payroll,
order=[('employee.party.name', 'ASC'), ('period.name', 'ASC')]
)
new_objects = []
default_vals = cls.default_values()
sum_gross_payments = []
sum_total_deductions = []
sum_net_payment = []
item = 0
for payroll in payrolls:
item += 1
values = default_vals.copy()
values['item'] = item
values['employee'] = payroll.employee.party.name
values['id_number'] = payroll.employee.party.id_number
position_name, position_contract = '', ''
if payroll.employee.position:
position_name = payroll.employee.position.name
if payroll.contract and payroll.contract.position:
position_contract = payroll.contract.position.name
values['position'] = position_contract or position_name
values['department'] = payroll.employee.department.name \
if payroll.employee.department else ''
values['company'] = user.company.party.name
values['legal_salary'] = payroll.contract.get_salary_in_date(
payroll.end)
values['period'] = payroll.period.name
salary_day_in_date = payroll.contract.get_salary_in_date(
payroll.end)/30
values['salary_day'] = salary_day_in_date
values['salary_hour'] = (salary_day_in_date / 8) if salary_day_in_date else 0
values['worked_days'] = payroll.worked_days
values['gross_payment'] = payroll.gross_payments
# Add compatibility with staff contracting
project = ""
if hasattr(payroll, 'project'):
if payroll.project:
project = payroll.project.name
if hasattr(payroll.employee, 'project_contract'):
if payroll.employee.project_contract and \
payroll.employee.project_contract.reference:
project = payroll.employee.project_contract.reference
values['project'] = project
values.update(cls._prepare_lines(payroll, values))
sum_gross_payments.append(payroll.gross_payments)
sum_total_deductions.append(payroll.total_deductions)
sum_net_payment.append(payroll.net_payment)
new_objects.append(values)
report_context['records'] = new_objects
report_context['periods'] = periods_names
report_context['company'] = user.company
report_context['user'] = user
report_context['sum_gross_payments'] = sum(sum_gross_payments)
report_context['sum_net_payment'] = sum(sum_net_payment)
report_context['sum_total_deductions'] = sum(sum_total_deductions)
return report_context
@classmethod
def default_values(cls):
fields_no_amount = [
'item',
'employee',
'id_number',
'position',
'legal_salary',
'salary_day',
'salary_hour',
'worked_days',
'period',
'department',
]
fields_amount = [
'salary',
'reco',
'recf',
'hedo',
'heno',
'dom',
'hedf',
'henf',
'cost_reco',
'cost_recf',
'cost_hedo',
'cost_heno',
'cost_dom',
'cost_hedf',
'cost_henf',
'bonus',
'total_extras',
'gross_payment',
'health',
'retirement',
'food',
'transport',
'fsp',
'retefuente',
'other_deduction',
'total_deduction',
'ibc',
'net_payment',
'box_family',
'box_family',
'unemployment',
'interest',
'holidays',
'bonus_service',
'discount',
'other',
'total_benefit',
'risk',
'health_provision',
'retirement_provision',
'total_ssi',
'total_cost',
'sena',
'icbf',
'acquired_product',
]
if '' in fields_no_amount:
fields_no_amount.remove('')
default_values = {}
for field in fields_no_amount:
default_values.setdefault(field, None)
for field in fields_amount:
default_values.setdefault(field, Decimal(0))
return default_values
@classmethod
def _prepare_lines(cls, payroll, vals):
extras = [
'reco',
'recf',
'hedo',
'heno',
'dom',
'hedf',
'henf',
]
for line in payroll.lines:
if line.wage_type.definition == 'payment':
if line.wage_type.type_concept == 'salary':
vals['salary'] += line.amount
elif line.wage_type.type_concept == 'extras':
vals['total_extras'] += line.amount
for e in extras:
if e.upper() in line.wage_type.name:
vals[e] += line.quantity
vals['cost_' + e] += line.amount
break
elif line.wage_type.type_concept == 'risk':
vals['risk'] += line.amount
elif line.wage_type.type_concept == 'box_family':
vals['box_family'] += line.amount
elif line.wage_type.type_concept == 'unemployment':
vals['unemployment'] += line.amount
elif line.wage_type.type_concept == 'interest':
vals['interest'] += line.amount
elif line.wage_type.type_concept == 'holidays':
vals['holidays'] += line.amount
elif line.wage_type.type_concept == 'bonus':
vals['bonus'] += line.amount
elif line.wage_type.type_concept == 'bonus_service':
vals['bonus_service'] += line.amount
elif line.wage_type.type_concept == 'transport':
vals['transport'] += line.amount
elif line.wage_type.type_concept == 'food':
vals['food'] += line.amount
elif line.wage_type.type_concept == 'sena':
vals['sena'] += line.amount
elif line.wage_type.type_concept == 'icbf':
vals['icbf'] += line.amount
elif line.wage_type.type_concept == 'acquired_product':
vals['acquired_product'] += line.amount
else:
print('Warning: Line no processed... ', line.wage_type.name)
vals['other'] += line.amount
elif line.wage_type.definition == 'deduction':
vals['total_deduction'] += line.amount
if line.wage_type.type_concept == 'health':
vals['health'] += line.amount
vals['health_provision'] += line.get_expense_amount()
elif line.wage_type.type_concept == 'retirement':
vals['retirement'] += line.amount
vals['retirement_provision'] += line.get_expense_amount()
else:
if line.wage_type.type_concept == 'fsp':
vals['fsp'] += line.amount
elif line.wage_type.type_concept == 'tax':
vals['retefuente'] += line.amount
else:
vals['other_deduction'] += line.amount
else:
vals['discount'] += line.amount
print('Warning: Line no processed... ', line.wage_type.name)
vals['gross_payment'] = sum([
vals['salary'], vals['total_extras'], vals['transport'],
vals['food'], vals['bonus']
])
vals['net_payment'] = vals['gross_payment'] - vals['total_deduction']
vals['ibc'] = vals['gross_payment']
vals['total_benefit'] = sum([
vals['unemployment'], vals['interest'], vals['holidays'],
vals['bonus_service']
])
vals['total_ssi'] = vals['retirement_provision'] + vals['risk']
vals['total_cost'] = sum([
vals['total_ssi'], vals['box_family'], vals['net_payment'],
vals['total_benefit']
])
return vals
class PayrollGroupStart(metaclass=PoolMeta):
__name__ = 'staff.payroll_group.start'
department = fields.Many2One('company.department', 'Department')
employees = fields.Many2Many('company.employee', None, None,
'Employees', domain=[
('active', '=', True),
])
class PayrollGroup(metaclass=PoolMeta):
__name__ = 'staff.payroll_group'
def get_employees_dom(self, employees_w_payroll):
dom = super(PayrollGroup, self).get_employees_dom(employees_w_payroll)
if self.start.department:
dom.append(('department', '=', self.start.department.id))
if self.start.employees:
employees_ids = [e.id for e in self.start.employees]
dom.append(('id', 'in', employees_ids))
return dom
class OpenPayrollByPeriodStart(ModelView):
'Open Payroll By Period Start'
__name__ = 'staff_payroll_co.open_payroll_by_period.start'
company = fields.Many2One('company.company', 'Company', required=True)
fiscalyear = fields.Many2One('account.fiscalyear', 'Fiscal Year',
required=True)
@staticmethod
def default_company():
return Transaction().context.get('company')
@staticmethod
def default_fiscalyear():
FiscalYear = Pool().get('account.fiscalyear')
return FiscalYear.find(
Transaction().context.get('company'), exception=False)
class OpenPayrollByPeriod(Wizard):
'Open Payroll By Period'
__name__ = 'staff_payroll_co.open_payroll_by_period'
start = StateView('staff_payroll_co.open_payroll_by_period.start',
'staff_payroll_co.open_payroll_by_period_start_view_form', [
Button('Cancel', 'end', 'tryton-cancel'),
Button('Print', 'open_', 'tryton-ok', default=True),
])
open_ = StateAction('staff_payroll_co.act_payroll_by_period_board')
def do_open_(self, action):
data = {
'company': self.start.company.id,
'fiscalyear': self.start.fiscalyear.id,
}
action['name'] += ' - %s' % self.start.fiscalyear.name
return action, data
def transition_open_(self):
return 'end'
class PayrollByPeriodDynamic(Period):
'Payroll By Period Dynamic'
__name__ = 'staff_payroll_co.payroll_by_period_dynamic'
payrolls = fields.Function(fields.One2Many('staff.payroll', None,
'Payrolls'), 'get_payrolls')
amount_net_payment = fields.Function(fields.Numeric('Amount Net Payment',
digits=(16, 2)), 'get_amount')
amount_total_cost = fields.Function(fields.Numeric('Amount Total Cost',
digits=(16, 2)), 'get_amount')
def get_amount(self, name=None):
res = []
method = name[7:]
for payroll in self.payrolls:
value = getattr(payroll, method)
res.append(value)
return sum(res)
def get_payrolls(self, name=None):
pool = Pool()
Payroll = pool.get('staff.payroll')
payrolls = Payroll.search([
('period', '=', self.id),
('state', 'in', ['processed', 'posted']),
])
return [i.id for i in payrolls]
class Exo2276Start(ModelView):
'Payroll Exo 2276 Start'
__name__ = 'staff.payroll_exo2276.start'
start_period = fields.Many2One('staff.payroll.period', 'Start Period',
required=True)
end_period = fields.Many2One('staff.payroll.period', 'End Period',
required=True)
company = fields.Many2One('company.company', 'Company', required=True)
@staticmethod
def default_company():
return Transaction().context.get('company')
class Exo2276(Wizard):
'Payroll Exo 2276'
__name__ = 'staff.payroll_exo2276'
start = StateView('staff.payroll_exo2276.start',
'staff_payroll_co.payroll_exo2276_start_view_form', [
Button('Cancel', 'end', 'tryton-cancel'),
Button('Print', 'print_', 'tryton-ok', default=True),
])
print_ = StateReport('staff.payroll_exo2276.report')
def do_print_(self, action):
data = {
'ids': [],
'company': self.start.company.id,
'start_period': self.start.start_period.start,
'end_period': self.start.end_period.end,
}
return action, data
def transition_print_(self):
return 'end'
class Exo2276Report(Report):
__name__ = 'staff.payroll_exo2276.report'
@classmethod
def get_domain_payroll(cls, data=None):
Period = Pool().get('staff.payroll.period')
periods = Period.search([
('start', '>=', data['start_period']),
('end', '<=', data['end_period']),
])
periods_ids = [p.id for p in periods]
domain = [
('period', 'in', periods_ids),
('state', '=', 'posted'),
]
return domain
@classmethod
def get_context(cls, records, data):
report_context = super(Exo2276Report, cls).get_context(records, data)
pool = Pool()
user = pool.get('res.user')(Transaction().user)
Payroll = pool.get('staff.payroll')
# LiquidationLine = pool.get('staff.liquidation.line')
MoveLine = pool.get('account.move.line')
WageType = pool.get('staff.wage_type')
domain_ = cls.get_domain_payroll(data)
payrolls = Payroll.search([domain_])
new_objects = {}
start_period = data['start_period']
end_period = data['end_period']
pag_wage = WageType.search_read([
('type_concept', 'in', ['unemployment', 'interest', 'holidays']),
], fields_names=['credit_account', 'type_concept'])
accounts_id = {'cesanpag': [], 'holidays': []}
for p in pag_wage:
if p['type_concept'] in ('unemployment', 'interest'):
accounts_id['cesanpag'].append(p['credit_account'])
else:
accounts_id['holidays'].append(p['credit_account'])
# accounts_id_c = ['cesanpagp':['credit_account'] for p in pag_wage]
# cesanpag_wage = WageType.search_read([
# ('type_concept', 'in', ['unemployment', 'interest']),
# ], fields_names=['credit_account'])
# accounts_id = [p['credit_account'] for p in cesanpag_wage]
index = 0
for payroll in payrolls:
index += 1
party = payroll.employee.party
if party.id in new_objects.keys():
continue
new_objects[party.id] = {
'index': index,
'type_document': party.type_document,
'id_number': party.id_number,
'first_family_name': party.first_family_name,
'second_family_name': party.second_family_name,
'first_name': party.first_name,
'second_name': party.second_name,
'addresses': party.addresses[0].street,
'department_code': party.department_code,
'city_code': party.city_code,
'country_code': party.country_code,
'email': party.email,
'payments': 0,
'cesanpag': 0,
'interest': 0,
'holidays': 0,
'bonus': 0,
'bonus_service': 0,
'transport': 0,
'food': 0,
'total_deduction': 0,
'health': 0,
'retirement': 0,
'fsp': 0,
'retefuente': 0,
'other_deduction': 0,
'discount': 0,
'other': 0,
'various': 0,
'salary': 0,
'extras': 0,
'box_family': 0,
'risk': 0,
'unemployment': 0,
'allowance': 0,
'syndicate': 0,
'commission': 0,
'total_benefit': 0,
'gross_payment': 0,
'_cesanpag': 0,
'others_payments': 0,
'total_retirement': 0,
'total_salary': 0,
}
new_objects[party.id] = cls._prepare_lines(payrolls, new_objects[party.id], party.id)
_cesanpag = 0
_holidays = 0
lines_liquid = MoveLine.search_read([
('party', '=', party.id),
('move.state', '=', 'posted'),
('move.date', '>=', data['start_period']),
('move.date', '<=', data['end_period']),
('account', 'in', accounts_id['cesanpag']+accounts_id['holidays']),
('debit', '>', 0),
], fields_names=['debit', 'move', 'account'])
if lines_liquid:
_cesanpag = sum([l['debit'] for l in lines_liquid if l['account'] in accounts_id['cesanpag']])
_holidays = sum([l['debit'] for l in lines_liquid if l['account'] in accounts_id['holidays']])
new_objects[party.id]['cesanpag'] = _cesanpag
new_objects[party.id]['holidays'] = _holidays
report_context['records'] = new_objects.values()
report_context['end_period'] = end_period
report_context['start_period'] = start_period
report_context['today'] = date.today()
report_context['company'] = user.company
return report_context
@classmethod
def _prepare_lines(cls, payrolls, vals, party_id):
payroll_ids = [payroll.id for payroll in payrolls]
Lines = Pool().get('staff.payroll.line')
lines = Lines.search([
('payroll', 'in', payroll_ids),
('payroll.employee.party', '=', party_id),
])
for line in lines:
if line.wage_type.definition == 'payment':
vals['payments'] += line.amount
if line.wage_type.type_concept == 'unemployment':
continue
# vals['unemployment'] += line.amount
elif line.wage_type.type_concept == 'interest':
continue
# vals['interest'] += line.amount
elif line.wage_type.type_concept == 'salary':
vals['salary'] += line.amount
elif line.wage_type.type_concept == 'commission':
vals['payments'] += line.amount
vals['commission'] += line.amount
elif line.wage_type.type_concept == 'allowance':
vals['payments'] += line.amount
vals['allowance'] += line.amount
elif line.wage_type.type_concept == 'extras':
vals['payments'] += line.amount
vals['extras'] += line.amount
elif line.wage_type.type_concept == 'holidays':
continue
# vals['holidays'] += line.amount
elif line.wage_type.type_concept == 'bonus':
vals['payments'] += line.amount
vals['bonus'] += line.amount
elif line.wage_type.type_concept == 'bonus_service':
vals['payments'] += line.amount
vals['bonus_service'] += line.amount
elif line.wage_type.type_concept == 'transport':
vals['payments'] += line.amount
vals['transport'] += line.amount
elif line.wage_type.type_concept == 'food':
vals['payments'] += line.amount
vals['food'] += line.amount
elif line.wage_type.type_concept == 'box_family':
vals['box_family'] += line.amount
elif line.wage_type.type_concept == 'risk':
vals['risk'] += line.amount
elif line.wage_type.type_concept == 'other':
vals['other'] += line.amount
else:
print('Warning: Line no processed... ', line.wage_type.type_concept, line.wage_type.name)
vals['various'] += line.amount
elif line.wage_type.definition == 'deduction':
vals['total_deduction'] += line.amount
if line.wage_type.type_concept == 'health':
vals['health'] += line.amount
elif line.wage_type.type_concept == 'retirement':
vals['retirement'] += line.amount
else:
if line.wage_type.type_concept == 'fsp':
vals['fsp'] += line.amount
elif line.wage_type.type_concept == 'tax':
vals['retefuente'] += line.amount
else:
vals['other_deduction'] += line.amount
else:
vals['discount'] += line.amount
print('Warning: Line no processed... ', line.wage_type.name)
# vals['cesanpag'] = vals['unemployment'] + vals['interest']
vals['others_payments'] = (vals['other'] + vals['commission'] +
vals['bonus'] + vals['allowance'] +
vals['various'] + vals['food'])
vals['total_benefit'] = vals['holidays'] + vals['bonus_service']
vals['total_retirement'] = vals['fsp'] + vals['retirement']
vals['total_salary'] = sum([
vals['salary'], vals['extras'], vals['transport']
])
return vals
# vals['unemployment'], vals['interest'], vals['holidays'],
# vals['bonus_service']
class IncomeWithholdingsStart(ModelView):
'Income Withholding Start'
__name__ = 'staff.payroll.income_withholdings.start'
fiscalyear = fields.Many2One('account.fiscalyear', 'Fiscal Year',
required=True)
company = fields.Many2One('company.company', 'Company', required=True)
employees = fields.Many2Many('company.employee', None, None, 'Employees')
@staticmethod
def default_company():
return Transaction().context.get('company')
@staticmethod
def default_fiscalyear():
FiscalYear = Pool().get('account.fiscalyear')
return FiscalYear.find(
Transaction().context.get('company'), exception=False)
class IncomeWithholdings(Wizard):
'Income Withholding'
__name__ = 'staff.payroll.income_withholdings'
start = StateView('staff.payroll.income_withholdings.start',
'staff_payroll_co.income_withholdings_start_form', [
Button('Cancel', 'end', 'tryton-cancel'),
Button('Print', 'print_', 'tryton-ok', default=True),
])
print_ = StateReport('staff.payroll.income_withholdings_report')
def do_print_(self, action):
employees = []
if self.start.employees:
employees = [e.id for e in self.start.employees]
data = {
'ids': [],
'company': self.start.company.id,
'start_period': self.start.fiscalyear.start_date,
'end_period': self.start.fiscalyear.end_date,
'employees': employees,
}
return action, data
def transition_print_(self):
return 'end'
class IncomeWithholdingsReport(Exo2276Report):
'Income Withholding Report'
__name__ = 'staff.payroll.income_withholdings_report'
@classmethod
def get_domain_payroll(cls, data=None):
domain_ = super(IncomeWithholdingsReport, cls).get_domain_payroll(data)
if data['employees']:
domain_.append(('employee', 'in', data['employees']))
return domain_
class ExportMovesReport(PayrollReport):
__name__ = 'staff_payroll.export_moves.report'
@classmethod
def get_context(cls, records, header, data):
report_context = super().get_context(records, header, data)
config = Pool().get('staff.configuration')(1)
prefix = ''
if config:
prefix = config.staff_payroll_sequence.prefix
# ids = Transaction().context['active_ids']
# moves = []
# for payroll in Payroll.browse(ids):
# if not payroll.move:
# continue
# moves.append(payroll.move)
# print(payroll.number)
report_context['prefix'] = prefix
return report_context
class PayrollExportStart(ModelView):
'Export Payroll Start'
__name__ = 'staff.payroll.export.start'
company = fields.Many2One('company.company', 'Company', required=True)
start_period = fields.Many2One('staff.payroll.period', 'Start Period',
required=True)
end_period = fields.Many2One('staff.payroll.period', 'End Period',
required=True)
department = fields.Many2One('company.department', 'Department',
required=False, depends=['employee'])
@staticmethod
def default_company():
return Transaction().context.get('company')
class PayrollExport(Wizard):
'Payroll Export'
__name__ = 'staff.payroll.export'
start = StateView('staff.payroll.export.start',
'staff_payroll_co.payroll_export_start_view_form', [
Button('Cancel', 'end', 'tryton-cancel'),
Button('Print', 'print_', 'tryton-ok', default=True),
])
print_ = StateReport('staff.payroll.export_report')
def do_print_(self, action):
department_id = self.start.department.id \
if self.start.department else None
data = {
'ids': [],
'company': self.start.company.id,
'start_period': self.start.start_period.id,
'end_period': self.start.end_period.id,
'department_id': department_id,
}
return action, data
def transition_print_(self):
return 'end'
class PayrollExportReport(Report):
__name__ = 'staff.payroll.export_report'
@classmethod
def get_domain_payroll(cls, data=None):
dom_payroll = []
return dom_payroll
@classmethod
def get_context(cls, records, header, data):
report_context = super().get_context(records, header, data)
pool = Pool()
company = pool.get('company.company')(data['company'])
Payroll = pool.get('staff.payroll')
Period = pool.get('staff.payroll.period')
dom_payroll = cls.get_domain_payroll()
start_period, = Period.search([('id', '=', data['start_period'])])
end_period, = Period.search([('id', '=', data['end_period'])])
dom_payroll.append([
('period.start', '>=', start_period.start),
('period.end', '<=', end_period.end),
('move', '!=', None)
])
if data['department_id'] not in (None, ''):
dom_payroll.append([
('employee.department', '=', data['department_id'])
])
payrolls = Payroll.search(dom_payroll, order=[('period.name', 'ASC')])
records = {}
for payroll in payrolls:
employee = payroll.employee
""" extract debit account and party mandatory_wages"""
accdb_party = {mw.wage_type.debit_account.id: mw.party
for mw in employee.mandatory_wages
if mw.wage_type.debit_account and mw.party}
move = payroll.move
accountdb_ids = accdb_party.keys()
for line in move.lines:
"""Check account code in dict account debit and party"""
if not line.party:
continue
line_ = {
'date': line.move.date,
'code': '---',
'party': employee.party.id_number,
'description': line.description,
'department': employee.department.name if employee.department else '---',
'amount': line.debit or line.credit,
'type': 'D',
}
if line.debit > 0:
if line.account.id in accountdb_ids:
id_number = accdb_party[line.account.id].id_number
else:
id_number = None
if id_number in ENTITY_ACCOUNTS.keys():
line_['code'] = ENTITY_ACCOUNTS[id_number][1]
else:
line_['code'] = line.account.code
else:
line_['type'] = 'C'
id_number = line.party.id_number
if id_number in ENTITY_ACCOUNTS.keys():
line_['code'] = ENTITY_ACCOUNTS[id_number][0]
else:
line_['code'] = line.account.code
if line.account.code not in records.keys():
records[line.account.code] = {
'name': line.account.name,
'lines': []
}
records[line.account.code]['lines'].append(line_)
report_context['records'] = records
report_context['start_date'] = start_period.name
report_context['end_date'] = end_period.name
report_context['company'] = company.party.name
return report_context
class PayrollFix(Wizard):
'Payroll Fix'
__name__ = 'staff.payroll.fix'
start_state = 'fix'
fix = StateTransition()
def transition_fix(self):
context_ids = Transaction().context.get('active_ids')
pool = Pool()
Payroll = pool.get('staff.payroll')
MoveLine = pool.get('account.move.line')
payrolls = Payroll.search([
('state', '=', 'posted'),
('id', 'in', context_ids),
('move.state', '=', 'draft'),
])
for p in payrolls:
credit = 0
# p.move.state = 'draft'
# p.move.save()
fixed = True
for line in p.move.lines:
if line.account.id in (578, 16955):
credit += line.credit
if line.account.id == 578:
# MoveLine.delete([line])
fixed = False
else:
to_change = line
if not fixed:
to_change.credit = credit
to_change.save()
# p.move.state = 'posted'
# p.move.save()
return 'end'
class PayrollsMultiPaymentStart(ModelView):
'Payroll Multi Payment Start'
__name__ = 'staff.payroll_multi_payment.start'
period = fields.Many2One('staff.payroll.period', 'Period',
required=True, domain=[('state', '=', 'open')])
payment_mode = fields.Many2One('account.voucher.paymode', 'Payment Mode',
required=True)
company = fields.Many2One('company.company', 'Company',
required=True)
department = fields.Many2One('company.department', 'Department')
wage_type = fields.Many2One('staff.wage_type', 'Wage Type', domain=[
('definition', '=', 'payment'),
('type_concept', '=', 'salary'),
], required=True)
description = fields.Char('Description')
@staticmethod
def default_company():
return Transaction().context.get('company')
class PayrollsMultiPayment(Wizard):
'Payrolls MultiPayment'
__name__ = 'staff.payroll_multi_payment'
start = StateView('staff.payroll_multi_payment.start',
'staff_payroll_co.payroll_multi_payment_start_view_form', [
Button('Cancel', 'end', 'tryton-cancel'),
Button('Accept', 'open_', 'tryton-ok', default=True),
])
open_ = StateTransition()
# create_ = StateTransition()
def transition_open_(self):
pool = Pool()
Payroll = pool.get('staff.payroll')
Voucher = pool.get('account.voucher')
MoveLine = pool.get('account.move.line')
dom_pay = [
('period', '=', self.start.period.id),
('state', '=', 'posted'),
('move', '!=', None),
]
if self.start.department:
dom_pay.append(
('department', '=', self.start.period.id)
)
payrolls = Payroll.search(dom_pay)
moves_payroll_ids = [p.move.id for p in payrolls]
lines = MoveLine.search([
('account', '=', self.start.wage_type.credit_account.id),
('move', 'in', moves_payroll_ids),
('reconciliation', '=', None),
('party', '!=', None),
('credit', '>', 0),
])
line_to_create = []
for line in lines:
line_to_create.append({
'amount': line.credit,
'account': line.account.id,
'party': line.party.id,
'move_line': line.id,
'detail': line.move.origin.number,
'amount_original': line.credit,
})
account_id = Voucher.get_account('multipayment', self.start.payment_mode)
journal_id = self.start.payment_mode.journal.id
voucher, = Voucher.create([{
'payment_mode': self.start.payment_mode.id,
'date': self.start.period.end,
'voucher_type': 'multipayment',
'lines': [('create', line_to_create)],
'journal': journal_id,
'state': 'draft',
'account': account_id,
'description': self.start.description,
}])
amount_to_pay = voucher._get_amount_to_pay()
voucher.amount_to_pay = amount_to_pay
print(voucher)
voucher.save()
return 'end'
# def transition_create_(self):
# return 'open_'