456 lines
15 KiB
Python
456 lines
15 KiB
Python
from collections import defaultdict
|
|
from decimal import Decimal
|
|
from functools import wraps
|
|
from itertools import groupby, zip_longest
|
|
from operator import attrgetter
|
|
|
|
from dateutil.relativedelta import relativedelta
|
|
from sql.conditionals import Coalesce
|
|
|
|
from trytond.i18n import gettext
|
|
from trytond.model import (
|
|
ModelSQL, ModelView, fields, sequence_ordered, sum_tree, tree)
|
|
from trytond.modules.currency.fields import Monetary
|
|
from trytond.pool import Pool, PoolMeta
|
|
from trytond.pyson import Bool, Eval, If
|
|
from trytond.report import Report
|
|
from trytond.transaction import Transaction
|
|
|
|
from .exceptions import InvoiceConsolidationCompanyError
|
|
|
|
|
|
def with_currency_date(func):
|
|
@wraps(func)
|
|
def wrapper(*args, **kwargs):
|
|
pool = Pool()
|
|
Date = pool.get('ir.date')
|
|
today = Date.today()
|
|
transaction = Transaction()
|
|
context = transaction.context
|
|
with transaction.set_context(
|
|
date=context.get('date', context.get('to_date', today))):
|
|
return func(*args, **kwargs)
|
|
return wrapper
|
|
|
|
|
|
class Type(metaclass=PoolMeta):
|
|
__name__ = 'account.account.type'
|
|
consolidation = fields.Many2One(
|
|
'account.consolidation', "Consolidation",
|
|
domain=[
|
|
('statement', '=', Eval('statement')),
|
|
If(Eval('statement') == 'balance',
|
|
('assets', '=', Eval('assets', False)),
|
|
()),
|
|
])
|
|
|
|
|
|
class Move(metaclass=PoolMeta):
|
|
__name__ = 'account.move'
|
|
|
|
consolidation_company = fields.Many2One(
|
|
'company.company', "Consolidation Company",
|
|
domain=[
|
|
('id', '!=', Eval('company', -1)),
|
|
])
|
|
|
|
@classmethod
|
|
def __setup__(cls):
|
|
super().__setup__()
|
|
cls._check_modify_exclude.add('consolidation_company')
|
|
|
|
|
|
class MoveLine(metaclass=PoolMeta):
|
|
__name__ = 'account.move.line'
|
|
|
|
consolidation_company = fields.Function(fields.Many2One(
|
|
'company.company', "Consolidation Company"),
|
|
'get_move_field',
|
|
setter='set_move_field',
|
|
searcher='search_move_field')
|
|
|
|
@classmethod
|
|
def __setup__(cls):
|
|
super().__setup__()
|
|
cls._check_modify_exclude.add('consolidation_company')
|
|
|
|
@classmethod
|
|
def query_get(cls, table):
|
|
pool = Pool()
|
|
Move = pool.get('account.move')
|
|
move = Move.__table__()
|
|
context = Transaction().context
|
|
query, fiscalyear_id = super().query_get(table)
|
|
if context.get('consolidated') and context.get('companies'):
|
|
query &= table.move.in_(move.select(move.id, where=~Coalesce(
|
|
move.consolidation_company, -1).in_(context['companies'])))
|
|
return query, fiscalyear_id
|
|
|
|
|
|
class Invoice(metaclass=PoolMeta):
|
|
__name__ = 'account.invoice'
|
|
|
|
consolidation_company = fields.Many2One(
|
|
'company.company', "Consolidation Company",
|
|
domain=[
|
|
('party', '=', Eval('party', -1)),
|
|
('id', '!=', Eval('company', -1)),
|
|
],
|
|
states={
|
|
'readonly': Eval('state') != 'draft',
|
|
})
|
|
|
|
@fields.depends('party', 'company', 'consolidation_company')
|
|
def on_change_party(self):
|
|
pool = Pool()
|
|
Company = pool.get('company.company')
|
|
super().on_change_party()
|
|
if self.party:
|
|
companies = Company.search([
|
|
('party', '=', self.party.id),
|
|
('id', '!=', self.company.id if self.company else None),
|
|
])
|
|
if len(companies) == 1:
|
|
self.consolidation_company, = companies
|
|
|
|
@classmethod
|
|
def set_number(cls, invoices):
|
|
pool = Pool()
|
|
Company = pool.get('company.company')
|
|
|
|
super().set_number(invoices)
|
|
|
|
companies = Company.search([], order=[('party', None)])
|
|
party2company = {
|
|
party: list(companies)
|
|
for party, companies in groupby(companies, attrgetter('party'))}
|
|
for invoice in invoices:
|
|
if not invoice.consolidation_company:
|
|
companies = party2company.get(invoice.party, [])
|
|
if len(companies) == 1:
|
|
invoice.consolidation_company, = companies
|
|
elif companies:
|
|
raise InvoiceConsolidationCompanyError(
|
|
gettext('account_consolidation.'
|
|
'msg_invoice_consolidation_company_ambiguous',
|
|
invoice=invoice.rec_name,
|
|
party=invoice.party.rec_name))
|
|
cls.save(invoices)
|
|
|
|
def get_move(self):
|
|
previous_move = self.move
|
|
move = super().get_move()
|
|
if move != previous_move:
|
|
move.consolidation_company = self.consolidation_company
|
|
return move
|
|
|
|
|
|
class Consolidation(
|
|
sequence_ordered(), tree(separator='\\'), ModelSQL, ModelView):
|
|
__name__ = 'account.consolidation'
|
|
|
|
parent = fields.Many2One(
|
|
'account.consolidation', "Parent", ondelete="RESTRICT",
|
|
domain=['OR',
|
|
If(Eval('statement') == 'off-balance',
|
|
('statement', '=', 'off-balance'),
|
|
If(Eval('statement') == 'balance',
|
|
('statement', '=', 'balance'),
|
|
('statement', '!=', 'off-balance')),
|
|
),
|
|
('statement', '=', None),
|
|
])
|
|
name = fields.Char("Name", required=True)
|
|
statement = fields.Selection([
|
|
(None, ""),
|
|
('balance', "Balance"),
|
|
('income', "Income"),
|
|
('off-balance', "Off-Balance"),
|
|
], "Statement",
|
|
states={
|
|
'required': Bool(Eval('parent')),
|
|
})
|
|
assets = fields.Boolean(
|
|
"Assets",
|
|
states={
|
|
'invisible': Eval('statement') != 'balance',
|
|
})
|
|
types = fields.One2Many(
|
|
'account.account.type', 'consolidation', "Types",
|
|
domain=[
|
|
('statement', '=', Eval('statement')),
|
|
If(Eval('statement') == 'balance',
|
|
('assets', '=', Eval('assets', False)),
|
|
()),
|
|
],
|
|
add_remove=[
|
|
('consolidation', '=', None),
|
|
])
|
|
children = fields.One2Many('account.consolidation', 'parent', "Children")
|
|
amount = fields.Function(Monetary(
|
|
"Amount", currency='currency', digits='currency'),
|
|
'get_amount')
|
|
currency = fields.Function(fields.Many2One(
|
|
'currency.currency', 'Currency'), 'get_currency')
|
|
amount_cmp = fields.Function(Monetary(
|
|
"Amount", currency='currency', digits='currency'),
|
|
'get_amount_cmp')
|
|
|
|
@classmethod
|
|
def default_assets(cls):
|
|
return False
|
|
|
|
@fields.depends('parent', '_parent_parent.statement')
|
|
def on_change_parent(self):
|
|
if self.parent:
|
|
self.statement = self.parent.statement
|
|
|
|
def get_currency(self, name):
|
|
return Transaction().context.get('currency')
|
|
|
|
@classmethod
|
|
@with_currency_date
|
|
def get_amount(cls, consolidations, name):
|
|
pool = Pool()
|
|
AccountType = pool.get('account.account.type')
|
|
Currency = pool.get('currency.currency')
|
|
User = pool.get('res.user')
|
|
transaction = Transaction()
|
|
context = transaction.context
|
|
user = User(transaction.user)
|
|
|
|
children = cls.search([
|
|
('parent', 'child_of', [c.id for c in consolidations]),
|
|
])
|
|
|
|
types = sum((c.types for c in children), ())
|
|
key = attrgetter('company')
|
|
companies = set(context.get('companies', [])).intersection(
|
|
map(int, user.companies))
|
|
id2types = {}
|
|
for company, types in groupby(sorted(types, key=key), key):
|
|
if company.id not in companies:
|
|
company = None
|
|
else:
|
|
company = company.id
|
|
with transaction.set_context(company=company, consolidated=True):
|
|
types = AccountType.browse(types)
|
|
id2types.update((t.id, t) for t in types)
|
|
|
|
values = defaultdict(Decimal)
|
|
for consolidation in children:
|
|
currency = consolidation.currency
|
|
if not currency:
|
|
continue
|
|
for type_ in consolidation.types:
|
|
type_ = id2types[type_.id]
|
|
if type_.company.id not in companies:
|
|
continue
|
|
value = type_.amount
|
|
if type_.statement == 'balance' and type_.assets:
|
|
value *= -1
|
|
if type_.company.currency != currency:
|
|
value = Currency.compute(
|
|
type_.company.currency, value, currency, round=False)
|
|
values[consolidation.id] += value
|
|
|
|
result = sum_tree(children, values)
|
|
for consolidation in consolidations:
|
|
if consolidation.currency:
|
|
result[consolidation.id] = consolidation.currency.round(
|
|
result[consolidation.id])
|
|
if consolidation.statement == 'balance' and consolidation.assets:
|
|
result[consolidation.id] *= -1
|
|
return result
|
|
|
|
@classmethod
|
|
def get_amount_cmp(cls, consolidations, name):
|
|
transaction = Transaction()
|
|
current = transaction.context
|
|
if not current.get('comparison'):
|
|
return dict.fromkeys([c.id for c in consolidations], None)
|
|
new = {}
|
|
for key, value in current.items():
|
|
if key.endswith('_cmp'):
|
|
new[key[:-4]] = value
|
|
with transaction.set_context(new):
|
|
return cls.get_amount(consolidations, name)
|
|
|
|
@classmethod
|
|
def view_attributes(cls):
|
|
return super().view_attributes() + [
|
|
('/tree/field[@name="amount_cmp"]', 'tree_invisible',
|
|
~Eval('comparison', False)),
|
|
]
|
|
|
|
@classmethod
|
|
def copy(cls, consolidations, default=None):
|
|
if default is None:
|
|
default = {}
|
|
else:
|
|
default = default.copy()
|
|
default.setdefault('types', None)
|
|
return super().copy(consolidations, default=default)
|
|
|
|
|
|
class ConsolidationBalanceSheetContext(ModelView):
|
|
__name__ = 'account.consolidation.balance_sheet.context'
|
|
date = fields.Date("Date", required=True)
|
|
posted = fields.Boolean("Posted Moves", help="Only include posted moves.")
|
|
companies = fields.Many2Many('company.company', None, None, "Companies")
|
|
currency = fields.Many2One('currency.currency', "Currency", required=True)
|
|
|
|
comparison = fields.Boolean("Comparison")
|
|
date_cmp = fields.Date(
|
|
"Date",
|
|
states={
|
|
'required': Eval('comparison', False),
|
|
'invisible': ~Eval('comparison', False),
|
|
})
|
|
|
|
@classmethod
|
|
def default_date(cls):
|
|
Date = Pool().get('ir.date')
|
|
return Transaction().context.get('date', Date.today())
|
|
|
|
@classmethod
|
|
def default_posted(cls):
|
|
return Transaction().context.get('posted', False)
|
|
|
|
@classmethod
|
|
def default_currency(cls):
|
|
pool = Pool()
|
|
Company = pool.get('company.company')
|
|
company_id = Transaction().context.get('company')
|
|
if company_id is not None and company_id >= 0:
|
|
return Company(company_id).currency.id
|
|
|
|
@classmethod
|
|
def default_companies(cls):
|
|
context = Transaction().context
|
|
return context.get(
|
|
'companies',
|
|
[context['company']] if context.get('company') else None)
|
|
|
|
@classmethod
|
|
def default_comparison(cls):
|
|
return False
|
|
|
|
@fields.depends('comparison', 'date', 'date_cmp')
|
|
def on_change_comparison(self):
|
|
self.date_cmp = None
|
|
if self.comparison and self.date:
|
|
self.date_cmp = self.date - relativedelta(years=1)
|
|
|
|
@classmethod
|
|
def view_attributes(cls):
|
|
return super().view_attributes() + [
|
|
('/form/separator[@id="comparison"]', 'states', {
|
|
'invisible': ~Eval('comparison', False),
|
|
}),
|
|
]
|
|
|
|
|
|
class ConsolidationIncomeStatementContext(ModelView):
|
|
__name__ = 'account.consolidation.income_statement.context'
|
|
from_date = fields.Date(
|
|
"From Date",
|
|
domain=[
|
|
If(Eval('to_date') & Eval('from_date'),
|
|
('from_date', '<=', Eval('to_date')),
|
|
()),
|
|
])
|
|
to_date = fields.Date(
|
|
"To Date",
|
|
domain=[
|
|
If(Eval('from_date') & Eval('to_date'),
|
|
('to_date', '>=', Eval('from_date')),
|
|
()),
|
|
])
|
|
companies = fields.Many2Many('company.company', None, None, "Companies")
|
|
currency = fields.Many2One('currency.currency', "Currency", required=True)
|
|
posted = fields.Boolean('Posted Moves', help="Only include posted moves.")
|
|
comparison = fields.Boolean('Comparison')
|
|
from_date_cmp = fields.Date(
|
|
"From Date",
|
|
domain=[
|
|
If(Eval('to_date_cmp') & Eval('from_date_cmp'),
|
|
('from_date_cmp', '<=', Eval('to_date_cmp')),
|
|
()),
|
|
],
|
|
states={
|
|
'invisible': ~Eval('comparison', False),
|
|
})
|
|
to_date_cmp = fields.Date(
|
|
"To Date",
|
|
domain=[
|
|
If(Eval('from_date_cmp') & Eval('to_date_cmp'),
|
|
('to_date_cmp', '>=', Eval('from_date_cmp')),
|
|
()),
|
|
],
|
|
states={
|
|
'invisible': ~Eval('comparison', False),
|
|
})
|
|
|
|
@classmethod
|
|
def default_posted(cls):
|
|
return False
|
|
|
|
@classmethod
|
|
def default_comparison(cls):
|
|
return False
|
|
|
|
@classmethod
|
|
def default_currency(cls):
|
|
pool = Pool()
|
|
Company = pool.get('company.company')
|
|
company_id = Transaction().context.get('company')
|
|
if company_id is not None and company_id >= 0:
|
|
return Company(company_id).currency.id
|
|
|
|
@classmethod
|
|
def default_companies(cls):
|
|
context = Transaction().context
|
|
return context.get(
|
|
'companies',
|
|
[context['company']] if context.get('company') else None)
|
|
|
|
@classmethod
|
|
def view_attributes(cls):
|
|
return super().view_attributes() + [
|
|
('/form/separator[@id="comparison"]', 'states', {
|
|
'invisible': ~Eval('comparison', False),
|
|
}),
|
|
]
|
|
|
|
|
|
class ConsolidationStatement(Report):
|
|
__name__ = 'account.consolidation.statement'
|
|
|
|
@classmethod
|
|
def get_context(cls, records, header, data):
|
|
pool = Pool()
|
|
Company = pool.get('company.company')
|
|
User = pool.get('res.user')
|
|
transaction = Transaction()
|
|
context = transaction.context
|
|
user = User(transaction.user)
|
|
|
|
report_context = super().get_context(records, header, data)
|
|
|
|
companies = set(context.get('companies', [])).intersection(
|
|
map(int, user.companies))
|
|
report_context['companies'] = Company.browse(companies)
|
|
|
|
if data.get('model_context') is not None:
|
|
Context = pool.get(data['model_context'])
|
|
values = {}
|
|
for field in Context._fields:
|
|
if field in context:
|
|
values[field] = context[field]
|
|
report_context['ctx'] = Context(**values)
|
|
|
|
report_context['consolidations'] = zip_longest(
|
|
records, data.get('paths') or [], fillvalue=[])
|
|
return report_context
|