# This file is part of Tryton. The COPYRIGHT file at the top level of # this repository contains the full copyright notices and license terms. from decimal import Decimal from itertools import chain from simpleeval import simple_eval from trytond import backend, config from trytond.i18n import gettext from trytond.model import ( DeactivableMixin, ModelSQL, ModelView, Workflow, fields) from trytond.modules.company.model import ( CompanyMultiValueMixin, CompanyValueMixin) from trytond.modules.currency.fields import Monetary from trytond.pool import Pool, PoolMeta from trytond.pyson import Bool, Eval from trytond.tools import decistmt from trytond.transaction import Transaction from .exceptions import FormulaError class AdvancePaymentTerm( DeactivableMixin, ModelSQL, ModelView): __name__ = 'sale.advance_payment_term' name = fields.Char("Name", required=True, translate=True) lines = fields.One2Many( 'sale.advance_payment_term.line', 'advance_payment_term', "Lines") def get_advance_payment_context(self, sale): return { 'total_amount': sale.total_amount, 'untaxed_amount': sale.untaxed_amount, } def get_lines(self, sale): lines = [] term_context = self.get_advance_payment_context(sale) for sale_line in self.lines: line = sale_line.get_line(sale.currency, **term_context) if line.amount > 0: lines.append(line) return lines class AdvancePaymentTermLine(ModelView, ModelSQL, CompanyMultiValueMixin): __name__ = 'sale.advance_payment_term.line' _rec_name = 'description' advance_payment_term = fields.Many2One( 'sale.advance_payment_term', "Advance Payment Term", required=True, ondelete='CASCADE') description = fields.Char( "Description", required=True, translate=True, help="Used as description for the invoice line.") account = fields.MultiValue( fields.Many2One('account.account', "Account", required=True, domain=[ ('type.unearned_revenue', '=', True), ], help="Used for the line of advance payment invoice.")) accounts = fields.One2Many( 'sale.advance_payment_term.line.account', 'line', "Accounts") block_supply = fields.Boolean( "Block Supply", help="Check to prevent any supply request before advance payment.") block_shipping = fields.Boolean( "Block Shipping", help="Check to prevent the packing of the shipment " "before advance payment.") invoice_delay = fields.TimeDelta( "Invoice Delay", help="Delta to apply on the sale date for the date of " "the advance payment invoice.") formula = fields.Char('Formula', required=True, help="A python expression used to compute the advance payment amount " "that will be evaluated with:\n" "- total_amount: The total amount of the sale.\n" "- untaxed_amount: The total untaxed amount of the sale.") @classmethod def __setup__(cls): super().__setup__() cls.__access__.add('advance_payment_term') @fields.depends('formula', 'description') def pre_validate(self, **names): super().pre_validate() names['total_amount'] = names['untaxed_amount'] = 0 try: if not isinstance(self.compute_amount(**names), Decimal): raise Exception('The formula does not return a Decimal') except Exception as exception: raise FormulaError( gettext('sale_advance_payment.msg_term_line_invalid_formula', formula=self.formula, term_line=self.description or '', exception=exception)) from exception def get_compute_amount_context(self, **names): return { 'names': names, 'functions': { 'Decimal': Decimal, }, } def compute_amount(self, **names): context = self.get_compute_amount_context(**names) return simple_eval(decistmt(self.formula), **context) def get_line(self, currency, **context): pool = Pool() Line = pool.get('sale.advance_payment.line') return Line( block_supply=self.block_supply, block_shipping=self.block_shipping, amount=currency.round(self.compute_amount(**context)), account=self.account, invoice_delay=self.invoice_delay, description=self.description) class AdvancePaymentTermLineAccount(ModelSQL, CompanyValueMixin): __name__ = 'sale.advance_payment_term.line.account' line = fields.Many2One( 'sale.advance_payment_term.line', "Line", required=True, ondelete='CASCADE', context={ 'company': Eval('company', -1), }) account = fields.Many2One( 'account.account', "Account", required=True, domain=[ ('type.unearned_revenue', '=', True), ('company', '=', Eval('company', -1)), ]) class AdvancePaymentLine(ModelSQL, ModelView): __name__ = 'sale.advance_payment.line' _rec_name = 'description' _states = { 'readonly': Eval('sale_state') != 'draft', } sale = fields.Many2One( 'sale.sale', "Sale", required=True, ondelete='CASCADE', states={ 'readonly': ((Eval('sale_state') != 'draft') & Bool(Eval('sale'))), }) description = fields.Char("Description", required=True, states=_states) amount = Monetary( "Amount", currency='currency', digits='currency', states=_states) account = fields.Many2One( 'account.account', "Account", required=True, domain=[ ('type.unearned_revenue', '=', True), ('company', '=', Eval('sale_company', -1)), ], states=_states) block_supply = fields.Boolean("Block Supply", states=_states) block_shipping = fields.Boolean("Block Shipping", states=_states) invoice_delay = fields.TimeDelta("Invoice Delay", states=_states) invoice_lines = fields.One2Many( 'account.invoice.line', 'origin', "Invoice Lines", readonly=True) completed = fields.Function(fields.Boolean("Completed"), 'get_completed') sale_state = fields.Function(fields.Selection( 'get_sale_states', "Sale State"), 'on_change_with_sale_state') sale_company = fields.Function(fields.Many2One( 'company.company', "Company"), 'on_change_with_sale_company') currency = fields.Function(fields.Many2One( 'currency.currency', "Currency"), 'on_change_with_currency') del _states @classmethod def __setup__(cls): super().__setup__() cls._order.insert(0, ('amount', 'ASC')) cls.__access__.add('sale') @classmethod def __register__(cls, module): # Migration from 7.0: rename condition into line backend.TableHandler.table_rename( config.get( 'table', 'sale.advance_payment.condition', default='sale_advance_payment_condition'), cls._table) super().__register__(module) @classmethod def get_sale_states(cls): pool = Pool() Sale = pool.get('sale.sale') return Sale.fields_get(['state'])['state']['selection'] @fields.depends('sale', '_parent_sale.state') def on_change_with_sale_state(self, name=None): if self.sale: return self.sale.state @fields.depends('sale', '_parent_sale.company') def on_change_with_sale_company(self, name=None): return self.sale.company if self.sale else None @fields.depends('sale', '_parent_sale.currency') def on_change_with_currency(self, name=None): return self.sale.currency if self.sale else None @classmethod def copy(cls, lines, default=None): if default is None: default = {} else: default = default.copy() default.setdefault('invoice_lines', []) return super().copy(lines, default) def create_invoice(self): invoice = self.sale._get_invoice() if self.invoice_delay is not None: invoice.invoice_date = self.sale.sale_date + self.invoice_delay invoice.payment_term = None invoice_lines = self.get_invoice_advance_payment_lines(invoice) if not invoice_lines: return None invoice.lines = invoice_lines return invoice def get_invoice_advance_payment_lines(self, invoice): pool = Pool() InvoiceLine = pool.get('account.invoice.line') advance_amount = self._get_advance_amount() advance_amount += self._get_ignored_amount() if advance_amount >= self.amount: return [] invoice_line = InvoiceLine() invoice_line.invoice = invoice invoice_line.type = 'line' invoice_line.quantity = 1 invoice_line.account = self.account invoice_line.unit_price = self.amount - advance_amount invoice_line.description = self.description invoice_line.origin = self invoice_line.company = self.sale.company invoice_line.currency = self.sale.currency # Set taxes invoice_line.on_change_account() return [invoice_line] def _get_advance_amount(self): return sum(l.amount for l in self.invoice_lines if l.invoice.state != 'cancelled') def _get_ignored_amount(self): skips = {l for i in self.sale.invoices_recreated for l in i.lines} return sum(l.amount for l in self.invoice_lines if l.invoice.state == 'cancelled' and l not in skips) def get_completed(self, name): advance_amount = 0 lines_ignored = set(l for i in self.sale.invoices_ignored for l in i.lines) for l in self.invoice_lines: if l.invoice.state == 'paid' or l in lines_ignored: advance_amount += l.amount return advance_amount >= self.amount class Sale(metaclass=PoolMeta): __name__ = 'sale.sale' advance_payment_term = fields.Many2One('sale.advance_payment_term', 'Advance Payment Term', ondelete='RESTRICT', states={ 'readonly': Eval('state') != 'draft', }) advance_payment_lines = fields.One2Many( 'sale.advance_payment.line', 'sale', "Advance Payment Lines", states={ 'readonly': Eval('state') != 'draft', }) advance_payment_invoices = fields.Function(fields.Many2Many( 'account.invoice', None, None, "Advance Payment Invoices"), 'get_advance_payment_invoices', searcher='search_advance_payment_invoices') @classmethod def __setup__(cls): super().__setup__() cls.invoices_ignored.domain = [ 'OR', cls.invoices_ignored.domain, [ ('id', 'in', Eval('advance_payment_invoices', [])), ('state', '=', 'cancelled'), ], ] @classmethod @ModelView.button @Workflow.transition('quotation') def quote(cls, sales): pool = Pool() AdvancePaymentLine = pool.get('sale.advance_payment.line') super().quote(sales) AdvancePaymentLine.delete( list(chain(*(s.advance_payment_lines for s in sales)))) for sale in sales: sale.set_advance_payment_term() cls.save(sales) @classmethod def copy(cls, sales, default=None): if default is None: default = {} else: default = default.copy() default.setdefault('advance_payment_lines', None) return super().copy(sales, default=default) def set_advance_payment_term(self): pool = Pool() AdvancePaymentTerm = pool.get('sale.advance_payment_term') if self.advance_payment_term: if self.party and self.party.lang: with Transaction().set_context(language=self.party.lang.code): advance_payment_term = AdvancePaymentTerm( self.advance_payment_term.id) else: advance_payment_term = self.advance_payment_term self.advance_payment_lines = advance_payment_term.get_lines(self) def get_advance_payment_invoices(self, name): invoices = set() for line in self.advance_payment_lines: for invoice_line in line.invoice_lines: if invoice_line.invoice: invoices.add(invoice_line.invoice.id) return list(invoices) @classmethod def search_advance_payment_invoices(cls, name, clause): return [('advance_payment_lines.invoice_lines.invoice' + clause[0][len(name):], *clause[1:])] @property def _invoices_for_state(self): return super()._invoices_for_state + self.advance_payment_invoices def get_recall_lines(self, invoice): pool = Pool() InvoiceLine = pool.get('account.invoice.line') recall_lines = [] advance_lines = [ l for c in self.advance_payment_lines for l in c.invoice_lines if l.type == 'line' and l.invoice.state == 'paid'] for advance_line in advance_lines: amount = advance_line.amount for recalled_line in advance_line.advance_payment_recalled_lines: amount += recalled_line.amount if amount: line = InvoiceLine( invoice=invoice, company=invoice.company, type='line', quantity=-1, account=advance_line.account, unit_price=amount, description=advance_line.description, origin=advance_line, taxes=advance_line.taxes, taxes_date=advance_line.tax_date, ) recall_lines.append(line) return recall_lines @classmethod def _process_invoice(cls, sales): pool = Pool() Invoice = pool.get('account.invoice') invoices = [] for sale in sales: if (sale.advance_payment_eligible() and not sale.advance_payment_completed): for line in sale.advance_payment_lines: invoice = line.create_invoice() if invoice: invoices.append(invoice) Invoice.save(invoices) super()._process_invoice(sales) def create_invoice(self): invoice = super().create_invoice() if (invoice is not None and self.advance_payment_eligible() and self.advance_payment_completed): invoice.lines = ( list(getattr(invoice, 'lines', ())) + self.get_recall_lines(invoice)) return invoice def advance_payment_eligible(self, shipment_type=None): """ Returns True when the shipment_type is eligible to further processing of the sale's advance payment. """ return bool((shipment_type == 'out' or shipment_type is None) and self.advance_payment_lines) @property def advance_payment_completed(self): """ Returns True when the advance payment process is completed """ return (bool(self.advance_payment_lines) and all(c.completed for c in self.advance_payment_lines)) @property def supply_blocked(self): for line in self.advance_payment_lines: if not line.block_supply: continue if not line.completed: return True return False @property def shipping_blocked(self): for line in self.advance_payment_lines: if not line.block_shipping: continue if not line.completed: return True return False class SaleLine(metaclass=PoolMeta): __name__ = 'sale.line' def get_move(self, shipment_type): move = super().get_move(shipment_type) if (self.sale.advance_payment_eligible(shipment_type) and self.sale.supply_blocked): return None return move def get_purchase_request(self, product_quantities): request = super().get_purchase_request(product_quantities) if (self.sale.advance_payment_eligible() and self.sale.supply_blocked): return None return request def get_invoice_line(self): lines = super().get_invoice_line() if (self.sale.advance_payment_eligible() and not self.sale.advance_payment_completed): return [] return lines class HandleInvoiceException(metaclass=PoolMeta): __name__ = 'sale.handle.invoice.exception' def default_ask(self, fields): default = super().default_ask(fields) invoices = default['domain_invoices'] sale = self.record skips = set(sale.invoices_ignored) skips.update(sale.invoices_recreated) for invoice in sale.advance_payment_invoices: if invoice.state == 'cancelled' and invoice not in skips: invoices.append(invoice.id) return default