Files
tradon/modules/stock/location.py
2026-03-14 09:42:12 +00:00

762 lines
28 KiB
Python

# This file is part of Tryton. The COPYRIGHT file at the top level of
# this repository contains the full copyright notices and license terms.
import datetime
import operator
from decimal import Decimal
from sql import Column
from trytond.cache import Cache
from trytond.i18n import gettext
from trytond.model import (
DeactivableMixin, Index, MatchMixin, Model, ModelSQL, ModelView, fields,
sequence_ordered, tree)
from trytond.modules.product import price_digits, round_price
from trytond.pool import Pool
from trytond.pyson import Eval, If, TimeDelta
from trytond.tools import grouped_slice
from trytond.transaction import (
Transaction, inactive_records, without_check_access)
from .exceptions import LocationValidationError
class WarehouseWasteLocation(ModelSQL):
__name__ = 'stock.location.waste'
warehouse = fields.Many2One(
'stock.location', "Warehouse", required=True, ondelete='CASCADE',
domain=[('type', '=', 'warehouse')])
location = fields.Many2One(
'stock.location', "Waste Location", required=True, ondelete='CASCADE',
domain=[('type', '=', 'lost_found')])
class Location(DeactivableMixin, tree(), ModelSQL, ModelView):
__name__ = 'stock.location'
_default_warehouse_cache = Cache('stock.location.default_warehouse',
context=False)
name = fields.Char("Name", size=None, required=True, translate=True)
code = fields.Char(
"Code",
help="The internal identifier used for the location.")
address = fields.Many2One(
'party.address', "Address",
states={
'invisible': Eval('type') != 'warehouse',
})
type = fields.Selection([
('supplier', 'Supplier'),
('customer', 'Customer'),
('lost_found', 'Lost and Found'),
('warehouse', 'Warehouse'),
('storage', 'Storage'),
('production', 'Production'),
('drop', 'Drop'),
('rental', "Rental"),
('view', 'View'),
], "Type",
help_selection={
'supplier': "Used as the source of stock received from suppliers.",
'customer': "Used as the destination for stock sent to customers.",
'lost_found': "Used for damages, discrepancies and wastage.",
'warehouse': (
"Regroup storage locations under a logistics warehouse."),
'storage': "Used to physically store goods.",
'production': (
"Used as the destination of components and the source of "
"finished products."),
'drop': "Used during the drop shipping process.",
'view': "Group locations logically.",
})
type_string = type.translated('type')
parent = fields.Many2One(
"stock.location", "Parent", ondelete='CASCADE',
left="left", right="right",
help="Used to add structure above the location.")
left = fields.Integer('Left', required=True)
right = fields.Integer('Right', required=True)
childs = fields.One2Many("stock.location", "parent", "Children",
help="Used to add structure below the location.")
flat_childs = fields.Boolean(
"Flat Children",
help="Check to enforce a single level of children with no "
"grandchildren.")
warehouse = fields.Function(fields.Many2One('stock.location', 'Warehouse'),
'get_warehouse')
input_location = fields.Many2One(
"stock.location", "Input", states={
'invisible': Eval('type') != 'warehouse',
'required': Eval('type') == 'warehouse',
},
domain=[
['OR',
('type', '=', 'storage'),
('id', '=', Eval('storage_location', -1)),
],
['OR',
('parent', 'child_of', [Eval('id', -1)]),
('parent', '=', None),
],
],
help="Where incoming stock is received.")
output_location = fields.Many2One(
"stock.location", "Output", states={
'invisible': Eval('type') != 'warehouse',
'required': Eval('type') == 'warehouse',
},
domain=[
['OR',
('type', '=', 'storage'),
('id', '=', Eval('storage_location', -1)),
],
['OR',
('parent', 'child_of', [Eval('id', -1)]),
('parent', '=', None)]],
help="Where outgoing stock is sent from.")
storage_location = fields.Many2One(
"stock.location", "Storage", states={
'invisible': Eval('type') != 'warehouse',
'required': Eval('type') == 'warehouse',
},
domain=[
('type', 'in', ['storage', 'view']),
['OR',
('parent', 'child_of', [Eval('id', -1)]),
('parent', '=', None)]],
help="The top level location where stock is stored.")
picking_location = fields.Many2One(
'stock.location', 'Picking', states={
'invisible': Eval('type') != 'warehouse',
},
domain=[
('type', '=', 'storage'),
('parent', 'child_of', [Eval('storage_location', -1)]),
],
help="Where stock is picked from.\n"
"Leave empty to use the storage location.")
lost_found_location = fields.Many2One(
'stock.location', "Lost and Found",
states={
'invisible': Eval('type') != 'warehouse',
},
domain=[
('type', '=', 'lost_found'),
],
help="Used, by inventories, when correcting stock levels "
"in the warehouse.")
waste_locations = fields.Many2Many(
'stock.location.waste', 'warehouse', 'location', "Waste Locations",
states={
'invisible': Eval('type') != 'warehouse',
},
domain=[
('type', '=', 'lost_found'),
],
help="The locations used for waste products from the warehouse.")
waste_warehouses = fields.Many2Many(
'stock.location.waste', 'location', 'warehouse', "Waste Warehouses",
states={
'invisible': Eval('type') != 'lost_found',
},
domain=[
('type', '=', 'warehouse'),
],
help="The warehouses that use the location for waste products.")
allow_pickup = fields.Boolean(
"Allow Pickup",
states={
'invisible': (
(Eval('type') != 'warehouse')
& ~Eval('address')),
})
quantity = fields.Function(
fields.Float(
"Quantity", digits='quantity_uom',
help="The amount of stock in the location."),
'get_quantity', searcher='search_quantity')
forecast_quantity = fields.Function(
fields.Float(
"Forecast Quantity", digits='quantity_uom',
help="The amount of stock expected to be in the location."),
'get_quantity', searcher='search_quantity')
quantity_uom = fields.Function(fields.Many2One(
'product.uom', "Quantity UoM",
help="The Unit of Measure for the quantities."),
'get_quantity_uom')
cost_value = fields.Function(fields.Numeric(
"Cost Value", digits=price_digits,
help="The value of the stock in the location."),
'get_cost_value')
@classmethod
def __setup__(cls):
cls.code.search_unaccented = False
super().__setup__()
t = cls.__table__()
cls._sql_indexes.update({
Index(t, (t.code, Index.Similarity())),
Index(
t,
(t.left, Index.Range(cardinality='high')),
(t.right, Index.Range(cardinality='high'))),
})
cls._order.insert(0, ('name', 'ASC'))
parent_domain = [
['OR',
('parent.flat_childs', '=', False),
('parent', '=', None),
]
]
childs_domain = [
If(Eval('flat_childs', False),
('childs', '=', None),
()),
]
childs_mapping = cls._childs_domain()
for type_, allowed_parents in cls._parent_domain().items():
parent_domain.append(If(Eval('type') == type_,
('type', 'in', allowed_parents), ()))
childs_domain.append(If(Eval('type') == type_,
('type', 'in', childs_mapping[type_]), ()))
cls.parent.domain = parent_domain
cls.childs.domain = childs_domain
@classmethod
def _parent_domain(cls):
'''Returns a dict with location types as keys and a list of allowed
parent location types as values'''
return {
'customer': ['customer'],
'supplier': ['supplier'],
'production': ['production'],
'lost_found': ['lost_found'],
'view': ['warehouse', 'view', 'storage'],
'storage': ['warehouse', 'view', 'storage'],
'warehouse': ['view'],
}
@classmethod
def _childs_domain(cls):
childs_domain = {}
for type_, allowed_parents in cls._parent_domain().items():
for parent in allowed_parents:
childs_domain.setdefault(parent, [])
childs_domain[parent].append(type_)
return childs_domain
@classmethod
def validate_fields(cls, locations, field_names):
super().validate_fields(locations, field_names)
inactives = []
for location in locations:
location.check_type_for_moves(field_names)
if 'active' in field_names and not location.active:
inactives.append(location)
cls.check_inactive(inactives)
def check_type_for_moves(self, field_names=None):
""" Check locations with moves have types compatible with moves. """
pool = Pool()
Move = pool.get('stock.move')
if field_names and 'type' not in field_names:
return
invalid_move_types = ['warehouse', 'view']
if self.type in invalid_move_types:
moves = Move.search([
['OR',
('to_location', '=', self.id),
('from_location', '=', self.id),
],
('state', 'not in', ['staging', 'draft']),
],
order=[], limit=1)
if moves:
raise LocationValidationError(
gettext('stock.msg_location_invalid_type_for_moves',
location=self.rec_name,
type=self.type_string))
@classmethod
def check_inactive(cls, locations):
"Check inactive location are empty"
assert all(not l.active for l in locations)
empty = cls.get_empty_locations(locations)
non_empty = set(locations) - set(empty)
if non_empty:
raise LocationValidationError(
gettext('stock.msg_location_inactive_not_empty',
location=next(iter(non_empty)).rec_name))
@classmethod
def get_empty_locations(cls, locations=None):
pool = Pool()
Move = pool.get('stock.move')
Product = pool.get('product.product')
if locations is None:
locations = cls.search([])
if not locations:
return []
location_ids = list(map(int, locations))
with without_check_access(), inactive_records():
query = Move.compute_quantities_query(
location_ids, with_childs=True)
quantities = Move.compute_quantities(
query, location_ids, with_childs=True)
empty = set(location_ids)
product_ids = [q[1] for q in quantities.keys()]
consumables = {
p.id for p in Product.browse(product_ids) if p.consumable}
for (location_id, product), quantity in quantities.items():
if quantity and product not in consumables:
empty.discard(location_id)
for sub_ids in grouped_slice(list(empty)):
sub_ids = list(sub_ids)
moves = Move.search([
('state', 'not in', ['done', 'cancelled']),
['OR',
('from_location', 'in', sub_ids),
('to_location', 'in', sub_ids),
],
])
for move in moves:
for location in [move.from_location, move.to_location]:
empty.discard(location.id)
return cls.browse(empty)
@staticmethod
def default_left():
return 0
@staticmethod
def default_right():
return 0
@classmethod
def default_flat_childs(cls):
return False
@staticmethod
def default_type():
return 'storage'
def get_warehouse(self, name):
# Order by descending left to get the first one in the tree
with inactive_records():
locations = self.search([
('parent', 'parent_of', [self.id]),
('type', '=', 'warehouse'),
], order=[('left', 'DESC')])
if locations:
return locations[0].id
@classmethod
def get_default_warehouse(cls):
warehouse = Transaction().context.get('warehouse')
if warehouse:
return warehouse
warehouse = cls._default_warehouse_cache.get(None, -1)
if warehouse == -1:
warehouses = cls.search([
('type', '=', 'warehouse'),
], limit=2)
if len(warehouses) == 1:
warehouse = warehouses[0].id
else:
warehouse = None
cls._default_warehouse_cache.set(None, warehouse)
return warehouse
@property
def lost_found_used(self):
if self.warehouse:
return self.warehouse.lost_found_location
def get_rec_name(self, name):
if self.code:
return f'[{self.code}] {self.name}'
else:
return self.name
@classmethod
def search_rec_name(cls, name, clause):
if clause[1].startswith('!') or clause[1].startswith('not '):
bool_op = 'AND'
else:
bool_op = 'OR'
return [bool_op,
(cls._rec_name,) + tuple(clause[1:]),
('code',) + tuple(clause[1:]),
]
@classmethod
def _get_quantity_grouping(cls):
context = Transaction().context
grouping, grouping_filter, key = (), (), []
if context.get('product') is not None:
grouping = ('product',)
grouping_filter = ([context['product']],)
key = (context['product'],)
elif context.get('product_template') is not None:
grouping = ('product.template',)
grouping_filter = ([context['product_template']],)
key = (context['product_template'],)
return grouping, grouping_filter, key
@classmethod
def get_quantity(cls, locations, name):
pool = Pool()
Product = pool.get('product.product')
Date_ = pool.get('ir.date')
trans_context = Transaction().context
def valid_context(name):
return (trans_context.get(name) is not None
and isinstance(trans_context[name], int))
context = {}
if (name == 'quantity'
and ((trans_context.get('stock_date_end') or datetime.date.max)
> Date_.today())):
context['stock_date_end'] = Date_.today()
if name == 'forecast_quantity':
context['forecast'] = True
if not trans_context.get('stock_date_end'):
context['stock_date_end'] = datetime.date.max
grouping, grouping_filter, key = cls._get_quantity_grouping()
if not grouping:
return {loc.id: None for loc in locations}
pbl = {}
for sub_locations in grouped_slice(locations):
location_ids = [l.id for l in sub_locations]
with Transaction().set_context(context):
pbl.update(Product.products_by_location(
location_ids,
grouping=grouping,
grouping_filter=grouping_filter,
with_childs=trans_context.get('with_childs', True)))
return dict((loc.id, pbl.get((loc.id,) + key, 0)) for loc in locations)
@classmethod
def search_quantity(cls, name, domain):
_, operator_, operand = domain
operator_ = {
'=': operator.eq,
'>=': operator.ge,
'>': operator.gt,
'<=': operator.le,
'<': operator.lt,
'!=': operator.ne,
'in': lambda v, l: v in l,
'not in': lambda v, l: v not in l,
}.get(operator_, lambda v, l: False)
ids = []
for location in cls.search([], order=[('left', 'ASC')]):
if operator_(getattr(location, name), operand):
ids.append(location.id)
return [('id', 'in', ids)]
@classmethod
def get_quantity_uom(cls, locations, name):
pool = Pool()
Product = pool.get('product.product')
Template = pool.get('product.template')
context = Transaction().context
value = None
uom = None
if context.get('product') is not None:
product = Product(context['product'])
uom = product.default_uom
elif context.get('product_template') is not None:
template = Template(context['product_template'])
uom = template.default_uom
if uom:
value = uom.id
return {l.id: value for l in locations}
@classmethod
def get_cost_value(cls, locations, name):
pool = Pool()
Product = pool.get('product.product')
Template = pool.get('product.template')
trans_context = Transaction().context
cost_values = {l.id: None for l in locations}
def valid_context(name):
return (trans_context.get(name) is not None
and isinstance(trans_context[name], int))
if not any(map(valid_context, ['product', 'product_template'])):
return cost_values
def get_record():
if trans_context.get('product') is not None:
return Product(trans_context['product'])
else:
return Template(trans_context['product_template'])
context = {}
if trans_context.get('stock_date_end') is not None:
# Use the last cost_price of the day
context['_datetime'] = datetime.datetime.combine(
trans_context['stock_date_end'], datetime.time.max)
# The date could be before the product creation
record = get_record()
if record.create_date > context['_datetime']:
return cost_values
with Transaction().set_context(context):
cost_price = get_record().cost_price
# The template may have more than one product
if cost_price is not None:
for location in locations:
cost_values[location.id] = round_price(
Decimal(str(location.quantity)) * cost_price)
return cost_values
@classmethod
def _set_warehouse_parent(cls, locations):
'''
Set the parent of child location of warehouse if not set
'''
to_update = set()
to_save = []
for location in locations:
if location.type == 'warehouse':
if not location.input_location.parent:
to_update.add(location.input_location)
if not location.output_location.parent:
to_update.add(location.output_location)
if not location.storage_location.parent:
to_update.add(location.storage_location)
if to_update:
for child_location in to_update:
child_location.parent = location
to_save.append(child_location)
to_update.clear()
cls.save(to_save)
@classmethod
def on_modification(cls, mode, locations, field_names=None):
super().on_modification(mode, locations, field_names=field_names)
if mode in {'create', 'write'}:
cls._set_warehouse_parent(locations)
cls._default_warehouse_cache.clear()
@classmethod
def check_modification(cls, mode, locations, values=None, external=False):
super().check_modification(
mode, locations, values=values, external=external)
if mode == 'write' and 'parent' in values:
warehouses = cls.search([
('type', '=', 'warehouse'),
])
cls.validate_fields(
warehouses,
{'storage_location', 'input_location', 'output_location'})
@classmethod
def delete(cls, locations):
# Delete also required children as CASCADING is done separately
extra_locations = []
for location in locations:
extra_locations.extend(filter(None, [
location.input_location,
location.output_location,
location.storage_location,
]))
super().delete(locations + extra_locations)
@classmethod
def copy(cls, locations, default=None):
if default is None:
default = {}
else:
default = default.copy()
res = []
for location in locations:
if location.type == 'warehouse':
wh_default = default.copy()
wh_default['type'] = 'view'
wh_default['input_location'] = None
wh_default['output_location'] = None
wh_default['storage_location'] = None
wh_default['childs'] = None
new_location, = super().copy([location],
default=wh_default)
with Transaction().set_context(
cp_warehouse_locations={
'input_location': location.input_location.id,
'output_location': location.output_location.id,
'storage_location': location.storage_location.id,
},
cp_warehouse_id=new_location.id):
cls.copy(location.childs,
default={'parent': new_location.id})
cls.write([new_location], {
'type': 'warehouse',
})
else:
new_location, = super().copy([location],
default=default)
warehouse_locations = Transaction().context.get(
'cp_warehouse_locations') or {}
if location.id in warehouse_locations.values():
cp_warehouse = cls(
Transaction().context['cp_warehouse_id'])
for field, loc_id in warehouse_locations.items():
if loc_id == location.id:
cls.write([cp_warehouse], {
field: new_location.id,
})
res.append(new_location)
return res
@classmethod
def view_attributes(cls):
storage_types = Eval('type').in_(['storage', 'warehouse', 'view'])
return super().view_attributes() + [
('/tree/field[@name="quantity"]',
'visual', If(
storage_types & (Eval('quantity', 0) < 0), 'danger', ''),
['type']),
('/tree/field[@name="forecast_quantity"]',
'visual', If(
storage_types & (Eval('forecast_quantity', 0) < 0),
'warning', ''),
['type']),
]
class ProductsByLocationsContext(ModelView):
__name__ = 'stock.products_by_locations.context'
company = fields.Many2One('company.company', "Company", required=True)
forecast_date = fields.Date(
'At Date',
help="The date for which the stock quantity is calculated.\n"
"* An empty value calculates as far ahead as possible.\n"
"* A date in the past will provide historical values.")
stock_date_end = fields.Function(fields.Date('At Date'),
'on_change_with_stock_date_end')
@classmethod
def default_company(cls):
return Transaction().context.get('company')
@staticmethod
def default_forecast_date():
Date_ = Pool().get('ir.date')
return Date_.today()
@fields.depends('forecast_date')
def on_change_with_stock_date_end(self, name=None):
if self.forecast_date is None:
return datetime.date.max
return self.forecast_date
class ProductsByLocations(DeactivableMixin, ModelSQL, ModelView):
__name__ = 'stock.products_by_locations'
product = fields.Many2One('product.product', "Product")
quantity = fields.Function(
fields.Float("Quantity", digits='default_uom'),
'get_product', searcher='search_product')
forecast_quantity = fields.Function(
fields.Float("Forecast Quantity", digits='default_uom'),
'get_product', searcher='search_product')
default_uom = fields.Function(
fields.Many2One(
'product.uom', "Default UoM",
help="The default Unit of Measure."),
'get_product', searcher='search_product')
cost_value = fields.Function(
fields.Numeric("Cost Value"), 'get_product')
consumable = fields.Function(
fields.Boolean("Consumable"), 'get_product',
searcher='search_product')
@classmethod
def __setup__(cls):
super().__setup__()
cls._order.insert(0, ('product', 'ASC'))
@classmethod
def table_query(cls):
pool = Pool()
Product = pool.get('product.product')
product = Product.__table__()
columns = []
for fname, field in cls._fields.items():
if not hasattr(field, 'set'):
if (isinstance(field, fields.Many2One)
and field.get_target() == Product):
column = Column(product, 'id')
else:
column = Column(product, fname)
columns.append(column.as_(fname))
return product.select(*columns)
def get_rec_name(self, name):
return self.product.rec_name
@classmethod
def search_rec_name(cls, name, clause):
return [('product.rec_name',) + tuple(clause[1:])]
def get_product(self, name):
value = getattr(self.product, name)
if isinstance(value, Model):
value = value.id
return value
@classmethod
def search_product(cls, name, clause):
nested = clause[0][len(name):]
return [('product.' + name + nested, *clause[1:])]
class LocationLeadTime(sequence_ordered(), ModelSQL, ModelView, MatchMixin):
__name__ = 'stock.location.lead_time'
warehouse_from = fields.Many2One('stock.location', 'Warehouse From',
ondelete='CASCADE',
domain=[
('type', '=', 'warehouse'),
('id', '!=', Eval('warehouse_to', -1)),
])
warehouse_to = fields.Many2One('stock.location', 'Warehouse To',
ondelete='CASCADE',
domain=[
('type', '=', 'warehouse'),
('id', '!=', Eval('warehouse_from', -1)),
])
lead_time = fields.TimeDelta(
"Lead Time",
domain=['OR',
('lead_time', '=', None),
('lead_time', '>=', TimeDelta()),
],
help="The time it takes to move stock between the warehouses.")
@classmethod
def get_lead_time(cls, pattern):
for record in cls.search([]):
if record.match(pattern):
return record.lead_time