# This file is part of Tryton. The COPYRIGHT file at the top level of # this repository contains the full copyright notices and license terms. from collections import defaultdict from decimal import Decimal from dateutil.relativedelta import relativedelta from sql import Literal, Null, With from sql.aggregate import Max, Min, Sum from sql.conditionals import Case, Coalesce from sql.functions import Ceil, DateTrunc, Log, Power, Round from trytond.i18n import lazy_gettext from trytond.model import ModelSQL, ModelView, fields, sum_tree from trytond.modules.currency.fields import Monetary from trytond.pool import Pool from trytond.pyson import Eval, If from trytond.tools import grouped_slice, pairwise_longest, reduce_ids from trytond.tools.chart import sparkline from trytond.transaction import Transaction class Abstract(ModelSQL, ModelView): company = fields.Many2One( 'company.company', lazy_gettext('stock.msg_stock_reporting_company')) cost = Monetary( lazy_gettext('stock.msg_stock_reporting_cost'), currency='currency', digits='currency') revenue = Monetary( lazy_gettext('stock.msg_stock_reporting_revenue'), currency='currency', digits='currency') profit = Monetary( lazy_gettext('stock.msg_stock_reporting_profit'), currency='currency', digits='currency') margin = fields.Numeric( lazy_gettext('stock.msg_stock_reporting_margin'), digits=(None, 4), states={ 'invisible': ~Eval('margin'), }) margin_trend = fields.Function(fields.Char( lazy_gettext('stock.msg_stock_reporting_margin_trend')), 'get_trend') time_series = None currency = fields.Many2One( 'currency.currency', lazy_gettext('stock.msg_stock_reporting_currency')) @classmethod def table_query(cls): from_item, tables, withs = cls._joins() return from_item.select(*cls._columns(tables, withs), where=cls._where(tables, withs), group_by=cls._group_by(tables, withs), with_=withs.values()) @classmethod def _joins(cls): pool = Pool() Company = pool.get('company.company') Currency = pool.get('currency.currency') Move = pool.get('stock.move') Location = pool.get('stock.location') tables = {} tables['move'] = move = Move.__table__() tables['move.company'] = company = Company.__table__() tables['move.company.currency'] = currency = Currency.__table__() tables['move.from_location'] = from_location = Location.__table__() tables['move.to_location'] = to_location = Location.__table__() withs = {} withs['currency_rate'] = currency_rate = With( query=Currency.currency_rate_sql()) withs['currency_rate_company'] = currency_rate_company = With( query=Currency.currency_rate_sql()) from_item = (move .join(currency_rate, type_='LEFT', condition=(move.currency == currency_rate.currency) & (currency_rate.start_date <= move.effective_date) & ((currency_rate.end_date == Null) | (currency_rate.end_date > move.effective_date)) ) .join(company, condition=move.company == company.id) .join(currency, condition=company.currency == currency.id) .join(currency_rate_company, condition=(company.currency == currency_rate_company.currency) & (currency_rate_company.start_date <= move.effective_date) & ((currency_rate_company.end_date == Null) | (currency_rate_company.end_date > move.effective_date)) ) .join(from_location, condition=(move.from_location == from_location.id)) .join(to_location, condition=(move.to_location == to_location.id))) return from_item, tables, withs @classmethod def _columns(cls, tables, withs): move = tables['move'] from_location = tables['move.from_location'] to_location = tables['move.to_location'] currency = tables['move.company.currency'] sign = Case( (from_location.type.in_(cls._to_location_types()) & to_location.type.in_(cls._from_location_types()), -1), else_=1) cost = cls._column_cost(tables, withs, sign) revenue = cls._column_revenue(tables, withs, sign) profit = revenue - cost margin = Case( (revenue != 0, profit / revenue), else_=Null) return [ cls._column_id(tables, withs).as_('id'), move.company.as_('company'), cls.cost.sql_cast( Round(cost, currency.digits)).as_('cost'), cls.revenue.sql_cast( Round(revenue, currency.digits)).as_('revenue'), cls.profit.sql_cast( Round(profit, currency.digits)).as_('profit'), cls.margin.sql_cast( Round(margin, cls.margin.digits[1])).as_('margin'), currency.id.as_('currency'), ] @classmethod def _column_id(cls, tables, withs): move = tables['move'] return Min(move.id) @classmethod def _column_cost(cls, tables, withs, sign): move = tables['move'] return Sum( sign * cls.cost.sql_cast(move.internal_quantity) * Coalesce(move.cost_price, 0)) @classmethod def _column_revenue(cls, tables, withs, sign): move = tables['move'] currency = withs['currency_rate'] currency_company = withs['currency_rate_company'] return Sum( sign * cls.revenue.sql_cast(move.quantity) * Coalesce(move.unit_price, 0) * Coalesce(currency_company.rate / currency.rate, 0)) @classmethod def _group_by(cls, tables, withs): move = tables['move'] currency = tables['move.company.currency'] return [move.company, currency.id, currency.digits] @classmethod def _where(cls, tables, withs): context = Transaction().context move = tables['move'] from_location = tables['move.from_location'] to_location = tables['move.to_location'] where = move.company == context.get('company') where &= (( from_location.type.in_(cls._from_location_types()) & to_location.type.in_(cls._to_location_types())) | ( from_location.type.in_(cls._to_location_types()) & to_location.type.in_(cls._from_location_types()))) where &= move.state == 'done' from_date = context.get('from_date') if from_date: where &= move.effective_date >= from_date to_date = context.get('to_date') if to_date: where &= move.effective_date <= to_date return where @classmethod def _from_location_types(cls): return ['storage', 'drop'] @classmethod def _to_location_types(cls): types = ['customer'] if Transaction().context.get('include_lost'): types += ['lost_found'] return types @property def time_series_all(self): delta = self._period_delta() for ts, next_ts in pairwise_longest(self.time_series or []): yield ts if delta and next_ts: date = ts.date + delta while date < next_ts.date: yield None date += delta @classmethod def _period_delta(cls): context = Transaction().context return { 'year': relativedelta(years=1), 'month': relativedelta(months=1), 'day': relativedelta(days=1), }.get(context.get('period')) def get_trend(self, name): name = name[:-len('_trend')] return sparkline( [getattr(ts, name) or 0 if ts else 0 for ts in self.time_series_all]) @classmethod def view_attributes(cls): return super().view_attributes() + [ ('/tree/field[@name="profit"]', 'visual', If(Eval('profit', 0) < 0, 'danger', '')), ('/tree/field[@name="margin"]', 'visual', If(Eval('margin', 0) < 0, 'danger', '')), ] class AbstractTimeseries(Abstract): date = fields.Date("Date") @classmethod def __setup__(cls): super().__setup__() cls._order.insert(0, ('date', 'ASC')) @classmethod def _columns(cls, tables, withs): return super()._columns(tables, withs) + [ cls._column_date(tables, withs).as_('date')] @classmethod def _column_date(cls, tables, withs): context = Transaction().context move = tables['move'] date = DateTrunc(context.get('period'), move.effective_date) date = cls.date.sql_cast(date) return date @classmethod def _group_by(cls, tables, withs): return super()._group_by(tables, withs) + [ cls._column_date(tables, withs)] class Context(ModelView): __name__ = 'stock.reporting.margin.context' company = fields.Many2One('company.company', "Company", required=True) from_date = fields.Date("From Date", domain=[ If(Eval('to_date') & Eval('from_date'), ('from_date', '<=', Eval('to_date')), ()), ]) to_date = fields.Date("To Date", domain=[ If(Eval('from_date') & Eval('to_date'), ('to_date', '>=', Eval('from_date')), ()), ]) period = fields.Selection([ ('year', "Year"), ('month', "Month"), ('day', "Day"), ], "Period", required=True) include_lost = fields.Boolean( "Include Lost", help="If checked, the cost of product moved " "to a lost and found location is included.") @classmethod def default_company(cls): return Transaction().context.get('company') @classmethod def default_from_date(cls): pool = Pool() Date = pool.get('ir.date') context = Transaction().context if 'from_date' in context: return context['from_date'] return Date.today() - relativedelta(years=1) @classmethod def default_to_date(cls): pool = Pool() Date = pool.get('ir.date') context = Transaction().context if 'to_date' in context: return context['to_date'] return Date.today() @classmethod def default_period(cls): return Transaction().context.get('period', 'month') @classmethod def default_include_lost(cls): return Transaction().context.get('include_lost', False) class Main(Abstract, ModelView): __name__ = 'stock.reporting.margin.main' time_series = fields.Function(fields.Many2Many( 'stock.reporting.margin.main.time_series', None, None, "Time Series"), 'get_time_series') def get_rec_name(self, name): return '' def get_time_series(self, name): pool = Pool() Timeseries = pool.get('stock.reporting.margin.main.time_series') return [t.id for t in Timeseries.search([])] class MainTimeseries(AbstractTimeseries, ModelView): __name__ = 'stock.reporting.margin.main.time_series' class ProductMixin: __slots__ = () product = fields.Many2One( 'product.product', "Product", context={ 'company': Eval('company', -1), }, depends={'company'}) internal_quantity = fields.Float("Internal Quantity") quantity = fields.Function(fields.Float( "Quantity", digits='unit'), 'get_quantity') unit = fields.Many2One('product.uom', "Unit") @classmethod def _joins(cls): pool = Pool() Product = pool.get('product.product') Template = pool.get('product.template') from_item, tables, withs = super()._joins() if 'move.product' not in tables: product = Product.__table__() tables['move.product'] = product move = tables['move'] from_item = (from_item .join(product, condition=move.product == product.id)) if 'move.product.template' not in tables: template = Template.__table__() tables['move.product.template'] = template product = tables['move.product'] from_item = (from_item .join(template, condition=product.template == template.id)) return from_item, tables, withs @classmethod def _columns(cls, tables, withs): move = tables['move'] from_location = tables['move.from_location'] to_location = tables['move.to_location'] template = tables['move.product.template'] sign = Case( (from_location.type.in_(cls._to_location_types()) & to_location.type.in_(cls._from_location_types()), -1), else_=1) return super()._columns(tables, withs) + [ move.product.as_('product'), Sum(sign * move.internal_quantity).as_('internal_quantity'), template.default_uom.as_('unit'), ] @classmethod def _group_by(cls, tables, withs): move = tables['move'] template = tables['move.product.template'] return super()._group_by(tables, withs) + [ move.product, template.default_uom] def get_rec_name(self, name): return self.product.rec_name @classmethod def search_rec_name(cls, name, clause): return [('product.rec_name', *clause[1:])] def get_quantity(self, name): return self.unit.round(self.internal_quantity) class Product(ProductMixin, Abstract, ModelView): __name__ = 'stock.reporting.margin.product' time_series = fields.One2Many( 'stock.reporting.margin.product.time_series', 'product', "Time Series") @classmethod def __setup__(cls): super().__setup__() cls._order.insert(0, ('product', 'ASC')) @classmethod def _column_id(cls, tables, withs): move = tables['move'] return move.product class ProductTimeseries(ProductMixin, AbstractTimeseries, ModelView): __name__ = 'stock.reporting.margin.product.time_series' class CategoryMixin: __slots__ = () category = fields.Many2One('product.category', "Category") @classmethod def _joins(cls): pool = Pool() Product = pool.get('product.product') TemplateCategory = pool.get('product.template-product.category.all') from_item, tables, withs = super()._joins() if 'move.product' not in tables: product = Product.__table__() tables['move.product'] = product move = tables['move'] from_item = (from_item .join(product, condition=move.product == product.id)) if 'move.product.template_category' not in tables: template_category = TemplateCategory.__table__() tables['move.product.template_category'] = template_category product = tables['move.product'] from_item = (from_item .join(template_category, condition=product.template == template_category.template)) return from_item, tables, withs @classmethod def _columns(cls, tables, withs): template_category = tables['move.product.template_category'] return super()._columns(tables, withs) + [ template_category.category.as_('category')] @classmethod def _column_id(cls, tables, withs): pool = Pool() Category = pool.get('product.category') category = Category.__table__() move = tables['move'] template_category = tables['move.product.template_category'] # Get a stable number of category over time # by using number one order bigger. nb_category = category.select( Power(10, (Ceil(Log(Max(category.id))) + Literal(1)))) return Min(move.id * nb_category + template_category.id) @classmethod def _group_by(cls, tables, withs): template_category = tables['move.product.template_category'] return super()._group_by(tables, withs) + [template_category.category] @classmethod def _where(cls, tables, withs): template_category = tables['move.product.template_category'] where = super()._where(tables, withs) where &= template_category.category != Null return where def get_rec_name(self, name): return self.category.rec_name if self.category else None @classmethod def search_rec_name(cls, name, clause): return [('category.rec_name', *clause[1:])] class Category(CategoryMixin, Abstract, ModelView): __name__ = 'stock.reporting.margin.category' time_series = fields.One2Many( 'stock.reporting.margin.category.time_series', 'category', "Time Series") @classmethod def __setup__(cls): super().__setup__() cls._order.insert(0, ('category', 'ASC')) @classmethod def _column_id(cls, tables, withs): template_category = tables['move.product.template_category'] return template_category.category class CategoryTimeseries(CategoryMixin, AbstractTimeseries, ModelView): __name__ = 'stock.reporting.margin.category.time_series' class CategoryTree(ModelSQL, ModelView): __name__ = 'stock.reporting.margin.category.tree' name = fields.Function( fields.Char("Name"), 'get_name', searcher='search_name') parent = fields.Many2One('stock.reporting.margin.category.tree', "Parent") children = fields.One2Many( 'stock.reporting.margin.category.tree', 'parent', "Children") cost = fields.Function(Monetary( lazy_gettext('stock.msg_stock_reporting_cost'), currency='currency', digits='currency'), 'get_total') revenue = fields.Function(Monetary( lazy_gettext('stock.msg_stock_reporting_revenue'), currency='currency', digits='currency'), 'get_total') profit = fields.Function(Monetary( lazy_gettext('stock.msg_stock_reporting_profit'), currency='currency', digits='currency'), 'get_total') margin = fields.Function(Monetary( lazy_gettext('stock.msg_stock_reporting_margin'), digits=(None, 4)), 'get_margin') currency = fields.Function(fields.Many2One( 'currency.currency', lazy_gettext('stock.msg_stock_reporting_currency')), 'get_currency') @classmethod def __setup__(cls): super().__setup__() cls._order.insert(0, ('name', 'ASC')) @classmethod def table_query(cls): pool = Pool() Category = pool.get('product.category') return Category.__table__() @classmethod def get_name(cls, categories, name): pool = Pool() Category = pool.get('product.category') categories = Category.browse(categories) return {c.id: c.name for c in categories} @classmethod def search_name(cls, name, clause): pool = Pool() Category = pool.get('product.category') return [('id', 'in', Category.search([clause], query=True))] @classmethod def order_name(cls, tables): pool = Pool() Category = pool.get('product.category') table, _ = tables[None] if 'category' not in tables: category = Category.__table__() tables['category'] = { None: (category, table.id == category.id), } return Category.name.convert_order( 'name', tables['category'], Category) def time_series_all(self): return [] @classmethod def get_total(cls, categories, names): pool = Pool() ReportingCategory = pool.get('stock.reporting.margin.category') reporting_category = ReportingCategory.__table__() cursor = Transaction().connection.cursor() categories = cls.search([ ('parent', 'child_of', [c.id for c in categories]), ]) ids = [c.id for c in categories] reporting_categories = [] for sub_ids in grouped_slice(ids): sub_ids = list(sub_ids) where = reduce_ids(reporting_category.id, sub_ids) cursor.execute( *reporting_category.select(reporting_category.id, where=where)) reporting_categories.extend(r for r, in cursor) result = {} reporting_categories = ReportingCategory.browse(reporting_categories) for name in names: values = defaultdict( Decimal, {c.id: getattr(c, name) for c in reporting_categories}) result[name] = sum_tree(categories, values) return result def get_margin(self, name): digits = self.__class__.margin.digits if self.profit is not None and self.revenue: return (self.profit / self.revenue).quantize( Decimal(1) / 10 ** digits[1]) def get_currency(self, name): pool = Pool() Company = pool.get('company.company') company = Transaction().context.get('company') if company is not None and company >= 0: return Company(company).currency.id @classmethod def view_attributes(cls): return super().view_attributes() + [ ('/tree/field[@name="profit"]', 'visual', If(Eval('profit', 0) < 0, 'danger', '')), ('/tree/field[@name="margin"]', 'visual', If(Eval('margin', 0) < 0, 'danger', '')), ]