Files
tradon/modules/stock_supply/purchase_request.py
2026-03-14 09:42:12 +00:00

366 lines
14 KiB
Python

# This file is part of Tryton. The COPYRIGHT file at the top level of
# this repository contains the full copyright notices and license terms.
import datetime
import operator
from collections import defaultdict
from trytond.model import ModelSQL, ValueMixin, fields
from trytond.pool import Pool, PoolMeta
from trytond.pyson import TimeDelta
from trytond.tools import grouped_slice
from trytond.transaction import Transaction
supply_period = fields.TimeDelta(
"Supply Period",
domain=['OR',
('supply_period', '=', None),
('supply_period', '>=', TimeDelta()),
])
class PurchaseConfiguration(metaclass=PoolMeta):
__name__ = 'purchase.configuration'
supply_period = fields.MultiValue(supply_period)
class PurchaseConfigurationSupplyPeriod(ModelSQL, ValueMixin):
__name__ = 'purchase.configuration.supply_period'
supply_period = supply_period
class PurchaseRequest(metaclass=PoolMeta):
__name__ = 'purchase.request'
@classmethod
def _get_origin(cls):
origins = super()._get_origin()
return origins | {'stock.order_point'}
@classmethod
def generate_requests(cls, products=None, warehouses=None):
"""
For each product compute the purchase request that must be
created today to meet product outputs.
If products is specified it will compute the purchase requests
for the selected products.
If warehouses is specified it will compute the purchase request
necessary for the selected warehouses.
"""
pool = Pool()
Product = pool.get('product.product')
Location = pool.get('stock.location')
User = pool.get('res.user')
company = User(Transaction().user).company
if not company:
return
if not warehouses:
# fetch warehouses:
warehouses = Location.search([
('type', '=', 'warehouse'),
])
warehouse_ids = [w.id for w in warehouses]
if products is None:
# fetch goods and assets
# ordered by ids to speedup reduce_ids in products_by_location
products = Product.search([
('type', 'in', ['goods', 'assets']),
('consumable', '=', False),
('purchasable', '=', True),
], order=[('id', 'ASC')])
# aggregate product by minimum supply date
date2products = defaultdict(list)
for product in products:
min_date, max_date = cls.get_supply_dates(
product, company=company.id)
date2products[min_date, max_date].append(product)
# compute requests
new_requests = []
for (min_date, max_date), dates_products in date2products.items():
for sub_products in grouped_slice(dates_products):
sub_products = Product.browse(sub_products)
product2ops = {}
product2ops_other = {}
for product in sub_products:
for order_point in product.order_points:
if (order_point.company != company
or not order_point.warehouse_location):
continue
if order_point.type == 'purchase':
dict_ = product2ops
else:
dict_ = product2ops_other
dict_[
(order_point.warehouse_location.id,
order_point.product.id)
] = order_point
product_ids = [p.id for p in sub_products]
with Transaction().set_context(
forecast=True,
stock_date_end=min_date):
pbl = Product.products_by_location(warehouse_ids,
with_childs=True, grouping_filter=(product_ids,))
for warehouse_id in warehouse_ids:
min_date_qties = defaultdict(int,
((x, pbl.pop((warehouse_id, x), 0))
for x in product_ids))
# Do not compute shortage for product
# with different order point
product_ids = [
p.id for p in sub_products
if (warehouse_id, p.id) not in product2ops_other]
# Search for shortage between min-max
shortages = cls.get_shortage(
warehouse_id, product_ids, min_date, max_date,
min_date_qties=min_date_qties,
order_points=product2ops)
for product in sub_products:
if product.id not in shortages:
continue
shortage_date, product_quantity = shortages[product.id]
if shortage_date is None or product_quantity is None:
continue
order_point = product2ops.get(
(warehouse_id, product.id))
# generate request values
request = cls.compute_request(product,
warehouse_id, shortage_date, product_quantity,
company, order_point)
new_requests.append(request)
# delete purchase requests without a purchase line
products = set(products)
reqs = cls.search([
('state', '=', 'draft'),
('purchase_line', '=', None),
('company', '=', company.id),
('origin', 'like', 'stock.order_point,%'),
])
reqs = [r for r in reqs
if r.product in products and r.warehouse in warehouses]
cls.delete(reqs)
new_requests = cls.compare_requests(new_requests, company)
cls.create_requests(new_requests)
@classmethod
def create_requests(cls, new_requests):
to_save = []
for new_req in new_requests:
if new_req.supply_date == datetime.date.max:
new_req.supply_date = None
if new_req.computed_quantity > 0:
to_save.append(new_req)
cls.save(to_save)
@classmethod
def compare_requests(cls, new_requests, company):
"""
Compare new_requests with already existing request to avoid
to re-create existing requests.
"""
pool = Pool()
Uom = pool.get('product.uom')
Request = pool.get('purchase.request')
requests = Request.search([
('product', '!=', None),
('purchase_line', '!=', None),
('purchase_line.moves', '=', None),
('purchase_line.purchase.state', '!=', 'cancelled'),
('company', '=', company.id),
('origin', 'like', 'stock.order_point,%'),
])
# Fetch data from existing requests
existing_req = {}
for request in requests:
pline = request.purchase_line
# Skip incoherent request
if (request.product != pline.product
or request.warehouse != pline.purchase.warehouse):
continue
# Take smallest amount between request and purchase line
pline_qty = Uom.compute_qty(pline.unit, pline.quantity,
pline.product.default_uom, round=False)
quantity = min(request.computed_quantity, pline_qty)
existing_req.setdefault(
(request.product.id, request.warehouse.id),
[]).append({
'supply_date': (
request.supply_date or datetime.date.max),
'quantity': quantity,
})
for i in existing_req.values():
i.sort(key=lambda r: r['supply_date'])
# Update new requests to take existing requests into account
new_requests.sort(key=operator.attrgetter('supply_date'))
for new_req in new_requests:
for old_req in existing_req.get(
(new_req.product.id, new_req.warehouse.id), []):
if old_req['supply_date'] <= new_req.supply_date:
new_req.computed_quantity = max(0.0,
new_req.computed_quantity - old_req['quantity'])
new_req.quantity = Uom.compute_qty(
new_req.product.default_uom, new_req.computed_quantity,
new_req.unit, round=False)
new_req.quantity = new_req.unit.ceil(new_req.quantity)
old_req['quantity'] = max(0.0,
old_req['quantity'] - new_req.computed_quantity)
else:
break
return new_requests
@classmethod
def get_supply_dates(cls, product, **pattern):
"""
Return the interval of earliest supply dates for a product.
"""
Date = Pool().get('ir.date')
min_date = None
max_date = None
today = Date.today()
for product_supplier in product.product_suppliers_used(**pattern):
supply_date = product_supplier.compute_supply_date(date=today)
if supply_date == datetime.date.max:
continue
next_day = today + product_supplier.get_supply_period()
next_supply_date = product_supplier.compute_supply_date(
date=next_day)
if (not min_date) or supply_date < min_date:
min_date = supply_date
if (not max_date) or next_supply_date > max_date:
max_date = next_supply_date
if not min_date:
min_date = datetime.date.max
max_date = datetime.date.max
return (min_date, max_date)
@classmethod
def compute_request(cls, product, location_id, shortage_date,
product_quantity, company, order_point=None,
supplier_pattern=None):
"""
Return the value of the purchase request which will answer to
the needed quantity at the given date. I.e: the latest
purchase date, the expected supply date and the prefered
supplier.
"""
pool = Pool()
Uom = pool.get('product.uom')
Request = pool.get('purchase.request')
if supplier_pattern is None:
supplier_pattern = {}
else:
supplier_pattern = supplier_pattern.copy()
supplier_pattern['company'] = company.id
supplier, purchase_date = cls.find_best_supplier(product,
shortage_date, **supplier_pattern)
unit = product.purchase_uom or product.default_uom
target_quantity = order_point.target_quantity if order_point else 0.0
computed_quantity = target_quantity - product_quantity
product_quantity = unit.ceil(product_quantity)
quantity = Uom.compute_qty(
product.default_uom, computed_quantity, unit, round=False)
quantity = unit.ceil(quantity)
if order_point:
origin = 'stock.order_point,%s' % order_point.id
else:
origin = 'stock.order_point,-1'
return Request(product=product,
party=supplier and supplier or None,
quantity=quantity,
unit=unit,
computed_quantity=computed_quantity,
computed_unit=product.default_uom,
purchase_date=purchase_date,
supply_date=shortage_date,
stock_level=product_quantity,
company=company,
warehouse=location_id,
origin=origin,
)
@classmethod
def get_shortage(cls, location_id, product_ids, min_date, max_date,
min_date_qties, order_points):
"""
Return for each product the first date between min_date and max_date
where the stock quantity is less than the minimal quantity and the
smallest stock quantity in the interval or None if there is no date
where stock quantity is less than the minimal quantity.
The minimal quantity comes from the order point or is zero.
min_date_qty is the quantities for each products at the min_date.
order_points is a dictionary that links products to order point.
"""
Product = Pool().get('product.product')
res_dates = {}
res_qties = {}
min_quantities = defaultdict(float)
for product_id in product_ids:
order_point = order_points.get((location_id, product_id))
if order_point:
min_quantities[product_id] = order_point.min_quantity
with Transaction().set_context(
forecast=True,
stock_date_start=min_date,
stock_date_end=max_date):
pbl = Product.products_by_location(
[location_id], with_childs=True,
grouping=('date', 'product'),
grouping_filter=(None, product_ids))
pbl_dates = defaultdict(dict)
for key, qty in pbl.items():
date, product_id = key[1:]
pbl_dates[date][product_id] = qty
current_date = min_date
current_qties = min_date_qties.copy()
products_to_check = product_ids.copy()
while (current_date < max_date) or (current_date == min_date):
for product_id in products_to_check:
current_qty = current_qties[product_id]
min_quantity = min_quantities[product_id]
res_qty = res_qties.get(product_id)
res_date = res_dates.get(product_id)
if min_quantity is not None and current_qty < min_quantity:
if not res_date:
res_dates[product_id] = current_date
if (not res_qty) or (current_qty < res_qty):
res_qties[product_id] = current_qty
if current_date == datetime.date.max:
break
current_date += datetime.timedelta(1)
pbl = pbl_dates[current_date]
products_to_check.clear()
for product_id, qty in pbl.items():
current_qties[product_id] += qty
products_to_check.append(product_id)
return {x: (res_dates.get(x), res_qties.get(x)) for x in product_ids}