trytonpsk-staff_payroll_co/liquidation.py

1371 lines
53 KiB
Python

# This file is part of Tryton. The COPYRIGHT file at the top level of
# this repository contains the full copyright notices and license terms.
from decimal import Decimal
from trytond.model import Workflow, ModelSQL, ModelView, fields
from trytond.pool import Pool
from trytond.report import Report
from trytond.pyson import Eval, If, Bool
from trytond.wizard import (Wizard, StateView, Button,
StateTransition, StateReport)
from trytond.transaction import Transaction
from trytond.i18n import gettext
from .exceptions import (
LiquidationEmployeeError, MissingSecuenceLiquidation,
LiquidationMoveError, WageTypeConceptError, LiquidationDeleteError,
RecordDuplicateError
)
STATES = {'readonly': (Eval('state') != 'draft')}
_ZERO = Decimal('0.0')
BONUS_SERVICE = ['bonus_service']
CONTRACT = [
'bonus_service', 'health', 'retirement', 'unemployment', 'interest',
'holidays', 'convencional_bonus'
]
concept_social_security = ['health', 'risk', 'box_family', 'retirement', 'fsp']
class Liquidation(Workflow, ModelSQL, ModelView):
'Staff Liquidation'
__name__ = 'staff.liquidation'
number = fields.Char('Number', readonly=True, help="Secuence",
select=True)
employee = fields.Many2One('company.employee', 'Employee',
states=STATES, required=True, depends=['state'])
start_period = fields.Many2One('staff.payroll.period', 'Start Period',
required=True, states=STATES)
end_period = fields.Many2One('staff.payroll.period', 'End Period',
required=True, states=STATES, depends=['start_period'])
kind = fields.Selection([
('contract', 'Contract'),
('bonus_service', 'Bonus Service'),
('interest', 'Interest'),
('unemployment', 'Unemployment'),
('holidays', 'Vacation'),
('convencional_bonus', 'Convencional Bonus'),
], 'Kind', required=True, states=STATES)
liquidation_date = fields.Date('Liquidation Date', states=STATES,
required=True)
lines = fields.One2Many('staff.liquidation.line', 'liquidation',
'Lines', states=STATES, depends=['employee', 'state'])
gross_payments = fields.Function(fields.Numeric(
'Gross Payments', states=STATES, digits=(16, 2)),
'get_sum_operation')
total_deductions = fields.Function(fields.Numeric(
'Total Deductions', states=STATES, digits=(16, 2)),
'get_sum_operation')
net_payment = fields.Function(fields.Numeric(
'Net Payment', states=STATES, digits=(16, 2)),
'get_net_payment')
time_contracting = fields.Integer('Time Contracting', states=STATES,
depends=['start_period', 'end_period', 'employee'])
state = fields.Selection([
('draft', 'Draft'),
('confirmed', 'Confirmed'),
('posted', 'Posted'),
('cancel', 'Cancel'),
], 'State', readonly=True)
company = fields.Many2One('company.company', 'Company', required=True,
states={
'readonly': (Eval('state') != 'draft') | Eval('lines', [0]),
},
domain=[
('id', If(Eval('context', {}).contains('company'), '=', '!='),
Eval('context', {}).get('company', 0)),
],
depends=['state'], select=True)
description = fields.Char('Description', states=STATES, select=True)
cause = fields.Char('Cause', states=STATES)
permissons = fields.Char('Permissons', states=STATES)
journal = fields.Many2One('account.journal', 'Journal', required=True,
states=STATES)
currency = fields.Many2One('currency.currency', 'Currency',
required=True, states={
'readonly': ((Eval('state') != 'draft')
| (Eval('lines', [0]) & Eval('currency'))),
},
depends=['state'])
move = fields.Many2One('account.move', 'Move', readonly=True)
contract = fields.Many2One('staff.contract', 'Contract',
states=STATES, domain=[('employee', '=', Eval('employee'))])
account = fields.Many2One('account.account', 'Account',
required=True, domain=[
('type', '!=', None),
])
payrolls = fields.Function(fields.Many2Many('staff.payroll',
None, None, 'Payroll', depends=['start_period', 'end_period'],
domain=[
('employee', '=', Eval('employee')),
('kind', '=', 'normal'),
],), 'get_payrolls')
start = fields.Function(fields.Date('Start Date'), 'get_dates')
end = fields.Function(fields.Date('End Date'), 'get_dates')
party_to_pay = fields.Many2One('party.party', 'Party to Pay', states=STATES)
last_salary = fields.Numeric('Last Salary', states=STATES,
digits=(16, 2))
@classmethod
def __setup__(cls):
super(Liquidation, cls).__setup__()
cls._transitions |= set((
('draft', 'cancel'),
('cancel', 'draft'),
('confirmed', 'draft'),
('confirmed', 'posted'),
('draft', 'confirmed'),
('posted', 'draft'),
))
cls._buttons.update({
'draft': {
'invisible': Eval('state') == 'draft',
},
'confirm': {
'invisible': Eval('state') != 'draft',
},
'cancel': {
'invisible': Eval('state') != 'draft',
},
'post': {
'invisible': Eval('state') != 'confirmed',
},
'compute_liquidation': {
'invisible': Bool(Eval('lines')),
},
})
@classmethod
def __register__(cls, module_name):
super(Liquidation, cls).__register__(module_name)
cursor = Transaction().connection.cursor()
sql_table = cls.__table__()
cursor.execute(*sql_table.update(
[sql_table.kind], ['holidays'],
where=sql_table.kind == 'vacation'))
@staticmethod
def default_company():
return Transaction().context.get('company')
@staticmethod
def default_kind():
return 'contract'
@staticmethod
def default_state():
return 'draft'
@staticmethod
def default_currency():
Company = Pool().get('company.company')
company = Transaction().context.get('company')
if company:
company = Company(company)
return company.currency.id
@staticmethod
def default_journal():
Configuration = Pool().get('staff.configuration')
configuration = Configuration(1)
if configuration.default_journal:
return configuration.default_journal.id
@classmethod
def search_rec_name(cls, name, clause):
if clause[1].startswith('!') or clause[1].startswith('not '):
bool_op = 'AND'
else:
bool_op = 'OR'
return [
bool_op,
('employee',) + tuple(clause[1:]),
('number',) + tuple(clause[1:]),
]
@classmethod
@ModelView.button
@Workflow.transition('confirmed')
def confirm(cls, records):
for rec in records:
if not rec.contract:
raise LiquidationEmployeeError(
gettext('staff_payroll_co.msg_dont_contract'))
rec.set_number()
@classmethod
@ModelView.button
@Workflow.transition('draft')
def draft(cls, records):
pass
@classmethod
@ModelView.button
@Workflow.transition('cancel')
def cancel(cls, records):
pass
@classmethod
@ModelView.button
@Workflow.transition('posted')
def post(cls, records):
for rec in records:
rec.create_move()
@classmethod
def create_withholding(cls, liquidation):
rec = liquidation
pool = Pool()
UvtWithholding = pool.get('staff.payroll.uvt_withholding')
WageType = pool.get('staff.wage_type')
fields_names = [
'unit_price_formula', 'concepts_salary', 'salary_constitute',
'name', 'sequence', 'definition', 'unit_price_formula',
'expense_formula', 'uom', 'default_quantity', 'type_concept',
'salary_constitute', 'receipt', 'concepts_salary',
'contract_finish', 'limit_days', 'month_application',
'minimal_amount', 'adjust_days_worked', 'round_amounts',
'debit_account.name', 'credit_account.name',
'deduction_account.name', 'account_60_40.name'
]
wage_tax = WageType.search_read([('type_concept', '=', 'tax')],
fields_names=fields_names)
if not wage_tax:
return
wage_tax = wage_tax[0]
deductions_month = sum([
ln.amount for ln in rec.lines if ln.wage.definition != 'payment'
])
salary_full = rec.net_payment
payrolls = {p.end: p for p in rec.payrolls}
if not payrolls:
return
max_date = max(payrolls.keys())
if rec.liquidation_date.month == max_date.month:
payroll = payrolls[max_date]
line_tax = None
for line in payroll.lines:
if line.wage_type.type_concept == 'tax' and line.amount:
line_tax = line
if not line_tax:
deductions_month += payroll.get_deductions_month()
salary_args = payroll.get_salary_full(wage_tax)
salary_full += salary_args['salary']
base_salary_withholding = salary_full - deductions_month
amount_tax = UvtWithholding.compute_withholding(
base_salary_withholding)
amount_tax = rec.currency.round(Decimal(amount_tax))
if amount_tax:
create_tax = {
'sequence': wage_tax['sequence'],
'wage': wage_tax['id'],
'description': wage_tax['name'],
'amount': amount_tax * -1,
'days': rec.time_contracting,
'account': wage_tax['credit_account.']['id'],
}
cls.write([rec], {
'lines': [('create', [create_tax])]
})
@classmethod
@ModelView.button
def compute_liquidation(cls, records):
for rec in records:
rec.set_liquidation_lines()
cls.create_withholding(rec)
@classmethod
def copy(cls, records, default=None):
raise RecordDuplicateError(
gettext('staff_payroll_co.msg_cannot_duplicate_record'))
def get_dates(self, name):
res = None
if self.contract:
if name == 'start':
values = [self.start_period.start]
if self.contract.start_date:
values.append(self.contract.start_date)
res = max(values)
elif name == 'end':
values = [self.end_period.end]
if self.contract.end_date:
values.append(self.contract.end_date)
res = min(values)
return res
def get_payrolls(self, name):
if not self.employee or not self.contract:
return
Payroll = Pool().get('staff.payroll')
date_start, date_end = self._get_dates()
payrolls = Payroll.search([
('employee', '=', self.employee.id),
('start', '>=', date_start),
('end', '<=', date_end),
('contract', '=', self.contract.id),
])
payrolls_ids = [payroll.id for payroll in payrolls]
return payrolls_ids
def get_salary(self, name):
res = 0
if self.contract:
if name == 'last_salary':
res = self.contract.salary
elif name == 'salary_average':
Payroll = Pool().get('staff.payroll')
Wage = Pool().get('staff.wage_type')
wage, = Wage.search([('type_concept', '=', 'unemployment')])
res = Payroll.get_salary_average(
self.end,
self.employee,
self.contract, wage)
values = [self.end_period.end]
if self.contract.end_date:
values.append(self.contract.end_date)
res = min(values)
return res
def create_move(self):
pool = Pool()
Move = pool.get('account.move')
MoveLine = pool.get('account.move.line')
Period = pool.get('account.period')
if self.move:
return
move_lines, grouped = self.get_moves_lines()
if move_lines:
period_id = Period.find(self.company.id, date=self.liquidation_date)
move, = Move.create([{
'journal': self.journal.id,
'origin': str(self),
'period': period_id,
'date': self.liquidation_date,
'description': self.description,
'lines': [('create', move_lines)],
}])
self.write([self], {'move': move.id})
for ml in move.lines:
statement = ml.account.type.statement
account_id = ml.account.id
if account_id not in grouped.keys() or (statement not in ('balance')):
continue
to_reconcile = [ml]
to_reconcile.extend(grouped[account_id]['lines'])
if len(to_reconcile) > 1:
MoveLine.reconcile(set(to_reconcile))
Move.post([move])
def get_moves_lines(self):
Configuration = Pool().get('staff.configuration')
configuration = Configuration(1)
tax = getattr(configuration, 'tax_withholding')
lines_moves = []
to_reconcile = []
grouped = {}
amount = []
party_id = self.employee.party.id
for line in self.lines:
definition = line.wage.definition
concept = line.wage.type_concept
if line.move_lines:
for moveline in line.move_lines:
to_reconcile.append(moveline)
account_id = moveline.account.id
amount_line = moveline.debit - moveline.credit * -1
if account_id not in grouped.keys():
grouped[account_id] = {
'amount': [],
'description': line.description,
'lines': [],
}
grouped[account_id]['amount'].append(amount_line)
grouped[account_id]['lines'].append(moveline)
amount.append(amount_line)
elif concept in concept_social_security:
if definition == 'payment':
account_credit = line.account.id
if account_credit not in grouped.keys():
grouped[account_credit] = {
'amount': [],
'description': line.description,
'lines': [],
'party_to_pay': line.party_to_pay,
}
grouped[account_credit]['amount'].append(line.amount * -1)
account_debit = line.wage.debit_account.id
if account_debit not in grouped.keys():
grouped[account_debit] = {
'amount': [],
'description': line.description,
'lines': [],
'party_to_pay': line.party_to_pay,
}
grouped[account_debit]['amount'].append(line.amount)
else:
account_id = line.account.id
if account_id not in grouped.keys():
grouped[account_id] = {
'amount': [],
'description': line.description,
'lines': [],
'party_to_pay': line.party_to_pay,
}
grouped[account_id]['amount'].append(line.amount)
amount.append(line.amount)
if line.wage.expense_formula:
expense = line.get_expense_amount()
debit_account = line.wage.debit_account
grouped[account_id]['amount'].append(expense * -1)
if debit_account not in grouped.keys():
grouped[debit_account] = {
'amount': [],
'description': line.description,
'lines': [],
'party_to_pay': line.party_to_pay,
}
grouped[debit_account]['amount'].append(expense)
for adjust in line.adjustments:
key = adjust.account.id
if key not in grouped.keys():
grouped[key] = {
'amount': [],
'description': adjust.description,
'lines': [],
'party_to_pay': line.party_to_pay
}
if hasattr(adjust, 'analytic_account') and adjust.analytic_account:
grouped[key]['analytic'] = adjust.analytic_account
if tax and line.tax_base:
tax_line = {
'amount': line.tax_base,
'tax': tax.id,
'type': 'base',
}
grouped[key]['tax_lines'] = [('create', [tax_line])]
grouped[adjust.account.id]['amount'].append(adjust.amount)
amount.append(adjust.amount)
for account_id, values in grouped.items():
_amount = sum(values['amount'])
party = values.get('party_to_pay', party_id)
if not party:
party = party_id
line = {
'description': values['description'],
'debit': _amount,
'credit': _ZERO,
'account': account_id,
'party': party,
'tax_lines': values.get('tax_lines', None),
}
# debit = _amount
# credit = _ZERO
lines_moves.append(self._prepare_line(line, values.get('analytic')))
if lines_moves:
line = {
'description': self.description,
'debit': _ZERO,
'credit': sum(amount),
'account': self.account.id,
'party': self.party_to_pay if self.party_to_pay else party_id,
}
lines_moves.append(self._prepare_line(line))
return lines_moves, grouped
def _prepare_line(self, line, analytic=None):
if line['debit'] < _ZERO:
line['credit'] = abs(line['debit'])
line['debit'] = _ZERO
elif line['credit'] < _ZERO:
line['debit'] = abs(line['credit'])
line['credit'] = _ZERO
# credit = abs(credit)
# debit = abs(debit)
# party_id = self.employee.party.id
# if party_to_pay:
# party_id = party_to_pay.id
# res = {
# 'description': description,
# 'debit': debit,
# 'credit': credit,
# 'account': account_id,
# 'party': party_id,
# }
if analytic:
line['analytic_lines'] = [
('create', [{
'debit': line['debit'],
'credit': line['credit'],
'account': analytic.id,
'date': self.liquidation_date
}])]
return line
def _get_dates(self):
date_end_contract = None
date_start = self.start_period.start
if self.contract.start_date > self.start_period.start:
date_start = self.contract.start_date
if self.contract.futhermores:
date_end_contract = self.contract.finished_date
date_end = self.end_period.end
if date_end_contract and date_end_contract <= self.end_period.end:
date_end = date_end_contract
elif self.contract.end_date and self.contract.end_date < self.end_period.end:
date_end = self.contract.end_date
return date_start, date_end
def _get_dates_liquidation(self):
# date_end_contract = None
date_start = self.start_period.start
if self.contract.start_date > self.start_period.start:
date_start = self.contract.start_date
date_end = self.end_period.end
if self.kind == 'contract':
date_end = self.contract.finished_date
return date_start, date_end
@classmethod
def get_moves_lines_pending(cls, employee, wage_type, effective_date, moves=[]):
MoveLine = Pool().get('account.move.line')
lines = []
if not wage_type.credit_account:
return
account_id = wage_type.credit_account.id
domain = [
('move.date', '<=', effective_date),
('credit', '>', 0),
('account', '=', account_id),
('party', '=', employee.party.id),
('reconciliation', '=', None),
[
'OR',
('move.origin', 'not ilike', 'staff.payroll'),
('move.origin', '=', None),
]
]
lines1 = MoveLine.search(domain)
lines2 = []
if moves:
domain = [
('credit', '>', 0),
('account', '=', account_id),
('party', '=', employee.party.id),
('reconciliation', '=', None),
('move', 'in', moves),
]
lines2 = MoveLine.search(domain)
lines = set(lines1 + lines2)
return lines
@classmethod
def delete(cls, records):
# Cancel before delete
cls.cancel(records)
for liquidation in records:
if liquidation.state != 'cancel':
raise LiquidationDeleteError(
gettext('staff_payroll_co.msg_delete_cancel', liquidation=liquidation.rec_name))
if liquidation.move:
raise LiquidationDeleteError(
gettext('staff_payroll_co.msg_existing_move', liquidation=liquidation.rec_name))
super(Liquidation, cls).delete(records)
def set_liquidation_lines(self):
pool = Pool()
Payroll = pool.get('staff.payroll')
LiquidationMove = pool.get('staff.liquidation.line-move.line')
date_start, date_end = self._get_dates_liquidation()
payrolls = Payroll.search([
('employee', '=', self.employee.id),
('start', '>=', date_start),
('end', '<=', date_end),
('contract', '=', self.contract.id),
('state', '=', 'posted')
])
wages = {}
wages_target = {}
moves = [p.move.id for p in payrolls]
for payroll in payrolls:
for l in payroll.lines:
wage = l.wage_type
print(wage, wage.contract_finish, wage.provision_cancellation, self.kind, 'asasdasd')
if wage.contract_finish or wage.provision_cancellation:
if wage.provision_cancellation and not wage.contract_finish:
wage = wage.provision_cancellation
if self.kind == 'contract':
print(wage.type_concept, 'validate2323')
if wage.type_concept not in CONTRACT:
continue
elif self.kind != wage.type_concept:
continue
print('va a buscar')
if wage.id not in wages_target.keys():
mlines = self.get_moves_lines_pending(
payroll.employee, wage, date_end, moves
)
if not mlines:
continue
wages_target[wage.id] = [
wage.credit_account.id,
mlines,
wage,
]
for (account_id, lines, wage_type) in wages_target.values():
values = []
lines_to_reconcile = []
for line in lines:
values.append(abs(line.debit - line.credit))
lines_to_reconcile.append(line.id)
value = self.get_line_(wage_type, sum(values), self.time_contracting, account_id, party=self.party_to_pay)
lines = LiquidationMove.search([
('move_line', 'in', lines_to_reconcile)
])
if lines:
liquidation = lines[0].line.liquidation
raise RecordDuplicateError(
gettext('staff_payroll_co.msg_duplicate_liquidation',
liquidation=liquidation.id, state=liquidation.state))
value.update({
'move_lines': [('add', lines_to_reconcile)],
})
wages[wage_type.id] = value
self.write([self], {'lines': [('create', wages.values())]})
if self.kind == 'contract':
self.process_loans_to_pay()
# self.calculate_discounts()
def get_line_(self, wage, amount, days, account_id, party=None):
value = {
'sequence': wage.sequence,
'wage': wage.id,
'account': account_id,
'description': wage.name,
'amount': amount,
'days': days,
'party_to_pay': party,
}
return value
def calculate_social_security(self):
LiquidationLine = Pool().get('staff.liquidation.line')
wages = [mw for mw in self.employee.mandatory_wages \
if mw.wage_type.type_concept in concept_social_security]
lines_to_create = []
for mw in wages:
wage = mw.wage_type
concepts_salary = [c.id for c in wage.concepts_salary]
if concepts_salary:
salary_full = sum(
line.amount for line in self.lines if line.wage.id in concepts_salary)
amount = wage.compute_unit_price(
wage.unit_price_formula,
{'salary': salary_full})
if amount > 0:
if wage.definition != 'payment':
amount = amount * -1
value = self.get_line_(
wage, amount,
1,
wage.credit_account.id,
party=mw.party)
value['liquidation'] = self
lines_to_create.append(value)
LiquidationLine.create(lines_to_create)
def process_loans_to_pay(self):
pool = Pool()
MoveLine = pool.get('account.move.line')
LoanLine = pool.get('staff.loan.line')
LiquidationLine = pool.get('staff.liquidation.line')
dom = [
('loan.wage_type', '!=', None),
('loan.party', '=', self.employee.party.id),
('state', 'in', ['pending', 'partial']),
]
lines_loan = LoanLine.search(dom)
for m in lines_loan:
move_lines = MoveLine.search([
('origin', 'in', ['staff.loan.line,' + str(m)]),
])
party = m.loan.party_to_pay.id if m.loan.party_to_pay else None
res = self.get_line_(m.loan.wage_type, m.amount * -1, 1, m.loan.account_debit.id, party=party)
res['move_lines'] = [('add', move_lines)]
res['liquidation'] = self.id
line_, = LiquidationLine.create([res])
LoanLine.write([m], {'state': 'paid', 'origin': line_})
@fields.depends('start_period', 'end_period', 'contract')
def on_change_with_time_contracting(self):
delta = None
if self.start_period and self.end_period and self.contract:
try:
date_start, date_end = self._get_dates()
delta = self.contract.get_time_days(date_start, date_end)
except Exception:
raise LiquidationEmployeeError(
gettext('staff_payroll_co.msg_error_dates', s=self.employee.party.name))
delta = 0
return delta
def on_change_contract(self):
if self.contract:
self.last_salary = self.contract.salary
def set_number(self):
if self.number:
return
pool = Pool()
Configuration = pool.get('staff.configuration')
configuration = Configuration(1)
if not configuration.staff_liquidation_sequence:
raise MissingSecuenceLiquidation(
gettext('staff_payroll_co.msg_sequence_missing'))
seq = configuration.staff_liquidation_sequence.get()
self.write([self], {'number': seq})
def get_sum_operation(self, name):
res = []
for line in self.lines:
if not line.amount:
continue
if name == 'gross_payments' and line.wage.definition == 'payment' and line.wage.type_concept not in concept_social_security:
res.append(line.amount)
elif name == 'total_deductions' and line.wage.definition != 'payment':
res.append(line.amount)
return sum(res)
def get_net_payment(self, name):
res = (self.gross_payments or 0) - abs((self.total_deductions or 0))
if res:
return self.currency.round(res)
return 0
def get_currency(self, name):
return self.company.currency.id
class LiquidationLine(ModelSQL, ModelView):
'Staff Liquidation Line'
__name__ = 'staff.liquidation.line'
sequence = fields.Integer('Sequence', required=True)
liquidation = fields.Many2One('staff.liquidation', 'Liquidation',
required=True)
wage = fields.Many2One('staff.wage_type', 'Wage Type', required=True)
description = fields.Char('Description', required=True)
amount = fields.Numeric('Amount', digits=(16, 2), required=True,
depends=['adjustments', 'move_lines'])
days = fields.Integer('Days')
notes = fields.Char('Notes')
account = fields.Many2One('account.account', 'Account')
move_lines = fields.Many2Many('staff.liquidation.line-move.line',
'line', 'move_line', 'Liquidation Line - Move Line',
domain=[
('party', '=', Eval('party')),
('account', '=', Eval('account')),
], depends=['party', 'account'])
party = fields.Function(fields.Many2One('party.party', 'Party'),
'get_party')
adjustments = fields.One2Many('staff.liquidation.line_adjustment',
'staff_liquidation_line', 'Adjustments')
party_to_pay = fields.Many2One('party.party', 'Party to Pay')
salary_average = fields.Function(fields.Numeric('Salary Average',
digits=(16, 2)), 'get_average_payroll')
tax_base = fields.Numeric('Tax Base', digits=(16, 2))
is_wage_tax = fields.Function(fields.Boolean('Is wage tax'),
'on_change_with_is_wage_tax')
@classmethod
def __setup__(cls):
super(LiquidationLine, cls).__setup__()
cls._order.insert(0, ('sequence', 'ASC'))
@classmethod
def __register__(cls, module_name):
table_h = cls.__table_handler__(module_name)
# Migration from 4.0: remove hoard_amount
if table_h.column_exist('hoard_amount'):
table_h.drop_column('hoard_amount')
super(LiquidationLine, cls).__register__(module_name)
@fields.depends('wage')
def on_change_with_is_wage_tax(self, name=None):
if self.wage:
return self.wage.type_concept == 'tax'
return False
def get_party(self, name=None):
if self.liquidation.employee:
return self.liquidation.employee.party.id
@fields.depends('wage', 'description', 'amount', 'liquidation',
'_parent_liquidation.employee', '_parent_liquidation.time_contracting',
'_parent_liquidation.start_period', '_parent_liquidation.end_period',
'_parent_liquidation.currency', 'sequence')
def on_change_wage(self):
if not self.wage:
return
self.sequence = self.wage.sequence
self.description = self.wage.name
self.days = self.liquidation.time_contracting
@fields.depends('amount', 'adjustments', 'move_lines')
def on_change_with_amount(self, name=None):
if not self.adjustments and not self.move_lines:
amount_ = self.amount
else:
amount_ = 0
if self.adjustments:
amount_ += sum([ad.amount or 0 for ad in self.adjustments])
if self.move_lines:
amount_ += sum([(ml.credit - ml.debit) for ml in self.move_lines])
return amount_
def get_average_payroll(self, name):
contract = self.liquidation.contract
employee = self.liquidation.employee
end = self.liquidation.end
Payroll = Pool().get('staff.payroll')
try:
res = Payroll.get_salary_average(end, employee, contract, self.wage) * 30
except:
res = 0
return res
def get_expense_amount(self):
expense = 0
if self.wage.expense_formula:
concepts_salary = [c.id for c in self.wage.concepts_salary]
salary = sum(
line.amount for line in self.liquidation.lines if line.wage.id in concepts_salary)
expense = self.wage.compute_expense(self.wage.expense_formula, {'salary': salary})
return expense
@classmethod
def delete(cls, lines):
LoanLine = Pool().get('staff.loan.line')
loan_lines = LoanLine.search([
('origin', 'in', ['staff.liquidation.line,' + str(l.id) for l in lines])
])
LoanLine.write([m for m in loan_lines], {'state': 'pending', 'origin': None})
super(LiquidationLine, cls).delete(lines)
class LiquidationLineAdjustment(ModelSQL, ModelView):
'Liquidation Adjustment'
__name__ = 'staff.liquidation.line_adjustment'
staff_liquidation_line = fields.Many2One('staff.liquidation.line', 'Line',
required=True, select=True)
account = fields.Many2One('account.account', 'Acount',
required=True, domain=[
('company', '=', Eval('context', {}).get('company', -1)),
('type', '!=', None),
])
description = fields.Char('Description', required=True)
amount = fields.Numeric('Amount', digits=(10,2), required=True)
class LiquidationReport(Report):
__name__ = 'staff.liquidation.report'
class LiquidationLineMoveLine(ModelSQL):
"Liquidation Line - MoveLine"
__name__ = "staff.liquidation.line-move.line"
_table = 'staff_liquidation_line_move_line_rel'
line = fields.Many2One('staff.liquidation.line', 'Line',
ondelete='CASCADE', select=True, required=True)
move_line = fields.Many2One('account.move.line', 'Move Line',
ondelete='RESTRICT', select=True, required=True)
class LiquidationAdjustmentStart(ModelView):
'Create Liquidation Adjustment Start'
__name__ = 'staff.liquidation_adjustment.start'
wage_type = fields.Many2One('staff.wage_type', 'Wage Type', required=True, domain=[
('contract_finish', '=', True)
])
amount = fields.Numeric('Amount', required=True)
account = fields.Many2One('account.account', 'Acount',
required=True, domain=[
('company', '=', Eval('context', {}).get('company', -1)),
('type', '!=', None),
])
description = fields.Char('Description', required=True)
@fields.depends('wage_type', 'account')
def on_change_wage_type(self):
if self.wage_type and self.wage_type.debit_account:
self.account = self.wage_type.debit_account.id
class LiquidationAdjustment(Wizard):
'Create Liquidation Adjustment'
__name__ = 'staff.liquidation_adjustment'
start = StateView('staff.liquidation_adjustment.start',
'staff_payroll_co.liquidation_adjustment_start_view_form', [
Button('Cancel', 'end', 'tryton-cancel'),
Button('Create', 'accept', 'tryton-ok', default=True),
])
accept = StateTransition()
@classmethod
def __setup__(cls):
super(LiquidationAdjustment, cls).__setup__()
def create_adjustment(self, line):
LineAdjustment = Pool().get('staff.liquidation.line_adjustment')
adjust, = LineAdjustment.create([{
'staff_liquidation_line': line.id,
'account': self.start.account.id,
'description': self.start.description,
'amount': self.start.amount,
}])
return adjust
def transition_accept(self):
pool = Pool()
Liquidation = pool.get('staff.liquidation')
LiquidationLine = pool.get('staff.liquidation.line')
id_ = Transaction().context['active_id']
liquidation, = Liquidation.search([('id', '=', id_)])
line_created = None
if liquidation:
if liquidation.move:
raise LiquidationMoveError(
gettext('staff_payroll_co.msg_liquidation_with_move', s=liquidation.number))
for line in liquidation.lines:
if line.wage.id == self.start.wage_type.id:
if line.amount:
line.amount += self.start.amount
line.save()
line_created = self.create_adjustment(line)
if not line_created:
line, = LiquidationLine.create([{
'sequence': len(liquidation.lines) + 1,
'liquidation': liquidation.id,
'wage': self.start.wage_type.id,
'description': self.start.wage_type.name,
'amount': self.start.amount,
}])
self.create_adjustment(line)
return 'end'
class LiquidationGroupStart(ModelView):
'Liquidation Group Start'
__name__ = 'staff.liquidation_group.start'
start_period = fields.Many2One('staff.payroll.period', 'Start Period',
required=True)
end_period = fields.Many2One('staff.payroll.period', 'End Period',
required=True, depends=['start_period'])
kind = fields.Selection([
('contract', 'Contract'),
('bonus_service', 'Bonus Service'),
('interest', 'Interest'),
('holidays', 'Vacation'),
('unemployment', 'Unemployment'),
], 'Kind', required=True)
liquidation_date = fields.Date('Liquidation Date', required=True)
company = fields.Many2One('company.company', 'Company', required=True)
description = fields.Char('Description', required=True)
account = fields.Many2One('account.account', 'Account', required=True,
domain=[
('type.payable', '=', True),
('company', '=', Eval('company'))
])
employees = fields.Many2Many('company.employee', None, None, 'Employee')
party_to_pay = fields.Many2One('party.party', 'Party to Pay', states={
'invisible': Eval('kind') != 'unemployment'
})
@staticmethod
def default_company():
return Transaction().context.get('company')
class LiquidationGroup(Wizard):
'Liquidation Group'
__name__ = 'staff.liquidation_group'
start = StateView('staff.liquidation_group.start',
'staff_payroll_co.liquidation_group_start_view_form', [
Button('Cancel', 'end', 'tryton-cancel'),
Button('Accept', 'open_', 'tryton-ok', default=True),
])
open_ = StateTransition()
def transition_open_(self):
pool = Pool()
Liquidation = pool.get('staff.liquidation')
Contract = pool.get('staff.contract')
Employee = pool.get('company.employee')
Period = pool.get('staff.payroll.period')
to_liquidation = []
start_period = self.start.start_period.id
end_period = self.start.end_period.id
kind = self.start.kind
liquidation_date = self.start.liquidation_date
company_id = self.start.company.id
description = self.start.description
currency_id = self.start.company.currency.id
account_id = self.start.account.id
start_date = self.start.start_period.start
end_date = self.start.end_period.end
periods = Period.search([
('start', '>=', start_date),
('start', '<=', end_date)
])
dom_contracts = [('kind', '=', kind)]
if self.start.employees:
employees = self.start.employees
employee_ids = [e.id for e in employees]
dom_contracts.append(('employee', 'in', employee_ids))
else:
employees = Employee.search([('contract', '!=', None)])
if periods:
periods_ids = [p.id for p in periods]
dom_contracts.append(('start_period', 'in', periods_ids))
dom_contracts.append(('end_period', 'in', periods_ids))
contracts = Liquidation.search_read(
dom_contracts, fields_names=['contract']
)
contract_ids = [i['contract'] for i in contracts]
for employee in employees:
contracts = Contract.search([
('employee', '=', employee.id),
('id', 'not in', contract_ids),
])
if not contracts:
continue
contract = contracts[0]
if kind == 'unemployment':
wages = [
mw for mw in employee.mandatory_wages
if mw.wage_type.type_concept == 'unemployment'
]
if not wages:
continue
wage = wages[0]
lines = Liquidation.get_moves_lines_pending(
employee, wage.wage_type, self.start.end_period.end
)
if not lines:
continue
lqt_create = {
'start_period': start_period,
'end_period': end_period,
'employee': employee.id,
'contract': contract.id,
'kind': kind,
'liquidation_date': liquidation_date,
'time_contracting': None,
'state': 'draft',
'company': company_id,
'description': description,
'currency': currency_id,
'account': account_id,
'party_to_pay': self.start.party_to_pay.id if self.start.party_to_pay else None
}
liq, = Liquidation.create([lqt_create])
liq.time_contracting = liq.on_change_with_time_contracting()
liq.save()
to_liquidation.append(liq)
Liquidation.compute_liquidation(to_liquidation)
return 'end'
class MoveProvisionBonusServiceStart(ModelView):
'Move Privision Bonus Service Start'
__name__ = 'staff.move_provision_bonus_service.start'
period = fields.Many2One('staff.payroll.period', 'Period',
required=True, domain=[('state', '=', 'open')])
description = fields.Char('Description', required=True)
company = fields.Many2One('company.company', 'Company',
required=True)
wage_type = fields.Many2One('staff.wage_type', 'Wage Types', domain=[
('type_concept', '=', 'bonus_service')
], required=True)
category = fields.Many2One('staff.employee_category', 'Category')
@staticmethod
def default_company():
return Transaction().context.get('company')
class MoveProvisionBonusService(Wizard):
'Move Provision Bonus Service'
__name__ = 'staff.move_provision_bonus_service'
start = StateView('staff.move_provision_bonus_service.start',
'staff_payroll_co.move_provision_bonus_service_view_form', [
Button('Cancel', 'end', 'tryton-cancel'),
Button('Accept', 'open_', 'tryton-ok', default=True),
])
open_ = StateTransition()
def transition_open_(self):
pool = Pool()
Contract = pool.get('staff.contract')
Move = pool.get('account.move')
AccountPeriod = pool.get('account.period')
configuration = Pool().get('staff.configuration')(1)
journal_id = None
if configuration and configuration.default_journal:
journal_id = configuration.default_journal.id
_end_date = self.start.period.end
_company = self.start.company
provision_wage = self.start.wage_type
period_days = (self.start.period.end - self.start.period.start).days + 1
dom_contract = [
['AND', ['OR', [
('end_date', '>', self.start.period.start),
], [
('end_date', '=', None),
],
]],
]
if self.start.category:
dom_contract.append(
('employee.category', '=', self.start.category.id)
)
for contract in Contract.search(dom_contract):
period_in_month = 1 if period_days > 15 else 2
salary_amount = contract.get_salary_in_date(_end_date)
base_ = salary_amount
move_lines = []
employee = contract.employee
for concept in contract.employee.mandatory_wages:
if concept.wage_type and concept.wage_type.salary_constitute:
if concept.wage_type.type_concept == 'transport':
base_ += concept.wage_type.compute_unit_price({'salary': 0}) * concept.wage_type.default_quantity
if concept.fix_amount:
base_ += concept.fix_amount
period_id = AccountPeriod.find(_company.id, date=_end_date)
provision_amount = provision_wage.compute_unit_price(
{'salary': (round((base_ / period_in_month), 2))}
)
move_lines.extend([
{
'debit': provision_amount,
'credit': 0,
'party': employee.party.id,
'account': provision_wage.debit_account.id
},
{
'debit': 0,
'credit': provision_amount,
'party': employee.party.id,
'account': provision_wage.credit_account.id
}
])
move, = Move.create([{
'journal': journal_id,
# 'origin': str(contract),
'period': period_id,
'company': _company.id,
'date': _end_date,
'state': 'draft',
'description': self.start.description,
'lines': [('create', move_lines)],
}])
return 'end'
class LiquidationExportStart(ModelView):
'Liquidation Export Start'
__name__ = 'staff.liquidation_export.start'
start_date = fields.Date('Start Date', required=True)
end_date = fields.Date('End Date', required=True)
company = fields.Many2One('company.company', 'Company', required=True)
department = fields.Many2One('company.department', 'Department')
kind = fields.Selection([
('contract', 'Contract'),
('bonus_service', 'Bonus Service'),
('interest', 'Interest'),
('unemployment', 'Unemployment'),
('vacation', 'Vacation'),
], 'Kind')
@staticmethod
def default_company():
return Transaction().context.get('company')
@fields.depends('start_date')
def on_change_with_end_date(self, name=None):
if self.start_date:
return self.start_date
class LiquidationExport(Wizard):
'Liquidation Export'
__name__ = 'staff.liquidation_export'
start = StateView('staff.liquidation_export.start',
'staff_payroll_co.liquidation_start_view_form', [
Button('Cancel', 'end', 'tryton-cancel'),
Button('Print', 'print_', 'tryton-ok', default=True),
])
print_ = StateReport('staff.liquidation_export.report')
def do_print_(self, action):
department_id = None
if self.start.department:
department_id = self.start.department.id
data = {
'ids': [],
'company': self.start.company.id,
'start_date': self.start.start_date,
'end_date': self.start.end_date,
'department': department_id,
'kind': self.start.kind,
}
return action, data
def transition_print_(self):
return 'end'
class LiquidationExportReport(Report):
__name__ = 'staff.liquidation_export.report'
@classmethod
def get_domain_liquidation(cls, data):
dom_liquidation = []
return dom_liquidation
@classmethod
def get_context(cls, records, header, data):
report_context = super().get_context(records, header, data)
pool = Pool()
user = pool.get('res.user')(Transaction().user)
Liquidation = pool.get('staff.liquidation')
Department = pool.get('company.department')
dom_liq = cls.get_domain_liquidation(data)
dom_liq.append(('state', 'in', ['processed', 'posted', 'draft']),)
dom_liq.append(('liquidation_date', '>=', data['start_date']),)
dom_liq.append(('liquidation_date', '<=', data['end_date']),)
if data['department']:
dom_liq.append(('employee.department', '=', data['department']),)
department = Department(data['department']).name
else:
department = None
if data['kind']:
dom_liq.append(('kind', '=', data['kind']))
liquidations = Liquidation.search(dom_liq)
# default_vals = cls.default_values()
sum_gross_payments = []
sum_total_deductions = []
sum_net_payment = []
parties = {}
payments = ['interest', 'holidays', 'bonus_service', 'unemployment']
for liquidation in liquidations:
employee_id = liquidation.employee.id
if employee_id not in parties.keys():
position_employee = liquidation.employee.position.name if liquidation.employee.position else ''
position_contract = liquidation.contract.position.name if liquidation.contract and liquidation.contract.position else ''
parties[employee_id] = {}
parties[employee_id]['employee_code'] = liquidation.employee.code
parties[employee_id]['employee_contract'] = liquidation.contract.id
parties[employee_id]['employee'] = liquidation.employee.party.name
parties[employee_id]['employee_id_number'] = liquidation.employee.party.id_number
parties[employee_id]['employee_position'] = position_contract or position_employee or ''
for line in liquidation.lines:
concept = None
if line.wage.type_concept in (payments):
concept = line.wage.type_concept
else:
if line.wage.definition == 'payment':
concept = 'others_payments'
elif line.wage.definition == 'deduction' or \
line.wage.definition == 'discount' and \
line.wage.receipt:
concept = 'others_deductions'
if not concept:
raise WageTypeConceptError(
gettext('staff_payroll_co.msg_type_concept_not_exists', s=line.wage.name))
if concept not in parties[employee_id].keys():
parties[employee_id][concept] = 0
parties[employee_id][concept] += line.amount
parties[employee_id]['time_contracting'] = liquidation.time_contracting
parties[employee_id]['net_payment'] = liquidation.net_payment
parties[employee_id]['gross_payments'] = liquidation.gross_payments
parties[employee_id]['total_deductions'] = liquidation.total_deductions
sum_gross_payments.append(liquidation.gross_payments)
sum_total_deductions.append(liquidation.total_deductions)
sum_net_payment.append(liquidation.net_payment)
employee_dict = {e['employee']: e for e in parties.values()}
report_context['records'] = sorted(employee_dict.items(), key=lambda t: t[0])
report_context['department'] = department
report_context['start_date'] = data['start_date']
report_context['end_date'] = data['end_date']
report_context['company'] = user.company
report_context['user'] = user
report_context['sum_gross_payments'] = sum(sum_gross_payments)
report_context['sum_net_payment'] = sum(sum_net_payment)
report_context['sum_total_deductions'] = sum(sum_total_deductions)
return report_context
class LiquidationDeduction(Wizard):
'Liquidation Deduction'
__name__ = 'staff.liquidation_calculate_social_security'
start_state = 'calculate_social_security'
calculate_social_security = StateTransition()
def transition_calculate_social_security(self):
Liquidation = Pool().get('staff.liquidation')
liquidations_ids = Transaction().context['active_ids']
if liquidations_ids:
liquidations = Liquidation.browse(liquidations_ids)
for liq in liquidations:
liq.calculate_social_security()
return 'end'