516 lines
18 KiB
Python
516 lines
18 KiB
Python
# This file is part of Tryton. The COPYRIGHT file at the top level of
|
|
# this repository contains the full copyright notices and license terms.
|
|
|
|
import json
|
|
import re
|
|
from decimal import Decimal
|
|
|
|
from simpleeval import simple_eval
|
|
from stdnum import iso11649
|
|
|
|
from trytond.model import ModelSQL, ModelView, fields, sequence_ordered
|
|
from trytond.modules.currency.fields import Monetary
|
|
from trytond.pool import Pool, PoolMeta
|
|
from trytond.pyson import Eval, If
|
|
from trytond.tools import decistmt
|
|
from trytond.transaction import Transaction
|
|
|
|
|
|
class Statement(metaclass=PoolMeta):
|
|
__name__ = 'account.statement'
|
|
|
|
@classmethod
|
|
def __setup__(cls):
|
|
super().__setup__()
|
|
cls._buttons.update(
|
|
apply_rules={
|
|
'invisible': Eval('state') != 'draft',
|
|
'depends': ['state'],
|
|
},
|
|
)
|
|
|
|
@classmethod
|
|
@ModelView.button
|
|
def apply_rules(cls, statements):
|
|
pool = Pool()
|
|
Rule = pool.get('account.statement.rule')
|
|
Line = pool.get('account.statement.line')
|
|
|
|
lines = []
|
|
rules = Rule.search([])
|
|
for statement in statements:
|
|
lines.extend(statement._apply_rules(rules))
|
|
Line.save(lines)
|
|
|
|
def _apply_rules(self, rules):
|
|
for origin in self.origins:
|
|
if origin.lines:
|
|
continue
|
|
for rule in rules:
|
|
keywords = rule.match(origin)
|
|
if keywords:
|
|
origin.keywords = keywords
|
|
yield from rule.apply(origin, keywords)
|
|
break
|
|
self.origins = self.origins
|
|
self.save()
|
|
|
|
|
|
class StatementOrigin(metaclass=PoolMeta):
|
|
__name__ = 'account.statement.origin'
|
|
|
|
keywords = fields.Dict(None, "Keywords")
|
|
|
|
|
|
class StatementRule(sequence_ordered(), ModelSQL, ModelView):
|
|
__name__ = 'account.statement.rule'
|
|
|
|
name = fields.Char("Name")
|
|
|
|
company = fields.Many2One('company.company', "Company")
|
|
journal = fields.Many2One(
|
|
'account.statement.journal', "Journal",
|
|
domain=[
|
|
If(Eval('company'),
|
|
('company', '=', Eval('company', -1)),
|
|
()),
|
|
])
|
|
amount_low = Monetary(
|
|
"Amount Low", currency='currency', digits='currency',
|
|
domain=[If(Eval('amount_high'),
|
|
['OR',
|
|
('amount_low', '=', None),
|
|
('amount_low', '<=', Eval('amount_high')),
|
|
],
|
|
[])])
|
|
amount_high = Monetary(
|
|
"Amount High", currency='currency', digits='currency',
|
|
domain=[If(Eval('amount_low'),
|
|
['OR',
|
|
('amount_high', '=', None),
|
|
('amount_high', '>=', Eval('amount_low')),
|
|
],
|
|
[])])
|
|
description = fields.Char("Description",
|
|
help="The regular expression the description is searched with.\n"
|
|
"It may define the groups named:\n"
|
|
"'party'\n"
|
|
"'bank_account'\n"
|
|
"'invoice'\n"
|
|
"'payment_reference'")
|
|
information_rules = fields.One2Many(
|
|
'account.statement.rule.information', 'rule', "Information Rules")
|
|
|
|
lines = fields.One2Many(
|
|
'account.statement.rule.line', 'rule', "Lines")
|
|
|
|
currency = fields.Function(fields.Many2One(
|
|
'currency.currency', "Currency"),
|
|
'on_change_with_currency')
|
|
|
|
@fields.depends('journal')
|
|
def on_change_with_currency(self, name=None):
|
|
return self.journal.currency if self.journal else None
|
|
|
|
def match(self, origin):
|
|
keywords = {}
|
|
if self.company and self.company != origin.company:
|
|
return False
|
|
if self.journal and self.journal != origin.statement.journal:
|
|
return False
|
|
if self.amount_low is not None and self.amount_low > origin.amount:
|
|
return False
|
|
if self.amount_high is not None and self.amount_high < origin.amount:
|
|
return False
|
|
if self.information_rules:
|
|
for irule in self.information_rules:
|
|
result = irule.match(origin)
|
|
if isinstance(result, dict):
|
|
keywords.update(result)
|
|
elif not result:
|
|
return False
|
|
if self.description:
|
|
result = re.search(self.description, origin.description or '')
|
|
if not result:
|
|
return False
|
|
keywords.update(result.groupdict())
|
|
keywords.update(amount=origin.amount, pending=origin.amount)
|
|
return keywords
|
|
|
|
def apply(self, origin, keywords):
|
|
keywords = keywords.copy()
|
|
for rule_line in self.lines:
|
|
line = rule_line.get_line(origin, keywords)
|
|
if not line:
|
|
return
|
|
keywords['pending'] -= line.amount
|
|
yield line
|
|
|
|
|
|
class StatementRuleInformation(sequence_ordered(), ModelSQL, ModelView):
|
|
__name__ = 'account.statement.rule.information'
|
|
|
|
rule = fields.Many2One(
|
|
'account.statement.rule', "Rule", required=True, ondelete='CASCADE')
|
|
key = fields.Many2One(
|
|
'account.statement.origin.information', "Key", required=True,
|
|
domain=[
|
|
('type_', 'in', [
|
|
'boolean', 'integer', 'float', 'number', 'char',
|
|
'selection']),
|
|
])
|
|
|
|
boolean = fields.Boolean("Boolean",
|
|
states={
|
|
'invisible': Eval('key_type') != 'boolean',
|
|
})
|
|
char = fields.Char("Char",
|
|
states={
|
|
'invisible': Eval('key_type') != 'char',
|
|
},
|
|
help="The regular expression the key information is searched with.\n"
|
|
"It may define the groups named:\n"
|
|
"party, bank_account, invoice.")
|
|
selection = fields.Selection(
|
|
'get_selections', "Selection",
|
|
states={
|
|
'invisible': Eval('key_type') != 'selection',
|
|
})
|
|
|
|
key_type = fields.Function(
|
|
fields.Selection('get_key_types', "Key Type"),
|
|
'on_change_with_key_type')
|
|
|
|
@classmethod
|
|
def __setup__(cls):
|
|
super().__setup__()
|
|
cls.__access__.add('rule')
|
|
|
|
@classmethod
|
|
def get_key_types(cls):
|
|
pool = Pool()
|
|
OriginInformation = pool.get('account.statement.origin.information')
|
|
return OriginInformation.fields_get(['type_'])['type_']['selection']
|
|
|
|
@fields.depends('key')
|
|
def on_change_with_key_type(self, name=None):
|
|
if self.key:
|
|
return self.key.type_
|
|
|
|
@fields.depends('key')
|
|
def get_selections(self):
|
|
if self.key and self.key.type_ == 'selection':
|
|
return json.loads(self.key.selection_json)
|
|
return [(None, '')]
|
|
|
|
@classmethod
|
|
def view_attributes(cls):
|
|
return super().view_attributes() + [
|
|
('//group[@id="%s"]' % type_, 'states', {
|
|
'invisible': Eval('key_type') != type_,
|
|
}) for type_ in ['integer', 'float', 'number']]
|
|
|
|
def match(self, origin):
|
|
return getattr(self, '_match_%s' % self.key_type)(
|
|
origin, origin.information or {})
|
|
|
|
def _match_boolean(self, origin, information):
|
|
return self.boolean == information.get(self.key.name, False)
|
|
|
|
def _match_range(self, origin, information):
|
|
low = getattr(self, '%s_low' % self.key_type)
|
|
high = getattr(self, '%s_high' % self.key_type)
|
|
amount = information.get(self.key.name)
|
|
if amount is None:
|
|
return False
|
|
if low is not None and low > amount:
|
|
return False
|
|
if high is not None and high < amount:
|
|
return False
|
|
_match_integer = _match_range
|
|
_match_float = _match_range
|
|
_match_number = _match_range
|
|
|
|
def _match_char(self, origin, information):
|
|
result = re.search(
|
|
self.char, information.get(self.key.name, ''))
|
|
if not result:
|
|
return False
|
|
return result.groupdict()
|
|
|
|
def _match_selection(self, origin, information):
|
|
return self.selection == information.get(self.key.name)
|
|
|
|
|
|
def _add_range(cls, name, type_, string):
|
|
low_name = '%s_low' % name
|
|
high_name = '%s_high' % name
|
|
setattr(cls, low_name,
|
|
type_("%s Low" % string,
|
|
domain=[If(Eval(high_name),
|
|
['OR',
|
|
(low_name, '=', None),
|
|
(low_name, '<=', Eval(high_name)),
|
|
],
|
|
[])],
|
|
states={
|
|
'invisible': Eval('key_type') != name,
|
|
}))
|
|
setattr(cls, high_name,
|
|
type_("%s High" % string,
|
|
domain=[If(Eval(low_name),
|
|
['OR',
|
|
(high_name, '=', None),
|
|
(high_name, '<=', Eval(low_name)),
|
|
],
|
|
[])],
|
|
states={
|
|
'invisible': Eval('key_type') != name,
|
|
}))
|
|
|
|
|
|
_add_range(StatementRuleInformation, 'integer', fields.Integer, "Integer")
|
|
_add_range(StatementRuleInformation, 'float', fields.Float, "Float")
|
|
_add_range(StatementRuleInformation, 'number', fields.Numeric, "Numeric")
|
|
|
|
|
|
class StatementRuleLine(sequence_ordered(), ModelSQL, ModelView):
|
|
__name__ = 'account.statement.rule.line'
|
|
|
|
rule = fields.Many2One(
|
|
'account.statement.rule', "Rule", required=True, ondelete='CASCADE')
|
|
amount = fields.Char(
|
|
"Amount", required=True,
|
|
help="A Python expression evaluated with 'amount' and 'pending'.")
|
|
party = fields.Many2One(
|
|
'party.party', "Party",
|
|
context={
|
|
'company': Eval('company', -1),
|
|
},
|
|
depends={'company'},
|
|
help="Leave empty to use the group named 'party' "
|
|
"from the regular expressions.")
|
|
account = fields.Many2One(
|
|
'account.account', "Account",
|
|
domain=[
|
|
('company', '=', Eval('company', -1)),
|
|
('type', '!=', None),
|
|
('closed', '!=', True),
|
|
],
|
|
states={
|
|
'readonly': ~Eval('company'),
|
|
},
|
|
help="Leave empty to use the party's receivable or payable account.\n"
|
|
"The rule must have a company to use this field.")
|
|
|
|
company = fields.Function(
|
|
fields.Many2One('company.company', "Company"),
|
|
'on_change_with_company')
|
|
|
|
@classmethod
|
|
def __setup__(cls):
|
|
super().__setup__()
|
|
cls.__access__.add('rule')
|
|
|
|
@fields.depends('rule', '_parent_rule.company')
|
|
def on_change_with_company(self, name=None):
|
|
return self.rule.company if self.rule else None
|
|
|
|
def get_line(self, origin, keywords, **context):
|
|
pool = Pool()
|
|
Line = pool.get('account.statement.line')
|
|
context.setdefault('functions', {})['Decimal'] = Decimal
|
|
context.setdefault('names', {}).update(keywords)
|
|
|
|
currency = origin.statement.journal.currency
|
|
amount = currency.round(simple_eval(decistmt(self.amount), **context))
|
|
party = self._get_party(origin, keywords, amount=amount)
|
|
related_to = list(
|
|
filter(None, self._get_related_to(
|
|
origin, keywords, party=party, amount=amount)))
|
|
if len(related_to) == 1:
|
|
related_to, = related_to
|
|
else:
|
|
related_to = None
|
|
related_to_party = self._get_party_from(related_to)
|
|
|
|
if related_to_party and party and related_to_party != party:
|
|
return
|
|
if related_to_party and not party:
|
|
party = related_to_party
|
|
|
|
account = self.account
|
|
if not account:
|
|
related_to_account = self._get_account_from(related_to)
|
|
if related_to_account:
|
|
account = related_to_account
|
|
elif party:
|
|
with Transaction().set_context(date=origin.date):
|
|
if amount > Decimal(0):
|
|
account = party.account_receivable_used
|
|
else:
|
|
account = party.account_payable_used
|
|
|
|
if not account:
|
|
return
|
|
if not party:
|
|
party = origin.party
|
|
if account.party_required and not party:
|
|
return
|
|
if not account.party_required:
|
|
party = None
|
|
|
|
line = Line()
|
|
line.statement = origin.statement
|
|
line.number = origin.number
|
|
line.description = origin.description
|
|
line.origin = origin
|
|
line.amount = amount
|
|
line.date = origin.date
|
|
line.party = party
|
|
line.account = account
|
|
line.related_to = related_to
|
|
return line
|
|
|
|
def _get_party(self, origin, keywords, amount=0):
|
|
pool = Pool()
|
|
Party = pool.get('party.party')
|
|
Line = pool.get('account.statement.line')
|
|
Configuration = pool.get('account.configuration')
|
|
try:
|
|
AccountNumber = pool.get('bank.account.number')
|
|
except KeyError:
|
|
AccountNumber = None
|
|
|
|
configuration = Configuration(1)
|
|
customer_payment_reference_number = configuration.get_multivalue(
|
|
'customer_payment_reference_number',
|
|
company=origin.company.id)
|
|
|
|
party = self.party
|
|
if not party:
|
|
if keywords.get('bank_account') and AccountNumber:
|
|
bank_account = keywords['bank_account']
|
|
numbers = AccountNumber.search(['OR',
|
|
('number', '=', bank_account),
|
|
('number_compact', '=', bank_account),
|
|
])
|
|
if len(numbers) == 1:
|
|
number, = numbers
|
|
if number.account.owners:
|
|
party = number.account.owners[0]
|
|
else:
|
|
lines = Line.search([
|
|
('statement.state', 'in', ['validated', 'posted']),
|
|
('origin.keywords.bank_account', '=',
|
|
bank_account),
|
|
('party', '!=', None),
|
|
],
|
|
order=[('date', 'DESC')], limit=1)
|
|
if lines:
|
|
line, = lines
|
|
party = line.party
|
|
elif (keywords.get('party')
|
|
or (keywords.get('payment_reference')
|
|
and amount > 0
|
|
and customer_payment_reference_number == 'party')):
|
|
domain = []
|
|
if party_rec_name := keywords.get('party'):
|
|
domain.append(('rec_name', 'ilike', party_rec_name))
|
|
if ((payment_reference := keywords.get('payment_reference'))
|
|
and amount > 0
|
|
and customer_payment_reference_number == 'party'):
|
|
domain.extend(
|
|
self._party_payment_reference(payment_reference))
|
|
if domain:
|
|
domain.insert(0, 'OR')
|
|
parties = Party.search(domain)
|
|
else:
|
|
parties = []
|
|
if len(parties) == 1:
|
|
party, = parties
|
|
elif party_rec_name:
|
|
lines = Line.search([
|
|
('statement.state', 'in', ['validated', 'posted']),
|
|
('origin.keywords.party', '=', party_rec_name),
|
|
('party', '!=', None),
|
|
],
|
|
order=[('date', 'DESC')], limit=1)
|
|
if lines:
|
|
line, = lines
|
|
party = line.party
|
|
return party
|
|
|
|
@classmethod
|
|
def _party_payment_reference(cls, value):
|
|
if iso11649.is_valid(value):
|
|
yield ('code_alnum', '=', value[4:])
|
|
|
|
def _get_related_to(self, origin, keywords, party=None, amount=0):
|
|
return {
|
|
self._get_invoice(origin, keywords, party=party, amount=amount)}
|
|
|
|
def _get_party_from(self, related_to):
|
|
pool = Pool()
|
|
Invoice = pool.get('account.invoice')
|
|
if isinstance(related_to, Invoice):
|
|
return related_to.party
|
|
|
|
def _get_account_from(self, related_to):
|
|
pool = Pool()
|
|
Invoice = pool.get('account.invoice')
|
|
if isinstance(related_to, Invoice):
|
|
return related_to.account
|
|
|
|
def _get_invoice(self, origin, keywords, party=None, amount=0):
|
|
pool = Pool()
|
|
Invoice = pool.get('account.invoice')
|
|
Configuration = pool.get('account.configuration')
|
|
|
|
configuration = Configuration(1)
|
|
customer_payment_reference_number = configuration.get_multivalue(
|
|
'customer_payment_reference_number',
|
|
company=origin.company.id)
|
|
|
|
if keywords.get('invoice') or keywords.get('payment_reference'):
|
|
kdomain = ['OR']
|
|
if invoice_rec_name := keywords.get('invoice'):
|
|
kdomain.append(('rec_name', '=', invoice_rec_name))
|
|
if payment_reference := keywords.get('payment_reference'):
|
|
if (amount > 0
|
|
and customer_payment_reference_number == 'invoice'):
|
|
kdomain.append(
|
|
('customer_payment_reference', '=', payment_reference))
|
|
elif amount < 0:
|
|
kdomain.append(
|
|
('supplier_payment_reference', '=', payment_reference))
|
|
domain = [
|
|
kdomain,
|
|
('company', '=', origin.company.id),
|
|
('currency', '=', origin.currency.id),
|
|
('state', '=', 'posted'),
|
|
]
|
|
if amount > 0:
|
|
domain.append(['OR',
|
|
[('type', '=', 'out'),
|
|
('total_amount', '>=', 0)],
|
|
[('type', '=', 'in'),
|
|
('total_amount', '<=', 0)],
|
|
])
|
|
elif amount < 0:
|
|
domain.append(['OR',
|
|
[('type', '=', 'out'),
|
|
('total_amount', '<=', 0)],
|
|
[('type', '=', 'in'),
|
|
('total_amount', '>=', 0)],
|
|
])
|
|
if party:
|
|
domain.append(['OR',
|
|
('party', '=', party.id),
|
|
('alternative_payees', '=', party.id),
|
|
])
|
|
invoices = Invoice.search(domain)
|
|
if len(invoices) == 1:
|
|
invoice, = invoices
|
|
return invoice
|