597 lines
24 KiB
Python
597 lines
24 KiB
Python
# This file is part of purchase_editable_line module for Tryton.
|
|
# The COPYRIGHT file at the top level of this repository contains
|
|
# the full copyright notices and license terms.
|
|
from decimal import Decimal
|
|
from datetime import date, timedelta
|
|
from shutil import move
|
|
from trytond.model import ModelView, fields
|
|
from trytond.wizard import (Wizard, StateView, StateTransition,
|
|
Button, StateAction, StateReport)
|
|
from trytond.pool import Pool
|
|
from trytond.transaction import Transaction
|
|
from trytond.report import Report
|
|
from trytond.exceptions import UserError
|
|
from trytond.i18n import gettext
|
|
from .exceptions import SupplierMovesWarning
|
|
|
|
|
|
class CreatePurchaseSuggestedStart(ModelView):
|
|
'Create Purchase Suggested Start'
|
|
__name__ = 'purchase_suggested.create_order.start'
|
|
supplier = fields.Many2One('party.party', 'Supplier', required=True)
|
|
time_stock = fields.Integer('Time Stock', required=True, help='In days')
|
|
location = fields.Many2One('stock.location', 'Location', domain=[
|
|
('type', '=', 'warehouse')
|
|
])
|
|
historical_time = fields.Integer('Historical Time', required=True,
|
|
help='In days')
|
|
|
|
@staticmethod
|
|
def default_historical_time():
|
|
return 90
|
|
|
|
@staticmethod
|
|
def default_time_stock():
|
|
return 30
|
|
|
|
|
|
class CreatePurchaseSuggested(Wizard):
|
|
'Create Purchase Suggested'
|
|
__name__ = 'purchase_suggested.create_order'
|
|
start = StateView('purchase_suggested.create_order.start',
|
|
'purchase_suggested.create_purchase_suggested_view_form', [
|
|
Button('Cancel', 'end', 'tryton-cancel'),
|
|
Button('Ok', 'accept', 'tryton-ok'),
|
|
])
|
|
accept = StateTransition()
|
|
|
|
def get_pending_purchase(self, product_id):
|
|
StockMove = Pool().get('stock.move')
|
|
qty_records = StockMove.search_read([
|
|
('product', '=', product_id),
|
|
('state', '=', 'draft'),
|
|
('from_location.type', 'in', ['supplier']),
|
|
], fields_names=['quantity'])
|
|
return sum([l['quantity'] for l in qty_records])
|
|
|
|
def get_product_stock(self, product_id, locations):
|
|
pool = Pool()
|
|
Product = pool.get('product.product')
|
|
stock_context = {
|
|
'stock_date_end': date.today(),
|
|
'locations': locations
|
|
}
|
|
with Transaction().set_context(stock_context):
|
|
product, = Product.search([('id', '=', product_id)])
|
|
return product.quantity
|
|
|
|
def transition_accept(self):
|
|
pool = Pool()
|
|
Purchase = pool.get('purchase.purchase')
|
|
StockMove = pool.get('stock.move')
|
|
Location = pool.get('stock.location')
|
|
Party = pool.get('party.party')
|
|
Currency = pool.get('currency.currency')
|
|
today = date.today()
|
|
start_date = today - timedelta(self.start.historical_time)
|
|
products_ids = self.get_supplier_products(self.start.supplier)
|
|
historical_time = self.start.historical_time
|
|
time_stock = self.start.time_stock
|
|
dom_stock = [
|
|
('effective_date', '>=', start_date),
|
|
('effective_date', '<=', today),
|
|
('state', '=', 'done'),
|
|
('product', 'in', products_ids),
|
|
('to_location.type', 'in', ['customer', 'production']),
|
|
]
|
|
location_id = None
|
|
if self.start.location:
|
|
location_id = self.start.location.storage_location.id
|
|
location_ids = [location_id]
|
|
dom_stock.append(('from_location', '=', location_id))
|
|
else:
|
|
dom_stock.append(('from_location.type', '=', 'storage'))
|
|
locations = Location.search([
|
|
('type', '=', 'warehouse')
|
|
])
|
|
location_ids = [l.storage_location.id for l in locations]
|
|
moves = StockMove.search(dom_stock, order=[('effective_date', 'ASC')])
|
|
to_create = {
|
|
'party': self.start.supplier.id,
|
|
'invoice_address': self.start.supplier.address_get(type='invoice'),
|
|
'purchase_date': today,
|
|
'state': 'draft',
|
|
'warehouse': self.start.location.id if self.start.location else None
|
|
}
|
|
|
|
purchase, = Purchase.create([to_create])
|
|
target_products = {}
|
|
historical_products = {}
|
|
for move in moves:
|
|
if move.product not in target_products.keys():
|
|
# setattr(move.product, 'historical_time', move.effective_date)
|
|
target_products[move.product] = [move.quantity]
|
|
historical_products[move.product.id] = move.effective_date
|
|
else:
|
|
target_products[move.product].append(move.quantity)
|
|
currency_id = Transaction().context.get('currency')
|
|
|
|
for product, quantities in target_products.items():
|
|
""" Stock necesario para n dias - existencia"""
|
|
current_stock = self.get_product_stock(product.id, location_ids)
|
|
pdt_historical_time = historical_products[product.id]
|
|
real_historical_time = historical_time
|
|
if pdt_historical_time > start_date:
|
|
real_historical_time = historical_time - (pdt_historical_time - start_date).days
|
|
# suggest_qty = int(time_stock * sum(quantities) / real_historical_time
|
|
# - current_stock - self.get_pending_purchase(product.id))
|
|
if real_historical_time == 0:
|
|
suggest_qty = 0
|
|
else:
|
|
suggest_qty = int(time_stock * sum(quantities) / real_historical_time - current_stock)
|
|
if suggest_qty <= 0:
|
|
continue
|
|
order = {
|
|
'product': product,
|
|
'quantity': suggest_qty,
|
|
'party': Party(self.start.supplier.id),
|
|
'uom': product.purchase_uom,
|
|
'currency': Currency(currency_id),
|
|
}
|
|
line = self.compute_purchase_line(order, purchase)
|
|
line.purchase = purchase
|
|
line.save()
|
|
|
|
return 'end'
|
|
|
|
def get_supplier_products(self, supplier):
|
|
""" Return a list of ids supplier products """
|
|
ProductSupplier = Pool().get('purchase.product_supplier')
|
|
products_supplier = ProductSupplier.search([
|
|
('party', '=', supplier.id),
|
|
])
|
|
products_ids = []
|
|
for ps in products_supplier:
|
|
products_ids.extend([p.id for p in ps.template.products])
|
|
return products_ids
|
|
|
|
@classmethod
|
|
def compute_purchase_line(cls, request, purchase):
|
|
pool = Pool()
|
|
Product = pool.get('product.product')
|
|
Line = pool.get('purchase.line')
|
|
|
|
product = request['product']
|
|
line = Line(
|
|
product=product,
|
|
unit=request['uom'],
|
|
quantity=request['quantity'],
|
|
description=product.name,
|
|
)
|
|
|
|
# XXX purchase with several lines of the same product
|
|
with Transaction().set_context(uom=request['uom'].id,
|
|
supplier=request['party'].id,
|
|
currency=request['currency'].id):
|
|
product_price = Product.get_purchase_price(
|
|
[product], request['quantity'])[product.id]
|
|
product_price = product_price.quantize(
|
|
Decimal(1) / 10 ** Line.unit_price.digits[1])
|
|
|
|
if product_price is None:
|
|
raise UserError(gettext('purchase_suggested.msg_late_supplier_moves', s=product.rec_name))
|
|
line.unit_price = product_price
|
|
|
|
taxes = []
|
|
for tax in product.supplier_taxes_used:
|
|
if request['party'] and request['party'].supplier_tax_rule:
|
|
pattern = cls._get_tax_rule_pattern(request)
|
|
tax_ids = request['party'].supplier_tax_rule.apply(tax, pattern)
|
|
if tax_ids:
|
|
taxes.extend(tax_ids)
|
|
continue
|
|
taxes.append(tax.id)
|
|
line.taxes = taxes
|
|
return line
|
|
|
|
@staticmethod
|
|
def _get_tax_rule_pattern(request):
|
|
'''
|
|
Get tax rule pattern
|
|
'''
|
|
return {}
|
|
|
|
|
|
class CreatePurchaseRequestBySupplierStart(ModelView):
|
|
'Create Purchase Request By Supplier'
|
|
__name__ = 'purchase_suggested.create_purchase_request_by_supplier.start'
|
|
party = fields.Many2One('party.party', 'Party',
|
|
required=True)
|
|
|
|
|
|
class CreatePurchaseRequestBySupplier(Wizard):
|
|
'Create Purchase Requests By Supplier'
|
|
__name__ = 'purchase_suggested.create_purchase_request_by_supplier'
|
|
start = StateView('purchase_suggested.create_purchase_request_by_supplier.start',
|
|
'purchase_suggested.purchase_request_create_start_view_form', [
|
|
Button('Cancel', 'end', 'tryton-cancel'),
|
|
Button('Create', 'create_', 'tryton-ok', default=True),
|
|
])
|
|
create_ = StateAction('purchase_request.act_purchase_request_form')
|
|
|
|
@classmethod
|
|
def __setup__(cls):
|
|
super(CreatePurchaseRequestBySupplier, cls).__setup__()
|
|
|
|
@property
|
|
def _requests_parameters(self):
|
|
return {}
|
|
|
|
def do_create_(self, action):
|
|
pool = Pool()
|
|
PurchaseRequest = pool.get('purchase.request')
|
|
Move = pool.get('stock.move')
|
|
ProductSupplier = pool.get('purchase.product_supplier')
|
|
Date = pool.get('ir.date')
|
|
today = Date.today()
|
|
products_supplier = ProductSupplier.search([
|
|
('party', '=', self.start.party.id),
|
|
])
|
|
products = [product for ps in products_supplier for product in ps.template.products]
|
|
moves = Move.search([
|
|
('from_location.type', '=', 'supplier'),
|
|
('to_location.type', '=', 'storage'),
|
|
('product', 'in', products),
|
|
('state', '=', 'draft'),
|
|
('planned_date', '<', today),
|
|
], order=[])
|
|
if moves:
|
|
raise SupplierMovesWarning(gettext('purchase_suggested.late_supplier_moves', module=self.__name__, date=today))
|
|
PurchaseRequest.generate_requests(products=products)
|
|
return action, {}
|
|
|
|
def transition_create_(self):
|
|
return 'end'
|
|
|
|
|
|
class CreateRequestBySupplierStart(ModelView):
|
|
'Create Request By Supplier Start'
|
|
__name__ = 'purchase_suggested.create_request_by_supplier.start'
|
|
supplier = fields.Many2One('party.party', 'Supplier', required=True)
|
|
warehouse_location = fields.Many2One('stock.location',
|
|
'Warehouse Location', select=True, domain=[('type', '=', 'warehouse')],
|
|
required=True)
|
|
|
|
|
|
class CreateRequestBySupplier(Wizard):
|
|
'Create Request By Supplier'
|
|
__name__ = 'purchase_suggested.create_request_by_supplier'
|
|
start = StateView('purchase_suggested.create_request_by_supplier.start',
|
|
'purchase_suggested.create_request_by_supplier_start_view_form', [
|
|
Button('Cancel', 'end', 'tryton-cancel'),
|
|
Button('Create', 'create_', 'tryton-ok', default=True),
|
|
])
|
|
create_ = StateTransition()
|
|
|
|
def transition_create_(self):
|
|
pool = Pool()
|
|
ProductSupplier = pool.get('purchase.product_supplier')
|
|
PurchaseRequest = pool.get('purchase.request')
|
|
Product = pool.get('product.product')
|
|
OrderPoint = pool.get('stock.order_point')
|
|
User = pool.get('res.user')
|
|
company = User(Transaction().user).company
|
|
targets = ProductSupplier.search([
|
|
('party', '=', self.start.supplier.id)
|
|
])
|
|
|
|
request_to_create = []
|
|
products_ids = [p.id for t in targets if t.template for p in t.template.products]
|
|
order_points = OrderPoint.search([
|
|
('warehouse_location', '=', self.start.warehouse_location.id),
|
|
('company', '=', company.id),
|
|
('product', 'in', products_ids),
|
|
('type', '=', 'purchase'),
|
|
])
|
|
products2op = {op.product.id: (op.min_quantity , op.target_quantity) for op in order_points}
|
|
|
|
today = date.today()
|
|
stock_context = {
|
|
'stock_date_end': today,
|
|
'locations': [self.start.warehouse_location.id],
|
|
}
|
|
|
|
with Transaction().set_context(stock_context):
|
|
products = Product.search([
|
|
('id', 'in', products2op.keys())
|
|
])
|
|
products_dict = {p.id: p.quantity for p in products}
|
|
|
|
for op in order_points:
|
|
pto_min, pto_max = products2op[op.product.id]
|
|
product_qty = products_dict[op.product.id]
|
|
if product_qty > pto_min:
|
|
continue
|
|
req_quantity = pto_max - product_qty
|
|
request_to_create.append({
|
|
'product': op.product,
|
|
'description': op.product.template.name,
|
|
'party': self.start.supplier.id,
|
|
'quantity': req_quantity,
|
|
'uom': op.product.template.default_uom,
|
|
'warehouse': self.start.warehouse_location.id,
|
|
'company': company.id,
|
|
'state': 'draft',
|
|
'origin': str(op),
|
|
})
|
|
PurchaseRequest.create(request_to_create)
|
|
return 'end'
|
|
|
|
|
|
class PurchaseSuggestedReportStart(ModelView):
|
|
'Purchase Suggested Report Start'
|
|
__name__ = 'purchase_suggested.purchase_suggested_report.start'
|
|
company = fields.Many2One('company.company', 'Company', required=True)
|
|
time_stock = fields.Integer('Time Stock', required=True, help='In days')
|
|
supplier = fields.Many2One('party.party', 'Supplier')
|
|
location = fields.Many2One('stock.location', 'Location', domain=[
|
|
('type', '=', 'warehouse')
|
|
])
|
|
historical_time = fields.Integer('Historical Time', required=True,
|
|
help='In days')
|
|
categories = fields.Many2Many(
|
|
'product.category', None, None, 'Categories')
|
|
|
|
@staticmethod
|
|
def default_company():
|
|
return Transaction().context.get('company')
|
|
|
|
@staticmethod
|
|
def default_historical_time():
|
|
return 90
|
|
|
|
@staticmethod
|
|
def default_time_stock():
|
|
return 30
|
|
|
|
|
|
class PrintPurchaseSuggestedReport(Wizard):
|
|
'Purchase Suggested Report Wizard'
|
|
__name__ = 'purchase_suggested.print_purchase_suggested_report'
|
|
start = StateView('purchase_suggested.purchase_suggested_report.start',
|
|
'purchase_suggested.print_purchase_suggested_report_start_view_form', [
|
|
Button('Cancel', 'end', 'tryton-cancel'),
|
|
Button('Print', 'print_', 'tryton-print', default=True),
|
|
])
|
|
print_ = StateReport('purchase_suggested.purchase_suggested_report')
|
|
|
|
def do_print_(self, action):
|
|
supplier = None
|
|
location_id = None
|
|
location_name = None
|
|
if self.start.location:
|
|
location_id = self.start.location.storage_location.id
|
|
location_name = self.start.location.storage_location.name
|
|
if self.start.supplier:
|
|
supplier = self.start.supplier.id
|
|
data = {
|
|
'company': self.start.company.id,
|
|
'supplier': supplier,
|
|
'location': location_id,
|
|
'location_name': location_name,
|
|
'time_stock': self.start.time_stock,
|
|
'historical_time': self.start.historical_time,
|
|
'categories': [category.id for category in self.start.categories]
|
|
}
|
|
return action, data
|
|
|
|
def transition_print_(self):
|
|
return 'end'
|
|
|
|
|
|
class PurchaseSuggestedReport(Report):
|
|
'Purchase Suggested Report'
|
|
__name__ = 'purchase_suggested.purchase_suggested_report'
|
|
|
|
@classmethod
|
|
def get_supplier_products(cls, supplier):
|
|
""" Return a list of ids supplier products """
|
|
ProductSupplier = Pool().get('purchase.product_supplier')
|
|
domain = [
|
|
('template.active', '=', True),
|
|
('template.purchasable', '=', True),
|
|
]
|
|
if supplier:
|
|
domain.append(
|
|
('party', '=', supplier)
|
|
)
|
|
products_supplier = ProductSupplier.search(domain)
|
|
suppliers_dict = {}
|
|
supplier_list = []
|
|
for ps in products_supplier:
|
|
products_ids = [p.id for p in ps.template.products]
|
|
try:
|
|
suppliers_dict[ps.party.id]['products'].extend(products_ids)
|
|
except:
|
|
suppliers_dict[ps.party.id] = {
|
|
'name': ps.party.name,
|
|
'products': products_ids
|
|
}
|
|
supplier_list = [{key: value} for key, value in suppliers_dict.items()]
|
|
return supplier_list
|
|
|
|
@classmethod
|
|
def get_last_purchase(cls, product_id):
|
|
PurchaseLine = Pool().get('purchase.line')
|
|
lines = PurchaseLine.search([
|
|
('product', '=', product_id),
|
|
('purchase.state', 'in', ('processing', 'done')),
|
|
], order=[('create_date', 'DESC')], limit=1)
|
|
if lines:
|
|
return (lines[0].purchase.purchase_date,
|
|
lines[0].quantity, lines[0].unit_price)
|
|
else:
|
|
return None, None, 0
|
|
|
|
@classmethod
|
|
def get_pending_purchase(cls, product_id):
|
|
StockMove = Pool().get('stock.move')
|
|
qty_records = StockMove.search_read([
|
|
('product', '=', product_id),
|
|
('state', '=', 'draft'),
|
|
('from_location.type', 'in', ['supplier']),
|
|
], fields_names=['quantity'])
|
|
return sum([l['quantity'] for l in qty_records])
|
|
|
|
@classmethod
|
|
def get_product_stock(cls, product_id, locations):
|
|
pool = Pool()
|
|
Product = pool.get('product.product')
|
|
stock_context = {
|
|
'stock_date_end': date.today(),
|
|
'locations': locations
|
|
}
|
|
with Transaction().set_context(stock_context):
|
|
product, = Product.search([('id', '=', product_id)])
|
|
return product.quantity
|
|
|
|
@classmethod
|
|
def get_last_out_moves(cls, product_id, location_id, end_date, days):
|
|
StockMove = Pool().get('stock.move')
|
|
start_date = end_date - timedelta(days=days)
|
|
dom_stock = [
|
|
('product', '=', product_id),
|
|
('effective_date', '>=', start_date),
|
|
('effective_date', '<=', end_date),
|
|
('state', '=', 'done'),
|
|
('to_location.type', 'in', ['customer', 'production']),
|
|
]
|
|
if location_id:
|
|
dom_stock.append(('from_location', '=', location_id))
|
|
else:
|
|
dom_stock.append(('from_location.type', '=', 'storage'))
|
|
moves = StockMove.search(dom_stock)
|
|
return sum([l.quantity for l in moves])
|
|
|
|
@classmethod
|
|
def check_parameters(cls, supplier, categories):
|
|
if not supplier and not categories:
|
|
raise UserError('You must set a supplier or category.')
|
|
|
|
@classmethod
|
|
def get_context(cls, records, header, data):
|
|
report_context = super().get_context(records, header, data)
|
|
pool = Pool()
|
|
Company = pool.get('company.company')
|
|
company = Company(data['company'])
|
|
StockMove = pool.get('stock.move')
|
|
Location = pool.get('stock.location')
|
|
today = date.today()
|
|
start_date = today - timedelta(data['historical_time'])
|
|
cls.check_parameters(data['supplier'], data['categories'])
|
|
suppliers_list = cls.get_supplier_products(data['supplier'])
|
|
historical_time = data['historical_time']
|
|
time_stock = data['time_stock']
|
|
dom_stock = [
|
|
('effective_date', '>=', start_date),
|
|
('effective_date', '<=', today),
|
|
('state', '=', 'done'),
|
|
('to_location.type', 'in', ['customer', 'production']),
|
|
]
|
|
records = []
|
|
location_id = None
|
|
if data['categories']:
|
|
dom_stock.append(('product.categories', 'in', data['categories']))
|
|
if data['location']:
|
|
location_id = data['location']
|
|
location_ids = [location_id]
|
|
dom_stock.append(('from_location', '=', location_id))
|
|
else:
|
|
dom_stock.append(('from_location.type', '=', 'storage'))
|
|
locations = Location.search([
|
|
('type', '=', 'warehouse')
|
|
])
|
|
location_ids = [l.storage_location.id for l in locations]
|
|
for sl in suppliers_list:
|
|
for supplier in sl.values():
|
|
dom_stock.append(('product', 'in', supplier['products']))
|
|
supplier['products'] = []
|
|
print(dom_stock, supplier['name'])
|
|
moves = StockMove.search(dom_stock, order=[('effective_date', 'ASC')])
|
|
target_products = {}
|
|
supplier_cost = []
|
|
supplier_daily_sale = []
|
|
historical_products = {}
|
|
for move in moves:
|
|
if move.product not in target_products.keys():
|
|
# setattr(move.product, 'historical_time', move.effective_date)
|
|
target_products[move.product] = [move.quantity]
|
|
historical_products[move.product.id] = move.effective_date
|
|
else:
|
|
target_products[move.product].append(move.quantity)
|
|
# print(target_products)
|
|
for product, quantities in target_products.items():
|
|
""" stock necesario para n dias - existencia"""
|
|
current_stock = cls.get_product_stock(product.id, location_ids)
|
|
""" search the real historical time, not the default (90) """
|
|
real_historical_time = historical_time
|
|
pdt_historical_time = historical_products[product.id]
|
|
if pdt_historical_time > start_date:
|
|
real_historical_time = historical_time - (pdt_historical_time - start_date).days
|
|
|
|
if real_historical_time == 0:
|
|
suggest_qty = 0
|
|
daily_rotation = 0
|
|
stock_duration = 0
|
|
else:
|
|
suggest_qty = int(time_stock * sum(quantities) / real_historical_time - current_stock)
|
|
daily_rotation = Decimal(sum(quantities) / real_historical_time)
|
|
stock_duration = int(Decimal(current_stock) / daily_rotation)
|
|
# suggest_qty = int(time_stock * sum(quantities) / real_historical_time - current_stock)
|
|
sales_30 = cls.get_last_out_moves(product.id, data['location'], today, 30)
|
|
sales_60 = cls.get_last_out_moves(product.id, data['location'], today-timedelta(30), 30)
|
|
last_purchase_date, last_quantity, last_cost_price = cls.get_last_purchase(product.id)
|
|
if last_cost_price > 0:
|
|
last_cost_price += (product.extra_tax or 0)
|
|
else: #assigns cost price when there aren't purchases (production)
|
|
last_cost_price = product.cost_price + (product.extra_tax or 0)
|
|
list_price = Decimal(product.list_price + (product.extra_tax or 0))
|
|
cost_price = int(Decimal(current_stock) * last_cost_price)
|
|
daily_sale = int(daily_rotation * list_price)
|
|
supplier_cost.append(cost_price)
|
|
supplier_daily_sale.append(daily_sale)
|
|
supplier['products'].append({
|
|
'product_id': product.id,
|
|
'code': product.code,
|
|
'name': product.template.name,
|
|
'current_stock': current_stock,
|
|
'uom': product.template.default_uom.name,
|
|
'stock_duration': stock_duration,
|
|
'pending_purchase': cls.get_pending_purchase(product.id),
|
|
'30': sales_30,
|
|
'60': sales_60,
|
|
'daily_rotation': daily_rotation,
|
|
'daily_sale': daily_sale,
|
|
'suggest_qty': suggest_qty,
|
|
'last_purchase': last_purchase_date,
|
|
'last_quantity': last_quantity,
|
|
'last_cost_price': last_cost_price,
|
|
'amount_stock': cost_price
|
|
})
|
|
supplier['total_cost_price'] = sum(supplier_cost)
|
|
supplier['total_daily_sale'] = sum(supplier_daily_sale)
|
|
dom_stock.pop()
|
|
records.append(supplier) if supplier['products'] else None
|
|
report_context['records'] = records
|
|
report_context['today'] = date.today()
|
|
report_context['company'] = company.party.name
|
|
report_context['nit'] = company.party.id_number
|
|
report_context['location'] = data['location_name']
|
|
report_context['time_stock'] = time_stock
|
|
report_context['historical_time'] = historical_time
|
|
return report_context
|
|
|
|
|
|
class PurchaseRequisitionReport(Report):
|
|
__name__ = 'purchase.requisition.report'
|