980 lines
34 KiB
Python
980 lines
34 KiB
Python
# This file is part of Tryton. The COPYRIGHT file at the top level of
|
|
# this repository contains the full copyright notices and license terms.
|
|
import datetime
|
|
import os
|
|
import unicodedata
|
|
import uuid
|
|
from io import BytesIO
|
|
from itertools import groupby
|
|
from operator import attrgetter
|
|
|
|
import genshi
|
|
import genshi.template
|
|
# XXX fix: https://genshi.edgewall.org/ticket/582
|
|
from genshi.template.astutil import ASTCodeGenerator, ASTTransformer
|
|
from lxml import etree
|
|
from sql import Literal, Null
|
|
|
|
import trytond.config as config
|
|
from trytond.i18n import gettext
|
|
from trytond.model import (
|
|
Index, ModelSQL, ModelView, Unique, Workflow, dualmethod, fields)
|
|
from trytond.model.exceptions import AccessError
|
|
from trytond.modules.account_payment.exceptions import ProcessError
|
|
from trytond.modules.company import CompanyReport
|
|
from trytond.pool import Pool, PoolMeta
|
|
from trytond.pyson import Eval, If
|
|
from trytond.report import Report
|
|
from trytond.tools import (
|
|
cached_property, grouped_slice, is_full_text, lstrip_wildcard, reduce_ids,
|
|
sortable_values)
|
|
from trytond.transaction import Transaction
|
|
|
|
from .sepa_handler import CAMT054
|
|
|
|
if not hasattr(ASTCodeGenerator, 'visit_NameConstant'):
|
|
def visit_NameConstant(self, node):
|
|
if node.value is None:
|
|
self._write('None')
|
|
elif node.value is True:
|
|
self._write('True')
|
|
elif node.value is False:
|
|
self._write('False')
|
|
else:
|
|
raise Exception("Unknown NameConstant %r" % (node.value,))
|
|
ASTCodeGenerator.visit_NameConstant = visit_NameConstant
|
|
if not hasattr(ASTTransformer, 'visit_NameConstant'):
|
|
# Re-use visit_Name because _clone is deleted
|
|
ASTTransformer.visit_NameConstant = ASTTransformer.visit_Name
|
|
|
|
if config.getboolean('account_payment_sepa', 'filestore', default=False):
|
|
file_id = 'message_file_id'
|
|
store_prefix = config.get(
|
|
'account_payment_sepa', 'store_prefix', default=None)
|
|
else:
|
|
file_id = None
|
|
store_prefix = None
|
|
|
|
INITIATOR_IDS = [
|
|
(None, ''),
|
|
('eu_at_02', "SEPA Creditor Identifier"),
|
|
('be_vat', "Belgian Enterprise Number"),
|
|
('es_vat', "Spanish VAT Number"),
|
|
]
|
|
|
|
|
|
class Journal(metaclass=PoolMeta):
|
|
__name__ = 'account.payment.journal'
|
|
company_party = fields.Function(fields.Many2One(
|
|
'party.party', "Company Party",
|
|
context={
|
|
'company': Eval('company', -1),
|
|
},
|
|
depends={'company'}),
|
|
'on_change_with_company_party')
|
|
sepa_bank_account_number = fields.Many2One('bank.account.number',
|
|
'Bank Account Number', states={
|
|
'required': Eval('process_method') == 'sepa',
|
|
'invisible': Eval('process_method') != 'sepa',
|
|
},
|
|
domain=[
|
|
('type', '=', 'iban'),
|
|
('account.owners', '=', Eval('company_party', -1)),
|
|
])
|
|
sepa_payable_flavor = fields.Selection([
|
|
(None, ''),
|
|
('pain.001.001.03', 'pain.001.001.03'),
|
|
('pain.001.001.05', 'pain.001.001.05'),
|
|
('pain.001.003.03', 'pain.001.003.03'),
|
|
], 'Payable Flavor', states={
|
|
'required': Eval('process_method') == 'sepa',
|
|
'invisible': Eval('process_method') != 'sepa',
|
|
},
|
|
translate=False)
|
|
sepa_receivable_flavor = fields.Selection([
|
|
(None, ''),
|
|
('pain.008.001.02', 'pain.008.001.02'),
|
|
('pain.008.001.04', 'pain.008.001.04'),
|
|
('pain.008.003.02', 'pain.008.003.02'),
|
|
], 'Receivable Flavor', states={
|
|
'required': Eval('process_method') == 'sepa',
|
|
'invisible': Eval('process_method') != 'sepa',
|
|
},
|
|
translate=False)
|
|
sepa_payable_initiator_id = fields.Selection(
|
|
INITIATOR_IDS, "SEPA Payable Initiator Identifier",
|
|
states={
|
|
'invisible': Eval('process_method') != 'sepa',
|
|
},
|
|
help="The identifier used for the initiating party.")
|
|
sepa_receivable_initiator_id = fields.Selection(
|
|
INITIATOR_IDS, "SEPA Receivable Initiator Identifier",
|
|
states={
|
|
'invisible': Eval('process_method') != 'sepa',
|
|
},
|
|
help="The identifier used for the initiating party.")
|
|
sepa_batch_booking = fields.Boolean('Batch Booking', states={
|
|
'invisible': Eval('process_method') != 'sepa',
|
|
})
|
|
sepa_charge_bearer = fields.Selection([
|
|
('DEBT', 'Debtor'),
|
|
('CRED', 'Creditor'),
|
|
('SHAR', 'Shared'),
|
|
('SLEV', 'Service Level'),
|
|
], 'Charge Bearer', states={
|
|
'required': Eval('process_method') == 'sepa',
|
|
'invisible': Eval('process_method') != 'sepa',
|
|
})
|
|
|
|
@classmethod
|
|
def __setup__(cls):
|
|
super().__setup__()
|
|
sepa_method = ('sepa', 'SEPA')
|
|
if sepa_method not in cls.process_method.selection:
|
|
cls.process_method.selection.append(sepa_method)
|
|
|
|
@classmethod
|
|
def __register__(cls, module_name):
|
|
cursor = Transaction().connection.cursor()
|
|
sql_table = cls.__table__()
|
|
super().__register__(module_name)
|
|
|
|
# Migration from 6.8: es_nif renamed into es_vat
|
|
cursor.execute(*sql_table.update(
|
|
[sql_table.sepa_payable_initiator_id],
|
|
['es_vat'],
|
|
where=sql_table.sepa_payable_initiator_id == 'es_nif'))
|
|
cursor.execute(*sql_table.update(
|
|
[sql_table.sepa_receivable_initiator_id],
|
|
['es_vat'],
|
|
where=sql_table.sepa_receivable_initiator_id == 'es_nif'))
|
|
|
|
@classmethod
|
|
def default_company_party(cls):
|
|
pool = Pool()
|
|
Company = pool.get('company.company')
|
|
company_id = cls.default_company()
|
|
if company_id is not None and company_id >= 0:
|
|
return Company(company_id).party.id
|
|
|
|
@fields.depends('company')
|
|
def on_change_with_company_party(self, name=None):
|
|
return self.company.party if self.company else None
|
|
|
|
@staticmethod
|
|
def default_sepa_charge_bearer():
|
|
return 'SLEV'
|
|
|
|
|
|
def remove_comment(stream):
|
|
for kind, data, pos in stream:
|
|
if kind is genshi.core.COMMENT:
|
|
continue
|
|
yield kind, data, pos
|
|
|
|
|
|
loader = genshi.template.TemplateLoader(
|
|
os.path.join(os.path.dirname(__file__), 'template'),
|
|
auto_reload=True)
|
|
|
|
|
|
class Group(metaclass=PoolMeta):
|
|
__name__ = 'account.payment.group'
|
|
sepa_messages = fields.One2Many('account.payment.sepa.message', 'origin',
|
|
'SEPA Messages', readonly=True,
|
|
domain=[('company', '=', Eval('company', -1))],
|
|
states={
|
|
'invisible': ~Eval('sepa_messages'),
|
|
})
|
|
sepa_id = fields.Char("SEPA ID", readonly=True, size=35,
|
|
states={
|
|
'invisible': ~Eval('sepa_id'),
|
|
})
|
|
|
|
@classmethod
|
|
def __setup__(cls):
|
|
super().__setup__()
|
|
cls._buttons.update({
|
|
'sepa_generate_message': {
|
|
'invisible': Eval('process_method') != 'sepa',
|
|
'depends': ['process_method'],
|
|
},
|
|
})
|
|
|
|
def get_sepa_template(self):
|
|
if self.kind == 'payable':
|
|
return loader.load('%s.xml' % self.journal.sepa_payable_flavor)
|
|
elif self.kind == 'receivable':
|
|
return loader.load('%s.xml' % self.journal.sepa_receivable_flavor)
|
|
|
|
def process_sepa(self):
|
|
pool = Pool()
|
|
Payment = pool.get('account.payment')
|
|
Mandate = pool.get('account.payment.sepa.mandate')
|
|
assert self.process_method == 'sepa'
|
|
if self.kind == 'receivable':
|
|
payments = sorted(self.payments, key=attrgetter('date', 'id'))
|
|
mandates = Payment.get_sepa_mandates(payments)
|
|
Mandate.lock(
|
|
[m for m in mandates if m and m.sequence_type == 'FRST'])
|
|
sequence_types = {}
|
|
for payment, mandate in zip(payments, mandates):
|
|
if not mandate:
|
|
raise ProcessError(
|
|
gettext('account_payment_sepa'
|
|
'.msg_payment_process_no_mandate',
|
|
payment=payment.rec_name))
|
|
sequence_type = sequence_types.get(mandate)
|
|
if not sequence_type:
|
|
sequence_type = mandate.sequence_type
|
|
if sequence_type == 'FRST':
|
|
sequence_types[mandate] = 'RCUR'
|
|
else:
|
|
sequence_types[mandate] = sequence_type
|
|
payment.sepa_mandate = mandate
|
|
payment.sepa_mandate_sequence_type = sequence_type
|
|
Payment.save(payments)
|
|
elif self.kind == 'payable':
|
|
for payment in self.payments:
|
|
if not payment.sepa_payable_bank_account_number:
|
|
payment.sepa_payable_bank_account_number = (
|
|
payment.sepa_bank_account_number)
|
|
Payment.save(self.payments)
|
|
for payment in self.payments:
|
|
if not payment.sepa_bank_account_number:
|
|
raise ProcessError(
|
|
gettext('account_payment_sepa'
|
|
'.msg_payment_process_no_iban',
|
|
payment=payment.rec_name))
|
|
if not payment.sepa_bank_account_number.account.bank:
|
|
bank_account = payment.sepa_bank_account_number.account
|
|
raise ProcessError(
|
|
gettext('account_payment_sepa'
|
|
'.msg_payment_process_no_bank',
|
|
payment=payment.rec_name,
|
|
bank_account=bank_account.rec_name))
|
|
to_write = []
|
|
for key, payments in self.sepa_payments:
|
|
to_write.append(payments)
|
|
to_write.append({
|
|
'sepa_info_id': self.sepa_group_payment_id(key),
|
|
})
|
|
if to_write:
|
|
Payment.write(*to_write)
|
|
self.sepa_id = uuid.uuid4().hex
|
|
self.sepa_generate_message()
|
|
|
|
@dualmethod
|
|
@ModelView.button
|
|
def sepa_generate_message(cls, groups):
|
|
pool = Pool()
|
|
Message = pool.get('account.payment.sepa.message')
|
|
for group in groups:
|
|
if group.journal.process_method != 'sepa':
|
|
continue
|
|
tmpl = group.get_sepa_template()
|
|
if not tmpl:
|
|
raise NotImplementedError
|
|
if not group.sepa_messages:
|
|
group.sepa_messages = ()
|
|
message = tmpl.generate(group=group,
|
|
datetime=datetime, normalize=unicodedata.normalize,
|
|
).filter(remove_comment).render().encode('utf8')
|
|
message = Message(message=message, type='out', state='waiting',
|
|
company=group.company)
|
|
group.sepa_messages += (message,)
|
|
cls.save(groups)
|
|
|
|
@property
|
|
def sepa_initiating_party(self):
|
|
return self.company.party
|
|
|
|
def sepa_group_payment_key(self, payment):
|
|
key = (
|
|
('payment_info', payment.sepa_info_id),
|
|
('date', payment.date),
|
|
)
|
|
if self.kind == 'receivable':
|
|
key += (('sequence_type', payment.sepa_mandate_sequence_type),)
|
|
key += (('scheme', payment.sepa_mandate.scheme),)
|
|
return key
|
|
|
|
def sepa_group_payment_id(self, key):
|
|
return key['payment_info'] or uuid.uuid4().hex
|
|
|
|
@property
|
|
def sepa_payments(self):
|
|
pool = Pool()
|
|
Payment = pool.get('account.payment')
|
|
keyfunc = self.sepa_group_payment_key
|
|
# re-browse to align cache
|
|
payments = Payment.browse(sorted(
|
|
self.payments, key=sortable_values(keyfunc)))
|
|
for key, grouped_payments in groupby(payments, key=keyfunc):
|
|
yield dict(key), list(grouped_payments)
|
|
|
|
@property
|
|
def sepa_message_id(self):
|
|
return self.sepa_id or self.number
|
|
|
|
@classmethod
|
|
def search_rec_name(cls, name, clause):
|
|
_, operator, operand, *extra = clause
|
|
if operator.startswith('!') or operator.startswith('not '):
|
|
bool_op = 'AND'
|
|
else:
|
|
bool_op = 'OR'
|
|
code_value = operand
|
|
if operator.endswith('like') and is_full_text(operand):
|
|
code_value = lstrip_wildcard(operand)
|
|
domain = super().search_rec_name(name, clause)
|
|
return [bool_op,
|
|
domain,
|
|
('sepa_id', operator, code_value, *extra),
|
|
]
|
|
|
|
|
|
class Payment(metaclass=PoolMeta):
|
|
__name__ = 'account.payment'
|
|
|
|
sepa_payable_bank_account_number = fields.Many2One(
|
|
'bank.account.number', "Bank Account Number",
|
|
states={
|
|
'invisible': (
|
|
(Eval('process_method') != 'sepa')
|
|
| (Eval('kind') != 'payable')),
|
|
},
|
|
domain=[
|
|
('type', '=', 'iban'),
|
|
('account.owners', '=', Eval('party', -1)),
|
|
])
|
|
sepa_mandate = fields.Many2One('account.payment.sepa.mandate', 'Mandate',
|
|
ondelete='RESTRICT',
|
|
states={
|
|
'readonly': Eval('state') != 'draft',
|
|
'invisible': ((Eval('process_method') != 'sepa')
|
|
| (Eval('kind') != 'receivable')),
|
|
},
|
|
domain=[
|
|
('party', '=', Eval('party', -1)),
|
|
('company', '=', Eval('company', -1)),
|
|
If(Eval('state') == 'draft',
|
|
('state', '=', 'validated'),
|
|
(),
|
|
)
|
|
])
|
|
sepa_mandate_sequence_type = fields.Char('Mandate Sequence Type',
|
|
readonly=True)
|
|
sepa_return_reason_code = fields.Char('Return Reason Code', readonly=True,
|
|
states={
|
|
'invisible': ((Eval('process_method') != 'sepa')
|
|
| (~Eval('sepa_return_reason_code')
|
|
& (Eval('state') != 'failed'))),
|
|
})
|
|
sepa_return_reason_information = fields.Text('Return Reason Information',
|
|
readonly=True,
|
|
states={
|
|
'invisible': ((Eval('process_method') != 'sepa')
|
|
| (~Eval('sepa_return_reason_information')
|
|
& (Eval('state') != 'failed'))),
|
|
})
|
|
sepa_end_to_end_id = fields.Function(fields.Char('SEPA End To End ID'),
|
|
'get_sepa_end_to_end_id', searcher='search_end_to_end_id')
|
|
sepa_instruction_id = fields.Function(fields.Char('SEPA Instruction ID'),
|
|
'get_sepa_instruction_id', searcher='search_sepa_instruction_id')
|
|
sepa_info_id = fields.Char("SEPA Info ID", readonly=True, size=35,
|
|
states={
|
|
'invisible': ~Eval('sepa_info_id'),
|
|
})
|
|
|
|
@classmethod
|
|
def copy(cls, payments, default=None):
|
|
if default is None:
|
|
default = {}
|
|
else:
|
|
default = default.copy()
|
|
default.setdefault('sepa_mandate_sequence_type', None)
|
|
return super().copy(payments, default=default)
|
|
|
|
@classmethod
|
|
def process_method_with_group(cls):
|
|
return super().process_method_with_group() + ['sepa']
|
|
|
|
@classmethod
|
|
def get_sepa_mandates(cls, payments):
|
|
mandates = []
|
|
for payment in payments:
|
|
if payment.sepa_mandate:
|
|
if payment.sepa_mandate.is_valid:
|
|
mandate = payment.sepa_mandate
|
|
else:
|
|
mandate = None
|
|
else:
|
|
for mandate in payment.party.sepa_mandates_for(payment):
|
|
if mandate.is_valid:
|
|
break
|
|
else:
|
|
mandate = None
|
|
mandates.append(mandate)
|
|
return mandates
|
|
|
|
def get_sepa_end_to_end_id(self, name):
|
|
return str(self.id)
|
|
|
|
@classmethod
|
|
def search_end_to_end_id(cls, name, domain):
|
|
table = cls.__table__()
|
|
_, operator, value = domain
|
|
cast = cls.sepa_end_to_end_id._field.sql_type().base
|
|
Operator = fields.SQL_OPERATORS[operator]
|
|
query = table.select(table.id,
|
|
where=Operator(table.id.cast(cast), value))
|
|
return [('id', 'in', query)]
|
|
|
|
get_sepa_instruction_id = get_sepa_end_to_end_id
|
|
search_sepa_instruction_id = search_end_to_end_id
|
|
|
|
@property
|
|
def sepa_remittance_information(self):
|
|
return self.reference_used
|
|
|
|
@property
|
|
def sepa_bank_account_number(self):
|
|
if self.kind == 'receivable':
|
|
if self.sepa_mandate:
|
|
return self.sepa_mandate.account_number
|
|
elif self.sepa_payable_bank_account_number:
|
|
return self.sepa_payable_bank_account_number
|
|
else:
|
|
for account in self.party.bank_accounts_used:
|
|
for number in account.numbers:
|
|
if number.type == 'iban':
|
|
return number
|
|
|
|
@property
|
|
def rejected(self):
|
|
return (self.state == 'failed'
|
|
and self.sepa_return_reason_code
|
|
and self.sepa_return_reason_information == '/RTYP/RJCT')
|
|
|
|
def _get_clearing_move(self, date=None):
|
|
if not date:
|
|
date = Transaction().context.get('date_value')
|
|
return super()._get_clearing_move(date=date)
|
|
|
|
@classmethod
|
|
def view_attributes(cls):
|
|
return super().view_attributes() + [
|
|
('//page[@id="sepa"]', 'states', {
|
|
'invisible': Eval('process_method') != 'sepa',
|
|
}),
|
|
('//separator[@id="sepa_return_reason"]', 'states', {
|
|
'invisible': Eval('state') != 'failed',
|
|
}),
|
|
]
|
|
|
|
@classmethod
|
|
def search_rec_name(cls, name, clause):
|
|
_, operator, operand, *extra = clause
|
|
if operator.startswith('!') or operator.startswith('not '):
|
|
bool_op = 'AND'
|
|
else:
|
|
bool_op = 'OR'
|
|
code_value = operand
|
|
if operator.endswith('like') and is_full_text(operand):
|
|
code_value = lstrip_wildcard(operand)
|
|
domain = super().search_rec_name(name, clause)
|
|
return [bool_op,
|
|
domain,
|
|
('sepa_info_id', operator, code_value, *extra),
|
|
]
|
|
|
|
|
|
class Mandate(Workflow, ModelSQL, ModelView):
|
|
__name__ = 'account.payment.sepa.mandate'
|
|
party = fields.Many2One(
|
|
'party.party', "Party", required=True,
|
|
states={
|
|
'readonly': Eval('state').in_(
|
|
['requested', 'validated', 'cancelled']),
|
|
},
|
|
context={
|
|
'company': Eval('company', -1),
|
|
},
|
|
depends={'company'})
|
|
address = fields.Many2One(
|
|
'party.address', "Address",
|
|
domain=[
|
|
('party', '=', Eval('party', -1)),
|
|
],
|
|
states={
|
|
'readonly': Eval('state').in_(['validated', 'cancelled']),
|
|
'required': Eval('state') == 'validated',
|
|
})
|
|
account_number = fields.Many2One('bank.account.number', 'Account Number',
|
|
ondelete='RESTRICT',
|
|
states={
|
|
'readonly': Eval('state').in_(['validated', 'cancelled']),
|
|
'required': Eval('state') == 'validated',
|
|
},
|
|
domain=[
|
|
('type', '=', 'iban'),
|
|
('account.owners', '=', Eval('party', -1)),
|
|
])
|
|
identification = fields.Char('Identification', size=35,
|
|
states={
|
|
'readonly': Eval('identification_readonly', True),
|
|
'required': Eval('state') == 'validated',
|
|
})
|
|
identification_readonly = fields.Function(fields.Boolean(
|
|
'Identification Readonly'), 'get_identification_readonly')
|
|
company = fields.Many2One(
|
|
'company.company', "Company", required=True,
|
|
states={
|
|
'readonly': Eval('state') != 'draft',
|
|
})
|
|
type = fields.Selection([
|
|
('recurrent', 'Recurrent'),
|
|
('one-off', 'One-off'),
|
|
], 'Type',
|
|
states={
|
|
'readonly': Eval('state').in_(['validated', 'cancelled']),
|
|
})
|
|
sequence_type_rcur = fields.Boolean(
|
|
"Always use RCUR",
|
|
states={
|
|
'invisible': Eval('type') == 'one-off',
|
|
})
|
|
scheme = fields.Selection([
|
|
('CORE', 'Core'),
|
|
('B2B', 'Business to Business'),
|
|
], 'Scheme', required=True,
|
|
states={
|
|
'readonly': Eval('state').in_(['validated', 'cancelled']),
|
|
})
|
|
scheme_string = scheme.translated('scheme')
|
|
signature_date = fields.Date('Signature Date',
|
|
states={
|
|
'readonly': Eval('state').in_(['validated', 'cancelled']),
|
|
'required': Eval('state') == 'validated',
|
|
})
|
|
state = fields.Selection([
|
|
('draft', 'Draft'),
|
|
('requested', 'Requested'),
|
|
('validated', 'Validated'),
|
|
('cancelled', 'Cancelled'),
|
|
], "State", readonly=True, sort=False)
|
|
payments = fields.One2Many('account.payment', 'sepa_mandate', 'Payments')
|
|
has_payments = fields.Function(fields.Boolean('Has Payments'),
|
|
'get_has_payments')
|
|
is_first_payment = fields.Function(
|
|
fields.Boolean("Is First Payment"), 'get_is_first_payment')
|
|
|
|
@classmethod
|
|
def __setup__(cls):
|
|
super().__setup__()
|
|
cls._transitions |= set((
|
|
('draft', 'requested'),
|
|
('requested', 'validated'),
|
|
('validated', 'cancelled'),
|
|
('requested', 'cancelled'),
|
|
('requested', 'draft'),
|
|
))
|
|
cls._buttons.update({
|
|
'cancel': {
|
|
'invisible': ~Eval('state').in_(
|
|
['requested', 'validated']),
|
|
'depends': ['state'],
|
|
},
|
|
'draft': {
|
|
'invisible': Eval('state') != 'requested',
|
|
'depends': ['state'],
|
|
},
|
|
'request': {
|
|
'invisible': Eval('state') != 'draft',
|
|
'depends': ['state'],
|
|
},
|
|
'validate_mandate': {
|
|
'invisible': Eval('state') != 'requested',
|
|
'depends': ['state'],
|
|
},
|
|
})
|
|
t = cls.__table__()
|
|
cls._sql_constraints = [
|
|
('identification_unique', Unique(t, t.company, t.identification),
|
|
'account_payment_sepa.msg_mandate_unique_id'),
|
|
]
|
|
cls._sql_indexes.add(
|
|
Index(
|
|
t, (t.state, Index.Equality(cardinality='low')),
|
|
where=t.state.in_(['draft', 'requested'])))
|
|
|
|
@fields.depends('party', '_parent_party.id')
|
|
def on_change_party(self):
|
|
if self.party and self.party.id >= 0:
|
|
self.address = self.party.address_get()
|
|
else:
|
|
self.address = None
|
|
self.account_number = None
|
|
|
|
@staticmethod
|
|
def default_company():
|
|
return Transaction().context.get('company')
|
|
|
|
@fields.depends('company')
|
|
def on_change_company(self):
|
|
self.identification_readonly = self.default_identification_readonly(
|
|
company=self.company.id if self.company else None)
|
|
|
|
@cached_property
|
|
def company_address(self):
|
|
return self.company.party.address_get()
|
|
|
|
@staticmethod
|
|
def default_type():
|
|
return 'recurrent'
|
|
|
|
@classmethod
|
|
def default_sequence_type_rcur(cls):
|
|
return False
|
|
|
|
@staticmethod
|
|
def default_scheme():
|
|
return 'CORE'
|
|
|
|
@staticmethod
|
|
def default_state():
|
|
return 'draft'
|
|
|
|
@classmethod
|
|
def default_identification_readonly(cls, **pattern):
|
|
pool = Pool()
|
|
Configuration = pool.get('account.configuration')
|
|
config = Configuration(1)
|
|
return bool(config.get_multivalue('sepa_mandate_sequence', **pattern))
|
|
|
|
def get_identification_readonly(self, name):
|
|
return bool(self.identification)
|
|
|
|
def get_rec_name(self, name):
|
|
name = '(%s)' % self.id
|
|
if self.identification:
|
|
name = self.identification
|
|
if self.account_number:
|
|
name += ' @ %s' % self.account_number.rec_name
|
|
return name
|
|
|
|
@classmethod
|
|
def search_rec_name(cls, name, clause):
|
|
if clause[1].startswith('!') or clause[1].startswith('not '):
|
|
bool_op = 'AND'
|
|
else:
|
|
bool_op = 'OR'
|
|
return [bool_op,
|
|
('identification',) + tuple(clause[1:]),
|
|
('account_number',) + tuple(clause[1:]),
|
|
]
|
|
|
|
@classmethod
|
|
def preprocess_values(cls, mode, values):
|
|
pool = Pool()
|
|
Configuration = pool.get('account.configuration')
|
|
values = super().preprocess_values(mode, values)
|
|
if mode == 'create' and not values.get('identification'):
|
|
configuration = Configuration(1)
|
|
company_id = values.get('company', cls.default_company())
|
|
if company_id is not None:
|
|
if sequence := configuration.get_multivalue(
|
|
'sepa_mandate_sequence', company=company_id):
|
|
values['identification'] = sequence.get()
|
|
if values.get('identification') == '':
|
|
values['identification'] = None
|
|
return values
|
|
|
|
@classmethod
|
|
def check_modification(cls, mode, mandates, values=None, external=False):
|
|
super().check_modification(
|
|
mode, mandates, values=values, external=external)
|
|
if mode == 'delete':
|
|
for mandate in mandates:
|
|
if mandate.state not in {'draft', 'cancelled'}:
|
|
raise AccessError(gettext(
|
|
'account_payment_sepa'
|
|
'.msg_mandate_delete_draft_cancelled',
|
|
mandate=mandate.rec_name))
|
|
|
|
@classmethod
|
|
def copy(cls, mandates, default=None):
|
|
if default is None:
|
|
default = {}
|
|
else:
|
|
default = default.copy()
|
|
default.setdefault('payments', [])
|
|
default.setdefault('signature_date', None)
|
|
default.setdefault('identification', None)
|
|
return super().copy(mandates, default=default)
|
|
|
|
@property
|
|
def is_valid(self):
|
|
if self.state == 'validated' and self.account_number.active:
|
|
if self.type == 'one-off':
|
|
if not self.has_payments:
|
|
return True
|
|
else:
|
|
return True
|
|
return False
|
|
|
|
@property
|
|
def sequence_type(self):
|
|
if self.type == 'one-off':
|
|
return 'OOFF'
|
|
elif not self.sequence_type_rcur and self.is_first_payment:
|
|
return 'FRST'
|
|
# TODO manage FNAL
|
|
else:
|
|
return 'RCUR'
|
|
|
|
@classmethod
|
|
def get_has_payments(cls, mandates, name):
|
|
pool = Pool()
|
|
Payment = pool.get('account.payment')
|
|
payment = Payment.__table__()
|
|
cursor = Transaction().connection.cursor()
|
|
|
|
has_payments = dict.fromkeys([m.id for m in mandates], False)
|
|
for sub_ids in grouped_slice(mandates):
|
|
red_sql = reduce_ids(payment.sepa_mandate, sub_ids)
|
|
cursor.execute(*payment.select(payment.sepa_mandate, Literal(True),
|
|
where=red_sql,
|
|
group_by=payment.sepa_mandate))
|
|
has_payments.update(cursor)
|
|
|
|
return has_payments
|
|
|
|
@classmethod
|
|
def get_is_first_payment(cls, mandates, name):
|
|
pool = Pool()
|
|
Payment = pool.get('account.payment')
|
|
payment = Payment.__table__()
|
|
cursor = Transaction().connection.cursor()
|
|
|
|
is_first = dict.fromkeys([m.id for m in mandates], True)
|
|
for sub_ids in grouped_slice(mandates):
|
|
red_sql = reduce_ids(payment.sepa_mandate, sub_ids)
|
|
cursor.execute(*payment.select(
|
|
payment.sepa_mandate, Literal(False),
|
|
where=red_sql
|
|
& (payment.sepa_mandate_sequence_type != Null)
|
|
& ~( # Same as property rejected
|
|
(payment.state == 'failed')
|
|
& ((payment.sepa_return_reason_code != Null)
|
|
| (payment.sepa_return_reason_code != ''))
|
|
& (payment.sepa_return_reason_information
|
|
== '/RTYP/RJCT')),
|
|
group_by=payment.sepa_mandate))
|
|
is_first.update(cursor)
|
|
return is_first
|
|
|
|
@classmethod
|
|
@ModelView.button
|
|
@Workflow.transition('draft')
|
|
def draft(cls, mandates):
|
|
pass
|
|
|
|
@classmethod
|
|
@ModelView.button
|
|
@Workflow.transition('requested')
|
|
def request(cls, mandates):
|
|
pass
|
|
|
|
@classmethod
|
|
@ModelView.button
|
|
@Workflow.transition('validated')
|
|
def validate_mandate(cls, mandates):
|
|
pass
|
|
|
|
@classmethod
|
|
@ModelView.button
|
|
@Workflow.transition('cancelled')
|
|
def cancel(cls, mandates):
|
|
# TODO must be automaticaly cancelled 13 months after last collection
|
|
pass
|
|
|
|
|
|
class MandateReport(CompanyReport):
|
|
__name__ = 'account.payment.sepa.mandate'
|
|
|
|
|
|
class Message(Workflow, ModelSQL, ModelView):
|
|
__name__ = 'account.payment.sepa.message'
|
|
_states = {
|
|
'readonly': Eval('state') != 'draft',
|
|
}
|
|
message = fields.Binary('Message', filename='filename',
|
|
file_id=file_id, store_prefix=store_prefix,
|
|
states=_states)
|
|
message_file_id = fields.Char("Message File ID", readonly=True)
|
|
filename = fields.Function(fields.Char('Filename'), 'get_filename')
|
|
type = fields.Selection([
|
|
('in', 'IN'),
|
|
('out', 'OUT'),
|
|
], 'Type', required=True, states=_states)
|
|
company = fields.Many2One(
|
|
'company.company', "Company", required=True,
|
|
states={
|
|
'readonly': Eval('state') != 'draft',
|
|
})
|
|
origin = fields.Reference(
|
|
"Origin", selection='get_origin',
|
|
states={
|
|
'readonly': _states['readonly'],
|
|
'invisible': Eval('type') == 'in',
|
|
})
|
|
state = fields.Selection([
|
|
('draft', 'Draft'),
|
|
('waiting', 'Waiting'),
|
|
('done', 'Done'),
|
|
('cancelled', 'Cancelled'),
|
|
], "State", readonly=True, sort=False)
|
|
|
|
@classmethod
|
|
def __setup__(cls):
|
|
super().__setup__()
|
|
t = cls.__table__()
|
|
cls._sql_indexes.add(
|
|
Index(
|
|
t, (t.state, Index.Equality(cardinality='low')),
|
|
where=t.state.in_(['draft', 'waiting'])))
|
|
cls._transitions |= {
|
|
('draft', 'waiting'),
|
|
('waiting', 'done'),
|
|
('waiting', 'draft'),
|
|
('draft', 'cancelled'),
|
|
('waiting', 'cancelled'),
|
|
}
|
|
cls._buttons.update({
|
|
'cancel': {
|
|
'invisible': ~Eval('state').in_(['draft', 'waiting']),
|
|
'depends': ['state'],
|
|
},
|
|
'draft': {
|
|
'invisible': Eval('state') != 'waiting',
|
|
'depends': ['state'],
|
|
},
|
|
'wait': {
|
|
'invisible': Eval('state') != 'draft',
|
|
'depends': ['state'],
|
|
},
|
|
'do': {
|
|
'invisible': Eval('state') != 'waiting',
|
|
'depends': ['state'],
|
|
},
|
|
})
|
|
|
|
@staticmethod
|
|
def default_type():
|
|
return 'in'
|
|
|
|
@staticmethod
|
|
def default_company():
|
|
return Transaction().context.get('company')
|
|
|
|
@staticmethod
|
|
def default_state():
|
|
return 'draft'
|
|
|
|
def get_filename(self, name):
|
|
pool = Pool()
|
|
Group = pool.get('account.payment.group')
|
|
if isinstance(self.origin, Group):
|
|
return self.origin.rec_name + '.xml'
|
|
|
|
@staticmethod
|
|
def _get_origin():
|
|
'Return list of Model names for origin Reference'
|
|
return ['account.payment.group']
|
|
|
|
@classmethod
|
|
def get_origin(cls):
|
|
IrModel = Pool().get('ir.model')
|
|
get_name = IrModel.get_name
|
|
models = cls._get_origin()
|
|
return [(None, '')] + [(m, get_name(m)) for m in models]
|
|
|
|
def get_rec_name(self, name):
|
|
if self.origin:
|
|
return self.origin.rec_name
|
|
else:
|
|
return f'({self.id})'
|
|
|
|
@classmethod
|
|
@ModelView.button
|
|
@Workflow.transition('draft')
|
|
def draft(cls, messages):
|
|
pass
|
|
|
|
@classmethod
|
|
@ModelView.button
|
|
@Workflow.transition('waiting')
|
|
def wait(cls, messages):
|
|
pass
|
|
|
|
@classmethod
|
|
@ModelView.button
|
|
@Workflow.transition('done')
|
|
def do(cls, messages):
|
|
for message in messages:
|
|
if message.type == 'in':
|
|
message.parse()
|
|
else:
|
|
message.send()
|
|
|
|
@classmethod
|
|
@ModelView.button
|
|
@Workflow.transition('cancelled')
|
|
def cancel(cls, messages):
|
|
pass
|
|
|
|
@staticmethod
|
|
def _get_handlers():
|
|
pool = Pool()
|
|
Payment = pool.get('account.payment')
|
|
return {
|
|
'urn:iso:std:iso:20022:tech:xsd:camt.054.001.01':
|
|
lambda f: CAMT054(f, Payment),
|
|
'urn:iso:std:iso:20022:tech:xsd:camt.054.001.02':
|
|
lambda f: CAMT054(f, Payment),
|
|
'urn:iso:std:iso:20022:tech:xsd:camt.054.001.03':
|
|
lambda f: CAMT054(f, Payment),
|
|
'urn:iso:std:iso:20022:tech:xsd:camt.054.001.04':
|
|
lambda f: CAMT054(f, Payment),
|
|
}
|
|
|
|
@staticmethod
|
|
def get_namespace(message):
|
|
f = BytesIO(message)
|
|
for _, element in etree.iterparse(f, events=('start',)):
|
|
tag = etree.QName(element)
|
|
if tag.localname == 'Document':
|
|
return tag.namespace
|
|
|
|
def parse(self):
|
|
f = BytesIO(self.message)
|
|
namespace = self.get_namespace(self.message)
|
|
handlers = self._get_handlers()
|
|
if namespace not in handlers:
|
|
raise # TODO UserError
|
|
handlers[namespace](f)
|
|
|
|
def send(self):
|
|
pass
|
|
|
|
|
|
class MessageReport(Report):
|
|
__name__ = 'account.payment.sepa.message'
|
|
|
|
@classmethod
|
|
def render(cls, action, report_context):
|
|
record = report_context['record']
|
|
return record.message
|