trytonpsk-purchase_suggested/purchase.py

597 lines
24 KiB
Python

# This file is part of purchase_editable_line module for Tryton.
# The COPYRIGHT file at the top level of this repository contains
# the full copyright notices and license terms.
from decimal import Decimal
from datetime import date, timedelta
from shutil import move
from trytond.model import ModelView, fields
from trytond.wizard import (Wizard, StateView, StateTransition,
Button, StateAction, StateReport)
from trytond.pool import Pool
from trytond.transaction import Transaction
from trytond.report import Report
from trytond.exceptions import UserError
from trytond.i18n import gettext
from .exceptions import SupplierMovesWarning
class CreatePurchaseSuggestedStart(ModelView):
'Create Purchase Suggested Start'
__name__ = 'purchase_suggested.create_order.start'
supplier = fields.Many2One('party.party', 'Supplier', required=True)
time_stock = fields.Integer('Time Stock', required=True, help='In days')
location = fields.Many2One('stock.location', 'Location', domain=[
('type', '=', 'warehouse')
])
historical_time = fields.Integer('Historical Time', required=True,
help='In days')
@staticmethod
def default_historical_time():
return 90
@staticmethod
def default_time_stock():
return 30
class CreatePurchaseSuggested(Wizard):
'Create Purchase Suggested'
__name__ = 'purchase_suggested.create_order'
start = StateView('purchase_suggested.create_order.start',
'purchase_suggested.create_purchase_suggested_view_form', [
Button('Cancel', 'end', 'tryton-cancel'),
Button('Ok', 'accept', 'tryton-ok'),
])
accept = StateTransition()
def get_pending_purchase(self, product_id):
StockMove = Pool().get('stock.move')
qty_records = StockMove.search_read([
('product', '=', product_id),
('state', '=', 'draft'),
('from_location.type', 'in', ['supplier']),
], fields_names=['quantity'])
return sum([l['quantity'] for l in qty_records])
def get_product_stock(self, product_id, locations):
pool = Pool()
Product = pool.get('product.product')
stock_context = {
'stock_date_end': date.today(),
'locations': locations
}
with Transaction().set_context(stock_context):
product, = Product.search([('id', '=', product_id)])
return product.quantity
def transition_accept(self):
pool = Pool()
Purchase = pool.get('purchase.purchase')
StockMove = pool.get('stock.move')
Location = pool.get('stock.location')
Party = pool.get('party.party')
Currency = pool.get('currency.currency')
today = date.today()
start_date = today - timedelta(self.start.historical_time)
products_ids = self.get_supplier_products(self.start.supplier)
historical_time = self.start.historical_time
time_stock = self.start.time_stock
dom_stock = [
('effective_date', '>=', start_date),
('effective_date', '<=', today),
('state', '=', 'done'),
('product', 'in', products_ids),
('to_location.type', 'in', ['customer', 'production']),
]
location_id = None
if self.start.location:
location_id = self.start.location.storage_location.id
location_ids = [location_id]
dom_stock.append(('from_location', '=', location_id))
else:
dom_stock.append(('from_location.type', '=', 'storage'))
locations = Location.search([
('type', '=', 'warehouse')
])
location_ids = [l.storage_location.id for l in locations]
moves = StockMove.search(dom_stock, order=[('effective_date', 'ASC')])
to_create = {
'party': self.start.supplier.id,
'invoice_address': self.start.supplier.address_get(type='invoice'),
'purchase_date': today,
'state': 'draft',
'warehouse': self.start.location.id if self.start.location else None
}
purchase, = Purchase.create([to_create])
target_products = {}
historical_products = {}
for move in moves:
if move.product not in target_products.keys():
# setattr(move.product, 'historical_time', move.effective_date)
target_products[move.product] = [move.quantity]
historical_products[move.product.id] = move.effective_date
else:
target_products[move.product].append(move.quantity)
currency_id = Transaction().context.get('currency')
for product, quantities in target_products.items():
""" Stock necesario para n dias - existencia"""
current_stock = self.get_product_stock(product.id, location_ids)
pdt_historical_time = historical_products[product.id]
real_historical_time = historical_time
if pdt_historical_time > start_date:
real_historical_time = historical_time - (pdt_historical_time - start_date).days
# suggest_qty = int(time_stock * sum(quantities) / real_historical_time
# - current_stock - self.get_pending_purchase(product.id))
if real_historical_time == 0:
suggest_qty = 0
else:
suggest_qty = int(time_stock * sum(quantities) / real_historical_time - current_stock)
if suggest_qty <= 0:
continue
order = {
'product': product,
'quantity': suggest_qty,
'party': Party(self.start.supplier.id),
'uom': product.purchase_uom,
'currency': Currency(currency_id),
}
line = self.compute_purchase_line(order, purchase)
line.purchase = purchase
line.save()
return 'end'
def get_supplier_products(self, supplier):
""" Return a list of ids supplier products """
ProductSupplier = Pool().get('purchase.product_supplier')
products_supplier = ProductSupplier.search([
('party', '=', supplier.id),
])
products_ids = []
for ps in products_supplier:
products_ids.extend([p.id for p in ps.template.products])
return products_ids
@classmethod
def compute_purchase_line(cls, request, purchase):
pool = Pool()
Product = pool.get('product.product')
Line = pool.get('purchase.line')
product = request['product']
line = Line(
product=product,
unit=request['uom'],
quantity=request['quantity'],
description=product.name,
)
# XXX purchase with several lines of the same product
with Transaction().set_context(uom=request['uom'].id,
supplier=request['party'].id,
currency=request['currency'].id):
product_price = Product.get_purchase_price(
[product], request['quantity'])[product.id]
product_price = product_price.quantize(
Decimal(1) / 10 ** Line.unit_price.digits[1])
if product_price is None:
raise UserError(gettext('purchase_suggested.msg_late_supplier_moves', s=product.rec_name))
line.unit_price = product_price
taxes = []
for tax in product.supplier_taxes_used:
if request['party'] and request['party'].supplier_tax_rule:
pattern = cls._get_tax_rule_pattern(request)
tax_ids = request['party'].supplier_tax_rule.apply(tax, pattern)
if tax_ids:
taxes.extend(tax_ids)
continue
taxes.append(tax.id)
line.taxes = taxes
return line
@staticmethod
def _get_tax_rule_pattern(request):
'''
Get tax rule pattern
'''
return {}
class CreatePurchaseRequestBySupplierStart(ModelView):
'Create Purchase Request By Supplier'
__name__ = 'purchase_suggested.create_purchase_request_by_supplier.start'
party = fields.Many2One('party.party', 'Party',
required=True)
class CreatePurchaseRequestBySupplier(Wizard):
'Create Purchase Requests By Supplier'
__name__ = 'purchase_suggested.create_purchase_request_by_supplier'
start = StateView('purchase_suggested.create_purchase_request_by_supplier.start',
'purchase_suggested.purchase_request_create_start_view_form', [
Button('Cancel', 'end', 'tryton-cancel'),
Button('Create', 'create_', 'tryton-ok', default=True),
])
create_ = StateAction('purchase_request.act_purchase_request_form')
@classmethod
def __setup__(cls):
super(CreatePurchaseRequestBySupplier, cls).__setup__()
@property
def _requests_parameters(self):
return {}
def do_create_(self, action):
pool = Pool()
PurchaseRequest = pool.get('purchase.request')
Move = pool.get('stock.move')
ProductSupplier = pool.get('purchase.product_supplier')
Date = pool.get('ir.date')
today = Date.today()
products_supplier = ProductSupplier.search([
('party', '=', self.start.party.id),
])
products = [product for ps in products_supplier for product in ps.template.products]
moves = Move.search([
('from_location.type', '=', 'supplier'),
('to_location.type', '=', 'storage'),
('product', 'in', products),
('state', '=', 'draft'),
('planned_date', '<', today),
], order=[])
if moves:
raise SupplierMovesWarning(gettext('purchase_suggested.late_supplier_moves', module=self.__name__, date=today))
PurchaseRequest.generate_requests(products=products)
return action, {}
def transition_create_(self):
return 'end'
class CreateRequestBySupplierStart(ModelView):
'Create Request By Supplier Start'
__name__ = 'purchase_suggested.create_request_by_supplier.start'
supplier = fields.Many2One('party.party', 'Supplier', required=True)
warehouse_location = fields.Many2One('stock.location',
'Warehouse Location', select=True, domain=[('type', '=', 'warehouse')],
required=True)
class CreateRequestBySupplier(Wizard):
'Create Request By Supplier'
__name__ = 'purchase_suggested.create_request_by_supplier'
start = StateView('purchase_suggested.create_request_by_supplier.start',
'purchase_suggested.create_request_by_supplier_start_view_form', [
Button('Cancel', 'end', 'tryton-cancel'),
Button('Create', 'create_', 'tryton-ok', default=True),
])
create_ = StateTransition()
def transition_create_(self):
pool = Pool()
ProductSupplier = pool.get('purchase.product_supplier')
PurchaseRequest = pool.get('purchase.request')
Product = pool.get('product.product')
OrderPoint = pool.get('stock.order_point')
User = pool.get('res.user')
company = User(Transaction().user).company
targets = ProductSupplier.search([
('party', '=', self.start.supplier.id)
])
request_to_create = []
products_ids = [p.id for t in targets if t.template for p in t.template.products]
order_points = OrderPoint.search([
('warehouse_location', '=', self.start.warehouse_location.id),
('company', '=', company.id),
('product', 'in', products_ids),
('type', '=', 'purchase'),
])
products2op = {op.product.id: (op.min_quantity , op.target_quantity) for op in order_points}
today = date.today()
stock_context = {
'stock_date_end': today,
'locations': [self.start.warehouse_location.id],
}
with Transaction().set_context(stock_context):
products = Product.search([
('id', 'in', products2op.keys())
])
products_dict = {p.id: p.quantity for p in products}
for op in order_points:
pto_min, pto_max = products2op[op.product.id]
product_qty = products_dict[op.product.id]
if product_qty > pto_min:
continue
req_quantity = pto_max - product_qty
request_to_create.append({
'product': op.product,
'description': op.product.template.name,
'party': self.start.supplier.id,
'quantity': req_quantity,
'uom': op.product.template.default_uom,
'warehouse': self.start.warehouse_location.id,
'company': company.id,
'state': 'draft',
'origin': str(op),
})
PurchaseRequest.create(request_to_create)
return 'end'
class PurchaseSuggestedReportStart(ModelView):
'Purchase Suggested Report Start'
__name__ = 'purchase_suggested.purchase_suggested_report.start'
company = fields.Many2One('company.company', 'Company', required=True)
time_stock = fields.Integer('Time Stock', required=True, help='In days')
supplier = fields.Many2One('party.party', 'Supplier')
location = fields.Many2One('stock.location', 'Location', domain=[
('type', '=', 'warehouse')
])
historical_time = fields.Integer('Historical Time', required=True,
help='In days')
categories = fields.Many2Many(
'product.category', None, None, 'Categories')
@staticmethod
def default_company():
return Transaction().context.get('company')
@staticmethod
def default_historical_time():
return 90
@staticmethod
def default_time_stock():
return 30
class PrintPurchaseSuggestedReport(Wizard):
'Purchase Suggested Report Wizard'
__name__ = 'purchase_suggested.print_purchase_suggested_report'
start = StateView('purchase_suggested.purchase_suggested_report.start',
'purchase_suggested.print_purchase_suggested_report_start_view_form', [
Button('Cancel', 'end', 'tryton-cancel'),
Button('Print', 'print_', 'tryton-print', default=True),
])
print_ = StateReport('purchase_suggested.purchase_suggested_report')
def do_print_(self, action):
supplier = None
location_id = None
location_name = None
if self.start.location:
location_id = self.start.location.storage_location.id
location_name = self.start.location.storage_location.name
if self.start.supplier:
supplier = self.start.supplier.id
data = {
'company': self.start.company.id,
'supplier': supplier,
'location': location_id,
'location_name': location_name,
'time_stock': self.start.time_stock,
'historical_time': self.start.historical_time,
'categories': [category.id for category in self.start.categories]
}
return action, data
def transition_print_(self):
return 'end'
class PurchaseSuggestedReport(Report):
'Purchase Suggested Report'
__name__ = 'purchase_suggested.purchase_suggested_report'
@classmethod
def get_supplier_products(cls, supplier):
""" Return a list of ids supplier products """
ProductSupplier = Pool().get('purchase.product_supplier')
domain = [
('template.active', '=', True),
('template.purchasable', '=', True),
]
if supplier:
domain.append(
('party', '=', supplier)
)
products_supplier = ProductSupplier.search(domain)
suppliers_dict = {}
supplier_list = []
for ps in products_supplier:
products_ids = [p.id for p in ps.template.products]
try:
suppliers_dict[ps.party.id]['products'].extend(products_ids)
except:
suppliers_dict[ps.party.id] = {
'name': ps.party.name,
'products': products_ids
}
supplier_list = [{key: value} for key, value in suppliers_dict.items()]
return supplier_list
@classmethod
def get_last_purchase(cls, product_id):
PurchaseLine = Pool().get('purchase.line')
lines = PurchaseLine.search([
('product', '=', product_id),
('purchase.state', 'in', ('processing', 'done')),
], order=[('create_date', 'DESC')], limit=1)
if lines:
return (lines[0].purchase.purchase_date,
lines[0].quantity, lines[0].unit_price)
else:
return None, None, 0
@classmethod
def get_pending_purchase(cls, product_id):
StockMove = Pool().get('stock.move')
qty_records = StockMove.search_read([
('product', '=', product_id),
('state', '=', 'draft'),
('from_location.type', 'in', ['supplier']),
], fields_names=['quantity'])
return sum([l['quantity'] for l in qty_records])
@classmethod
def get_product_stock(cls, product_id, locations):
pool = Pool()
Product = pool.get('product.product')
stock_context = {
'stock_date_end': date.today(),
'locations': locations
}
with Transaction().set_context(stock_context):
product, = Product.search([('id', '=', product_id)])
return product.quantity
@classmethod
def get_last_out_moves(cls, product_id, location_id, end_date, days):
StockMove = Pool().get('stock.move')
start_date = end_date - timedelta(days=days)
dom_stock = [
('product', '=', product_id),
('effective_date', '>=', start_date),
('effective_date', '<=', end_date),
('state', '=', 'done'),
('to_location.type', 'in', ['customer', 'production']),
]
if location_id:
dom_stock.append(('from_location', '=', location_id))
else:
dom_stock.append(('from_location.type', '=', 'storage'))
moves = StockMove.search(dom_stock)
return sum([l.quantity for l in moves])
@classmethod
def check_parameters(cls, supplier, categories):
if not supplier and not categories:
raise UserError('You must set a supplier or category.')
@classmethod
def get_context(cls, records, header, data):
report_context = super().get_context(records, header, data)
pool = Pool()
Company = pool.get('company.company')
company = Company(data['company'])
StockMove = pool.get('stock.move')
Location = pool.get('stock.location')
today = date.today()
start_date = today - timedelta(data['historical_time'])
cls.check_parameters(data['supplier'], data['categories'])
suppliers_list = cls.get_supplier_products(data['supplier'])
historical_time = data['historical_time']
time_stock = data['time_stock']
dom_stock = [
('effective_date', '>=', start_date),
('effective_date', '<=', today),
('state', '=', 'done'),
('to_location.type', 'in', ['customer', 'production']),
]
records = []
location_id = None
if data['categories']:
dom_stock.append(('product.categories', 'in', data['categories']))
if data['location']:
location_id = data['location']
location_ids = [location_id]
dom_stock.append(('from_location', '=', location_id))
else:
dom_stock.append(('from_location.type', '=', 'storage'))
locations = Location.search([
('type', '=', 'warehouse')
])
location_ids = [l.storage_location.id for l in locations]
for sl in suppliers_list:
for supplier in sl.values():
dom_stock.append(('product', 'in', supplier['products']))
supplier['products'] = []
print(dom_stock, supplier['name'])
moves = StockMove.search(dom_stock, order=[('effective_date', 'ASC')])
target_products = {}
supplier_cost = []
supplier_daily_sale = []
historical_products = {}
for move in moves:
if move.product not in target_products.keys():
# setattr(move.product, 'historical_time', move.effective_date)
target_products[move.product] = [move.quantity]
historical_products[move.product.id] = move.effective_date
else:
target_products[move.product].append(move.quantity)
# print(target_products)
for product, quantities in target_products.items():
""" stock necesario para n dias - existencia"""
current_stock = cls.get_product_stock(product.id, location_ids)
""" search the real historical time, not the default (90) """
real_historical_time = historical_time
pdt_historical_time = historical_products[product.id]
if pdt_historical_time > start_date:
real_historical_time = historical_time - (pdt_historical_time - start_date).days
if real_historical_time == 0:
suggest_qty = 0
daily_rotation = 0
stock_duration = 0
else:
suggest_qty = int(time_stock * sum(quantities) / real_historical_time - current_stock)
daily_rotation = Decimal(sum(quantities) / real_historical_time)
stock_duration = int(Decimal(current_stock) / daily_rotation)
# suggest_qty = int(time_stock * sum(quantities) / real_historical_time - current_stock)
sales_30 = cls.get_last_out_moves(product.id, data['location'], today, 30)
sales_60 = cls.get_last_out_moves(product.id, data['location'], today-timedelta(30), 30)
last_purchase_date, last_quantity, last_cost_price = cls.get_last_purchase(product.id)
if last_cost_price > 0:
last_cost_price += (product.extra_tax or 0)
else: #assigns cost price when there aren't purchases (production)
last_cost_price = product.cost_price + (product.extra_tax or 0)
list_price = Decimal(product.list_price + (product.extra_tax or 0))
cost_price = int(Decimal(current_stock) * last_cost_price)
daily_sale = int(daily_rotation * list_price)
supplier_cost.append(cost_price)
supplier_daily_sale.append(daily_sale)
supplier['products'].append({
'product_id': product.id,
'code': product.code,
'name': product.template.name,
'current_stock': current_stock,
'uom': product.template.default_uom.name,
'stock_duration': stock_duration,
'pending_purchase': cls.get_pending_purchase(product.id),
'30': sales_30,
'60': sales_60,
'daily_rotation': daily_rotation,
'daily_sale': daily_sale,
'suggest_qty': suggest_qty,
'last_purchase': last_purchase_date,
'last_quantity': last_quantity,
'last_cost_price': last_cost_price,
'amount_stock': cost_price
})
supplier['total_cost_price'] = sum(supplier_cost)
supplier['total_daily_sale'] = sum(supplier_daily_sale)
dom_stock.pop()
records.append(supplier) if supplier['products'] else None
report_context['records'] = records
report_context['today'] = date.today()
report_context['company'] = company.party.name
report_context['nit'] = company.party.id_number
report_context['location'] = data['location_name']
report_context['time_stock'] = time_stock
report_context['historical_time'] = historical_time
return report_context
class PurchaseRequisitionReport(Report):
__name__ = 'purchase.requisition.report'