1354 lines
51 KiB
Python
1354 lines
51 KiB
Python
# This file is part of Tryton. The COPYRIGHT file at the top level of
|
|
# this repository contains the full copyright notices and license terms.
|
|
import datetime
|
|
from collections import defaultdict
|
|
from copy import deepcopy
|
|
from decimal import Decimal
|
|
|
|
from simpleeval import InvalidExpression, simple_eval
|
|
from sql import Literal, Null, Select, Window, With
|
|
from sql.aggregate import Max, Sum
|
|
from sql.conditionals import Case, Coalesce
|
|
from sql.operators import Concat
|
|
|
|
from trytond.i18n import gettext
|
|
from trytond.model import Index, ModelSQL, ModelView, fields
|
|
from trytond.model.exceptions import AccessError
|
|
from trytond.modules.product import price_digits, round_price
|
|
from trytond.pool import Pool, PoolMeta
|
|
from trytond.pyson import Bool, Eval, If, PYSONEncoder
|
|
from trytond.tools import decistmt, grouped_slice, reduce_ids
|
|
from trytond.tools import timezone as tz
|
|
from trytond.transaction import Transaction, without_check_access
|
|
from trytond.wizard import (
|
|
Button, StateAction, StateTransition, StateView, Wizard)
|
|
|
|
from .exceptions import ProductCostPriceError, ProductStockWarning
|
|
from .move import StockMixin
|
|
from .shipment import ShipmentAssignMixin
|
|
|
|
|
|
def _find_moves(cls, records):
|
|
pool = Pool()
|
|
Move = pool.get('stock.move')
|
|
if cls.__name__ == 'product.template':
|
|
field = 'product.template'
|
|
else:
|
|
field = 'product'
|
|
for sub_records in grouped_slice(records):
|
|
moves = Move.search([
|
|
(field, 'in', list(map(int, sub_records))),
|
|
],
|
|
limit=1, order=[])
|
|
if moves:
|
|
return moves
|
|
return False
|
|
|
|
|
|
def _check_no_stock(cls, records):
|
|
pool = Pool()
|
|
Company = pool.get('company.company')
|
|
Location = pool.get('stock.location')
|
|
Product = pool.get('product.product')
|
|
Warning = pool.get('res.user.warning')
|
|
|
|
def get_record_locations(company, location_ids, records, grouping):
|
|
record2locations = defaultdict(list)
|
|
record_ids = list(map(int, records))
|
|
with Transaction().set_context(company=company.id):
|
|
quantities = Product.products_by_location(
|
|
location_ids, with_childs=True,
|
|
grouping=grouping, grouping_filter=(record_ids,))
|
|
for key, quantity in quantities.items():
|
|
location_id, product_id, = key
|
|
if quantity:
|
|
record2locations[product_id].append(location_id)
|
|
return record2locations
|
|
|
|
def raise_warning(cls, company, record2locations):
|
|
for record_id, location_ids in record2locations.items():
|
|
record = cls(record_id)
|
|
locations = ','.join(
|
|
l.rec_name for l in Location.browse(location_ids[:5]))
|
|
if len(location_ids) > 5:
|
|
locations += '...'
|
|
warning_name = Warning.format(
|
|
'deactivate_product_with_stock', [record])
|
|
if Warning.check(warning_name):
|
|
raise ProductStockWarning(warning_name,
|
|
gettext(
|
|
'stock.msg_product_location_quantity',
|
|
product=record.rec_name,
|
|
company=company.rec_name,
|
|
locations=locations),
|
|
gettext('stock.msg_product_location_quantity_description'))
|
|
|
|
if cls.__name__ == 'product.template':
|
|
grouping = ('product.template',)
|
|
else:
|
|
grouping = ('product',)
|
|
|
|
locations = Location.search([('type', '=', 'storage')])
|
|
location_ids = list(map(int, locations))
|
|
for company in Company.search([]):
|
|
for sub_records in grouped_slice(records):
|
|
record2locations = get_record_locations(
|
|
company, location_ids, sub_records, grouping)
|
|
raise_warning(cls, company, record2locations)
|
|
|
|
|
|
class Template(metaclass=PoolMeta):
|
|
__name__ = "product.template"
|
|
quantity = fields.Function(
|
|
fields.Float(
|
|
"Quantity", digits='default_uom',
|
|
help="The amount of stock in the location."),
|
|
'sum_product')
|
|
forecast_quantity = fields.Function(
|
|
fields.Float(
|
|
"Forecast Quantity", digits='default_uom',
|
|
help="The amount of stock expected to be in the location."),
|
|
'sum_product')
|
|
cost_value = fields.Function(
|
|
fields.Numeric(
|
|
"Cost Value", digits=price_digits,
|
|
help="The value of the stock in the location."),
|
|
'sum_product')
|
|
|
|
def sum_product(self, name):
|
|
if name not in ('quantity', 'forecast_quantity', 'cost_value'):
|
|
raise Exception('Bad argument')
|
|
sum_ = 0. if name != 'cost_value' else Decimal(0)
|
|
for product in self.products:
|
|
sum_ += getattr(product, name) or 0
|
|
return sum_
|
|
|
|
@classmethod
|
|
def __setup__(cls):
|
|
super().__setup__()
|
|
cls._modify_no_move = [
|
|
('default_uom', 'stock.msg_product_change_default_uom'),
|
|
('type', 'stock.msg_product_change_type'),
|
|
('cost_price', 'stock.msg_product_change_cost_price'),
|
|
]
|
|
|
|
@classmethod
|
|
def check_modification(cls, mode, templates, values=None, external=False):
|
|
super().check_modification(
|
|
mode, templates, values=values, external=external)
|
|
if mode == 'write' and external:
|
|
for field_name, msg in cls._modify_no_move:
|
|
if field_name in values:
|
|
if _find_moves(cls, templates):
|
|
raise AccessError(gettext(msg))
|
|
# No moves for those records
|
|
break
|
|
|
|
if not values.get('active', True):
|
|
_check_no_stock(cls, templates)
|
|
|
|
@classmethod
|
|
def recompute_cost_price(cls, templates, start=None):
|
|
pool = Pool()
|
|
Product = pool.get('product.product')
|
|
|
|
products = [p for t in templates for p in t.products]
|
|
Product.recompute_cost_price(products, start=start)
|
|
|
|
|
|
class Product(StockMixin, object, metaclass=PoolMeta):
|
|
__name__ = "product.product"
|
|
quantity = fields.Function(fields.Float(
|
|
"Quantity", digits='default_uom',
|
|
help="The amount of stock in the location."),
|
|
'get_quantity', searcher='search_quantity')
|
|
forecast_quantity = fields.Function(fields.Float(
|
|
"Forecast Quantity", digits='default_uom',
|
|
help="The amount of stock expected to be in the location."),
|
|
'get_quantity', searcher='search_quantity')
|
|
cost_value = fields.Function(fields.Numeric(
|
|
"Cost Value", digits=price_digits,
|
|
help="The value of the stock in the location."),
|
|
'get_cost_value')
|
|
|
|
@classmethod
|
|
def get_quantity(cls, products, name):
|
|
location_ids = Transaction().context.get('locations')
|
|
product_ids = list(map(int, products))
|
|
return cls._get_quantity(
|
|
products, name, location_ids, grouping_filter=(product_ids,))
|
|
|
|
@classmethod
|
|
def search_quantity(cls, name, domain=None):
|
|
location_ids = Transaction().context.get('locations')
|
|
return cls._search_quantity(name, location_ids, domain)
|
|
|
|
@classmethod
|
|
def get_cost_value(cls, products, name):
|
|
pool = Pool()
|
|
Company = pool.get('company.company')
|
|
cost_values = {p.id: None for p in products}
|
|
context = {}
|
|
trans_context = Transaction().context
|
|
if trans_context.get('stock_date_end'):
|
|
# Use the last cost_price of the day
|
|
context['_datetime'] = datetime.datetime.combine(
|
|
trans_context['stock_date_end'], datetime.time.max)
|
|
company = trans_context.get('company')
|
|
if company is not None and company >= 0:
|
|
company = Company(company)
|
|
if company.timezone:
|
|
timezone = tz.ZoneInfo(company.timezone)
|
|
try:
|
|
context['_datetime'] = (
|
|
context['_datetime']
|
|
.replace(tzinfo=timezone)
|
|
.astimezone(tz.UTC)
|
|
.replace(tzinfo=None))
|
|
except OverflowError:
|
|
pass
|
|
# The date could be before the product creation
|
|
products = [p for p in products
|
|
if p.create_date <= context['_datetime']]
|
|
with Transaction().set_context(context):
|
|
for product, h_product in zip(products, cls.browse(products)):
|
|
# The product may not have a cost price
|
|
if h_product.cost_price is not None:
|
|
cost_values[product.id] = round_price(
|
|
Decimal(str(product.quantity)) * h_product.cost_price)
|
|
return cost_values
|
|
|
|
@classmethod
|
|
def check_modification(cls, mode, products, values=None, external=False):
|
|
pool = Pool()
|
|
Template = pool.get('product.template')
|
|
super().check_modification(
|
|
mode, products, values=values, external=external)
|
|
if mode == 'write' and external:
|
|
for field_name, msg in Template._modify_no_move:
|
|
if field_name in values:
|
|
if _find_moves(cls, products):
|
|
raise AccessError(gettext(msg))
|
|
# No moves for those records
|
|
break
|
|
|
|
if values.get('template'):
|
|
template = Template(values['template'])
|
|
for product in products:
|
|
for field_name, msg in Template._modify_no_move:
|
|
if isinstance(
|
|
getattr(Template, field_name),
|
|
fields.Function):
|
|
continue
|
|
if (getattr(product, field_name)
|
|
!= getattr(template, field_name)):
|
|
if _find_moves(cls, [product]):
|
|
raise AccessError(gettext(msg))
|
|
# No moves for this record
|
|
break
|
|
|
|
if not values.get('active', True):
|
|
_check_no_stock(cls, products)
|
|
|
|
def can_be_deactivated(self):
|
|
pool = Pool()
|
|
Location = pool.get('stock.location')
|
|
Company = pool.get('company.company')
|
|
result = super().can_be_deactivated()
|
|
locations = Location.search([('type', '=', 'storage')])
|
|
location_ids = list(map(int, locations))
|
|
for company in Company.search([]):
|
|
with Transaction().set_context(
|
|
company=company.id,
|
|
locations=location_ids):
|
|
product = self.__class__(self.id)
|
|
if product.quantity:
|
|
result = False
|
|
return result
|
|
|
|
@classmethod
|
|
def deactivate_replaced(cls, products=None):
|
|
pool = Pool()
|
|
Move = pool.get('stock.move')
|
|
deactivated = super().deactivate_replaced(products=products)
|
|
for sub_products in grouped_slice(deactivated):
|
|
moves = Move.search([
|
|
('product', 'in', list(map(int, sub_products))),
|
|
('state', 'in', ['staging', 'draft']),
|
|
])
|
|
Move.delete(moves)
|
|
return deactivated
|
|
|
|
@classmethod
|
|
def products_by_location(cls, location_ids,
|
|
with_childs=False, grouping=('product',), grouping_filter=None):
|
|
"""
|
|
Compute for each location and product the stock quantity in the default
|
|
uom of the product.
|
|
|
|
The context with keys:
|
|
stock_skip_warehouse: if set, quantities on a warehouse are no more
|
|
quantities of all child locations but quantities of the storage
|
|
zone.
|
|
|
|
Return a dictionary with location id and grouping as key
|
|
and quantity as value.
|
|
"""
|
|
pool = Pool()
|
|
Location = pool.get('stock.location')
|
|
Move = pool.get('stock.move')
|
|
|
|
# Skip warehouse location in favor of their storage location
|
|
# to compute quantities. Keep track of which ids to remove
|
|
# and to add after the query.
|
|
storage_to_remove = set()
|
|
wh_to_add = {}
|
|
if Transaction().context.get('stock_skip_warehouse'):
|
|
location_ids = set(location_ids)
|
|
for location in Location.browse(list(location_ids)):
|
|
if location.type == 'warehouse':
|
|
location_ids.remove(location.id)
|
|
if location.storage_location.id not in location_ids:
|
|
storage_to_remove.add(location.storage_location.id)
|
|
location_ids.add(location.storage_location.id)
|
|
wh_to_add[location.id] = location.storage_location.id
|
|
location_ids = list(location_ids)
|
|
|
|
query = Move.compute_quantities_query(location_ids, with_childs,
|
|
grouping=grouping, grouping_filter=grouping_filter)
|
|
if query is None:
|
|
return {}
|
|
quantities = Move.compute_quantities(query, location_ids, with_childs,
|
|
grouping=grouping, grouping_filter=grouping_filter)
|
|
|
|
if wh_to_add:
|
|
for wh, storage in wh_to_add.items():
|
|
for key in list(quantities.keys()):
|
|
if key[0] == storage:
|
|
quantities[(wh,) + key[1:]] = quantities[key]
|
|
if storage in storage_to_remove:
|
|
del quantities[key]
|
|
return quantities
|
|
|
|
@classmethod
|
|
def recompute_cost_price_from_moves(cls):
|
|
pool = Pool()
|
|
Move = pool.get('stock.move')
|
|
products = set()
|
|
for move in Move.search([
|
|
('unit_price_updated', '=', True),
|
|
cls._domain_moves_cost(),
|
|
],
|
|
order=[('effective_date', 'ASC')]):
|
|
if move.product not in products:
|
|
cls.__queue__.recompute_cost_price(
|
|
[move.product], start=move.effective_date)
|
|
products.add(move.product)
|
|
|
|
@classmethod
|
|
def recompute_cost_price(cls, products, start=None):
|
|
pool = Pool()
|
|
Move = pool.get('stock.move')
|
|
costs = defaultdict(list)
|
|
for product in products:
|
|
if product.type == 'service':
|
|
continue
|
|
cost = getattr(
|
|
product, 'recompute_cost_price_%s' %
|
|
product.cost_price_method)(start)
|
|
cost = round_price(cost)
|
|
costs[cost].append(product)
|
|
|
|
updated = []
|
|
for sub_products in grouped_slice(products):
|
|
domain = [
|
|
('unit_price_updated', '=', True),
|
|
cls._domain_moves_cost(),
|
|
('product', 'in', [p.id for p in sub_products]),
|
|
]
|
|
if start:
|
|
domain.append(('effective_date', '>=', start))
|
|
updated += Move.search(domain, order=[])
|
|
if updated:
|
|
Move.write(updated, {'unit_price_updated': False})
|
|
|
|
if costs:
|
|
cls.update_cost_price(costs)
|
|
|
|
@classmethod
|
|
@without_check_access
|
|
def update_cost_price(cls, costs):
|
|
"Update cost price of products from costs re-computation dictionary"
|
|
to_write = []
|
|
for cost, products in costs.items():
|
|
to_write.append(products)
|
|
to_write.append({'cost_price': cost})
|
|
cls.write(*to_write)
|
|
|
|
def recompute_cost_price_fixed(self, start=None):
|
|
return self.cost_price or Decimal(0)
|
|
|
|
@classmethod
|
|
def _domain_moves_cost(cls):
|
|
"Returns the domain for moves to use in cost computation"
|
|
context = Transaction().context
|
|
return [
|
|
('company', '=', context.get('company')),
|
|
('state', '=', 'done'),
|
|
]
|
|
|
|
@classmethod
|
|
def _domain_in_moves_cost(cls):
|
|
"Return the domain for incoming moves in cost computation"
|
|
return [
|
|
('to_location.type', '=', 'storage'),
|
|
('from_location.type', '!=', 'storage'),
|
|
]
|
|
|
|
@classmethod
|
|
def _domain_out_moves_cost(cls):
|
|
"Return the domain for outgoing moves in cost computation"
|
|
return [
|
|
('from_location.type', '=', 'storage'),
|
|
('to_location.type', '!=', 'storage'),
|
|
]
|
|
|
|
@classmethod
|
|
def _domain_storage_quantity(cls):
|
|
"Returns the domain for locations to use in cost computation"
|
|
return [('type', '=', 'storage')]
|
|
|
|
def _get_storage_quantity(self, date=None):
|
|
pool = Pool()
|
|
Location = pool.get('stock.location')
|
|
|
|
locations = Location.search(self._domain_storage_quantity())
|
|
if not date:
|
|
date = datetime.date.today()
|
|
location_ids = [l.id for l in locations]
|
|
with Transaction().set_context(
|
|
locations=location_ids,
|
|
with_childs=False,
|
|
stock_date_end=date):
|
|
return self.__class__(self.id).quantity
|
|
|
|
def recompute_cost_price_average(self, start=None):
|
|
pool = Pool()
|
|
Move = pool.get('stock.move')
|
|
Uom = pool.get('product.uom')
|
|
Revision = pool.get('product.cost_price.revision')
|
|
|
|
domain = [
|
|
('product', '=', self.id),
|
|
self._domain_moves_cost(),
|
|
['OR',
|
|
self._domain_in_moves_cost(),
|
|
self._domain_out_moves_cost(),
|
|
]
|
|
]
|
|
if start:
|
|
domain.append(('effective_date', '>=', start))
|
|
moves = Move.search(
|
|
domain, order=[('effective_date', 'ASC'), ('id', 'ASC')])
|
|
|
|
_in_moves = Move.search([
|
|
('product', '=', self.id),
|
|
self._domain_moves_cost(),
|
|
self._domain_in_moves_cost(),
|
|
], order=[])
|
|
_in_moves = set(m.id for m in _in_moves)
|
|
|
|
revisions = Revision.get_for_product(self)
|
|
|
|
cost_price = Decimal(0)
|
|
quantity = 0
|
|
if start:
|
|
domain.remove(('effective_date', '>=', start))
|
|
domain.append(('effective_date', '<', start))
|
|
domain.append(self._domain_in_moves_cost())
|
|
prev_moves = Move.search(
|
|
domain,
|
|
order=[('effective_date', 'DESC'), ('id', 'DESC')],
|
|
limit=1)
|
|
if prev_moves:
|
|
move, = prev_moves
|
|
cost_price = move.cost_price
|
|
quantity = self._get_storage_quantity(
|
|
date=start - datetime.timedelta(days=1))
|
|
quantity = Decimal(str(quantity))
|
|
|
|
def in_move(move):
|
|
return move.id in _in_moves
|
|
|
|
def out_move(move):
|
|
return not in_move(move)
|
|
|
|
def production_move(move):
|
|
return (
|
|
move.from_location.type == 'production'
|
|
or move.to_location.type == 'production')
|
|
|
|
current_moves = []
|
|
current_cost_price = cost_price
|
|
qty_production = 0
|
|
for move in moves:
|
|
if (current_moves
|
|
and current_moves[-1].effective_date
|
|
!= move.effective_date):
|
|
Move.write([
|
|
m for m in current_moves
|
|
if m.cost_price != current_cost_price],
|
|
dict(cost_price=current_cost_price))
|
|
current_moves.clear()
|
|
qty_production = 0
|
|
current_moves.append(move)
|
|
|
|
cost_price = Revision.apply_up_to(
|
|
revisions, cost_price, move.effective_date)
|
|
qty = Uom.compute_qty(move.unit, move.quantity, self.default_uom)
|
|
qty = Decimal(str(qty))
|
|
if out_move(move):
|
|
qty *= -1
|
|
if in_move(move):
|
|
in_qty = qty
|
|
if production_move(move) and qty_production < 0:
|
|
# Exclude quantity coming back from production
|
|
in_qty -= min(abs(qty_production), in_qty)
|
|
unit_price = move.get_cost_price(product_cost_price=cost_price)
|
|
if quantity + in_qty > 0 and quantity >= 0:
|
|
cost_price = (
|
|
(cost_price * quantity) + (unit_price * in_qty)
|
|
) / (quantity + in_qty)
|
|
elif in_qty > 0:
|
|
cost_price = unit_price
|
|
current_cost_price = round_price(cost_price)
|
|
quantity += qty
|
|
if production_move(move):
|
|
qty_production += qty
|
|
|
|
Move.write([
|
|
m for m in current_moves
|
|
if m.cost_price != current_cost_price],
|
|
dict(cost_price=current_cost_price))
|
|
|
|
for revision in revisions:
|
|
cost_price = revision.get_cost_price(cost_price)
|
|
return cost_price
|
|
|
|
@classmethod
|
|
def view_attributes(cls):
|
|
return super().view_attributes() + [
|
|
('/tree/field[@name="quantity"]',
|
|
'visual', If(Eval('quantity', 0) < 0, 'danger', '')),
|
|
('/tree/field[@name="forecast_quantity"]',
|
|
'visual', If(Eval('forecast_quantity', 0) < 0, 'warning', '')),
|
|
]
|
|
|
|
|
|
class ProductByLocationContext(ModelView):
|
|
__name__ = 'product.by_location.context'
|
|
|
|
company = fields.Many2One('company.company', "Company", required=True)
|
|
forecast_date = fields.Date(
|
|
'At Date',
|
|
help="The date for which the stock quantity is calculated.\n"
|
|
"* An empty value calculates as far ahead as possible.\n"
|
|
"* A date in the past will provide historical values.")
|
|
stock_date_end = fields.Function(fields.Date('At Date'),
|
|
'on_change_with_stock_date_end')
|
|
|
|
@classmethod
|
|
def default_company(cls):
|
|
return Transaction().context.get('company')
|
|
|
|
@staticmethod
|
|
def default_forecast_date():
|
|
Date = Pool().get('ir.date')
|
|
return Date.today()
|
|
|
|
@fields.depends('forecast_date')
|
|
def on_change_with_stock_date_end(self, name=None):
|
|
if self.forecast_date is None:
|
|
return datetime.date.max
|
|
return self.forecast_date
|
|
|
|
|
|
class OpenProductQuantitiesByWarehouse(Wizard):
|
|
__name__ = 'stock.product_quantities_warehouse.open'
|
|
start_state = 'open_'
|
|
_readonly = True
|
|
open_ = StateAction('stock.act_product_quantities_warehouse')
|
|
|
|
def do_open_(self, action):
|
|
encoder = PYSONEncoder()
|
|
action['pyson_context'] = encoder.encode(self.get_context())
|
|
action['pyson_search_value'] = encoder.encode(self.get_search_value())
|
|
action['pyson_domain'] = encoder.encode(self.get_domain())
|
|
action['name'] += '(' + self.record.rec_name + ')'
|
|
return action, {}
|
|
|
|
def get_context(self):
|
|
context = {}
|
|
if issubclass(self.model, ShipmentAssignMixin):
|
|
context['product_template'] = None
|
|
context['product'] = [
|
|
m.product.id for m in self.record.assign_moves]
|
|
warehouse = getattr(self.record, 'warehouse', None)
|
|
if self.model == 'stock.shipment.internal':
|
|
warehouse = self.record.from_location.warehouse
|
|
if warehouse:
|
|
context['warehouse'] = warehouse.id
|
|
return context
|
|
|
|
def get_search_value(self):
|
|
pool = Pool()
|
|
Date = pool.get('ir.date')
|
|
today = Date.today()
|
|
value = [('date', '>=', today)]
|
|
if (getattr(self.record, 'planned_date', None)
|
|
and self.record.planned_date >= today):
|
|
value.append(('date', '<=', self.record.planned_date))
|
|
return value
|
|
|
|
def get_domain(self):
|
|
if issubclass(self.model, ShipmentAssignMixin):
|
|
return [('product', 'in', [
|
|
str(m.product) for m in self.record.assign_moves])]
|
|
return []
|
|
|
|
|
|
class ProductQuantitiesByWarehouse(ModelSQL, ModelView):
|
|
__name__ = 'stock.product_quantities_warehouse'
|
|
|
|
class _Date(fields.Date):
|
|
def get(self, ids, model, name, values=None):
|
|
if values is None:
|
|
values = {}
|
|
result = {}
|
|
for v in values:
|
|
date = v[name]
|
|
# SQLite does not convert to date
|
|
if isinstance(date, str):
|
|
date = datetime.date.fromisoformat(date)
|
|
result[v['id']] = date
|
|
return result
|
|
|
|
product = fields.Reference("Product", [
|
|
('product.product', "Variant"),
|
|
('product.template', "Product"),
|
|
])
|
|
date = _Date('Date')
|
|
quantity = fields.Function(fields.Float('Quantity'), 'get_quantity')
|
|
company = fields.Many2One('company.company', "Company")
|
|
|
|
del _Date
|
|
|
|
@classmethod
|
|
def __setup__(cls):
|
|
super().__setup__()
|
|
cls._order.insert(0, ('date', 'ASC'))
|
|
|
|
@classmethod
|
|
def table_query(cls):
|
|
pool = Pool()
|
|
Move = pool.get('stock.move')
|
|
Location = pool.get('stock.location')
|
|
Product = pool.get('product.product')
|
|
Date = pool.get('ir.date')
|
|
move = from_ = Move.__table__()
|
|
transaction = Transaction()
|
|
context = transaction.context
|
|
today = Date.today()
|
|
|
|
if context.get('product_template') is not None:
|
|
product_template = context['product_template']
|
|
if isinstance(product_template, int):
|
|
product_template = [product_template]
|
|
product = Product.__table__()
|
|
from_ = move.join(product, condition=move.product == product.id)
|
|
if product_template:
|
|
if len(product_template) > transaction.database.IN_MAX:
|
|
raise AccessError(gettext(
|
|
'stock.msg_product_quantities_max',
|
|
max=transaction.database.IN_MAX))
|
|
product_clause = reduce_ids(product.template, product_template)
|
|
else:
|
|
product_clause = product.template == Null
|
|
product_column = Concat('product.template,', product.template)
|
|
products = [('product.template', i) for i in product_template]
|
|
else:
|
|
product = context.get('product')
|
|
if product is None:
|
|
product = []
|
|
if isinstance(product, int):
|
|
product = [product]
|
|
if product:
|
|
if len(product) > transaction.database.IN_MAX:
|
|
raise AccessError(gettext(
|
|
'stock.msg_product_quantities_max',
|
|
max=transaction.database.IN_MAX))
|
|
product_clause = reduce_ids(move.product, product)
|
|
else:
|
|
product_clause = move.product == Null
|
|
product_column = Concat('product.product,', move.product)
|
|
products = [('product.product', i) for i in product]
|
|
|
|
if 'warehouse' in context:
|
|
warehouse = Location(context.get('warehouse'))
|
|
if context.get('stock_skip_warehouse'):
|
|
location_id = warehouse.storage_location.id
|
|
else:
|
|
location_id = warehouse.id
|
|
else:
|
|
location_id = -1
|
|
warehouse = With('id', query=Location.search([
|
|
('parent', 'child_of', [location_id]),
|
|
], query=True, order=[]))
|
|
date_column = Coalesce(move.effective_date, move.planned_date)
|
|
query = (from_.select(
|
|
Max(move.id * 3).as_('id'),
|
|
product_column.as_('product'),
|
|
date_column.as_('date'),
|
|
move.company.as_('company'),
|
|
where=product_clause
|
|
& (
|
|
(move.from_location.in_(
|
|
warehouse.select(warehouse.id))
|
|
& ~move.to_location.in_(
|
|
warehouse.select(warehouse.id)))
|
|
| (~move.from_location.in_(
|
|
warehouse.select(warehouse.id))
|
|
& move.to_location.in_(
|
|
warehouse.select(warehouse.id))))
|
|
& ((date_column < today) & (move.state == 'done')
|
|
| (date_column > today)),
|
|
group_by=(date_column, product_column, move.company),
|
|
with_=warehouse))
|
|
for model, id_ in products:
|
|
gap = ['product.template', 'product.product'].index(model) + 1
|
|
query |= Select([
|
|
Literal(id_ * 3 + gap).as_('id'),
|
|
Literal('%s,%s' % (model, id_)).as_('product'),
|
|
Literal(today).as_('date'),
|
|
Literal(context.get('company', -1)).as_('company'),
|
|
])
|
|
return query
|
|
|
|
@classmethod
|
|
def parse_view(cls, tree, type, *args, **kwargs):
|
|
pool = Pool()
|
|
Product = pool.get('product.product')
|
|
Template = pool.get('product.template')
|
|
context = Transaction().context
|
|
if kwargs.get('view_depends') is None:
|
|
view_depends = []
|
|
else:
|
|
view_depends = kwargs['view_depends'].copy()
|
|
kwargs['view_depends'] = view_depends
|
|
if type == 'graph':
|
|
encoder = PYSONEncoder()
|
|
if context.get('product_template') is not None:
|
|
product_template = context['product_template']
|
|
if isinstance(product_template, int):
|
|
product_template = [product_template]
|
|
records = Template.browse(product_template)
|
|
elif context.get('product'):
|
|
product = context['product']
|
|
if product is None:
|
|
product = -1
|
|
if isinstance(product, int):
|
|
product = [product]
|
|
records = Product.browse(product)
|
|
else:
|
|
records = []
|
|
if len(records) > 1:
|
|
quantity_node, = tree.xpath('//y/field[@name="quantity"]')
|
|
parent = quantity_node.getparent()
|
|
parent.remove(quantity_node)
|
|
for record in records:
|
|
node = deepcopy(quantity_node)
|
|
node.set('key', str(record.id))
|
|
node.set('string', record.rec_name)
|
|
node.set('domain', encoder.encode(
|
|
Eval('product') == str(record)))
|
|
node.set('fill', '0')
|
|
parent.append(node)
|
|
graph, = tree.xpath('/graph')
|
|
graph.set('legend', '1')
|
|
view_depends.append('product')
|
|
return super().parse_view(tree, type, *args, **kwargs)
|
|
|
|
@classmethod
|
|
def get_quantity(cls, lines, name):
|
|
Product = Pool().get('product.product')
|
|
trans_context = Transaction().context
|
|
|
|
if trans_context.get('product_template') is not None:
|
|
grouping = ('product.template',)
|
|
product_template = trans_context['product_template']
|
|
if isinstance(product_template, int):
|
|
product_template = [product_template]
|
|
grouping_filter = (product_template,)
|
|
else:
|
|
grouping = ('product',)
|
|
product = trans_context.get('product', -1)
|
|
if product is None:
|
|
product = -1
|
|
if isinstance(product, int):
|
|
product = [product]
|
|
grouping_filter = (product,)
|
|
warehouse_id = trans_context.get('warehouse')
|
|
|
|
def cast_date(date):
|
|
if isinstance(date, str):
|
|
date = datetime.date.fromisoformat(date)
|
|
return date
|
|
|
|
dates = sorted({cast_date(l.date) for l in lines})
|
|
quantities = {}
|
|
keys = set()
|
|
date_start = None
|
|
for date in dates:
|
|
context = {
|
|
'stock_date_start': date_start,
|
|
'stock_date_end': date,
|
|
'forecast': True,
|
|
}
|
|
with Transaction().set_context(**context):
|
|
quantities[date] = Product.products_by_location(
|
|
[warehouse_id],
|
|
grouping=grouping,
|
|
grouping_filter=grouping_filter,
|
|
with_childs=True)
|
|
keys.update(quantities[date])
|
|
try:
|
|
date_start = date + datetime.timedelta(1)
|
|
except OverflowError:
|
|
pass
|
|
cumulate = defaultdict(lambda: 0)
|
|
for date in dates:
|
|
for key in keys:
|
|
cumulate[key] += quantities[date][key]
|
|
quantities[date][key] = cumulate[key]
|
|
|
|
return {
|
|
l.id: quantities[cast_date(l.date)].get(
|
|
(warehouse_id, int(l.product)), 0)
|
|
for l in lines}
|
|
|
|
def get_rec_name(self, name):
|
|
return self.product.rec_name if self.product else ''
|
|
|
|
|
|
class ProductQuantitiesByWarehouseContext(ModelView):
|
|
__name__ = 'stock.product_quantities_warehouse.context'
|
|
|
|
company = fields.Many2One('company.company', "Company", required=True)
|
|
warehouse = fields.Many2One('stock.location', 'Warehouse', required=True,
|
|
domain=[
|
|
('type', '=', 'warehouse'),
|
|
],
|
|
help="The warehouse for which the quantities will be calculated.")
|
|
stock_skip_warehouse = fields.Boolean(
|
|
"Only storage zone",
|
|
help="Check to use only the quantity of the storage zone.")
|
|
|
|
@classmethod
|
|
def default_company(cls):
|
|
return Transaction().context.get('company')
|
|
|
|
@classmethod
|
|
def default_warehouse(cls):
|
|
Location = Pool().get('stock.location')
|
|
return Location.get_default_warehouse()
|
|
|
|
@classmethod
|
|
def default_stock_skip_warehouse(cls):
|
|
return Transaction().context.get('stock_skip_warehouse')
|
|
|
|
|
|
class OpenProductQuantitiesByWarehouseMove(Wizard):
|
|
__name__ = 'stock.product_quantities_warehouse.move.open'
|
|
start_state = 'open_'
|
|
_readonly = True
|
|
open_ = StateAction('stock.act_product_quantities_warehouse_move')
|
|
|
|
def do_open_(self, action):
|
|
encoder = PYSONEncoder()
|
|
action['pyson_context'] = '{}'
|
|
action['pyson_search_value'] = encoder.encode(
|
|
[('date', '>=', self.record.date)])
|
|
action['pyson_domain'] = encoder.encode(
|
|
[('product', '=', str(self.record.product))])
|
|
action['name'] += ' (' + self.record.rec_name + ')'
|
|
return action, {}
|
|
|
|
|
|
class ProductQuantitiesByWarehouseMove(ModelSQL, ModelView):
|
|
__name__ = 'stock.product_quantities_warehouse.move'
|
|
|
|
product = fields.Reference("Product", [
|
|
('product.product', "Variant"),
|
|
('product.template', "Product"),
|
|
])
|
|
date = fields.Date("Date")
|
|
move = fields.Many2One('stock.move', "Move")
|
|
origin = fields.Reference("Origin", selection='get_origin')
|
|
document = fields.Function(
|
|
fields.Reference("Document", selection='get_documents'),
|
|
'get_document')
|
|
quantity = fields.Float("Quantity")
|
|
cumulative_quantity_start = fields.Function(
|
|
fields.Float("Cumulative Quantity Start"), 'get_cumulative_quantity')
|
|
cumulative_quantity_delta = fields.Float("Cumulative Quantity Delta")
|
|
cumulative_quantity_end = fields.Function(
|
|
fields.Float("Cumulative Quantity End"), 'get_cumulative_quantity')
|
|
company = fields.Many2One('company.company', "Company")
|
|
|
|
@classmethod
|
|
def __setup__(cls):
|
|
super().__setup__()
|
|
cls._order.insert(0, ('date', 'ASC'))
|
|
|
|
@classmethod
|
|
def table_query(cls):
|
|
pool = Pool()
|
|
Date = pool.get('ir.date')
|
|
Location = pool.get('stock.location')
|
|
Move = pool.get('stock.move')
|
|
Product = pool.get('product.product')
|
|
move = from_ = Move.__table__()
|
|
transaction = Transaction()
|
|
context = transaction.context
|
|
today = Date.today()
|
|
|
|
if context.get('product_template') is not None:
|
|
product_template = context['product_template']
|
|
if isinstance(product_template, int):
|
|
product_template = [product_template]
|
|
product = Product.__table__()
|
|
from_ = move.join(product, condition=move.product == product.id)
|
|
if product_template:
|
|
if len(product_template) > transaction.database.IN_MAX:
|
|
raise AccessError(gettext(
|
|
'stock.msg_product_quantities_max',
|
|
max=transaction.database.IN_MAX))
|
|
product_clause = reduce_ids(product.template, product_template)
|
|
else:
|
|
product_clause = product.template == Null
|
|
product_column = Concat('product.template,', product.template)
|
|
else:
|
|
product = context.get('product', -1)
|
|
if product is None:
|
|
product = -1
|
|
if isinstance(product, int):
|
|
product = [product]
|
|
if product:
|
|
if len(product) > transaction.database.IN_MAX:
|
|
raise AccessError(gettext(
|
|
'stock.msg_product_quantities_max',
|
|
max=transaction.database.IN_MAX))
|
|
product_clause = reduce_ids(move.product, product)
|
|
else:
|
|
product_clause = move.product == Null
|
|
product_column = Concat('product.product,', move.product)
|
|
|
|
if 'warehouse' in context:
|
|
warehouse = Location(context.get('warehouse'))
|
|
if context.get('stock_skip_warehouse'):
|
|
location_id = warehouse.storage_location.id
|
|
else:
|
|
location_id = warehouse.id
|
|
else:
|
|
location_id = -1
|
|
warehouse = With('id', query=Location.search([
|
|
('parent', 'child_of', [location_id]),
|
|
], query=True, order=[]))
|
|
date_column = Coalesce(
|
|
move.effective_date,
|
|
Case((move.state == 'assigned', today), else_=Null),
|
|
move.planned_date)
|
|
quantity = Case(
|
|
(move.to_location.in_(warehouse.select(warehouse.id)),
|
|
move.internal_quantity),
|
|
else_=-move.internal_quantity)
|
|
cumulative_quantity_delta = Sum(
|
|
quantity,
|
|
window=Window(
|
|
[product_column, date_column], order_by=[move.id.asc]))
|
|
return (from_.select(
|
|
move.id.as_('id'),
|
|
product_column.as_('product'),
|
|
date_column.as_('date'),
|
|
move.id.as_('move'),
|
|
move.origin.as_('origin'),
|
|
quantity.as_('quantity'),
|
|
cumulative_quantity_delta.as_('cumulative_quantity_delta'),
|
|
move.company.as_('company'),
|
|
where=product_clause
|
|
& (
|
|
(move.from_location.in_(
|
|
warehouse.select(warehouse.id))
|
|
& ~move.to_location.in_(
|
|
warehouse.select(warehouse.id)))
|
|
| (~move.from_location.in_(
|
|
warehouse.select(warehouse.id))
|
|
& move.to_location.in_(
|
|
warehouse.select(warehouse.id))))
|
|
& ((date_column < today) & (move.state == 'done')
|
|
| (date_column >= today) & (move.state != 'cancelled')),
|
|
with_=warehouse))
|
|
|
|
@classmethod
|
|
def get_origin(cls):
|
|
pool = Pool()
|
|
Move = pool.get('stock.move')
|
|
return Move.get_origin()
|
|
|
|
@classmethod
|
|
def _get_document_models(cls):
|
|
pool = Pool()
|
|
Move = pool.get('stock.move')
|
|
return [m for m, _ in Move.get_shipment() if m]
|
|
|
|
@classmethod
|
|
def get_documents(cls):
|
|
pool = Pool()
|
|
Model = pool.get('ir.model')
|
|
get_name = Model.get_name
|
|
models = cls._get_document_models()
|
|
return [(None, '')] + [(m, get_name(m)) for m in models]
|
|
|
|
def get_document(self, name):
|
|
if self.move and self.move.shipment:
|
|
return str(self.move.shipment)
|
|
|
|
@classmethod
|
|
def get_cumulative_quantity(cls, records, names):
|
|
pool = Pool()
|
|
Product = pool.get('product.product')
|
|
transaction = Transaction()
|
|
trans_context = transaction.context
|
|
|
|
if trans_context.get('product_template') is not None:
|
|
grouping = ('product.template',)
|
|
product_template = trans_context['product_template']
|
|
if isinstance(product_template, int):
|
|
product_template = [product_template]
|
|
grouping_filter = (product_template,)
|
|
else:
|
|
grouping = ('product',)
|
|
product = trans_context.get('product', -1)
|
|
if product is None:
|
|
product = -1
|
|
if isinstance(product, int):
|
|
product = [product]
|
|
grouping_filter = (product,)
|
|
warehouse_id = trans_context.get('warehouse')
|
|
|
|
def cast_date(date):
|
|
if isinstance(date, str):
|
|
date = datetime.date.fromisoformat(date)
|
|
return date
|
|
|
|
dates = sorted({cast_date(r.date) for r in records})
|
|
quantities = {}
|
|
keys = set()
|
|
date_start = None
|
|
for date in dates:
|
|
try:
|
|
context = {
|
|
'stock_date_start': date_start,
|
|
'stock_date_end': date - datetime.timedelta(days=1),
|
|
'forecast': True,
|
|
}
|
|
except OverflowError:
|
|
pass
|
|
with Transaction().set_context(**context):
|
|
quantities[date] = Product.products_by_location(
|
|
[warehouse_id],
|
|
grouping=grouping,
|
|
grouping_filter=grouping_filter,
|
|
with_childs=True)
|
|
keys.update(quantities[date])
|
|
date_start = date
|
|
cumulate = defaultdict(lambda: 0)
|
|
for date in dates:
|
|
for key in keys:
|
|
cumulate[key] += quantities[date][key]
|
|
quantities[date][key] = cumulate[key]
|
|
|
|
result = {}
|
|
if 'cumulative_quantity_start' in names:
|
|
result['cumulative_quantity_start'] = {
|
|
r.id: (
|
|
quantities[cast_date(r.date)].get(
|
|
(warehouse_id, int(r.product)), 0)
|
|
+ r.cumulative_quantity_delta
|
|
- r.quantity)
|
|
for r in records}
|
|
if 'cumulative_quantity_end' in names:
|
|
result['cumulative_quantity_end'] = {
|
|
r.id: (
|
|
quantities[cast_date(r.date)].get(
|
|
(warehouse_id, int(r.product)), 0)
|
|
+ r.cumulative_quantity_delta)
|
|
for r in records}
|
|
return result
|
|
|
|
def get_rec_name(self, name):
|
|
return self.move.rec_name
|
|
|
|
|
|
class RecomputeCostPrice(Wizard):
|
|
__name__ = 'product.recompute_cost_price'
|
|
start = StateView(
|
|
'product.recompute_cost_price.start',
|
|
'stock.recompute_cost_price_start_view_form', [
|
|
Button("Cancel", 'end'),
|
|
Button("Recompute", 'recompute', default=True)])
|
|
recompute = StateTransition()
|
|
|
|
def default_start(self, fields):
|
|
pool = Pool()
|
|
Move = pool.get('stock.move')
|
|
Product = pool.get('product.product')
|
|
|
|
if self.model.__name__ == 'product.product':
|
|
products = self.records
|
|
elif self.model.__name__ == 'product.template':
|
|
templates = self.records
|
|
products = sum((t.products for t in templates), ())
|
|
else:
|
|
products = []
|
|
|
|
from_ = None
|
|
for sub_products in grouped_slice(products):
|
|
moves = Move.search([
|
|
('unit_price_updated', '=', True),
|
|
Product._domain_moves_cost(),
|
|
('product', 'in', [p.id for p in sub_products]),
|
|
],
|
|
order=[('effective_date', 'ASC')],
|
|
limit=1)
|
|
if moves:
|
|
move, = moves
|
|
from_ = min(from_ or datetime.date.max, move.effective_date)
|
|
return {'from_': from_}
|
|
|
|
def transition_recompute(self):
|
|
self.model.recompute_cost_price(self.records, start=self.start.from_)
|
|
return 'end'
|
|
|
|
|
|
class RecomputeCostPriceStart(ModelView):
|
|
__name__ = 'product.recompute_cost_price.start'
|
|
from_ = fields.Date("From")
|
|
|
|
|
|
class ModifyCostPrice(Wizard):
|
|
__name__ = 'product.modify_cost_price'
|
|
start = StateView(
|
|
'product.modify_cost_price.start',
|
|
'stock.product_modify_cost_price_start_form', [
|
|
Button("Cancel", 'end', 'tryton-cancel'),
|
|
Button("OK", 'modify', default=True),
|
|
])
|
|
modify = StateTransition()
|
|
|
|
def transition_modify(self):
|
|
pool = Pool()
|
|
Product = pool.get('product.product')
|
|
Revision = pool.get('product.cost_price.revision')
|
|
Date = pool.get('ir.date')
|
|
today = Date.today()
|
|
revisions = []
|
|
costs = defaultdict(list)
|
|
if self.model.__name__ == 'product.product':
|
|
records = list(self.records)
|
|
for product in list(records):
|
|
revision = self.get_revision(Revision)
|
|
revision.product = product
|
|
revision.template = product.template
|
|
revisions.append(revision)
|
|
if ((
|
|
product.cost_price_method == 'fixed'
|
|
and revision.date == today)
|
|
or product.type == 'service'):
|
|
cost = round_price(
|
|
revision.get_cost_price(product.cost_price))
|
|
costs[cost].append(product)
|
|
records.remove(product)
|
|
elif self.model.__name__ == 'product.template':
|
|
records = list(self.records)
|
|
for template in list(records):
|
|
revision = self.get_revision(Revision)
|
|
revision.template = template
|
|
revisions.append(revision)
|
|
if ((
|
|
template.cost_price_method == 'fixed'
|
|
and revision.date == today)
|
|
or template.type == 'service'):
|
|
for product in template.products:
|
|
cost = round_price(
|
|
revision.get_cost_price(product.cost_price))
|
|
costs[cost].append(product)
|
|
records.remove(template)
|
|
Revision.save(revisions)
|
|
if costs:
|
|
Product.update_cost_price(costs)
|
|
if records:
|
|
start = min((r.date for r in revisions), default=None)
|
|
self.model.recompute_cost_price(records, start=start)
|
|
return 'end'
|
|
|
|
def get_revision(self, Revision):
|
|
return Revision(
|
|
template=None,
|
|
product=None,
|
|
company=Transaction().context.get('company'),
|
|
date=self.start.date,
|
|
cost_price=self.start.cost_price,
|
|
)
|
|
|
|
|
|
class ModifyCostPriceStart(ModelView):
|
|
__name__ = 'product.modify_cost_price.start'
|
|
date = fields.Date("Date", required=True)
|
|
cost_price = fields.Char(
|
|
"New Cost Price", required=True,
|
|
help="Python expression that will be evaluated with:\n"
|
|
"- cost_price: the current cost price of the product")
|
|
|
|
@classmethod
|
|
def default_date(cls):
|
|
pool = Pool()
|
|
Date = pool.get('ir.date')
|
|
return Date.today()
|
|
|
|
@classmethod
|
|
def default_cost_price(cls):
|
|
return 'cost_price'
|
|
|
|
|
|
class CostPriceRevision(ModelSQL, ModifyCostPriceStart):
|
|
__name__ = 'product.cost_price.revision'
|
|
template = fields.Many2One(
|
|
'product.template', "Product",
|
|
ondelete='CASCADE', required=True,
|
|
domain=[
|
|
If(Bool(Eval('product')),
|
|
('products', '=', Eval('product')),
|
|
()),
|
|
],
|
|
context={
|
|
'company': Eval('company', -1),
|
|
},
|
|
depends={'company'})
|
|
product = fields.Many2One(
|
|
'product.product', "Variant",
|
|
ondelete='CASCADE',
|
|
domain=[
|
|
If(Bool(Eval('template')),
|
|
('template', '=', Eval('template')),
|
|
()),
|
|
],
|
|
context={
|
|
'company': Eval('company', -1),
|
|
},
|
|
depends={'company'})
|
|
company = fields.Many2One(
|
|
'company.company', "Company", ondelete='CASCADE', required=True)
|
|
|
|
@classmethod
|
|
def __setup__(cls):
|
|
super().__setup__()
|
|
t = cls.__table__()
|
|
cls._sql_indexes.add(
|
|
Index(
|
|
t,
|
|
(t.product, Index.Range()),
|
|
(t.template, Index.Range()),
|
|
(t.company, Index.Range())))
|
|
cls._order.insert(0, ('date', 'DESC'))
|
|
|
|
@classmethod
|
|
def default_company(cls):
|
|
return Transaction().context.get('company')
|
|
|
|
@fields.depends('product', '_parent_product.template')
|
|
def on_change_product(self):
|
|
if self.product:
|
|
self.template = self.product.template
|
|
|
|
@classmethod
|
|
def validate_fields(cls, revisions, field_names):
|
|
super().validate_fields(revisions, field_names)
|
|
cls.check_cost_price(revisions, field_names)
|
|
|
|
@classmethod
|
|
def check_cost_price(cls, revisions, field_names):
|
|
if field_names and 'cost_price' not in field_names:
|
|
return
|
|
for revision in revisions:
|
|
revision.get_cost_price(Decimal(0))
|
|
|
|
def get_cost_price(self, cost_price, **context):
|
|
context.setdefault('names', {})['cost_price'] = cost_price
|
|
context.setdefault('functions', {})['Decimal'] = Decimal
|
|
try:
|
|
amount = simple_eval(decistmt(self.cost_price), **context)
|
|
except (InvalidExpression, SyntaxError) as exception:
|
|
product = self.product or self.template
|
|
raise ProductCostPriceError(
|
|
gettext('stock.msg_invalid_cost_price',
|
|
cost_price=self.cost_price,
|
|
product=product.rec_name if product else '',
|
|
exception=exception)) from exception
|
|
if not isinstance(amount, Decimal):
|
|
product = self.product or self.template
|
|
raise ProductCostPriceError(
|
|
gettext('stock.msg_invalid_cost_price_not_number',
|
|
value=amount,
|
|
cost_price=self.cost_price,
|
|
product=product.rec_name if product else ''))
|
|
return amount
|
|
|
|
@classmethod
|
|
def _get_for_product_domain(cls):
|
|
context = Transaction().context
|
|
return [
|
|
('company', '=', context.get('company')),
|
|
]
|
|
|
|
@classmethod
|
|
def get_for_product(cls, product):
|
|
revisions = cls.search([
|
|
cls._get_for_product_domain(),
|
|
['OR',
|
|
('product', '=', product.id),
|
|
[
|
|
('template', '=', product.template.id),
|
|
('product', '=', None),
|
|
],
|
|
],
|
|
],
|
|
order=[('date', 'ASC'), ('id', 'ASC')])
|
|
return revisions
|
|
|
|
@classmethod
|
|
def apply_up_to(cls, revisions, cost_price, date):
|
|
"""Apply revision to cost price up to date
|
|
revisions list is modified"""
|
|
try:
|
|
while True:
|
|
revision = revisions.pop(0)
|
|
if revision.date <= date:
|
|
cost_price = revision.get_cost_price(cost_price)
|
|
else:
|
|
revisions.insert(0, revision)
|
|
break
|
|
except IndexError:
|
|
pass
|
|
return cost_price
|