
597 lines
25 KiB
Raw Normal View History

# Copyright (c) 2012 NaN Projectes de Programari Lliure, S.L.
# All Rights Reserved.
# http://www.NaN-tic.com
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU Affero General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
from osv import osv, fields
from tools.translate import _
import netsvc
class stock_production_lot_qc_trigger_test(osv.osv):
Model that defines the Quality Control Tests which a Production Lot must
to pass in certain situations defined by the Trigger Tag.
_name = 'stock.production.lot.qc.trigger.test'
_description = 'Quality Control Test Triggers by Lot'
_order = 'prodlot_id, sequence'
def name_get(self, cr, uid, ids, context=None):
res = []
for trigger in self.read(cr, uid, ids, ['trigger_id', 'template_type'],
name = trigger['trigger_id'][1] + ", " + trigger['template_type']
res.append((trigger['id'], name))
return res
# stock.production.lot.qc.trigger.test
def _calc_test_state_data(self, cr, uid, ids, fieldnames, args,
Calcs the value of fields 'test_approved' and 'test_success'.
The 'test_approved' field will be true if its test is in 'success' or
'failed' final states, and the fields 'test_success' will be True only
if its test is in 'success' state.
res = {}
for lot_trigger in self.browse(cr, uid, ids, context):
res[lot_trigger.id] = {}.fromkeys(fieldnames, False)
if (not lot_trigger.test_id or
lot_trigger.test_id.state not in ('success', 'failed')):
if 'test_approved' in fieldnames:
res[lot_trigger.id]['test_approved'] = True
if ('test_success' in fieldnames and
lot_trigger.test_id.state == 'success'):
res[lot_trigger.id]['test_success'] = True
return res
_columns = {
'prodlot_id': fields.many2one('stock.production.lot', 'Lot',
required=True, ondelete="cascade"),
'sequence': fields.integer('Sequence', required=True),
'trigger_id': fields.many2one('qc.trigger', 'Trigger', required=True,
help="The Quality Control Trigger Tag which defines when must "
"to be created a Test (using the specified template) for a "
"Production Lot of this Product."),
'template_type': fields.selection([
('generic', 'Generic'),
('related', 'Specific'),
], "Template's Type", required=True),
'test_id': fields.many2one('qc.test', 'Test', required=True,
'test_approved': fields.function(_calc_test_state_data, method=True,
type='boolean', string="Test approved?", multi='test_state'),
'test_success': fields.function(_calc_test_state_data, method=True,
type='boolean', string="Test success?", multi='test_state'),
_defaults = {
'sequence': 0,
_sql_constraints = [
'unique (prodlot_id, trigger_id, template_type)',
"The tuple QC Trigger Tag and Template Type must be unique for "
"each Production Lot!"),
class stock_production_lot(osv.osv):
Adds to the Lot an 'state' field and a workflow to manage its status related
to the Quality test that has passed.
It also adds one2many field to the model which defines the Quality Control
Tests that it must to pass.
_inherit = 'stock.production.lot'
# stock.production.lot
def get_available_states(self, cr, uid, context=None):
Returns the list of available states for Lot objects. It is defined as
a function to make easier to extend the list of states.
return [
('draft', 'Draft'),
('pending_test', 'Waiting QC Test'),
('valid', 'Valid'),
('test_failed', 'QC Test Failed'),
('cancel', 'Cancelled'),
# stock.production.lot
def __get_available_states(self, cr, uid, context=None):
Returns the list of available states for Lot objects.
It call's the function '_get_available_states' which could be
reimplemented to extend or modify the list of available states.
@see: _get_available_states
return self.get_available_states(cr, uid, context)
# stock.production.lot
def _calc_test_trigger_data(self, cr, uid, ids, fieldnames, args,
trigger_proxy = self.pool.get('stock.production.lot.qc.trigger.test')
res = {}
for lot_id in ids:
res[lot_id] = {}.fromkeys(fieldnames, [])
lot_trigger_ids = trigger_proxy.search(cr, uid,
[('prodlot_id', 'in', ids)], context=context)
for lot_trigger in trigger_proxy.browse(cr, uid, lot_trigger_ids,
lot_id = lot_trigger.prodlot_id.id
if 'qc_test_trigger_ro_ids' in fieldnames:
if 'qc_trigger_ids' in fieldnames:
return res
_columns = {
# adds the 'readonly' and 'states' attributes
'product_id': fields.many2one('product.product', 'Product',
required=True, readonly=True,
states={'draft': [('readonly', False)]}),
'state': fields.selection(__get_available_states, 'State',
required=True, readonly=True),
# 'state': fields.selection(
# lambda self, cr, uid, context=None: self.get_available_states,
# 'State', required=True, readonly=True),
'active': fields.boolean('Active'),
'qc_test_trigger_ids': fields.one2many(
'stock.production.lot.qc.trigger.test', 'prodlot_id',
'QC Tests', help="Defines the Quality Control Tests that this "
"Production Lot must to pass in certain situations defined by "
"the Trigger Tag."),
# Read Only version of previous field. Better solutions are wellcome
'qc_test_trigger_ro_ids': fields.function(_calc_test_trigger_data,
method=True, type='one2many', string='QC Tests',
'qc_trigger_ids': fields.function(_calc_test_trigger_data, method=True,
type='many2many', string='QC Triggers', relation='qc.trigger',
'current_qc_test_trigger_id': fields.many2one(
'Current QC Test Trigger',
domain="[('id','in',[x.id for x in qc_test_trigger_ids])]", readonly=True,
'current_qc_test_id': fields.related('current_qc_test_trigger_id',
'test_id', type='many2one', relation='qc.test',
string="Current QC Test", readonly=True),
_defaults = {
'state': 'draft',
'active': True,
# stock.production.lot
def _calc_qc_test_vals(self, cr, uid, prodlot, context):
Prepare data to create a qc.test instance related to a Production Lot
@param prodlot: Browse object of stock.production.lot
@return: Dictionary of stock.production.lot values
reference = 'stock.production.lot,%d' % prodlot.id
return {
'object_id': reference,
# stock.production.lot
def _calc_qc_test_trigger_ids_vals(self, cr, uid, prodlot, trigger_id,
Prepare data to create and write in the field 'qc_test_trigger_ids' of
Production Lot the QC Test Triggers corresponding to the supplied
@param prodlot: Browse object of stock.production.lot instance
@param trigger_id: ID of qc.trigger instance
@return: Dictionary of stock.production.lot values
qc_test_proxy = self.pool.get('qc.test')
if trigger_id in [x.id for x in prodlot.qc_trigger_ids]:
netsvc.Logger().notifyChannel(self._name, netsvc.LOG_DEBUG,
"Lot id:%d already has QC Test Trigger for Trigger %d" % (
prodlot.id, trigger_id))
return False
if trigger_id not in [x.id for x in prodlot.product_id.qc_trigger_ids]:
netsvc.Logger().notifyChannel(self._name, netsvc.LOG_DEBUG,
"Product id:%d of Lot id:%d doesn't have QC Template "
"Trigger for Trigger %d" % (prodlot.product_id.id,
prodlot.id, trigger_id))
return False
test_trigger_vals = []
for template_trigger in prodlot.product_id.qc_template_trigger_ids:
if template_trigger.trigger_id.id != trigger_id:
test_vals = self._calc_qc_test_vals(cr, uid, prodlot, context)
test_id = qc_test_proxy.create(cr, uid, test_vals, context)
qc_test_proxy.set_test_template(cr, uid, [test_id],
template_trigger.template_id.id, context)
test_trigger_vals.append((0, 0, {
'sequence': template_trigger.sequence,
'trigger_id': template_trigger.trigger_id.id,
'template_type': template_trigger.template_type,
'test_id': test_id,
if not test_trigger_vals:
return False
return {
'qc_test_trigger_ids': test_trigger_vals,
# stock.production.lot
def create_qc_test_triggers(self, cr, uid, prodlot, trigger_id,
set_next_test, context):
Create and write into Production Lot the Trigger Test corresponding to
supplied Trigger, getting templates configuration and QC templates from
the Product of Lot.
@param prodlot: Browse object of stock.production.lot instance
@param trigger_id: ID of qc.trigger instance
@param set_next_test: Sets the first Test of Trigger as current_qc_test_id and move Lot workflow to Pending Test
@return: Boolean value depending of triggers has been created
@raise except_osv: If set_next_test is True and Lot is not in 'Draft' or 'Valid' state
trigger_proxy = self.pool.get('stock.production.lot.qc.trigger.test')
wf_service = netsvc.LocalService("workflow")
if set_next_test and prodlot.state not in ('draft', 'valid'):
raise osv.except_osv(
_("Error setting next test!"),
_("It is trying to set a new next test for Production Lot "
"'%(lot_name)s' (id: %(lot_id)s) but it is in state "
"'%(lot_state)s' when it is expected 'Draft' or 'Valid' "
"states to change the current test.") % {
'lot_name': prodlot.name,
'lot_id': prodlot.id,
'lot_state': prodlot.state,
test_trigger_vals = self._calc_qc_test_trigger_ids_vals(cr, uid,
prodlot, trigger_id, context)
if test_trigger_vals:
self.write(cr, uid, [prodlot.id], test_trigger_vals, context)
res = True
res = False
if not set_next_test:
return res
trigger_test_ids = trigger_proxy.search(cr, uid, [
('prodlot_id', '=', prodlot.id),
('trigger_id', '=', trigger_id),
], order='sequence asc', limit=1, context=context)
if trigger_test_ids:
self.write(cr, uid, [prodlot.id],
{'current_qc_test_trigger_id': trigger_test_ids[0]},
# If 'set_next_test' is true it move the workflow independly if the Lot
# has any test (in this case it will be moved to 'Valid' state
if prodlot.state == 'draft':
wf_service.trg_validate(uid, 'stock.production.lot', prodlot.id,
'confirm', cr)
wf_service.trg_validate(uid, 'stock.production.lot', prodlot.id,
'next_test', cr)
return res
# stock.production.lot
def action_wofkflow_draft(self, cr, uid, ids, context=None):
Sets the State of Lot to 'Draft' and deactivate it
self.write(cr, uid, ids, {'state': 'draft', 'active': False}, context)
return True
# stock.production.lot
def _get_next_test_trigger_id(self, cr, uid, prodlot, context):
Calculates the Lot Trigger Test Id for the next Test to pass.
If there isn't a current trigger test specified, it returns False.
If the current Trigger Test is not 'Success', it returns this.
In other case, search the next trigger test (by Sequence) with the same
Trigger and return its Id.
@param prodlot: Browse object of stock.production.lot
@return: ID of stock.production.lot.qc.trigger.test instance or False
trigger_proxy = self.pool.get('stock.production.lot.qc.trigger.test')
if not prodlot.current_qc_test_trigger_id:
return False
# if current test is not 'Success' => it remains as Current test
if not prodlot.current_qc_test_trigger_id.test_success:
return prodlot.current_qc_test_trigger_id.id
# if current test is 'Success' => search next test for the same trigger
current_trigger = prodlot.current_qc_test_trigger_id
next_trigger_ids = trigger_proxy.search(cr, uid, [
('prodlot_id', '=', prodlot.id),
('id', '!=', current_trigger.id),
('sequence', '>=', current_trigger.sequence),
('trigger_id', '=', current_trigger.trigger_id.id),
], limit=1, context=context)
return next_trigger_ids and next_trigger_ids[0] or False
# stock.production.lot
def action_wofkflow_next_test(self, cr, uid, ids, context=None):
Find the Next Test to pass and write it to 'current_qc_test_trigger_id'
It will be empty if there isn't any current test or there aren't any
next test for current Trigger.
It will maintain the current test trigger if its test is not 'Success'
@see: _get_next_test_trigger_id
@param ids: List of IDs of stock.production.lot
for prodlot in self.browse(cr, uid, ids, context):
next_trigger_id = self._get_next_test_trigger_id(cr, uid, prodlot,
self.write(cr, uid, [prodlot.id], {
'current_qc_test_trigger_id': next_trigger_id,
}, context)
return True
# stock.production.lot
def test_valid(self, cr, uid, ids, context=None):
Checks that Production Lot doesn't has Current Test or it is in
'Success' state.
for prodlot in self.browse(cr, uid, ids, context):
if (prodlot.current_qc_test_id and
return False
return True
# stock.production.lot
def action_wofkflow_valid(self, cr, uid, ids, context=None):
Sets the State of Lot to 'Valid' and activate the Lot.
self.write(cr, uid, ids, {
'current_qc_test_trigger_id': False,
'state': 'valid',
'active': True,
}, context)
return True
# stock.production.lot
def test_pending_test(self, cr, uid, ids, context=None):
Checks that Production Lot has a Current Test and it is not Success
for prodlot in self.browse(cr, uid, ids, context):
if (not prodlot.current_qc_test_id or
return False
return True
# stock.production.lot
def action_wofkflow_pending_test(self, cr, uid, ids, context=None):
Sets the State of Lot to 'Pending Test' and deactivate it, and returns
the ID of current QC Test.
assert len(ids) == 1, "Unexpected number of Lot IDs in function of " \
"'Pending Test' workflow step."
lot = self.browse(cr, uid, ids[0], context)
assert lot.current_qc_test_id, "The 'Current Test' field is required " \
"for the Lots which reaches the 'Pending Test' workflow step, "\
"but the Lot %d doesn't have." % lot.id
self.write(cr, uid, ids, {
'state': 'pending_test',
'active': False,
}, context)
return lot.current_qc_test_id.id
# stock.production.lot
def action_wofkflow_test_failed(self, cr, uid, ids, context=None):
Sets the State of Lot to 'Test Failed', deactivate it and returns the
ID of current Test
assert len(ids) == 1, "Unexpected number of Lot IDs in function of " \
"'Pending Test' workflow step."
lot = self.browse(cr, uid, ids[0], context)
assert lot.current_qc_test_id, "The 'Current Test' field is required " \
"for the Lots which reaches the 'Test Failed' workflow step, "\
"but the Lot %d doesn't have." % lot.id
self.write(cr, uid, ids, {
'state': 'test_failed',
'active': False,
}, context)
return lot.current_qc_test_id.id
def test_not_qc_test_subflow(self, cr, uid, ids, context=None):
To be able to go out of 'subflow' activity without subflow signal,
the workitem must to be disassociated of any subflow.
It could be done with 'force_cancel' function which after raise
'cancel' signal
workitem_proxy = self.pool.get('workflow.workitem')
workflow_ids = self.pool.get('workflow').search(cr, uid, [
('osv', '=', 'stock.production.lot'),
('name', '=', 'stock.production.lot.basic'),
], context=context)
assert len(workflow_ids) == 1, "Unexpected number of workflows for " \
"'stock.production.lot' with name " \
"'stock.production.lot.basic'. Expected 1 and found %d" \
% len(workflow_ids)
instance_ids = self.pool.get('workflow.instance').search(cr, uid, [
('wkf_id', '=', workflow_ids[0]),
('res_type', '=', 'stock.production.lot'),
('res_id', 'in', ids),
('state', '=', 'active'),
], context)
if not instance_ids:
netsvc.Logger().notifyChannel(self._name, netsvc.LOG_WARNING,
"Unexpected not found any active Workflow Instance for "
"Production Lots with ids '%s' in test_not_qc_test_subflow "
"function." % str(ids))
return True
workitem_ids = workitem_proxy.search(cr, uid, [
('inst_id', 'in', instance_ids),
('state', '=', 'running'),
('subflow_id', '!=', False),
], context=context)
if not workitem_ids:
return True
lot_w_subflow = [str(wi.inst_id.res_id)
for wi in workitem_proxy.browse(cr, uid, workitem_ids, context)]
raise osv.osv_exception(
_("Error Cancelling Lot with subflow!"),
_("You are trying to cancel the Production Lots with IDs "
"'%(lot_ids)s' which are associated to QC Test workflow, and "
"it is not allowed.\n"
"Please, use the 'Force Cancel' button or function to cancel "
"a Lot in 'Pending Test' or 'Test Failed' state.") % {
'lot_ids': ", ".join(lot_w_subflow),
return False
# stock.production.lot
def action_wofkflow_cancel(self, cr, uid, ids, context=None):
Sets the State of Lot to 'Cancel', deactivate it and leave empty the
'Current QC Test Trigger' field.
self.write(cr, uid, ids, {
'current_qc_test_trigger_id': False,
'state': 'cancel',
'active': False,
}, context)
return True
# stock.production.lot
def action_force_cancel(self, cr, uid, ids, context=None):
Find Workitems (workflow) for these Lots with subflow and disassociate
them and sets their state to 'complete', and raise a 'cancel' signal
for all supplied Lot's ID's.
workitem_proxy = self.pool.get('workflow.workitem')
wf_service = netsvc.LocalService("workflow")
workflow_ids = self.pool.get('workflow').search(cr, uid, [
('osv', '=', 'stock.production.lot'),
('name', '=', 'stock.production.lot.basic'),
], context=context)
assert len(workflow_ids) == 1, "Unexpected number of workflows for " \
"'stock.production.lot' with name " \
"'stock.production.lot.basic'. Expected 1 and found %d" \
% len(workflow_ids)
instance_ids = self.pool.get('workflow.instance').search(cr, uid, [
('wkf_id', '=', workflow_ids[0]),
('res_type', '=', 'stock.production.lot'),
('res_id', 'in', ids),
('state', '=', 'active'),
], context)
if instance_ids:
workitem_ids = workitem_proxy.search(cr, uid, [
('inst_id', 'in', instance_ids),
('state', '=', 'running'),
('subflow_id', '!=', False),
], context=context)
if workitem_ids:
workitem_proxy.write(cr, uid, workitem_ids, {
'subflow_id': False,
'state': 'complete',
}, context)
for lot_id in ids:
wf_service.trg_validate(uid, 'stock.production.lot', lot_id,
'cancel', cr)
return True
# stock.production.lot
def copy(self, cr, uid, orig_id, default=None, context=None):
It only duplicate the trigger tests if the context has a True value for
'duplicate_trigger_test' key
if not context or not context.get('duplicate_trigger_test'):
default['qc_test_trigger_ids'] = False
return super(stock_production_lot, self).copy(cr, uid, orig_id, default,
# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: