Files
tradon/modules/purchase/product.py
2026-03-14 09:42:12 +00:00

555 lines
20 KiB
Python

# This file is part of Tryton. The COPYRIGHT file at the top level of
# this repository contains the full copyright notices and license terms.
import datetime
from collections import defaultdict
from sql import Literal
from sql.aggregate import Count, Max
from trytond.i18n import gettext
from trytond.model import (
Index, MatchMixin, ModelSQL, ModelView, fields, sequence_ordered)
from trytond.modules.currency.fields import Monetary
from trytond.modules.product import (
ProductDeactivatableMixin, price_digits, round_price)
from trytond.pool import Pool, PoolMeta
from trytond.pyson import Bool, Eval, If, TimeDelta
from trytond.tools import (
grouped_slice, is_full_text, lstrip_wildcard, reduce_ids)
from trytond.transaction import Transaction
from .exceptions import PurchaseUOMWarning
class Template(metaclass=PoolMeta):
__name__ = "product.template"
purchasable = fields.Boolean("Purchasable")
product_suppliers = fields.One2Many(
'purchase.product_supplier', 'template', "Suppliers",
states={
'invisible': (~Eval('purchasable', False)
| ~Eval('context', {}).get('company')),
})
purchase_uom = fields.Many2One(
'product.uom', "Purchase UoM",
states={
'invisible': ~Eval('purchasable'),
'required': Eval('purchasable', False),
},
domain=[('category', '=', Eval('default_uom_category', -1))],
help="The default Unit of Measure for purchases.")
@fields.depends('default_uom', 'purchase_uom', 'purchasable')
def on_change_default_uom(self):
try:
super().on_change_default_uom()
except AttributeError:
pass
if self.default_uom:
if self.purchase_uom:
if self.default_uom.category != self.purchase_uom.category:
self.purchase_uom = self.default_uom
else:
self.purchase_uom = self.default_uom
@classmethod
def view_attributes(cls):
return super().view_attributes() + [
('//page[@id="suppliers"]', 'states', {
'invisible': ~Eval('purchasable'),
})]
def product_suppliers_used(self, **pattern):
for product_supplier in self.product_suppliers:
if product_supplier.match(pattern):
yield product_supplier
@classmethod
def check_modification(cls, mode, templates, values=None, external=False):
pool = Pool()
Warning = pool.get('res.user.warning')
super().check_modification(
mode, templates, values=values, external=external)
if mode == 'write' and values.get("purchase_uom"):
for template in templates:
if not template.purchase_uom:
continue
if template.purchase_uom.id == values["purchase_uom"]:
continue
for product in template.products:
if not product.product_suppliers:
continue
name = '%s@product_template' % template.id
if Warning.check(name):
raise PurchaseUOMWarning(
name, gettext('purchase.msg_change_purchase_uom'))
@classmethod
def copy(cls, templates, default=None):
pool = Pool()
ProductSupplier = pool.get('purchase.product_supplier')
if default is None:
default = {}
else:
default = default.copy()
copy_suppliers = 'product_suppliers' not in default
default.setdefault('product_suppliers', None)
new_templates = super().copy(templates, default)
if copy_suppliers:
old2new = {}
to_copy = []
for template, new_template in zip(templates, new_templates):
to_copy.extend(
ps for ps in template.product_suppliers if not ps.product)
old2new[template.id] = new_template.id
if to_copy:
ProductSupplier.copy(to_copy, {
'template': lambda d: old2new[d['template']],
})
return new_templates
class Product(metaclass=PoolMeta):
__name__ = 'product.product'
product_suppliers = fields.One2Many(
'purchase.product_supplier', 'product', "Suppliers",
domain=[
('template', '=', Eval('template', -1)),
],
states={
'invisible': (~Eval('purchasable', False)
| ~Eval('context', {}).get('company')),
})
purchase_price_uom = fields.Function(fields.Numeric(
"Purchase Price", digits=price_digits), 'get_purchase_price_uom')
last_purchase_price_uom = fields.Function(fields.Numeric(
"Last Purchase Price", digits=price_digits),
'get_last_purchase_price_uom')
@classmethod
def get_purchase_price_uom(cls, products, name):
quantity = Transaction().context.get('quantity') or 0
return cls.get_purchase_price(products, quantity=quantity)
@classmethod
def get_last_purchase_price_uom(cls, products, name=None):
pool = Pool()
Company = pool.get('company.company')
Currency = pool.get('currency.currency')
Date = pool.get('ir.date')
Line = pool.get('purchase.line')
Purchase = pool.get('purchase.purchase')
UoM = pool.get('product.uom')
transaction = Transaction()
context = transaction.context
cursor = transaction.connection.cursor()
purchase = Purchase.__table__()
line = Line.__table__()
line_ids = []
where = purchase.state.in_(['confirmed', 'processing', 'done'])
supplier = context.get('supplier')
if supplier:
where &= purchase.party == supplier
for sub_products in grouped_slice(products):
query = (line
.join(purchase, condition=line.purchase == purchase.id)
.select(
Max(line.id),
where=where & reduce_ids(
line.product, map(int, sub_products)),
group_by=[line.product]))
cursor.execute(*query)
line_ids.extend(i for i, in cursor)
lines = Line.browse(line_ids)
uom = None
if context.get('uom'):
uom = UoM(context['uom'])
currency = None
if context.get('currency'):
currency = Currency(context['currency'])
elif context.get('company'):
currency = Company(context['company']).currency
date = context.get('purchase_date') or Date.today()
prices = defaultdict(lambda: None)
for line in lines:
default_uom = line.product.default_uom
if not uom or default_uom.category != uom.category:
product_uom = default_uom
else:
product_uom = uom
unit_price = UoM.compute_price(
line.unit, line.unit_price, product_uom)
if currency and line.purchase.currency != currency:
with Transaction().set_context(date=date):
unit_price = Currency.compute(
line.purchase.currency, unit_price, currency,
round=False)
prices[line.product.id] = round_price(unit_price)
return prices
def product_suppliers_used(self, **pattern):
for product_supplier in self.product_suppliers:
if product_supplier.match(pattern):
yield product_supplier
pattern['product'] = None
yield from self.template.product_suppliers_used(**pattern)
def _get_purchase_unit_price(self, quantity=0):
return
@classmethod
def get_purchase_price(cls, products, quantity=0):
'''
Return purchase price for product ids.
The context that can have as keys:
uom: the unit of measure
supplier: the supplier party id
currency: the currency id for the returned price
'''
pool = Pool()
Uom = pool.get('product.uom')
Company = pool.get('company.company')
Currency = pool.get('currency.currency')
Date = pool.get('ir.date')
ProductSupplier = pool.get('purchase.product_supplier')
ProductSupplierPrice = pool.get('purchase.product_supplier.price')
context = Transaction().context
prices = {}
assert len(products) == len(set(products)), "Duplicate products"
uom = None
if context.get('uom'):
uom = Uom(context['uom'])
currency = None
if context.get('currency'):
currency = Currency(context['currency'])
elif context.get('company'):
currency = Company(context['company']).currency
date = context.get('purchase_date') or Date.today()
last_purchase_prices = cls.get_last_purchase_price_uom(products)
for product in products:
unit_price = product._get_purchase_unit_price(quantity=quantity)
default_uom = product.default_uom
default_currency = currency
if not uom or default_uom.category != uom.category:
product_uom = default_uom
else:
product_uom = uom
pattern = ProductSupplier.get_pattern()
product_suppliers = product.product_suppliers_used(**pattern)
try:
product_supplier = next(product_suppliers)
except StopIteration:
pass
else:
pattern = ProductSupplierPrice.get_pattern()
for price in product_supplier.prices:
if price.match(quantity, product_uom, pattern):
unit_price = price.unit_price
default_uom = product_supplier.unit
default_currency = product_supplier.currency
if unit_price is not None:
unit_price = Uom.compute_price(
default_uom, unit_price, product_uom)
if currency and default_currency:
with Transaction().set_context(date=date):
unit_price = Currency.compute(
default_currency, unit_price, currency,
round=False)
if unit_price is None:
unit_price = last_purchase_prices[product.id]
else:
unit_price = round_price(unit_price)
prices[product.id] = unit_price
return prices
@classmethod
def copy(cls, products, default=None):
pool = Pool()
ProductSupplier = pool.get('purchase.product_supplier')
if default is None:
default = {}
else:
default = default.copy()
copy_suppliers = 'product_suppliers' not in default
if 'template' in default:
default.setdefault('product_suppliers', None)
new_products = super().copy(products, default)
if 'template' in default and copy_suppliers:
template2new = {}
product2new = {}
to_copy = []
for product, new_product in zip(products, new_products):
if product.product_suppliers:
to_copy.extend(product.product_suppliers)
template2new[product.template.id] = new_product.template.id
product2new[product.id] = new_product.id
if to_copy:
ProductSupplier.copy(to_copy, {
'product': lambda d: product2new[d['product']],
'template': lambda d: template2new[d['template']],
})
return new_products
class ProductSupplier(
sequence_ordered(), ProductDeactivatableMixin, MatchMixin,
ModelSQL, ModelView):
__name__ = 'purchase.product_supplier'
template = fields.Many2One(
'product.template', "Product",
required=True, ondelete='CASCADE',
domain=[
If(Bool(Eval('product')),
('products', '=', Eval('product')),
()),
],
states={
'readonly': Eval('id', -1) >= 0,
},
context={
'company': Eval('company', -1),
},
depends={'company'})
product = fields.Many2One(
'product.product', "Variant",
domain=[
If(Bool(Eval('template')),
('template', '=', Eval('template')),
()),
],
states={
'readonly': Eval('id', -1) >= 0,
},
context={
'company': Eval('company', -1),
},
depends={'company'})
party = fields.Many2One(
'party.party', 'Supplier', required=True, ondelete='CASCADE',
states={
'readonly': Eval('id', -1) >= 0,
},
context={
'company': Eval('company', -1),
},
depends={'company'})
name = fields.Char("Name", translate=True)
code = fields.Char("Code")
prices = fields.One2Many(
'purchase.product_supplier.price', 'product_supplier', "Prices",
help="Add price for different criteria.\n"
"The last matching line is used.")
company = fields.Many2One(
'company.company', "Company",
required=True, ondelete='CASCADE')
lead_time = fields.TimeDelta(
"Lead Time",
domain=['OR',
('lead_time', '=', None),
('lead_time', '>=', TimeDelta()),
],
help="The time from confirming the purchase order to receiving the "
"products.\n"
"If empty the lead time of the supplier is used.")
currency = fields.Many2One('currency.currency', 'Currency', required=True,
ondelete='RESTRICT')
unit = fields.Function(
fields.Many2One('product.uom', "Unit"), 'on_change_with_unit')
@classmethod
def __setup__(cls):
cls.code.search_unaccented = False
super().__setup__()
t = cls.__table__()
cls._sql_indexes.update({
Index(t, (t.code, Index.Similarity())),
})
@staticmethod
def default_company():
return Transaction().context.get('company')
@staticmethod
def default_currency():
Company = Pool().get('company.company')
if Transaction().context.get('company'):
company = Company(Transaction().context['company'])
return company.currency.id
@fields.depends(
'product', '_parent_product.template')
def on_change_product(self):
if self.product:
self.template = self.product.template
@fields.depends('party')
def on_change_party(self):
cursor = Transaction().connection.cursor()
self.currency = self.default_currency()
if self.party:
table = self.__table__()
cursor.execute(*table.select(table.currency,
where=table.party == self.party.id,
group_by=table.currency,
order_by=Count(Literal(1)).desc))
row = cursor.fetchone()
if row:
self.currency, = row
def get_rec_name(self, name):
if not self.name and not self.code:
if self.product:
name = self.product.rec_name
else:
name = self.template.rec_name
else:
if self.name:
name = self.name
elif self.product:
name = self.product.name
else:
name = self.template.name
if self.code:
name = '[' + self.code + ']' + name
return name
@classmethod
def search_rec_name(cls, name, clause):
_, operator, operand, *extra = clause
if operator.startswith('!') or operator.startswith('not '):
bool_op = 'AND'
else:
bool_op = 'OR'
code_value = operand
if operator.endswith('like') and is_full_text(operand):
code_value = lstrip_wildcard(operand)
domain = [bool_op,
('template.rec_name', operator, operand, *extra),
('product.rec_name', operator, operand, *extra),
('party.rec_name', operator, operand, *extra),
('code', operator, code_value, *extra),
('name', operator, operand, *extra),
]
return domain
@fields.depends(
'product', '_parent_product.purchase_uom',
'template', '_parent_template.purchase_uom')
def on_change_with_unit(self, name=None):
if self.product:
return self.product.purchase_uom
elif self.template:
return self.template.purchase_uom
@property
def lead_time_used(self):
# Use getattr because it can be called with an unsaved instance
lead_time = getattr(self, 'lead_time', None)
party = getattr(self, 'party', None)
if lead_time is None and party:
company = getattr(self, 'company', None)
lead_time = party.get_multivalue(
'supplier_lead_time', company=company.id if company else None)
return lead_time
def compute_supply_date(self, date=None):
'''
Compute the supply date for the Product Supplier at the given date
'''
Date = Pool().get('ir.date')
if not date:
with Transaction().set_context(company=self.company.id):
date = Date.today()
if self.lead_time_used is None:
return datetime.date.max
return date + self.lead_time_used
def compute_purchase_date(self, date):
'''
Compute the purchase date for the Product Supplier at the given date
'''
Date = Pool().get('ir.date')
if self.lead_time_used is None or date is None:
with Transaction().set_context(company=self.company.id):
return Date.today()
return date - self.lead_time_used
@staticmethod
def get_pattern():
context = Transaction().context
pattern = {'party': context.get('supplier')}
if 'product_supplier' in context:
pattern['id'] = context['product_supplier']
return pattern
class ProductSupplierPrice(
sequence_ordered(), ModelSQL, ModelView, MatchMixin):
__name__ = 'purchase.product_supplier.price'
product_supplier = fields.Many2One('purchase.product_supplier',
'Supplier', required=True, ondelete='CASCADE')
quantity = fields.Float(
"Quantity",
required=True,
domain=[('quantity', '>=', 0)],
help='Minimal quantity.')
unit_price = Monetary(
"Unit Price", currency='currency', required=True, digits=price_digits)
unit = fields.Function(fields.Many2One(
'product.uom', "Unit",
help="The unit in which the quantity is specified."),
'on_change_with_unit')
currency = fields.Function(
fields.Many2One('currency.currency', 'Currency'),
'on_change_with_currency')
@classmethod
def __setup__(cls):
super().__setup__()
cls.__access__.add('product_supplier')
@staticmethod
def default_quantity():
return 0.0
@fields.depends('product_supplier', '_parent_product_supplier.unit')
def on_change_with_unit(self, name=None):
return self.product_supplier.unit if self.product_supplier else None
@fields.depends('product_supplier', '_parent_product_supplier.currency')
def on_change_with_currency(self, name=None):
if self.product_supplier:
return self.product_supplier.currency
@staticmethod
def get_pattern():
return {}
def match(self, quantity, unit, pattern):
pool = Pool()
Uom = pool.get('product.uom')
test_quantity = Uom.compute_qty(
self.product_supplier.unit, self.quantity, unit)
if test_quantity > abs(quantity):
return False
return super().match(pattern)