Files
2026-03-14 09:42:12 +00:00

430 lines
14 KiB
Python

# This file is part of Tryton. The COPYRIGHT file at the top level of
# this repository contains the full copyright notices and license terms.
from collections import defaultdict
from itertools import groupby
from trytond.i18n import gettext
from trytond.model import (
Index, Model, ModelSQL, ModelView, Workflow, fields, sequence_ordered)
from trytond.modules.company.model import (
employee_field, reset_employee, set_employee)
from trytond.pool import Pool, PoolMeta
from trytond.pyson import Eval
from trytond.transaction import Transaction
from trytond.wizard import Button, StateAction, StateView, Wizard
from .exceptions import QuantityIssueError
class QuantityIssue(
sequence_ordered('priority', "Priority", "ASC NULLS LAST"),
Workflow, ModelSQL, ModelView):
__name__ = 'stock.quantity.issue'
company = fields.Many2One(
'company.company', "Company", required=True,
states={
'readonly': Eval('state') != 'open',
})
origin = fields.Reference(
"Origin", 'get_origins', required=True,
states={
'readonly': Eval('state') != 'open',
},
domain={
'stock.shipment.out': [
('company', '=', Eval('company', -1)),
],
'stock.shipment.in.return': [
('company', '=', Eval('company', -1)),
],
'stock.shipment.internal': [
('company', '=', Eval('company', -1)),
],
})
planned_date = fields.Function(
fields.Date("Planned Date"),
'on_change_with_planned_date')
best_planned_date = fields.Function(
fields.Date("Best Planned Date"),
'get_best_planned_date')
warehouse = fields.Function(
fields.Many2One('stock.location', "Warehouse"),
'on_change_with_warehouse')
issue_products = fields.One2Many(
'stock.quantity.issue.product', 'issue', "Issue Products",
readonly=True)
products = fields.Many2Many(
'stock.quantity.issue.product', 'issue', 'product', "Products",
readonly=True,
context={
'company': Eval('company'),
'locations': [Eval('warehouse')],
'stock_skip_warehouse': False,
'stock_date_end': Eval('planned_date'),
},
depends={'company', 'warehouse', 'planned_date'})
processed_by = employee_field("Processed by", states=['processing'])
solved_by = employee_field("Solved by", states=['solved'])
state = fields.Selection([
('open', "Open"),
('processing', "Processing"),
('solved', "Solved"),
], "State", required=True, readonly=True, sort=False)
@classmethod
def __setup__(cls):
super().__setup__()
t = cls.__table__()
cls._sql_indexes.add(
Index(
t, (t.state, Index.Equality(cardinality='low')),
where=t.state.in_(['open', 'processing'])))
cls.priority.readonly = True
cls._transitions |= {
('open', 'processing'),
('processing', 'solved'),
('processing', 'open'),
}
cls._buttons.update({
'open': {
'invisible': Eval('state') != 'processing',
'depends': ['state'],
},
'process': {
'invisible': Eval('state') != 'open',
'depends': ['state'],
},
'apply_best_planned_date': {
'invisible': Eval('state') != 'processing',
'readonly': ~Eval('best_planned_date'),
'depends': ['state', 'best_planned_date'],
},
'solve': {
'invisible': Eval('state') != 'processing',
'depends': ['state'],
},
})
@classmethod
def default_company(cls):
return Transaction().context.get('company')
@classmethod
def get_origins(cls):
pool = Pool()
Model = pool.get('ir.model')
get_name = Model.get_name
models = cls._get_origins()
return [(m, get_name(m)) for m in models]
@classmethod
def _get_origins(cls):
"Return a list of Model names for origin Reference"
return [
'stock.shipment.out',
'stock.shipment.in.return',
'stock.shipment.internal',
]
@fields.depends('origin')
def on_change_with_planned_date(self, name=None):
if (isinstance(self.origin, Model)
and self.origin.id >= 0
and hasattr(self.origin, 'planned_date')):
return self.origin.planned_date
def get_best_planned_date(self, name):
return max(
(p.best_planned_date for p in self.issue_products
if p.best_planned_date), default=None)
@classmethod
def default_warehouse(cls):
pool = Pool()
Location = pool.get('stock.location')
return Location.get_default_warehouse()
@fields.depends('origin')
def on_change_with_warehouse(self, name=None):
if (isinstance(self.origin, Model) and self.origin.id >= 0
and hasattr(self.origin, 'warehouse')):
return self.origin.warehouse
@classmethod
def default_state(cls):
return 'open'
def get_rec_name(self, name):
return (self.origin.rec_name if isinstance(self.origin, Model)
else '(%s)' % self.id)
@classmethod
@ModelView.button
@Workflow.transition('open')
@reset_employee('processed_by')
def open(cls, issues):
pass
@classmethod
@ModelView.button
@Workflow.transition('processing')
@set_employee('processed_by')
def process(cls, issues):
pass
@classmethod
@ModelView.button
def apply_best_planned_date(cls, issues):
origins = defaultdict(list)
for issue in issues:
origin = issue.origin
if hasattr(origin, 'planned_date'):
origins[origin.__class__].append(origin)
origin.planned_date = issue.best_planned_date
for klass, records in origins.items():
klass.save(records)
@classmethod
@ModelView.button
@Workflow.transition('solved')
@set_employee('solved_by')
def solve(cls, issues):
for issue in issues:
for product in issue.products:
if product.forecast_quantity < 0:
raise QuantityIssueError(
gettext('stock_quantity_issue.'
'msg_issue_solve_product_forecast_quantity',
issue=issue.rec_name,
product=product.rec_name))
@classmethod
def generate_issues(cls, warehouses=None, company=None):
"""
For each product check each stock forecast quantity and for each
negative day, it creates an issue for each origin on that day.
If warehouses is specified it checks the stock only for them.
"""
pool = Pool()
Date = pool.get('ir.date')
Location = pool.get('stock.location')
Move = pool.get('stock.move')
Product = pool.get('product.product')
ProductQuantitiesByWarehouse = pool.get(
'stock.product_quantities_warehouse')
User = pool.get('res.user')
transaction = Transaction()
today = Date.today()
if warehouses is None:
warehouses = Location.search([
('type', '=', 'warehouse'),
])
if company is None:
company = User(Transaction().user).company
# Do not keep former open issues as they may no more be valid
opens = cls.search([
('company', '=', company.id),
('state', '=', 'open'),
])
opens = [
i for i in opens if i.warehouse in warehouses or not i.warehouse]
cls.delete(opens)
issues = {}
for issue in cls.search([
('company', '=', company.id),
('state', '=', 'processing'),
]):
if issue.warehouse not in warehouses:
continue
# Remove products from processing issues
# as they may no more be valid
issue.issue_products = []
issues[issue.origin] = issue
# Order by id to speedup reduce_ids
products = Product.search([
('type', 'in', ['goods', 'assets']),
('consumable', '=', False),
],
order=[('id', 'ASC')])
for product in products:
for warehouse in warehouses:
with transaction.set_context(
product=product.id,
warehouse=warehouse.id,
stock_skip_warehouse=False,
):
product_quantities = ProductQuantitiesByWarehouse.search([
('date', '>=', today),
('company', '=', company.id),
])
issue_products = []
for product_quantity in product_quantities:
if product_quantity.quantity < 0:
moves = Move.search([
('product', '=', product.id),
('planned_date', '=', product_quantity.date),
('from_location', 'child_of',
[warehouse.id], 'parent'),
('to_location', 'not child_of',
[warehouse.id], 'parent'),
('company', '=', company.id),
])
for move in moves:
issue_products.append(cls._add(move, issues))
else:
for issue_product in issue_products:
issue_product.best_planned_date = (
product_quantity.date)
issue_products.clear()
cls.save(issues.values())
@classmethod
def _get_origin(cls, move):
if move.shipment and move.shipment.__name__ in cls._get_origins():
return move.shipment
@classmethod
def _add(cls, move, issues):
pool = Pool()
Date = pool.get('ir.date')
IssueProduct = pool.get('stock.quantity.issue.product')
origin = cls._get_origin(move)
if origin in issues:
issue = issues[origin]
else:
issue = issues[origin] = cls(
company=origin.company,
origin=origin)
issue_products = list(getattr(issue, 'issue_products', []))
issue_product = IssueProduct.get_from_move(move)
issue_products.append(issue_product)
issue.issue_products = issue_products
planned_date = issue.on_change_with_planned_date()
if planned_date:
issue.priority = (planned_date - Date.today()).days
else:
issue.priority = None
return issue_product
@classmethod
def copy(cls, issues, default=None):
if default is None:
default = {}
else:
default = default.copy()
default.setdefault('processed_by')
default.setdefault('solved_by')
return super().copy(issues, default=default)
class QuantityIssueProduction(metaclass=PoolMeta):
__name__ = 'stock.quantity.issue'
@classmethod
def __setup__(cls):
super().__setup__()
cls.origin.domain['production'] = [
('company', '=', Eval('company', -1)),
]
@classmethod
def _get_origins(cls):
return super()._get_origins() + ['production']
@classmethod
def _get_origin(cls, move):
origin = super()._get_origin(move)
if not origin:
origin = move.production_input
return origin
class QuantityIssueProduct(ModelSQL):
__name__ = 'stock.quantity.issue.product'
issue = fields.Many2One(
'stock.quantity.issue', "Issue",
required=True, ondelete='CASCADE')
product = fields.Many2One(
'product.product', "Product",
required=True, ondelete='CASCADE',
domain=[
('type', '!=', 'service'),
])
best_planned_date = fields.Date("Best Planned Date")
forecast_quantity = fields.Function(
fields.Float(
"Forecast Quantity",
help="The amount of stock expected to be in the warehouse."),
'get_forecast_quantity')
@classmethod
def get_forecast_quantity(cls, records):
quantities = {}
def key(record):
return record.warehouse, record.issue.planned_date
items = cls.browse(sorted(records, key=key))
for (warehouse, date), items in groupby(items, key=key):
with Transaction().set_context(
location=[warehouse.id],
stock_skip_warehouse=True,
stock_start_end=date):
for item in cls.browse(items):
quantities[item.id] = item.product.forecast_quantity
return quantities
def get_rec_name(self, name):
return self.product.rec_name
@classmethod
def get_from_move(cls, move):
return cls(product=move.product)
class QuantityIssueGenerate(Wizard):
__name__ = 'stock.quantity.issue.generate'
start = StateView(
'stock.quantity.issue.generate.start',
'stock_quantity_issue.quantity_issue_generate_start_view_form', [
Button("Cancel", 'end', 'tryton-cancel'),
Button("Generate", 'generate', 'tryton-ok', default=True),
])
generate = StateAction('stock_quantity_issue.act_quantity_issue_form')
def transition_generate(self):
pool = Pool()
QuantityIssue = pool.get('stock.quantity.issue')
QuantityIssue.generate_issues(warehouses=self.start.warehouses or None)
return 'end'
class QuantityIssueGenerateStart(ModelView):
__name__ = 'stock.quantity.issue.generate.start'
warehouses = fields.Many2Many(
'stock.location', None, None, "Warehouses",
domain=[
('type', '=', 'warehouse'),
],
help="If empty all warehouses are used.")
@classmethod
def default_warehouses(cls):
pool = Pool()
Location = pool.get('stock.location')
warehouse = Location.get_default_warehouse()
if warehouse:
return [warehouse]