620 lines
21 KiB
Python
620 lines
21 KiB
Python
# This file is part of Tryton. The COPYRIGHT file at the top level of
|
|
# this repository contains the full copyright notices and license terms.
|
|
from collections import defaultdict
|
|
|
|
from sql import Null
|
|
from sql.functions import CharLength
|
|
|
|
from trytond.i18n import gettext
|
|
from trytond.model import (
|
|
ChatMixin, Check, Index, Model, ModelSQL, ModelView, Workflow, fields)
|
|
from trytond.model.exceptions import AccessError
|
|
from trytond.pool import Pool
|
|
from trytond.pyson import Bool, Eval, If
|
|
from trytond.tools import grouped_slice, is_full_text, lstrip_wildcard
|
|
from trytond.transaction import Transaction
|
|
from trytond.wizard import Button, StateTransition, StateView, Wizard
|
|
|
|
from .exceptions import InventoryCountWarning, InventoryValidationError
|
|
|
|
|
|
class Inventory(Workflow, ModelSQL, ModelView, ChatMixin):
|
|
__name__ = 'stock.inventory'
|
|
_rec_name = 'number'
|
|
|
|
_states = {
|
|
'readonly': Eval('state') != 'draft',
|
|
}
|
|
|
|
number = fields.Char('Number', readonly=True,
|
|
help="The main identifier for the inventory.")
|
|
location = fields.Many2One(
|
|
'stock.location', 'Location', required=True,
|
|
domain=[('type', '=', 'storage')], states={
|
|
'readonly': (Eval('state') != 'draft') | Eval('lines', [0]),
|
|
},
|
|
help="The location inventoried.")
|
|
date = fields.Date('Date', required=True, states={
|
|
'readonly': (Eval('state') != 'draft') | Eval('lines', [0]),
|
|
},
|
|
help="The date of the stock count.")
|
|
lines = fields.One2Many(
|
|
'stock.inventory.line', 'inventory', 'Lines',
|
|
states={
|
|
'readonly': (_states['readonly'] | ~Eval('location')
|
|
| ~Eval('date')),
|
|
})
|
|
empty_quantity = fields.Selection([
|
|
(None, ""),
|
|
('keep', "Keep"),
|
|
('empty', "Empty"),
|
|
], "Empty Quantity", states=_states,
|
|
help="How lines without a quantity are handled.")
|
|
company = fields.Many2One('company.company', 'Company', required=True,
|
|
states={
|
|
'readonly': (Eval('state') != 'draft') | Eval('lines', [0]),
|
|
},
|
|
help="The company the inventory is associated with.")
|
|
state = fields.Selection([
|
|
('draft', "Draft"),
|
|
('done', "Done"),
|
|
('cancelled', "Cancelled"),
|
|
], "State", readonly=True, sort=False,
|
|
help="The current state of the inventory.")
|
|
|
|
del _states
|
|
|
|
@classmethod
|
|
def __setup__(cls):
|
|
cls.number.search_unaccented = False
|
|
super().__setup__()
|
|
t = cls.__table__()
|
|
cls._sql_indexes.add(
|
|
Index(
|
|
t,
|
|
(t.state, Index.Equality(cardinality='low')),
|
|
where=t.state == 'draft'))
|
|
cls._order.insert(0, ('date', 'DESC'))
|
|
cls._transitions |= set((
|
|
('draft', 'done'),
|
|
('draft', 'cancelled'),
|
|
))
|
|
cls._buttons.update({
|
|
'confirm': {
|
|
'invisible': Eval('state').in_(['done', 'cancelled']),
|
|
'depends': ['state'],
|
|
},
|
|
'cancel': {
|
|
'invisible': Eval('state').in_(['cancelled', 'done']),
|
|
'depends': ['state'],
|
|
},
|
|
'complete_lines': {
|
|
'readonly': Eval('state') != 'draft',
|
|
'depends': ['state'],
|
|
},
|
|
'do_count': {
|
|
'readonly': Eval('state') != 'draft',
|
|
'depends': ['state'],
|
|
},
|
|
})
|
|
|
|
@classmethod
|
|
def order_number(cls, tables):
|
|
table, _ = tables[None]
|
|
return [
|
|
~((table.state == 'cancelled') & (table.number == Null)),
|
|
CharLength(table.number), table.number]
|
|
|
|
@staticmethod
|
|
def default_state():
|
|
return 'draft'
|
|
|
|
@staticmethod
|
|
def default_date():
|
|
Date = Pool().get('ir.date')
|
|
return Date.today()
|
|
|
|
@staticmethod
|
|
def default_company():
|
|
return Transaction().context.get('company')
|
|
|
|
def get_rec_name(self, name):
|
|
pool = Pool()
|
|
Lang = pool.get('ir.lang')
|
|
lang = Lang.get()
|
|
date = lang.strftime(self.date)
|
|
return f"[{self.number}] {self.location.rec_name} @ {date}"
|
|
|
|
@classmethod
|
|
def search_rec_name(cls, name, clause):
|
|
_, operator, operand, *extra = clause
|
|
if operator.startswith('!') or operator.startswith('not '):
|
|
bool_op = 'AND'
|
|
else:
|
|
bool_op = 'OR'
|
|
number_value = operand
|
|
if operator.endswith('like') and is_full_text(operand):
|
|
number_value = lstrip_wildcard(operand)
|
|
return [bool_op,
|
|
('number', operator, number_value, *extra),
|
|
('location.rec_name', operator, operand, *extra),
|
|
]
|
|
|
|
@classmethod
|
|
def view_attributes(cls):
|
|
return super().view_attributes() + [
|
|
('/tree', 'visual', If(Eval('state') == 'cancelled', 'muted', '')),
|
|
]
|
|
|
|
@classmethod
|
|
@ModelView.button
|
|
@Workflow.transition('done')
|
|
def confirm(cls, inventories):
|
|
pool = Pool()
|
|
Move = pool.get('stock.move')
|
|
transaction = Transaction()
|
|
moves = []
|
|
for inventory in inventories:
|
|
keys = set()
|
|
for line in inventory.lines:
|
|
key = line.unique_key
|
|
if key in keys:
|
|
raise InventoryValidationError(
|
|
gettext('stock.msg_inventory_line_unique',
|
|
line=line.rec_name,
|
|
inventory=inventory.rec_name))
|
|
keys.add(key)
|
|
move = line.get_move()
|
|
if move:
|
|
moves.append(move)
|
|
if moves:
|
|
with transaction.set_context(_product_replacement=False):
|
|
Move.save(moves)
|
|
Move.do(moves)
|
|
|
|
@classmethod
|
|
@ModelView.button
|
|
@Workflow.transition('cancelled')
|
|
def cancel(cls, inventories):
|
|
Line = Pool().get("stock.inventory.line")
|
|
Line.cancel_move([l for i in inventories for l in i.lines])
|
|
|
|
@classmethod
|
|
def preprocess_values(cls, mode, values):
|
|
pool = Pool()
|
|
Configuration = pool.get('stock.configuration')
|
|
values = super().preprocess_values(mode, values)
|
|
if mode == 'create' and not values.get('number'):
|
|
company_id = values.get('company', cls.default_company())
|
|
if company_id is not None:
|
|
configuration = Configuration(1)
|
|
if sequence := configuration.get_multivalue(
|
|
'inventory_sequence', company=company_id):
|
|
values['number'] = sequence.get()
|
|
return values
|
|
|
|
@classmethod
|
|
def on_modification(cls, mode, inventories, field_names=None):
|
|
super().on_modification(mode, inventories, field_names=field_names)
|
|
if mode in {'create', 'write'}:
|
|
cls.complete_lines(inventories, fill=False)
|
|
|
|
@classmethod
|
|
def check_modification(
|
|
cls, mode, inventories, values=None, external=False):
|
|
super().check_modification(
|
|
mode, inventories, values=values, external=external)
|
|
if mode == 'delete':
|
|
for inventory in inventories:
|
|
if inventory.state not in {'cancelled', 'draft'}:
|
|
raise AccessError(gettext(
|
|
'stock.msg_inventory_delete_cancel',
|
|
inventory=inventory.rec_name))
|
|
|
|
@classmethod
|
|
def copy(cls, inventories, default=None):
|
|
pool = Pool()
|
|
Date = pool.get('ir.date')
|
|
|
|
if default is None:
|
|
default = {}
|
|
else:
|
|
default = default.copy()
|
|
default.setdefault('date', Date.today())
|
|
default.setdefault('lines.moves', None)
|
|
default.setdefault('number', None)
|
|
|
|
return super().copy(inventories, default=default)
|
|
|
|
@staticmethod
|
|
def grouping():
|
|
return ('product',)
|
|
|
|
@classmethod
|
|
@ModelView.button
|
|
def complete_lines(cls, inventories, fill=True):
|
|
'''
|
|
Complete or update the inventories
|
|
'''
|
|
pool = Pool()
|
|
Line = pool.get('stock.inventory.line')
|
|
Product = pool.get('product.product')
|
|
|
|
grouping = cls.grouping()
|
|
to_save, to_delete = [], []
|
|
for inventory in inventories:
|
|
# Once done computation is wrong because include created moves
|
|
if inventory.state == 'done':
|
|
continue
|
|
# Compute product quantities
|
|
with Transaction().set_context(
|
|
company=inventory.company.id,
|
|
stock_date_end=inventory.date):
|
|
if fill:
|
|
pbl = Product.products_by_location(
|
|
[inventory.location.id],
|
|
grouping=grouping)
|
|
else:
|
|
product_ids = [l.product.id for l in inventory.lines]
|
|
pbl = defaultdict(int)
|
|
for product_ids in grouped_slice(product_ids):
|
|
pbl.update(Product.products_by_location(
|
|
[inventory.location.id],
|
|
grouping=grouping,
|
|
grouping_filter=(list(product_ids),)))
|
|
|
|
# Update existing lines
|
|
for line in inventory.lines:
|
|
if line.product.type != 'goods':
|
|
to_delete.append(line)
|
|
continue
|
|
|
|
key = (inventory.location.id,) + line.unique_key
|
|
if key in pbl:
|
|
quantity = pbl.pop(key)
|
|
else:
|
|
quantity = 0.0
|
|
line.update_for_complete(quantity)
|
|
to_save.append(line)
|
|
|
|
if not fill:
|
|
continue
|
|
|
|
product_idx = grouping.index('product') + 1
|
|
# Index some data
|
|
product2type = {}
|
|
product2consumable = {}
|
|
for product in Product.browse({line[product_idx] for line in pbl}):
|
|
product2type[product.id] = product.type
|
|
product2consumable[product.id] = product.consumable
|
|
|
|
# Create lines if needed
|
|
for key, quantity in pbl.items():
|
|
product_id = key[product_idx]
|
|
if (product2type[product_id] != 'goods'
|
|
or product2consumable[product_id]):
|
|
continue
|
|
if not quantity:
|
|
continue
|
|
|
|
line = Line(
|
|
inventory=inventory,
|
|
**{fname: key[i] for i, fname in enumerate(grouping, 1)})
|
|
line.update_for_complete(quantity)
|
|
to_save.append(line)
|
|
if to_delete:
|
|
Line.delete(to_delete)
|
|
if to_save:
|
|
Line.save(to_save)
|
|
|
|
@classmethod
|
|
@ModelView.button_action('stock.wizard_inventory_count')
|
|
def do_count(cls, inventories):
|
|
cls.complete_lines(inventories)
|
|
|
|
|
|
class InventoryLine(ModelSQL, ModelView):
|
|
__name__ = 'stock.inventory.line'
|
|
_states = {
|
|
'readonly': Eval('inventory_state') != 'draft',
|
|
}
|
|
|
|
product = fields.Many2One('product.product', 'Product', required=True,
|
|
domain=[
|
|
('type', '=', 'goods'),
|
|
], states=_states)
|
|
unit = fields.Function(fields.Many2One(
|
|
'product.uom', "Unit",
|
|
help="The unit in which the quantity is specified."),
|
|
'get_unit')
|
|
expected_quantity = fields.Float(
|
|
"Expected Quantity", digits='unit', required=True, readonly=True,
|
|
states={
|
|
'invisible': Eval('id', -1) < 0,
|
|
},
|
|
help="The quantity the system calculated should be in the location.")
|
|
quantity = fields.Float(
|
|
"Actual Quantity", digits='unit', states=_states,
|
|
domain=[
|
|
If(Eval('quantity', None),
|
|
('quantity', '>=', 0),
|
|
()),
|
|
],
|
|
help="The actual quantity found in the location.")
|
|
moves = fields.One2Many('stock.move', 'origin', 'Moves', readonly=True)
|
|
inventory = fields.Many2One('stock.inventory', 'Inventory', required=True,
|
|
ondelete='CASCADE',
|
|
states={
|
|
'readonly': _states['readonly'] & Bool(Eval('inventory')),
|
|
},
|
|
help="The inventory the line belongs to.")
|
|
inventory_location = fields.Function(
|
|
fields.Many2One('stock.location', "Location"),
|
|
'on_change_with_inventory_location',
|
|
searcher='search_inventory_location')
|
|
inventory_date = fields.Function(
|
|
fields.Date("Date"),
|
|
'on_change_with_inventory_date',
|
|
searcher='search_inventory_date')
|
|
inventory_state = fields.Function(
|
|
fields.Selection('get_inventory_states', "Inventory State",
|
|
depends={'inventory'}),
|
|
'on_change_with_inventory_state')
|
|
|
|
@classmethod
|
|
def __setup__(cls):
|
|
super().__setup__()
|
|
cls.__access__.add('inventory')
|
|
t = cls.__table__()
|
|
cls._sql_constraints += [
|
|
('check_line_qty_pos', Check(t, t.quantity >= 0),
|
|
'stock.msg_inventory_line_quantity_positive'),
|
|
]
|
|
cls._order.insert(0, ('product', 'ASC'))
|
|
|
|
@staticmethod
|
|
def default_expected_quantity():
|
|
return 0.
|
|
|
|
@fields.depends('product')
|
|
def on_change_product(self):
|
|
if self.product:
|
|
self.unit = self.product.default_uom
|
|
|
|
@fields.depends('inventory', '_parent_inventory.location')
|
|
def on_change_with_inventory_location(self, name=None):
|
|
return self.inventory.location if self.inventory else None
|
|
|
|
@classmethod
|
|
def search_inventory_location(cls, name, clause):
|
|
nested = clause[0][len(name):]
|
|
return [('inventory.location' + nested, *clause[1:])]
|
|
|
|
@fields.depends('inventory', '_parent_inventory.date')
|
|
def on_change_with_inventory_date(self, name=None):
|
|
if self.inventory:
|
|
return self.inventory.date
|
|
|
|
@classmethod
|
|
def search_inventory_date(cls, name, clause):
|
|
return [('inventory.date',) + tuple(clause[1:])]
|
|
|
|
@classmethod
|
|
def get_inventory_states(cls):
|
|
pool = Pool()
|
|
Inventory = pool.get('stock.inventory')
|
|
return Inventory.fields_get(['state'])['state']['selection']
|
|
|
|
@fields.depends('inventory', '_parent_inventory.state')
|
|
def on_change_with_inventory_state(self, name=None):
|
|
if self.inventory:
|
|
return self.inventory.state
|
|
return 'draft'
|
|
|
|
def get_rec_name(self, name):
|
|
return self.product.rec_name
|
|
|
|
@classmethod
|
|
def search_rec_name(cls, name, clause):
|
|
return [('product.rec_name',) + tuple(clause[1:])]
|
|
|
|
def get_unit(self, name):
|
|
return self.product.default_uom
|
|
|
|
@property
|
|
def unique_key(self):
|
|
key = []
|
|
for fname in self.inventory.grouping():
|
|
value = getattr(self, fname)
|
|
if isinstance(value, Model):
|
|
value = value.id
|
|
key.append(value)
|
|
return tuple(key)
|
|
|
|
@classmethod
|
|
def cancel_move(cls, lines):
|
|
Move = Pool().get('stock.move')
|
|
moves = [m for l in lines for m in l.moves if l.moves]
|
|
Move.cancel(moves)
|
|
Move.delete(moves)
|
|
|
|
def get_move(self):
|
|
'''
|
|
Return Move instance for the inventory line
|
|
'''
|
|
pool = Pool()
|
|
Move = pool.get('stock.move')
|
|
|
|
qty = self.quantity
|
|
if qty is None:
|
|
if self.inventory.empty_quantity is None:
|
|
raise InventoryValidationError(
|
|
gettext('stock.msg_inventory_missing_empty_quantity',
|
|
inventory=self.inventory.rec_name))
|
|
if self.inventory.empty_quantity == 'keep':
|
|
return
|
|
else:
|
|
qty = 0.0
|
|
|
|
delta_qty = self.unit.round(self.expected_quantity - qty)
|
|
if delta_qty == 0.0:
|
|
return
|
|
from_location = self.inventory.location
|
|
to_location = self.inventory.location.lost_found_used
|
|
if not to_location:
|
|
raise InventoryValidationError(
|
|
gettext('stock.msg_inventory_location_missing_lost_found',
|
|
inventory=self.inventory.rec_name,
|
|
location=self.inventory.location.rec_name))
|
|
if delta_qty < 0:
|
|
(from_location, to_location, delta_qty) = \
|
|
(to_location, from_location, -delta_qty)
|
|
|
|
return Move(
|
|
from_location=from_location,
|
|
to_location=to_location,
|
|
quantity=delta_qty,
|
|
product=self.product,
|
|
unit=self.unit,
|
|
company=self.inventory.company,
|
|
effective_date=self.inventory.date,
|
|
origin=self,
|
|
)
|
|
|
|
@fields.depends('expected_quantity')
|
|
def update_for_complete(self, quantity):
|
|
if self.expected_quantity != quantity:
|
|
self.expected_quantity = quantity
|
|
|
|
@classmethod
|
|
def check_modification(cls, mode, lines, values=None, external=False):
|
|
super().check_modification(
|
|
mode, lines, values=values, external=external)
|
|
if mode == 'delete':
|
|
for line in lines:
|
|
if line.inventory_state not in {'cancelled', 'draft'}:
|
|
raise AccessError(gettext(
|
|
'stock.msg_inventory_line_delete_cancel',
|
|
line=line.rec_name,
|
|
inventory=line.inventory.rec_name))
|
|
|
|
|
|
class Count(Wizard):
|
|
__name__ = 'stock.inventory.count'
|
|
start_state = 'search'
|
|
|
|
search = StateView(
|
|
'stock.inventory.count.search',
|
|
'stock.inventory_count_search_view_form', [
|
|
Button("End", 'end', 'tryton-cancel'),
|
|
Button("Select", 'quantity', 'tryton-forward', default=True),
|
|
])
|
|
quantity = StateView(
|
|
'stock.inventory.count.quantity',
|
|
'stock.inventory_count_quantity_view_form', [
|
|
Button("Cancel", 'search', 'tryton-cancel'),
|
|
Button("Add", 'add', 'tryton-ok', default=True),
|
|
])
|
|
add = StateTransition()
|
|
|
|
def default_quantity(self, fields):
|
|
pool = Pool()
|
|
InventoryLine = pool.get('stock.inventory.line')
|
|
Warning = pool.get('res.user.warning')
|
|
values = {}
|
|
lines = InventoryLine.search(
|
|
self.get_line_domain(self.record), limit=1)
|
|
if not lines:
|
|
warning_name = '%s.%s.count_create' % (
|
|
self.record, self.search.search)
|
|
if Warning.check(warning_name):
|
|
raise InventoryCountWarning(warning_name,
|
|
gettext('stock.msg_inventory_count_create_line',
|
|
search=self.search.search.rec_name))
|
|
line = self.get_line()
|
|
line.update_for_complete(0)
|
|
line.save()
|
|
else:
|
|
line, = lines
|
|
values['line'] = line.id
|
|
values['product'] = line.product.id
|
|
values['unit'] = line.unit.id
|
|
if line.unit.rounding == 1:
|
|
values['quantity'] = 1.
|
|
return values
|
|
|
|
def get_line_domain(self, inventory):
|
|
pool = Pool()
|
|
Product = pool.get('product.product')
|
|
domain = [
|
|
('inventory', '=', inventory.id),
|
|
]
|
|
if isinstance(self.search.search, Product):
|
|
domain.append(('product', '=', self.search.search.id))
|
|
return domain
|
|
|
|
def get_line(self):
|
|
pool = Pool()
|
|
Product = pool.get('product.product')
|
|
InventoryLine = pool.get('stock.inventory.line')
|
|
|
|
line = InventoryLine(inventory=self.record)
|
|
if isinstance(self.search.search, Product):
|
|
line.product = self.search.search
|
|
return line
|
|
|
|
def transition_add(self):
|
|
if self.quantity.line and self.quantity.quantity:
|
|
line = self.quantity.line
|
|
if line.quantity:
|
|
line.quantity += self.quantity.quantity
|
|
else:
|
|
line.quantity = self.quantity.quantity
|
|
line.save()
|
|
return 'search'
|
|
|
|
|
|
class CountSearch(ModelView):
|
|
__name__ = 'stock.inventory.count.search'
|
|
|
|
search = fields.Reference(
|
|
"Search", [
|
|
('product.product', "Product"),
|
|
],
|
|
required=True,
|
|
domain={
|
|
'product.product': [
|
|
('type', '=', 'goods'),
|
|
('consumable', '=', False),
|
|
],
|
|
},
|
|
help="The item that's counted.")
|
|
|
|
@classmethod
|
|
def default_search(cls):
|
|
return 'product.product,-1'
|
|
|
|
|
|
class CountQuantity(ModelView):
|
|
__name__ = 'stock.inventory.count.quantity'
|
|
|
|
line = fields.Many2One(
|
|
'stock.inventory.line', "Line", readonly=True, required=True)
|
|
product = fields.Many2One('product.product', "Product", readonly=True)
|
|
unit = fields.Many2One(
|
|
'product.uom', "Unit", readonly=True,
|
|
help="The unit in which the quantities are specified.")
|
|
total_quantity = fields.Float(
|
|
"Total Quantity", digits='unit', readonly=True,
|
|
help="The total amount of the line counted so far.")
|
|
|
|
quantity = fields.Float(
|
|
"Quantity", digits='unit', required=True,
|
|
help="The quantity to add to the existing count.")
|
|
|
|
@fields.depends('quantity', 'line')
|
|
def on_change_quantity(self):
|
|
if self.line:
|
|
self.total_quantity = (
|
|
(self.line.quantity or 0) + (self.quantity or 0))
|