diff --git a/modules/stock/move.py b/modules/stock/move.py --- a/modules/stock/move.py +++ b/modules/stock/move.py @@ -4,12 +4,15 @@ import operator from decimal import Decimal from functools import partial -from sql import Column +from sql import Literal, Union, Column +from sql.aggregate import Sum +from sql.conditionals import Coalesce from sql.operators import Concat from trytond.model import Workflow, Model, ModelView, ModelSQL, fields from trytond import backend -from trytond.pyson import In, Eval, Not, Equal, If, Get, Bool +from trytond.pyson import In, Eval, Not, Equal, If, Bool +from trytond.tools import reduce_ids from trytond.transaction import Transaction from trytond.pool import Pool @@ -88,6 +91,10 @@ Compute the domain to filter records which validates the domain over quantity field. + The context with keys: + stock_skip_warehouse: if set, quantities on a warehouse are no more + quantities of all child locations but quantities of the storage + zone. location_ids is the list of IDs of locations to take account to compute the stock. It can't be empty. grouping defines how stock moves are grouped. @@ -95,43 +102,48 @@ whose quantity is computed. """ pool = Pool() - Product = pool.get('product.product') + Location = pool.get('stock.location') + Move = pool.get('stock.move') if not location_ids or not domain: return [] - def _search_quantity_eval_domain(line, domain): - operator_funcs = { - '=': operator.eq, - '>=': operator.ge, - '>': operator.gt, - '<=': operator.le, - '<': operator.lt, - '!=': operator.ne, - 'in': lambda v, l: v in l, - 'not in': lambda v, l: v not in l, - } - - field, op, operand = domain - value = line.get(field) - return operator_funcs[op](value, operand) + # Skip warehouse location in favor of their storage location + # to compute quantities. Keep track of which ids to remove + # and to add after the query. + if Transaction().context.get('stock_skip_warehouse'): + location_ids = set(location_ids) + for location in Location.browse(list(location_ids)): + if location.type == 'warehouse': + location_ids.remove(location.id) + location_ids.add(location.storage_location.id) + location_ids = list(location_ids) with Transaction().set_context(cls._quantity_context(name)): - pbl = Product.products_by_location( - location_ids=location_ids, - with_childs=True, grouping=grouping) + query = Move.compute_quantities_query(location_ids, + with_childs=True, grouping=grouping, + grouping_filter=None) + having_domain = getattr(cls, name)._field.convert_domain(domain, { + None: (query, {}), + }, cls) + # The last column of 'query' is always the quantity for the 'key'. + # It is computed with a SUM() aggregator so in having we have to + # use the SUM() expression and not the name of column + having_domain.left = query.columns[-1].expression + if query.having: + query.having &= having_domain + else: + query.having = having_domain + quantities = Move.compute_quantities(query, location_ids, + with_childs=True, grouping=grouping, + grouping_filter=None) - processed_lines = [] - for key, quantity in pbl.iteritems(): + record_ids = [] + for key, quantity in quantities.iteritems(): # pbl could return None in some keys if key[position] is not None: - processed_lines.append({ - 'record_id': key[position], - name: quantity, - }) + record_ids.append(key[position]) - record_ids = [line['record_id'] for line in processed_lines - if _search_quantity_eval_domain(line, domain)] return [('id', 'in', record_ids)] @@ -789,3 +801,367 @@ pbl[from_key] = pbl.get(from_key, 0.0) - qty_default_uom pbl[to_key] = pbl.get(to_key, 0.0) + qty_default_uom return success + + @classmethod + def compute_quantities_query(cls, location_ids, with_childs=False, + grouping=('product',), grouping_filter=None): + """ + Prepare a query object to compute for each location and product the + stock quantity in the default uom of the product. + + The context with keys: + stock_date_end: if set the date of the stock computation. + stock_date_start: if set return the delta of the stock between the + two dates, (ignored if stock_date_end is missing). + stock_assign: if set compute also the assigned moves as done. + forecast: if set compute the forecast quantity. + stock_destinations: A list of location ids. If set, restrict the + computation to moves from and to those locations. + stock_skip_warehouse: if set, quantities on a warehouse are no more + quantities of all child locations but quantities of the storage + zone. + If with_childs, it computes also for child locations. + grouping is a tuple of Move field names and defines how stock moves are + grouped. + grouping_filter is a tuple of values, for the Move's field at the same + position in grouping tuple, used to filter which moves are used to + compute quantities. It must be None or have the same number of + elements than grouping. If no grouping_filter is provided it + returns quantities for all products. + + The query return the location as first column, after the fields in + grouping, and the last column is the quantity. + """ + pool = Pool() + Rule = pool.get('ir.rule') + Location = pool.get('stock.location') + Date = pool.get('ir.date') + Period = pool.get('stock.period') + Move = pool.get('stock.move') + Product = pool.get('product.product') + Template = pool.get('product.template') + + move = Move.__table__() + product = Product.__table__() + template = Template.__table__() + + today = Date.today() + + if not location_ids: + return None + context = Transaction().context.copy() + + for field in grouping: + if field not in Move._fields: + raise ValueError('"%s" has no field "%s"' % (Move, field)) + assert grouping_filter is None or len(grouping_filter) == len(grouping) + + move_rule_query = Rule.domain_get('stock.move') + if move_rule_query is None: + move_rule_query = Literal(True) + + PeriodCache = Period.get_cache(grouping) + period = None + if PeriodCache: + period_cache = PeriodCache.__table__() + + if not context.get('stock_date_end'): + context['stock_date_end'] = datetime.date.max + + # date end in the past or today: filter on state done + if (context['stock_date_end'] < today + or (context['stock_date_end'] == today + and not context.get('forecast'))): + state_date_clause = ( + move.state.in_(['done', + context.get('stock_assign') and 'assigned' or 'done']) + & ( + ( + (move.effective_date == None) + & (move.planned_date <= context['stock_date_end']) + ) + | (move.effective_date <= context['stock_date_end']) + ) + ) + # future date end: filter move on state done and date + # before today, or on all state and date between today and + # date_end. + else: + state_date_clause = ( + (move.state.in_(['done', + context.get('stock_assign') and 'assigned' + or 'done']) + & ( + ( + (move.effective_date == None) + & (move.planned_date <= today) + ) + | (move.effective_date <= today) + ) + ) + | (move.state.in_(['done', 'assigned', 'draft']) + & ( + ( + (move.effective_date == None) + & (Coalesce(move.planned_date, datetime.date.max) + <= context['stock_date_end']) + & (Coalesce(move.planned_date, datetime.date.max) + >= today) + ) + | ( + (move.effective_date <= context['stock_date_end']) + & (move.effective_date >= today) + ) + ) + ) + ) + + if context.get('stock_date_start'): + if context['stock_date_start'] > today: + state_date_clause &= ( + move.state.in_(['done', 'assigned', 'draft']) + & ( + ( + (move.effective_date == None) + & ( + (move.planned_date >= + context['stock_date_start']) + | (move.planned_date == None) + ) + ) + | (move.effective_date >= context['stock_date_start']) + ) + ) + else: + state_date_clause &= ( + ( + move.state.in_(['done', 'assigned', 'draft']) + & ( + ( + (move.effective_date == None) + & ( + (move.planned_date >= today) + | (move.planned_date == None) + ) + ) + | (move.effective_date >= today) + ) + ) + | ( + move.state.in_(['done', + context.get('stock_assign') and 'assigned' + or 'done']) + & ( + ( + (move.effective_date == None) + & ( + ( + (move.planned_date >= + context['stock_date_start']) + & (move.planned_date < today) + ) + | (move.planned_date == None) + ) + ) + | ( + (move.effective_date >= + context['stock_date_start']) + & (move.effective_date < today) + ) + ) + ) + ) + elif PeriodCache: + with Transaction().set_user(0, set_context=True): + periods = Period.search([ + ('date', '<', context['stock_date_end']), + ('state', '=', 'closed'), + ], order=[('date', 'DESC')], limit=1) + if periods: + period, = periods + state_date_clause &= ( + Coalesce(move.effective_date, move.planned_date, + datetime.date.max) > period.date) + + if with_childs: + location_query = Location.search([ + ('parent', 'child_of', location_ids), + ], query=True, order=[]) + else: + location_query = location_ids[:] + + from_ = move + if PeriodCache: + from_period = period_cache + if grouping_filter and any(grouping_filter): + where = where_period = Literal(True) + for fieldname, grouping_ids in zip(grouping, grouping_filter): + if not grouping_ids: + continue + column = Column(move, fieldname) + if PeriodCache: + cache_column = Column(period_cache, fieldname) + if isinstance(grouping_ids[0], (int, long, float, Decimal)): + where &= reduce_ids(column, grouping_ids) + if PeriodCache: + where_period &= reduce_ids(cache_column, grouping_ids) + else: + where &= column.in_(grouping_ids) + if PeriodCache: + where_period &= cache_column.in_(grouping_ids) + else: + where = where_period = template.active == True + from_ = from_.join(product, condition=move.product == product.id) + from_ = from_.join(template, + condition=product.template == template.id) + if PeriodCache: + from_period = from_period.join(product, + condition=period_cache.product == product.id) + from_period = from_period.join(template, + condition=product.template == template.id) + + if context.get('stock_destinations'): + destinations = context['stock_destinations'] + dest_clause_from = move.from_location.in_(destinations) + dest_clause_to = move.to_location.in_(destinations) + + if PeriodCache: + dest_clause_period = period_cache.location.in_(destinations) + + else: + dest_clause_from = dest_clause_to = dest_clause_period = \ + Literal(True) + + # The main select clause is a union between three similar subqueries. + # One that sums incoming moves towards locations, one that sums + # outgoing moves and one for the period cache. UNION ALL is used + # because we already know that there will be no duplicates. + move_keys = [Column(move, key).as_(key) for key in grouping] + query = from_.select(move.to_location.as_('location'), + Sum(move.internal_quantity).as_('quantity'), + *move_keys, + where=state_date_clause + & where + & move.to_location.in_(location_query) + & move.id.in_(move_rule_query) + & dest_clause_from, + group_by=[move.to_location] + move_keys) + query = Union(query, from_.select(move.from_location.as_('location'), + (-Sum(move.internal_quantity)).as_('quantity'), + *move_keys, + where=state_date_clause + & where + & move.from_location.in_(location_query) + & move.id.in_(move_rule_query) + & dest_clause_to, + group_by=[move.from_location] + move_keys), + all_=True) + if PeriodCache: + period_keys = [Column(period_cache, key).as_(key) + for key in grouping] + query = Union(query, from_period.select( + period_cache.location.as_('location'), + period_cache.internal_quantity.as_('quantity'), + *period_keys, + where=(period_cache.period + == (period.id if period else None)) + & where_period + & period_cache.location.in_(location_query) + & dest_clause_period), + all_=True) + query_keys = [Column(query, key).as_(key) for key in grouping] + columns = ([query.location.as_('location')] + + query_keys + + [Sum(query.quantity).as_('quantity')]) + query = query.select(*columns, + group_by=[query.location] + query_keys) + return query + + @classmethod + def compute_quantities(cls, query, location_ids, with_childs=False, + grouping=('product',), grouping_filter=None): + """ + Executes the supplied query to compute for each location and product + the stock quantity in the default uom of the product and rounded to + Uom's rounding digits. + + See compute_quantites_query for params explanation. + + Return a dictionary with location id and grouping as key + and quantity as value. + """ + pool = Pool() + Location = pool.get('stock.location') + Product = pool.get('product.product') + Uom = pool.get('product.uom') + + assert query is not None, ( + "Query in Move.compute_quantities() can't be None") + assert 'product' in grouping + + cursor = Transaction().cursor + cursor.execute(*query) + raw_lines = cursor.fetchall() + + product_getter = operator.itemgetter(grouping.index('product') + 1) + res_product_ids = set() + quantities = {} + keys = set() + for line in raw_lines: + location = line[0] + key = tuple(line[1:-1]) + quantity = line[-1] + quantities[(location,) + key] = quantity + res_product_ids.add(product_getter(line)) + keys.add(key) + + # Propagate quantities on from child locations to their parents + if with_childs: + # Fetch all child locations + locations = Location.search([ + ('parent', 'child_of', location_ids), + ]) + # Generate a set of locations without childs and a dict + # giving the parent of each location. + leafs = set([l.id for l in locations]) + parent = {} + for location in locations: + if not location.parent: + continue + if location.parent.id in leafs: + leafs.remove(location.parent.id) + parent[location.id] = location.parent.id + locations = set((l.id for l in locations)) + while leafs: + for l in leafs: + locations.remove(l) + if l not in parent: + continue + for key in keys: + parent_key = (parent[l],) + key + quantities.setdefault(parent_key, 0) + quantities[parent_key] += quantities.get((l,) + key, 0) + next_leafs = set(locations) + for l in locations: + if l not in parent: + continue + if parent[l] in next_leafs and parent[l] in locations: + next_leafs.remove(parent[l]) + leafs = next_leafs + + # clean result + for key in quantities.keys(): + location = key[0] + if location not in location_ids: + del quantities[key] + + # Round quantities + default_uom = dict((p.id, p.default_uom) for p in + Product.browse(list(res_product_ids))) + for key, quantity in quantities.iteritems(): + location = key[0] + product = product_getter(key) + uom = default_uom[product] + quantities[key] = Uom.round(quantity, uom.rounding) + + return quantities diff --git a/modules/stock/product.py b/modules/stock/product.py --- a/modules/stock/product.py +++ b/modules/stock/product.py @@ -2,9 +2,8 @@ #this repository contains the full copyright notices and license terms. import datetime from decimal import Decimal -from operator import itemgetter -from sql import Literal, Union, Column -from sql.aggregate import Max, Sum +from sql import Literal +from sql.aggregate import Max from sql.functions import Now from sql.conditionals import Coalesce @@ -12,7 +11,6 @@ from trytond.wizard import Wizard, StateView, StateAction, Button from trytond.pyson import PYSONEncoder, Eval, Or from trytond.transaction import Transaction -from trytond.tools import reduce_ids from trytond.pool import Pool, PoolMeta from .move import StockMixin @@ -136,322 +134,40 @@ If product_ids is None all products are used. If with_childs, it computes also for child locations. grouping defines how stock moves are grouped. + stock_skip_warehouse: if set, quantities on a warehouse are no more + quantities of all child locations but quantities of the storage + zone. Return a dictionary with location id and grouping as key and quantity as value. """ pool = Pool() - Uom = pool.get('product.uom') - Rule = pool.get('ir.rule') Location = pool.get('stock.location') - Date = pool.get('ir.date') - Period = pool.get('stock.period') Move = pool.get('stock.move') - Product = pool.get('product.product') - Template = pool.get('product.template') - - move = Move.__table__() - product = Product.__table__() - template = Template.__table__() - - today = Date.today() - - if not location_ids: - return {} - cursor = Transaction().cursor - context = Transaction().context.copy() - - for field in grouping: - if field not in Move._fields: - raise ValueError('"%s" has no field "%s"' % (Move, field)) - assert 'product' in grouping # Skip warehouse location in favor of their storage location # to compute quantities. Keep track of which ids to remove # and to add after the query. - location_ids = set(location_ids) storage_to_remove = set() wh_to_add = {} - for location in Location.browse(list(location_ids)): - if (location.type == 'warehouse' - and Transaction().context.get('stock_skip_warehouse')): - location_ids.remove(location.id) - if location.storage_location.id not in location_ids: - storage_to_remove.add(location.storage_location.id) - location_ids.add(location.storage_location.id) - wh_to_add[location.id] = location.storage_location.id - location_ids = list(location_ids) + if Transaction().context.get('stock_skip_warehouse'): + location_ids = set(location_ids) + for location in Location.browse(list(location_ids)): + if location.type == 'warehouse': + location_ids.remove(location.id) + if location.storage_location.id not in location_ids: + storage_to_remove.add(location.storage_location.id) + location_ids.add(location.storage_location.id) + wh_to_add[location.id] = location.storage_location.id + location_ids = list(location_ids) - move_rule_query = Rule.domain_get('stock.move') - if move_rule_query is None: - move_rule_query = Literal(True) - - PeriodCache = Period.get_cache(grouping) - period = None - if PeriodCache: - period_cache = PeriodCache.__table__() - - if not context.get('stock_date_end'): - context['stock_date_end'] = datetime.date.max - - # date end in the past or today: filter on state done - if (context['stock_date_end'] < today - or (context['stock_date_end'] == today - and not context.get('forecast'))): - state_date_clause = ( - move.state.in_(['done', - context.get('stock_assign') and 'assigned' or 'done']) - & ( - ( - (move.effective_date == None) - & (move.planned_date <= context['stock_date_end']) - ) - | (move.effective_date <= context['stock_date_end']) - ) - ) - # future date end: filter move on state done and date - # before today, or on all state and date between today and - # date_end. - else: - state_date_clause = ( - (move.state.in_(['done', - context.get('stock_assign') and 'assigned' - or 'done']) - & ( - ( - (move.effective_date == None) - & (move.planned_date <= today) - ) - | (move.effective_date <= today) - ) - ) - | (move.state.in_(['done', 'assigned', 'draft']) - & ( - ( - (move.effective_date == None) - & (Coalesce(move.planned_date, datetime.date.max) - <= context['stock_date_end']) - & (Coalesce(move.planned_date, datetime.date.max) - >= today) - ) - | ( - (move.effective_date <= context['stock_date_end']) - & (move.effective_date >= today) - ) - ) - ) - ) - - if context.get('stock_date_start'): - if context['stock_date_start'] > today: - state_date_clause &= ( - move.state.in_(['done', 'assigned', 'draft']) - & ( - ( - (move.effective_date == None) - & ( - (move.planned_date >= - context['stock_date_start']) - | (move.planned_date == None) - ) - ) - | (move.effective_date >= context['stock_date_start']) - ) - ) - else: - state_date_clause &= ( - ( - move.state.in_(['done', 'assigned', 'draft']) - & ( - ( - (move.effective_date == None) - & ( - (move.planned_date >= today) - | (move.planned_date == None) - ) - ) - | (move.effective_date >= today) - ) - ) - | ( - move.state.in_(['done', - context.get('stock_assign') and 'assigned' - or 'done']) - & ( - ( - (move.effective_date == None) - & ( - ( - (move.planned_date >= - context['stock_date_start']) - & (move.planned_date < today) - ) - | (move.planned_date == None) - ) - ) - | ( - (move.effective_date >= - context['stock_date_start']) - & (move.effective_date < today) - ) - ) - ) - ) - elif PeriodCache: - with Transaction().set_user(0, set_context=True): - periods = Period.search([ - ('date', '<', context['stock_date_end']), - ('state', '=', 'closed'), - ], order=[('date', 'DESC')], limit=1) - if periods: - period, = periods - state_date_clause &= ( - Coalesce(move.effective_date, move.planned_date, - datetime.date.max) > period.date) - - if with_childs: - location_query = Location.search([ - ('parent', 'child_of', location_ids), - ], query=True, order=[]) - else: - location_query = location_ids[:] - - from_ = move - if PeriodCache: - from_period = period_cache - if product_ids: - where = reduce_ids(move.product, product_ids) - if PeriodCache: - where_period = reduce_ids(period_cache.product, product_ids) - else: - where = where_period = template.active == True - from_ = from_.join(product, condition=move.product == product.id) - from_ = from_.join(template, - condition=product.template == template.id) - if PeriodCache: - from_period = from_period.join(product, - condition=period_cache.product == product.id) - from_period = from_period.join(template, - condition=product.template == template.id) - - if context.get('stock_destinations'): - destinations = context.get('stock_destinations') - dest_clause_from = move.from_location.in_(destinations) - dest_clause_to = move.to_location.in_(destinations) - - if PeriodCache: - dest_clause_period = period_cache.location.in_(destinations) - - else: - dest_clause_from = dest_clause_to = dest_clause_period = \ - Literal(True) - - # The main select clause is a union between three similar subqueries. - # One that sums incoming moves towards locations, one that sums - # outgoing moves and one for the period cache. UNION ALL is used - # because we already know that there will be no duplicates. - move_keys = [Column(move, key).as_(key) for key in grouping] - query = from_.select(move.to_location.as_('location'), - Sum(move.internal_quantity).as_('quantity'), - *move_keys, - where=state_date_clause - & where - & move.to_location.in_(location_query) - & move.id.in_(move_rule_query) - & dest_clause_from, - group_by=[move.to_location] + move_keys) - query = Union(query, from_.select(move.from_location.as_('location'), - (-Sum(move.internal_quantity)).as_('quantity'), - *move_keys, - where=state_date_clause - & where - & move.from_location.in_(location_query) - & move.id.in_(move_rule_query) - & dest_clause_to, - group_by=[move.from_location] + move_keys), - all_=True) - if PeriodCache: - period_keys = [Column(period_cache, key).as_(key) - for key in grouping] - query = Union(query, from_period.select( - period_cache.location.as_('location'), - period_cache.internal_quantity.as_('quantity'), - *period_keys, - where=(period_cache.period - == (period.id if period else None)) - & where_period - & period_cache.location.in_(location_query) - & dest_clause_period), - all_=True) - query_keys = [Column(query, key).as_(key) for key in grouping] - columns = ([query.location.as_('location')] - + query_keys - + [Sum(query.quantity).as_('quantity')]) - query = query.select(*columns, - group_by=[query.location] + query_keys) - cursor.execute(*query) - raw_lines = cursor.fetchall() - - product_getter = itemgetter(grouping.index('product') + 1) - res_product_ids = set() - res = {} - keys = set() - for line in raw_lines: - location = line[0] - key = tuple(line[1:-1]) - quantity = line[-1] - res[(location,) + key] = quantity - res_product_ids.add(product_getter(line)) - keys.add(key) - - # Propagate quantities on from child locations to their parents - if with_childs: - # Fetch all child locations - locations = Location.search([ - ('parent', 'child_of', location_ids), - ]) - # Generate a set of locations without childs and a dict - # giving the parent of each location. - leafs = set([l.id for l in locations]) - parent = {} - for location in locations: - if not location.parent: - continue - if location.parent.id in leafs: - leafs.remove(location.parent.id) - parent[location.id] = location.parent.id - locations = set((l.id for l in locations)) - while leafs: - for l in leafs: - locations.remove(l) - if l not in parent: - continue - for key in keys: - parent_key = (parent[l],) + key - res.setdefault(parent_key, 0) - res[parent_key] += res.get((l,) + key, 0) - next_leafs = set(locations) - for l in locations: - if l not in parent: - continue - if parent[l] in next_leafs and parent[l] in locations: - next_leafs.remove(parent[l]) - leafs = next_leafs - - # clean result - for key in res.keys(): - location = key[0] - if location not in location_ids: - del res[key] - - # Round quantities - default_uom = dict((p.id, p.default_uom) for p in - cls.browse(list(res_product_ids))) - for key, quantity in res.iteritems(): - location = key[0] - product = product_getter(key) - uom = default_uom[product] - res[key] = Uom.round(quantity, uom.rounding) + grouping_filter = (product_ids,) + tuple(None for k in grouping[1:]) + query = Move.compute_quantities_query(location_ids, with_childs, + grouping=grouping, grouping_filter=grouping_filter) + if query is None: + return {} + res = Move.compute_quantities(query, location_ids, with_childs, + grouping=grouping, grouping_filter=grouping_filter) if wh_to_add: for wh, storage in wh_to_add.iteritems(): diff --git a/modules/stock/tests/test_stock.py b/modules/stock/tests/test_stock.py --- a/modules/stock/tests/test_stock.py +++ b/modules/stock/tests/test_stock.py @@ -280,6 +280,68 @@ else: self.assertEqual(product_reloaded.quantity, quantity) + # Python 2.6 compatibility (assertIn and assertNotIn added in 2.7) + assertIn = getattr(self, 'assertIn', + lambda a, b: self.assertTrue(a in b)) + assertNotIn = getattr(self, 'assertNotIn', + lambda a, b: self.assertTrue(a not in b)) + + def tests_product_search_quantity(context, quantity): + with transaction.set_context(locations=[storage.id]): + if (not context.get('stock_date_end') + or context['stock_date_end'] > today + or context.get('forecast')): + fname = 'forecast_quantity' + else: + fname = 'quantity' + found_products = self.product.search([ + (fname, '=', quantity), + ]) + assertIn(product, found_products) + + found_products = self.product.search([ + (fname, '!=', quantity), + ]) + assertNotIn(product, found_products) + + found_products = self.product.search([ + (fname, 'in', (quantity, quantity + 1)), + ]) + assertIn(product, found_products) + + found_products = self.product.search([ + (fname, 'not in', (quantity, quantity + 1)), + ]) + assertNotIn(product, found_products) + + found_products = self.product.search([ + (fname, '<', quantity), + ]) + assertNotIn(product, found_products) + found_products = self.product.search([ + (fname, '<', quantity + 1), + ]) + assertIn(product, found_products) + + found_products = self.product.search([ + (fname, '>', quantity), + ]) + assertNotIn(product, found_products) + found_products = self.product.search([ + (fname, '>', quantity - 1), + ]) + assertIn(product, found_products) + + found_products = self.product.search([ + (fname, '>=', quantity), + ]) + assertIn(product, found_products) + + found_products = self.product.search([ + (fname, '<=', quantity), + ]) + assertIn(product, found_products) + def test_products_by_location(): for context, quantity in tests: with transaction.set_context(context): @@ -289,6 +351,7 @@ self.assertEqual(products_by_location(), {(storage.id, product.id): quantity}) tests_product_quantity(context, quantity) + tests_product_search_quantity(context, quantity) test_products_by_location()