# This file is part of Tryton. The COPYRIGHT file at the top level of # this repository contains the full copyright notices and license terms. from collections import defaultdict from decimal import ROUND_DOWN, Decimal from itertools import groupby from sql.aggregate import Sum from sql.conditionals import Coalesce from trytond import backend from trytond.i18n import gettext from trytond.model import ( ChatMixin, Index, ModelSQL, ModelView, Unique, fields, sequence_ordered, tree) from trytond.modules.account.exceptions import FiscalYearNotFoundError from trytond.modules.currency.fields import Monetary from trytond.pool import Pool from trytond.pyson import Bool, Eval, If from trytond.rpc import RPC from trytond.tools import grouped_slice, reduce_ids, sqlite_apply_types from trytond.transaction import Transaction from trytond.wizard import ( Button, StateAction, StateTransition, StateView, Wizard) from .exceptions import BudgetValidationError class AmountMixin: __slots__ = () actual_amount = fields.Function( Monetary( "Actual Amount", currency='currency', digits='currency', help="The total amount booked against the budget line."), 'get_amount') percentage = fields.Function( fields.Numeric( "Percentage", digits=(None, 4), help="The percentage of booked amount of the budget line."), 'get_amount') @classmethod def view_attributes(cls): return [ ('/tree/field[@name="ratio"]', 'visual', If(Eval('ratio') & (Eval('ratio', 0) > 1), If(Eval('actual_amount', 0) > 0, 'danger', ''), If(Eval('actual_amount', 0) < 0, 'success', ''))), ] @classmethod def get_amount(cls, records, names): transaction = Transaction() cursor = transaction.connection.cursor() records = cls.browse(sorted(records, key=cls._get_amount_group_key)) balances = defaultdict(Decimal) for keys, grouped_records in groupby( records, key=cls._get_amount_group_key): for sub_records in grouped_slice(list(grouped_records)): query = cls._get_amount_query(sub_records, dict(keys)) if backend.name == 'sqlite': sqlite_apply_types(query, [None, 'NUMERIC']) cursor.execute(*query) balances.update(cursor) total = {n: {} for n in names} for record in records: balance = balances[record.id] if 'actual_amount' in names: total['actual_amount'][record.id] = record.currency.round( balance) if 'percentage' in names: if not record.total_amount: percentage = None elif not balance: percentage = Decimal('0.0000') else: percentage = balance / record.total_amount percentage = percentage.quantize(Decimal('.0001')) total['percentage'][record.id] = percentage return total @classmethod def _get_amount_group_key(cls, record): raise NotImplementedError @classmethod def _get_amount_query(cls, records, context): raise NotImplementedError class BudgetMixin(ChatMixin): __slots__ = () name = fields.Char("Name", required=True) company = fields.Many2One( 'company.company', "Company", required=True, states={ 'readonly': Eval('company') & Eval('lines', [-1]), }, help="The company that the budget is associated with.") lines = None root_lines = None @classmethod def default_company(cls): return Transaction().context.get('company') @classmethod def copy(cls, budgets, default=None): if default is None: default = {} else: default = default.copy() default.setdefault('lines', lambda data: data['root_lines']) return super().copy(budgets, default=default) class BudgetLineMixin( tree(name='current_name', separator='\\'), sequence_ordered(), AmountMixin): __slots__ = () budget = None name = fields.Char( "Name", states={ 'required': ~Eval('account'), 'invisible': Bool(Eval('account')), }) account = None current_name = fields.Function( fields.Char("Current Name"), 'on_change_with_current_name', searcher='search_current_name') company = fields.Function( fields.Many2One('company.company', "Company"), 'on_change_with_company') currency = fields.Function( fields.Many2One('currency.currency', "Currency"), 'on_change_with_currency') amount = Monetary( "Amount", currency='currency', digits='currency', help="The amount allocated to the budget line.") total_amount = fields.Function( Monetary( "Total Amount", currency='currency', digits='currency', help="The total amount allocated to " "the budget line and its children."), 'get_total_amount') left = fields.Integer("Left", required=True) right = fields.Integer("Right", required=True) @classmethod def __setup__(cls): cls.parent = fields.Many2One( cls.__name__, "Parent", left='left', right='right', ondelete='CASCADE', domain=[ ('budget', '=', Eval('budget', -1)), ], help="Used to add structure above the budget.") cls.children = fields.One2Many( cls.__name__, 'parent', "Children", domain=[ ('budget', '=', Eval('budget', -1)), ], help="Used to add structure below the budget.") super().__setup__() t = cls.__table__() cls._sql_indexes.add( Index( t, (t.left, Index.Range(cardinality='high')), (t.right, Index.Range(cardinality='high')))) cls.__access__.add('budget') @classmethod def default_left(cls): return 0 @classmethod def default_right(cls): return 0 @fields.depends('budget', '_parent_budget.company') def on_change_with_company(self, name=None): return self.budget.company if self.budget else None @fields.depends('budget', '_parent_budget.company') def on_change_with_currency(self, name=None): if self.budget and self.budget.company: return self.budget.company.currency @classmethod def get_total_amount(cls, records, name): transaction = Transaction() cursor = transaction.connection.cursor() table = cls.__table__() children = cls.__table__() amounts = defaultdict(Decimal) ids = [p.id for p in records] query = (table .join(children, condition=(children.left >= table.left) & (children.right <= table.right)) .select( table.id, Sum(Coalesce(children.amount, 0)).as_(name), group_by=table.id)) if backend.name == 'sqlite': sqlite_apply_types(query, [None, 'NUMERIC']) for sub_ids in grouped_slice(ids): query.where = reduce_ids(table.id, sub_ids) cursor.execute(*query) amounts.update(cursor) for record in records: amount = amounts[record.id] if amount is not None: amounts[record.id] = record.currency.round(amount) return amounts @fields.depends('account', 'name') def on_change_with_current_name(self, name=None): if self.account: return self.account.rec_name else: return self.name @classmethod def search_current_name(cls, name, clause): if clause[1].startswith('!') or clause[1].startswith('not '): bool_op = 'AND' else: bool_op = 'OR' return [bool_op, ('account.rec_name',) + tuple(clause[1:]), ('name',) + tuple(clause[1:]), ] @classmethod def copy(cls, records, default=None): if default is None: default = {} else: default = default.copy() if 'budget' in default and 'children.budget' not in default: default['children.budget'] = default['budget'] if 'amount' in default and 'children.amount' not in default: default['children.amount'] = default['amount'] return super().copy(records, default=default) class BudgetContext(ModelView): __name__ = 'account.budget.context' budget = fields.Many2One('account.budget', "Budget", required=True) posted = fields.Boolean( "Posted", help="Only include posted moves.") periods = fields.Many2Many( 'account.period', None, None, "Periods", domain=[ ('fiscalyear', '=', Eval('fiscalyear', -1)), ('type', '=', 'standard'), ]) fiscalyear = fields.Function( fields.Many2One('account.fiscalyear', "Fiscal Year"), 'on_change_with_fiscalyear') @classmethod def default_budget(cls): pool = Pool() Budget = pool.get('account.budget') FiscalYear = pool.get('account.fiscalyear') context = Transaction().context if 'budget' in context: return context.get('budget') try: fiscalyear = FiscalYear.find( context.get('company'), test_state=False) except FiscalYearNotFoundError: return None budgets = Budget.search([ ('fiscalyear', '=', fiscalyear.id), ], limit=1) if budgets: budget, = budgets return budget.id @fields.depends('budget') def on_change_with_fiscalyear(self, name=None): return self.budget.fiscalyear if self.budget else None class Budget(BudgetMixin, ModelSQL, ModelView): __name__ = 'account.budget' fiscalyear = fields.Many2One( 'account.fiscalyear', "Fiscal Year", required=True, domain=[('company', '=', Eval('company', -1))], help="The fiscal year the budget applies to.") lines = fields.One2Many( 'account.budget.line', 'budget', "Lines", states={ 'readonly': Eval('id', -1) < 0, }, order=[('left', 'ASC'), ('id', 'ASC')]) root_lines = fields.One2Many( 'account.budget.line', 'budget', "Lines", states={ 'readonly': Eval('id', -1) < 0, }, filter=[ ('parent', '=', None), ]) @classmethod def __setup__(cls): super().__setup__() cls._order.insert(0, ('fiscalyear', 'DESC')) cls._buttons.update({ 'update_lines': {}, 'copy_button': {}, }) @classmethod def default_fiscalyear(cls): pool = Pool() FiscalYear = pool.get('account.fiscalyear') try: fiscalyear = FiscalYear.find( cls.default_company(), test_state=False) except FiscalYearNotFoundError: return None return fiscalyear.id @fields.depends('company', 'fiscalyear') def on_change_company(self): pool = Pool() FiscalYear = pool.get('account.fiscalyear') if not self.fiscalyear or self.fiscalyear.company != self.company: try: self.fiscalyear = FiscalYear.find( self.company, test_state=False) except FiscalYearNotFoundError: pass def get_rec_name(self, name): return '%s - %s' % (self.name, self.fiscalyear.rec_name) @classmethod def search_rec_name(cls, name, clause): if clause[1].startswith('!') or clause[1].startswith('not '): bool_op = 'AND' else: bool_op = 'OR' return [bool_op, ('name',) + tuple(clause[1:]), ('fiscalyear.rec_name',) + tuple(clause[1:]), ] def _account_type_domain(self): return [ ('company', '=', self.company.id), ('statement', '=', 'income'), ] def _account_domain(self): return [ ('company', '=', self.company.id), ('type', 'where', self._account_type_domain()), ('closed', '!=', True), ] @classmethod @ModelView.button def update_lines(cls, budgets): pool = Pool() Account = pool.get('account.account') AccountType = pool.get('account.account.type') Line = pool.get('account.budget.line') company_account_types = {} company_accounts = {} for budget in budgets: company = budget.company if company not in company_account_types: company_account_types[company] = set( AccountType.search(budget._account_type_domain())) type_lines = Line.search([ ('budget', '=', budget.id), ('account_type', '!=', None), ]) types2lines = {l.account_type: l for l in type_lines} lines = [] for account_type in ( company_account_types[company] - set(types2lines.keys())): line = Line( budget=budget, account_type=account_type, sequence=account_type.sequence) types2lines[account_type] = line lines.append(line) Line.save(lines) if company not in company_accounts: company_accounts[company] = set( Account.search(budget._account_domain())) account_lines = Line.search([ ('budget', '=', budget.id), ('account', '!=', None), ]) accounts = {l.account for l in account_lines} lines = [] for account in sorted( company_accounts[company] - accounts, key=lambda a: a.code or ''): lines.append(Line( budget=budget, account=account, parent=types2lines.get(account.type))) Line.save(lines) for account_type, line in types2lines.items(): parent = types2lines.get(account_type.parent) if line.parent != parent: line.parent = parent Line.save(types2lines.values()) @classmethod @ModelView.button_action('account_budget.wizard_budget_copy') def copy_button(cls, budgets): pass class BudgetLine(BudgetLineMixin, ModelSQL, ModelView): __name__ = 'account.budget.line' budget = fields.Many2One( 'account.budget', "Budget", required=True, ondelete='CASCADE') account_type = fields.Many2One( 'account.account.type', "Account Type", domain=[ ('company', '=', Eval('company', -1)), ('statement', '=', 'income'), ], states={ 'required': ~Eval('name') & ~Eval('account'), 'invisible': Eval('name') | Eval('account'), }, help="The account type the budget applies to.") account = fields.Many2One( 'account.account', "Account", domain=[ ('company', '=', Eval('company', -1)), ('type.statement', '=', 'income'), ('closed', '!=', True), If(Eval('parent_account_type'), ('type', '=', Eval('parent_account_type', -1)), ()), ], states={ 'required': ~Eval('name') & ~Eval('account_type'), 'invisible': Eval('name') | Eval('account_type'), }, help="The account the budget applies to.") periods = fields.One2Many( 'account.budget.line.period', 'budget_line', "Periods", order=[('period', 'ASC')], help="The periods that contain details of the budget.") parent_account_type = fields.Function( fields.Many2One('account.account.type', "Parent Account Type"), 'on_change_with_parent_account_type') @classmethod def __setup__(cls): super().__setup__() cls.name.states['required'] &= ~Eval('account_type') cls.name.states['invisible'] |= Eval('account_type') cls.name.depends.add('account_type') t = cls.__table__() cls._sql_constraints.extend([ ('budget_account_unique', Unique(t, t.budget, t.account), 'account_budget.msg_budget_line_budget_account_unique'), ('budget_account_type_unique', Unique(t, t.budget, t.account_type), 'account_budget.' 'msg_budget_line_budget_account_type_unique'), ]) cls._buttons.update({ 'create_periods': { 'invisible': Bool(Eval('periods', [1])), }, }) cls.__rpc__.update({ 'create_period': RPC(readonly=False, instantiate=0), }) @fields.depends('parent', '_parent_parent.account_type') def on_change_with_parent_account_type(self, name=None): return self.parent.account_type if self.parent else None @fields.depends('account_type') def on_change_with_current_name(self, name=None): name = super().on_change_with_current_name(name) if self.account_type: name = self.account_type.name return name @classmethod def search_current_name(cls, name, clause): if clause[1].startswith('!') or clause[1].startswith('not '): bool_op = 'AND' else: bool_op = 'OR' return [bool_op, super().search_current_name(name, clause), ('account_type.name',) + tuple(clause[1:]), ] @classmethod def get_total_amount(cls, lines, name): amounts = super().get_total_amount(lines, name) periods = Transaction().context.get('periods') if periods: for line in lines: ratio = sum( p.ratio for p in line.periods if p.period.id in periods) amounts[line.id] = line.currency.round( amounts[line.id] * ratio) return amounts @classmethod def _get_amount_group_key(cls, record): return (('fiscalyear', record.budget.fiscalyear.id),) @classmethod def _get_amount_query(cls, records, context): pool = Pool() Line = pool.get('account.move.line') Period = pool.get('account.period') table = cls.__table__() children = cls.__table__() line = Line.__table__() amount = Sum(Coalesce(line.credit, 0) - Coalesce(line.debit, 0)) red_sql = reduce_ids(table.id, [r.id for r in records]) periods = Transaction().context.get('periods') if not periods: periods = [p.id for p in Period.search([ ('fiscalyear', '=', context.get('fiscalyear')), ('type', '=', 'standard'), ])] with Transaction().set_context(context, periods=periods): query_where, _ = Line.query_get(line) return (table .join( children, condition=(children.left >= table.left) & (children.right <= table.right)) .join( line, condition=line.account == children.account) .select( table.id, amount.as_('amount'), where=red_sql & query_where, group_by=table.id)) @classmethod def copy(cls, lines, default=None): if default is None: default = {} else: default = default.copy() default.setdefault('periods') return super().copy(lines, default=default) @classmethod @ModelView.button_action( 'account_budget.wizard_budget_line_create_periods') def create_periods(cls, lines): pass def create_period(self, method_name, account_periods): pool = Pool() Period = pool.get('account.budget.line.period') method = getattr(self, 'distribute_%s' % method_name) periods = [] if not self.periods: for account_period in account_periods: period = Period() period.budget_line = self period.period = account_period period.ratio = method(period, account_periods).quantize( Decimal('0.0001'), ROUND_DOWN) periods.append(period) for child in self.children: periods.extend(child.create_period(method_name, account_periods)) return periods def distribute_evenly(self, period, periods): return 1 / Decimal(len(periods)) class BudgetLinePeriod(AmountMixin, ModelSQL, ModelView): __name__ = 'account.budget.line.period' budget_line = fields.Many2One( 'account.budget.line', "Budget Line", required=True, ondelete="CASCADE", help="The line that the budget period is part of.") fiscalyear = fields.Function( fields.Many2One('account.fiscalyear', "Fiscal Year"), 'on_change_with_fiscalyear') period = fields.Many2One( 'account.period', "Period", required=True, domain=[ ('fiscalyear', '=', Eval('fiscalyear', -1)), ('type', '=', 'standard'), ]) currency = fields.Function( fields.Many2One('currency.currency', "Currency"), 'on_change_with_currency') ratio = fields.Numeric( "Ratio", digits=(1, 4), help="The percentage allocated to the budget.") total_amount = fields.Function( Monetary( "Total Amount", currency='currency', digits='currency', help="The total amount allocated to the budget and its children."), 'get_total_amount') @classmethod def __setup__(cls): super().__setup__() t = cls.__table__() cls._sql_constraints.append( ('budget_line_period_unique', Unique(t, t.budget_line, t.period), 'account_budget' '.msg_budget_line_period_budget_line_period_unique')) cls.__access__.add('budget_line') cls._order.insert(0, ('period', 'DESC')) @fields.depends('budget_line', '_parent_budget_line.company') def on_change_with_currency(self, name=None): if self.budget_line and self.budget_line.company: return self.budget_line.company.currency @fields.depends( 'budget_line', '_parent_budget_line.budget', '_parent_budget_line._parent_budget.fiscalyear') def on_change_with_fiscalyear(self, name=None): if self.budget_line and self.budget_line.budget: return self.budget_line.budget.fiscalyear @classmethod def get_total_amount(cls, periods, name): amounts = {} with Transaction().set_context(periods=None): periods = cls.browse(periods) for period in periods: if period.budget_line.total_amount is not None: amount = period.currency.round( period.budget_line.total_amount * period.ratio) else: amount = None amounts[period.id] = amount return amounts @classmethod def _get_amount_group_key(cls, record): return (('fiscalyear', record.budget_line.budget.fiscalyear.id),) @classmethod def _get_amount_query(cls, records, context): pool = Pool() BudgetLine = pool.get('account.budget.line') Move = pool.get('account.move') MoveLine = pool.get('account.move.line') Period = pool.get('account.period') table = cls.__table__() budget_line = BudgetLine.__table__() children = BudgetLine.__table__() move = Move.__table__() line = MoveLine.__table__() amount = Sum(Coalesce(line.credit, 0) - Coalesce(line.debit, 0)) red_sql = reduce_ids(table.id, [r.id for r in records]) periods = Transaction().context.get('periods') if not periods: periods = [p.id for p in Period.search([ ('fiscalyear', '=', context.get('fiscalyear')), ('type', '=', 'standard'), ])] with Transaction().set_context(context, periods=periods): query_where, _ = MoveLine.query_get(line) return (table .join(budget_line, condition=budget_line.id == table.budget_line) .join( children, condition=(children.left >= budget_line.left) & (children.right <= budget_line.right)) .join( line, condition=line.account == children.account) .join(move, condition=(line.move == move.id) & (move.period == table.period)) .select( table.id, amount.as_('amount'), where=red_sql & query_where, group_by=table.id)) @classmethod def validate_fields(cls, periods, field_names): super().validate_fields(periods, field_names) cls.check_ratio(periods, field_names) @classmethod def check_ratio(cls, periods, field_names=None): pool = Pool() Line = pool.get('account.budget.line') if field_names and not (field_names & {'ratio', 'budget_line'}): return transaction = Transaction() cursor = transaction.connection.cursor() table = cls.__table__() for sub_ids in grouped_slice({p.budget_line.id for p in periods}): cursor.execute(*table.select( table.budget_line, where=reduce_ids(table.budget_line, sub_ids), group_by=table.budget_line, having=Sum(table.ratio) > 1, limit=1)) try: line_id, = cursor.fetchone() except TypeError: continue line = Line(line_id) raise BudgetValidationError( gettext('account_budget.msg_budget_line_period_ratio', budget_line=line.rec_name)) class CopyBudgetMixin: __slots__ = () def default_start(self, field_names): return { 'name': self.record.name, 'company': self.record.company.id, } def _copy_default(self): default = {'name': self.start.name} factor = self.start.factor if factor != 1: currency = self.record.company.currency default['lines.amount'] = ( lambda data: currency.round(data['amount'] * factor) if data['amount'] else data['amount']) return default def do_copy(self, action): record, = self.model.copy([self.record], default=self._copy_default()) data = {'res_id': [record.id]} action['views'].reverse() return action, data class CopyBudgetStartMixin: __slots__ = () name = fields.Char("Name", required=True) factor = fields.Numeric( "Factor", help="The percentage to apply to the budget line amounts.") company = fields.Many2One('company.company', "Company", readonly=True) @classmethod def default_factor(cls): return Decimal(1) class CopyBudget(CopyBudgetMixin, Wizard): __name__ = 'account.budget.copy' start = StateView('account.budget.copy.start', 'account_budget.budget_copy_start_view_form', [ Button("Cancel", 'end', 'tryton-cancel'), Button("Copy", 'copy', 'tryton-ok', default=True), ]) copy = StateAction('account_budget.act_budget_form') def default_start(self, field_names): values = super().default_start(field_names) values['fiscalyear'] = self.record.fiscalyear.id return values def _copy_default(self): default = super()._copy_default() default['fiscalyear'] = self.start.fiscalyear.id return default class CopyBudgetStart(CopyBudgetStartMixin, ModelView): __name__ = 'account.budget.copy.start' fiscalyear = fields.Many2One( 'account.fiscalyear', "Fiscal Year", required=True, domain=[ ('company', '=', Eval('company', -1)), ], help="The fiscal year during which the new budget will apply.") class CreatePeriods(Wizard): __name__ = 'account.budget.line.create_periods' start = StateView('account.budget.line.create_periods.start', 'account_budget.budget_create_periods_start_view_form', [ Button("Cancel", 'end', 'tryton-cancel'), Button("Create", 'create_periods', 'tryton-ok', default=True), ]) create_periods = StateTransition() def transition_create_periods(self): pool = Pool() AccountPeriod = pool.get('account.period') Period = pool.get('account.budget.line.period') account_periods = AccountPeriod.search([ ('fiscalyear', '=', self.record.budget.fiscalyear.id), ('type', '=', 'standard'), ]) periods = self.record.create_period(self.start.method, account_periods) Period.save(periods) return 'end' class CreatePeriodsStart(ModelView): __name__ = 'account.budget.line.create_periods.start' method = fields.Selection([ ('evenly', "Evenly"), ], "Method", required=True) @classmethod def default_method(cls): return 'evenly'