Files
tradon/modules/purchase/purchase_reporting.py
2026-03-14 09:42:12 +00:00

391 lines
12 KiB
Python

# This file is part of Tryton. The COPYRIGHT file at the top level of
# this repository contains the full copyright notices and license terms.
from dateutil.relativedelta import relativedelta
from sql import Null
from sql.aggregate import Count, Min, Sum
from sql.conditionals import Coalesce
from sql.functions import DateTrunc
from trytond.i18n import lazy_gettext
from trytond.model import ModelSQL, ModelView, fields
from trytond.modules.currency.fields import Monetary
from trytond.pool import Pool
from trytond.pyson import Eval, If
from trytond.tools import pairwise_longest, sql_pairing, unpair
from trytond.tools.chart import sparkline
from trytond.transaction import Transaction
class Abstract(ModelSQL):
company = fields.Many2One(
'company.company',
lazy_gettext("purchase.msg_purchase_reporting_company"))
number = fields.Integer(
lazy_gettext("purchase.msg_purchase_reporting_number"),
help=lazy_gettext("purchase.msg_purchase_reporting_number_help"))
expense = Monetary(
lazy_gettext("purchase.msg_purchase_reporting_expense"),
digits='currency', currency='currency')
expense_trend = fields.Function(
fields.Char(
lazy_gettext("purchase.msg_purchase_reporting_expense_trend")),
'get_trend')
time_series = None
currency = fields.Many2One(
'currency.currency',
lazy_gettext("purchase.msg_purchase_reporting_currency"))
@classmethod
def table_query(cls):
from_item, tables = cls._joins()
return from_item.select(*cls._columns(tables),
where=cls._where(tables),
group_by=cls._group_by(tables))
@classmethod
def _joins(cls):
pool = Pool()
Line = pool.get('purchase.line')
Purchase = pool.get('purchase.purchase')
tables = {}
tables['line'] = line = Line.__table__()
tables['line.purchase'] = purchase = Purchase.__table__()
from_item = (line
.join(purchase, condition=line.purchase == purchase.id))
return from_item, tables
@classmethod
def _columns(cls, tables):
line = tables['line']
purchase = tables['line.purchase']
quantity = Coalesce(line.actual_quantity, line.quantity)
expense = cls.expense.sql_cast(
Sum(quantity * line.unit_price))
return [
cls._column_id(tables).as_('id'),
purchase.company.as_('company'),
purchase.currency.as_('currency'),
expense.as_('expense'),
Count(purchase.id, distinct=True).as_('number'),
]
@classmethod
def _column_id(cls, tables):
line = tables['line']
return Min(line.id)
@classmethod
def _group_by(cls, tables):
purchase = tables['line.purchase']
return [purchase.company, purchase.currency]
@classmethod
def _where(cls, tables):
context = Transaction().context
purchase = tables['line.purchase']
where = purchase.company == context.get('company')
where &= purchase.state.in_(cls._purchase_states())
from_date = context.get('from_date')
if from_date:
where &= purchase.purchase_date >= from_date
to_date = context.get('to_date')
if to_date:
where &= purchase.purchase_date <= to_date
warehouse = context.get('warehouse')
if warehouse:
where &= purchase.warehouse == warehouse
return where
@classmethod
def _purchase_states(cls):
return ['confirmed', 'processing', 'done']
@property
def time_series_all(self):
delta = self._period_delta()
for ts, next_ts in pairwise_longest(self.time_series or []):
yield ts
if delta and next_ts:
date = ts.date + delta
while date < next_ts.date:
yield None
date += delta
@classmethod
def _period_delta(cls):
context = Transaction().context
return {
'year': relativedelta(years=1),
'month': relativedelta(months=1),
'day': relativedelta(days=1),
}.get(context.get('period'))
def get_trend(self, name):
name = name[:-len('_trend')]
return sparkline([
getattr(ts, name) if ts else 0 for ts in self.time_series_all])
class AbstractTimeseries(Abstract):
date = fields.Date(lazy_gettext('purchase.msg_purchase_reporting_date'))
@classmethod
def __setup__(cls):
super().__setup__()
cls._order.insert(0, ('date', 'ASC'))
@classmethod
def _columns(cls, tables):
return super()._columns(tables) + [
cls._column_date(tables).as_('date')]
@classmethod
def _column_date(cls, tables):
context = Transaction().context
purchase = tables['line.purchase']
date = DateTrunc(context.get('period'), purchase.purchase_date)
date = cls.date.sql_cast(date)
return date
@classmethod
def _group_by(cls, tables):
return super()._group_by(tables) + [cls._column_date(tables)]
class Context(ModelView):
__name__ = 'purchase.reporting.context'
company = fields.Many2One('company.company', "Company", required=True)
from_date = fields.Date("From Date",
domain=[
If(Eval('to_date') & Eval('from_date'),
('from_date', '<=', Eval('to_date')),
()),
],
depends=['to_date'])
to_date = fields.Date("To Date",
domain=[
If(Eval('from_date') & Eval('to_date'),
('to_date', '>=', Eval('from_date')),
()),
],
depends=['from_date'])
period = fields.Selection([
('year', "Year"),
('month', "Month"),
('day', "Day"),
], "Period", required=True)
warehouse = fields.Many2One(
'stock.location', "Warehouse",
domain=[
('type', '=', 'warehouse'),
])
@classmethod
def default_company(cls):
return Transaction().context.get('company')
@classmethod
def default_from_date(cls):
pool = Pool()
Date = pool.get('ir.date')
context = Transaction().context
if 'from_date' in context:
return context['from_date']
return Date.today() - relativedelta(years=1)
@classmethod
def default_to_date(cls):
pool = Pool()
Date = pool.get('ir.date')
context = Transaction().context
if 'to_date' in context:
return context['to_date']
return Date.today()
@classmethod
def default_period(cls):
return Transaction().context.get('period', 'month')
@classmethod
def default_warehouse(cls):
return Transaction().context.get('warehouse')
class Main(Abstract, ModelView):
__name__ = 'purchase.reporting.main'
time_series = fields.Function(fields.Many2Many(
'purchase.reporting.main.time_series', None, None,
lazy_gettext('purchase.msg_purchase_reporting_time_series')),
'get_time_series')
def get_rec_name(self, name):
return ''
def get_time_series(self, name):
pool = Pool()
Timeseries = pool.get('purchase.reporting.main.time_series')
return [t.id for t in Timeseries.search([])]
class MainTimeseries(AbstractTimeseries, ModelView):
__name__ = 'purchase.reporting.main.time_series'
class SupplierMixin(object):
__slots__ = ()
supplier = fields.Many2One(
'party.party', "Supplier",
context={
'company': Eval('company', -1),
},
depends=['company'])
@classmethod
def _columns(cls, tables):
purchase = tables['line.purchase']
return super()._columns(tables) + [purchase.party.as_('supplier')]
@classmethod
def _group_by(cls, tables):
purchase = tables['line.purchase']
return super()._group_by(tables) + [purchase.party]
def get_rec_name(self, name):
return self.supplier.rec_name
@classmethod
def search_rec_name(cls, name, clause):
return [('supplier.rec_name', *clause[1:])]
class Supplier(SupplierMixin, Abstract, ModelView):
__name__ = 'purchase.reporting.supplier'
time_series = fields.One2Many(
'purchase.reporting.supplier.time_series', 'supplier',
lazy_gettext('purchase.msg_purchase_reporting_time_series'))
@classmethod
def __setup__(cls):
super().__setup__()
cls._order.insert(0, ('supplier', 'ASC'))
@classmethod
def _column_id(cls, tables):
purchase = tables['line.purchase']
return sql_pairing(purchase.party, purchase.currency)
class SupplierTimeseries(SupplierMixin, AbstractTimeseries, ModelView):
__name__ = 'purchase.reporting.supplier.time_series'
supplier_currency = fields.Integer("Supplier - Currency")
@classmethod
def _columns(cls, tables):
purchase = tables['line.purchase']
return super()._columns(tables) + [
sql_pairing(
purchase.party, purchase.currency).as_('supplier_currency'),
]
class ProductMixin(object):
__slots__ = ()
product = fields.Many2One(
'product.product', "Product",
context={
'company': Eval('company', -1),
},
depends=['company'])
product_supplier = fields.Many2One(
'purchase.product_supplier', "Supplier's Product")
@classmethod
def _columns(cls, tables):
line = tables['line']
return super()._columns(tables) + [
line.product.as_('product'),
line.product_supplier.as_('product_supplier'),
]
@classmethod
def _group_by(cls, tables):
line = tables['line']
return super()._group_by(tables) + [
line.product, line.product_supplier]
@classmethod
def _where(cls, tables):
context = Transaction().context
line = tables['line']
purchase = tables['line.purchase']
where = super()._where(tables)
where &= line.product != Null
if context.get('supplier_currency') is not None:
supplier, currency = unpair(context['supplier_currency'])
else:
supplier = context.get('supplier')
currency = context.get('currency')
where &= purchase.party == supplier
where &= purchase.currency == currency
return where
def get_rec_name(self, name):
pool = Pool()
Party = pool.get('party.party')
context = Transaction().context
name = self.product.rec_name if self.product else None
if context.get('supplier'):
supplier = Party(context['supplier'])
name += '@%s' % supplier.rec_name
return name
@classmethod
def search_rec_name(cls, name, clause):
return [('product.rec_name', *clause[1:])]
class Product(ProductMixin, Abstract, ModelView):
__name__ = 'purchase.reporting.product'
time_series = fields.One2Many(
'purchase.reporting.product.time_series', 'product',
lazy_gettext('purchase.msg_purchase_reporting_time_series'))
@classmethod
def __setup__(cls):
super().__setup__()
cls._order.insert(0, ('product', 'ASC'))
@classmethod
def _column_id(cls, tables):
line = tables['line']
purchase = tables['line.purchase']
return sql_pairing(line.product, purchase.currency)
class ProductTimeseries(ProductMixin, AbstractTimeseries, ModelView):
__name__ = 'purchase.reporting.product.time_series'
product_currency = fields.Integer("Product - Currency")
@classmethod
def _columns(cls, tables):
line = tables['line']
purchase = tables['line.purchase']
return super()._columns(tables) + [
sql_pairing(
line.product, purchase.currency).as_('product_currency'),
]