# This file is part of Tryton. The COPYRIGHT file at the top level of # this repository contains the full copyright notices and license terms. from collections import defaultdict from decimal import Decimal from itertools import chain, groupby from requests import Session from zeep import Client from zeep.exceptions import Error as ZeepError from zeep.transports import Transport import trytond.config as config from trytond.i18n import gettext from trytond.model import Index, ModelSQL, ModelView, Unique, Workflow, fields from trytond.model.exceptions import AccessError from trytond.modules.company.model import CompanyValueMixin from trytond.pool import Pool, PoolMeta from trytond.pyson import Bool, Eval from trytond.tools import grouped_slice from trytond.transaction import Transaction from .exceptions import ESSIIPostedInvoicesError SEND_SIZE = 10000 SII_URL = [ (None, ""), ('aeat', "AEAT"), ('guipuzkoa', "Guipuzkoa"), # XXX: URLs for basque country and navarra should be added ] WS_URL = { 'aeat': ('https://www2.agenciatributaria.gob.es/static_files/common/' 'internet/dep/aplicaciones/es/aeat/ssii_1_1_bis/fact/ws/'), 'guipuzkoa': ( 'https://egoitza.gipuzkoa.eus/ogasuna/sii/ficheros/v1.1/'), } class Configuration(metaclass=PoolMeta): __name__ = 'account.configuration' es_sii_url = fields.MultiValue( fields.Selection( SII_URL, "SII URL", translate=False, help="The URL where the invoices should be sent.")) es_sii_environment = fields.MultiValue(fields.Selection([ (None, ""), ('staging', "Staging"), ('production', "Production"), ], "SII Environment", states={ 'required': Bool(Eval('es_sii_url')), })) @classmethod def multivalue_model(cls, field): pool = Pool() if field in {'es_sii_url', 'es_sii_environment'}: return pool.get('account.credential.sii') return super().multivalue_model(field) class CredentialSII(ModelSQL, CompanyValueMixin): __name__ = 'account.credential.sii' es_sii_url = fields.Selection(SII_URL, "SII URL", translate=False) es_sii_environment = fields.Selection([ (None, ""), ('staging', "Staging"), ('production', "Production"), ], "SII Environment") @classmethod def get_client(cls, endpoint, **pattern): pool = Pool() Configuration = pool.get('account.configuration') configuration = Configuration(1) url = WS_URL.get( configuration.get_multivalue('es_sii_url', **pattern), '') if not url: raise AccessError( gettext('account_es_sii.msg_missing_sii_url')) service = endpoint environment = configuration.get_multivalue( 'es_sii_environment', **pattern) session = Session() session.cert = ( config.get('account_es_sii', 'certificate'), config.get('account_es_sii', 'privatekey')) transport = Transport(session=session) client = Client(url + endpoint + '.wsdl', transport=transport) if environment == 'staging': # Set guipuzkoa testing service if 'egoitza.gipuzkoa.eus' in url: client.create_service( next(iter(client.wsdl.bindings.keys())), 'https://sii-prep.egoitza.gipuzkoa.eus/JBS/HACI/' 'SSII-FACT/') else: service += 'Pruebas' return client.bind('siiService', service) class TaxTemplate(metaclass=PoolMeta): __name__ = 'account.tax.template' es_sii_tax_key = fields.Selection([ (None, ''), ('S1', "S1"), ('S2', "S2"), ('S3', "S3"), ('E1', "E1"), ('E2', "E2"), ('E3', "E3"), ('E4', "E4"), ('E5', "E5"), ('E6', "E6"), ], "SII Tax Key", translate=False, sort=False) es_sii_operation_key = fields.Selection( [(None, '')] + [(x, x) for x in ('{:02}'.format(i) for i in range(1, 18))], "SII Operation Key", translate=False, sort=False) es_exclude_from_sii = fields.Boolean("Exclude from SII") def _get_tax_value(self, tax=None): values = super()._get_tax_value(tax) for name in [ 'es_sii_tax_key', 'es_sii_operation_key', 'es_exclude_from_sii']: if not tax or getattr(tax, name) != getattr(self, name): values[name] = getattr(self, name) return values class Tax(metaclass=PoolMeta): __name__ = 'account.tax' _states = { 'readonly': (Bool(Eval('template', -1)) & ~Eval('template_override', False)), } es_sii_tax_key = fields.Selection([ (None, ''), ('S1', "S1"), ('S2', "S2"), ('S3', "S3"), ('E1', "E1"), ('E2', "E2"), ('E3', "E3"), ('E4', "E4"), ('E5', "E5"), ('E6', "E6"), ], "SII Tax Key", translate=False, sort=False, states=_states, help_selection={ 'S1': "Not exempt - No passive subject investment", 'S2': "Not exempt - With passive subject investment", 'S3': ("Not exempt - Without investment by the taxpayer " "and with investment by the taxpayer"), 'E1': "Exempt by Art. 20", 'E2': "Exempt by Art. 21", 'E3': "Exempt by Art. 22", 'E4': "Exempt by Art. 24", 'E5': "Exempt by Art. 25", 'E6': "Exempt others", }) es_sii_operation_key = fields.Selection( [(None, '')] + [(x, x) for x in ('{:02}'.format(i) for i in range(1, 18))], "SII Operation Key", translate=False, sort=False, states=_states) es_exclude_from_sii = fields.Boolean("Exclude from SII", states=_states) del _states class FiscalYear(metaclass=PoolMeta): __name__ = 'account.fiscalyear' es_sii_send_invoices = fields.Function( fields.Boolean("Send invoices to SII"), 'get_es_sii_send_invoices', setter='set_es_sii_send_invoices') def get_es_sii_send_invoices(self, name): result = None for period in self.periods: if period.type != 'standard': continue value = period.es_sii_send_invoices if value is not None: if result is None: result = value elif result != value: result = None break return result @classmethod def set_es_sii_send_invoices(cls, fiscalyears, name, value): pool = Pool() Period = pool.get('account.period') periods = [] for fiscalyear in fiscalyears: periods.extend( p for p in fiscalyear.periods if p.type == 'standard') Period.write(periods, {name: value}) class RenewFiscalYear(metaclass=PoolMeta): __name__ = 'account.fiscalyear.renew' def create_fiscalyear(self): fiscalyear = super().create_fiscalyear() previous_fiscalyear = self.start.previous_fiscalyear periods = [ p for p in previous_fiscalyear.periods if p.type == 'standard'] if periods: last_period = periods[-1] fiscalyear.es_sii_send_invoices = last_period.es_sii_send_invoices return fiscalyear class Period(metaclass=PoolMeta): __name__ = 'account.period' es_sii_send_invoices = fields.Boolean( "Send invoices to SII", states={ 'invisible': Eval('type') != 'standard', }, help="Check to create SII records for the invoices in the period.") @classmethod def check_modification(cls, mode, periods, values=None, external=False): super().check_modification( mode, periods, values=values, external=external) if mode == 'write' and 'es_sii_send_invoices' in values: to_check = [] for period in periods: if (period.es_sii_send_invoices != values['es_sii_send_invoices']): to_check.append(period) cls.check_es_sii_posted_invoices(to_check) @classmethod def check_es_sii_posted_invoices(cls, periods): pool = Pool() Invoice = pool.get('account.invoice') for sub_ids in grouped_slice(list(map(int, periods))): invoices = Invoice.search([ ('move.period', 'in', sub_ids), ], limit=1, order=[]) if invoices: invoice, = invoices raise ESSIIPostedInvoicesError(gettext( 'account_es_sii.msg_es_sii_posted_invoices', period=invoice.move.period.rec_name)) class Invoice(metaclass=PoolMeta): __name__ = 'account.invoice' @classmethod @ModelView.button @Workflow.transition('posted') def _post(cls, invoices): pool = Pool() InvoiceSII = pool.get('account.invoice.sii') posted_invoices = { i for i in invoices if i.state in {'draft', 'validated'}} super()._post(invoices) InvoiceSII.save([ InvoiceSII(invoice=i) for i in posted_invoices if i.es_send_to_sii]) @property def es_send_to_sii(self): if not self.move.period.es_sii_send_invoices: return False if not self.taxes: return True if all(t.tax.es_exclude_from_sii for t in self.taxes): return False return True @property def es_sii_party_tax_identifier(self): return self.party_tax_identifier or self.party.tax_identifier @property def es_sii_product_type_detail(self): country = None if self.es_sii_party_tax_identifier: country = self.es_sii_party_tax_identifier.es_country() return self.type == 'out' and country != 'ES' class InvoiceSII(ModelSQL, ModelView): __name__ = 'account.invoice.sii' invoice = fields.Many2One( 'account.invoice', "Invoice", required=True, ondelete='RESTRICT', states={ 'readonly': Eval('state') != 'pending', }, domain=[ ('state', 'in', ['posted', 'paid', 'cancelled']), ('move.state', '=', 'posted'), ]) csv = fields.Char("CSV", readonly=True, help="A secure validation code that confirms the delivery of the " "related invoice.") error_code = fields.Char("Error Code", readonly=True, states={ 'invisible': ~Bool(Eval('error_code')), }) error_description = fields.Char("Error Description", readonly=True, states={ 'invisible': ~Bool(Eval('error_description')), }) state = fields.Selection([ ('pending', "Pending"), ('sent', "Sent"), ('wrong', "Wrong"), ('rejected', "Rejected"), ], "State", readonly=True) @classmethod def __setup__(cls): super().__setup__() cls.__access__.add('invoice') t = cls.__table__() cls._sql_constraints = [ ('invoice_unique', Unique(t, t.invoice), 'account_es_sii.msg_es_sii_invoice_unique'), ] cls._sql_indexes.add( Index( t, (t.state, Index.Equality(cardinality='low')), where=t.state.in_(['pending', 'wrong', 'rejected']))) @classmethod def default_state(cls): return 'pending' def get_rec_name(self, name): return self.invoice.rec_name @classmethod def search_rec_name(cls, name, clause): return [('invoice.rec_name',) + tuple(clause[1:])] @classmethod def check_modification(cls, mode, invoices, values=None, external=False): super().check_modification( mode, invoices, values=values, external=external) if mode == 'delete': for invoice in invoices: if invoice.csv: raise AccessError(gettext( 'account_es_sii.msg_es_sii_invoice_delete_sent', invoice=invoice.rec_name)) @property def endpoint(self): if self.invoice.type == 'out': suffix = 'Emitidas' else: suffix = 'Recibidas' return 'SuministroFact%s' % suffix @property def invoice_type(self): tax_identifier = bool(self.invoice.es_sii_party_tax_identifier) if self.invoice.sequence_type == 'credit_note': if tax_identifier: return 'R1' else: return 'R5' else: if tax_identifier: return 'F1' else: return 'F2' @property def operation_description(self): return self.invoice.description or '.' @classmethod def endpoint2method(cls, endpoint): return { 'SuministroFactEmitidas': 'SuministroLRFacturasEmitidas', 'SuministroFactRecibidas': 'SuministroLRFacturasRecibidas', }.get(endpoint) @classmethod def _grouping_key(cls, record): communication_type = 'A0' # Error 3000 means duplicated if (record.state == 'wrong' or (record.state == 'rejected' and record.error_code == '3000')): communication_type = 'A1' return ( ('endpoint', record.endpoint), ('company', record.invoice.company), ('tax_identifier', record.invoice.tax_identifier), ('communication_type', communication_type), # Split wrong/rejected to avoid rejection of correct new invoices ('new', record.state == 'pending'), ) @classmethod def _credential_pattern(cls, key): return { 'company': key['company'].id, } def set_state(self, response): self.state = { 'Correcto': 'sent', 'Anulada': 'sent', 'Incorrecto': 'rejected', 'AceptadoConErrores': 'wrong', }.get(response.EstadoRegistro, 'pending') @classmethod def set_error(cls, records, message, code): for record in records: record.error_description = message record.error_code = code record.state = 'rejected' @classmethod def send(cls, records=None): """ Send invoices to SII The transaction is committed after each request (up to 10000 invoices). """ pool = Pool() Credential = pool.get('account.credential.sii') transaction = Transaction() if not records: records = cls.search([ ('invoice.company', '=', transaction.context.get('company')), ('state', '!=', 'sent'), ]) else: records = list(filter(lambda r: r.state != 'sent', records)) cls.lock(records) records = sorted(records, key=cls._grouping_key) for key, grouped_records in groupby(records, key=cls._grouping_key): key = dict(key) for sub_records in grouped_slice(list(grouped_records), SEND_SIZE): # Use clear cache after a commit sub_records = cls.browse(sub_records) cls.lock(sub_records) client = Credential.get_client( key['endpoint'], **cls._credential_pattern(key)) method = getattr(client, cls.endpoint2method(key['endpoint'])) try: resp = method( cls.get_headers(key), [r.get_payload() for r in sub_records]) except ZeepError as e: cls.set_error(sub_records, e.message, None) else: for record, response in zip( sub_records, resp.RespuestaLinea): record.set_state(response) if response.CodigoErrorRegistro: record.error_code = response.CodigoErrorRegistro record.error_description = ( response.DescripcionErrorRegistro) else: record.error_code = None record.error_description = None # The response has a CSV that's for all records record.csv = response.CSV or resp.CSV cls.save(sub_records) transaction.commit() @classmethod def get_headers(cls, key): owner = {} tax_identifier = key['tax_identifier'] if tax_identifier: owner = tax_identifier.es_sii_values() owner['NombreRazon'] = key['company'].rec_name[:120] return { 'IDVersionSii': '1.1', 'Titular': owner, 'TipoComunicacion': key['communication_type'], } @classmethod def tax_grouping_key(cls, tax_line): pool = Pool() ModelData = pool.get('ir.model.data') if not tax_line.tax: return tuple() tax = tax_line.tax if tax.es_reported_with: tax = tax.es_reported_with if not tax.es_sii_operation_key: return tuple() invoice = tax_line.move_line.move.origin product_type = '' if invoice.es_sii_product_type_detail and tax.group: if tax.group.id == ModelData.get_id( 'account_es', 'tax_group_sale'): product_type = 'Entrega' elif tax.group.id == ModelData.get_id( 'account_es', 'tax_group_sale_service'): product_type = 'PrestacionServicios' return ( ('cuota_suffix', ( 'Repercutida' if invoice.type == 'out' else 'Soportada')), ('sii_key', tax.es_sii_tax_key or ''), ('operation_key', tax.es_sii_operation_key or ''), ('excluded', bool(tax.es_exclude_from_sii)), ('product_key', product_type), ) @classmethod def tax_detail_grouping_key(cls, tax_line): if not tax_line.tax: return tuple() tax = tax_line.tax if tax.es_reported_with: tax = tax.es_reported_with if not tax.es_sii_operation_key: return tuple() return ( ('rate', str((tax.rate * 100).quantize(Decimal('0.01')))), ) @classmethod def get_tax_values(cls, key, tax_lines): if not key or key.get('excluded'): return base_amount = sum( t.amount for t in tax_lines if t.type == 'base' and not t.tax.es_reported_with) tax_amount = sum( t.amount for t in tax_lines if t.type == 'tax' and not t.tax.es_reported_with) values = { 'BaseImponible': base_amount, 'TipoImpositivo': key['rate'], 'Cuota%s' % key['cuota_suffix']: tax_amount, } surcharge_taxes = list(t for t in tax_lines if t.type == 'tax' and t.tax.es_reported_with) if surcharge_taxes: values['CuotaRecargoEquivalencia'] = ( sum(t.amount for t in surcharge_taxes)) values['TipoRecargoEquivalencia'] = str( (surcharge_taxes[0].tax.rate * 100).normalize()) return values def get_out_invoice_details(self, key, values): key = dict(key) sii_key = key['sii_key'] subject_key = 'Sujeta' if sii_key[0] == 'E': values = { 'Exenta': { 'DetalleExenta': { 'CausaExencion': sii_key, 'BaseImponible': sum(v['BaseImponible'] for v in values), }, }, } elif sii_key[0] == 'S': values = { 'NoExenta': { 'TipoNoExenta': sii_key, 'DesgloseIVA': { 'DetalleIVA': values, }, }, } else: subject_key = 'NoSujeta' non_subject_key = ('ImporteTAIReglasLocalizacion' if sii_key == 'NSTAI' else 'ImportePorArticulos7_14_Otros') values = { non_subject_key: sum(v['BaseImponible'] for v in values), } detail_key = (subject_key, key['product_key']) return detail_key, values def get_invoice_detail(self, tax_values, operation_keys, total_amount, tax_amount): counterpart = {} invoice_type = self.invoice_type tax_identifier = self.invoice.es_sii_party_tax_identifier if tax_identifier: counterpart = tax_identifier.es_sii_values() counterpart['NombreRazon'] = self.invoice.party.rec_name[:120] detail = { 'TipoFactura': invoice_type, 'DescripcionOperacion': self.operation_description[:500], 'RefExterna': self.invoice.rec_name[:60], 'Contraparte': counterpart, 'ImporteTotal': str(total_amount), # XXX: Set FechaOperacion from stock moves } if invoice_type.startswith('R'): detail['TipoRectificativa'] = 'I' for idx, value in enumerate(operation_keys): assert idx <= 2 key = 'ClaveRegimenEspecialOTrascendencia' if idx: key = '%sAdicional%d' % (key, idx) detail[key] = value if self.invoice.type == 'out': invoice_details = defaultdict(list) for key, values in tax_values.items(): detail_key, detail_values = self.get_out_invoice_details( key, values) invoice_details[detail_key].append(detail_values) detail['TipoDesglose'] = {} if self.invoice.es_sii_product_type_detail: detail['TipoDesglose'] = { 'DesgloseTipoOperacion': {}, } for key, invoice_detail in invoice_details.items(): subject_key, product_key = key detail['TipoDesglose']['DesgloseTipoOperacion'][ product_key] = { subject_key: invoice_detail, } else: for key, invoice_detail in invoice_details.items(): subject_key, _ = key detail['TipoDesglose'] = { 'DesgloseFactura': { subject_key: invoice_detail, }, } else: detail['DesgloseFactura'] = { # XXX: InversionSujetoPasivo 'DesgloseIVA': { 'DetalleIVA': list(chain(*tax_values.values())), }, } detail['FechaRegContable'] = self.invoice.move.post_date.strftime( '%d-%m-%Y') detail['CuotaDeducible'] = str(tax_amount) return detail def get_invoice_payload(self): # Use taxes from move lines to have amount in company currency tax_lines = list(chain(*( l.tax_lines for l in self.invoice.move.lines))) tax_lines = sorted(tax_lines, key=self.tax_grouping_key) tax_values = defaultdict(list) operation_keys = set() total_amount = Decimal(0) tax_amount = Decimal(0) for tax_key, tax_lines in groupby( tax_lines, key=self.tax_grouping_key): tax_lines = list(tax_lines) if tax_key and not dict(tax_key).get('excluded'): base_lines = set() for tax_line in tax_lines: # Do not duplicate base for lines with multiple taxes if (tax_line.type == 'base' and tax_line.move_line.id not in base_lines): total_amount += tax_line.amount base_lines.add(tax_line.move_line.id) if tax_line.type == 'tax': total_amount += tax_line.amount tax_lines = sorted(tax_lines, key=self.tax_detail_grouping_key) for detail_key, tax_lines in groupby( tax_lines, key=self.tax_detail_grouping_key): key = dict(tax_key + detail_key) values = self.get_tax_values(key, list(tax_lines)) if not values: continue operation_keys.add(key['operation_key']) tax_amount += values["Cuota%s" % key['cuota_suffix']] tax_values[tax_key].append(values) return self.get_invoice_detail( tax_values, operation_keys, total_amount, tax_amount) def get_payload(self): if self.invoice.type == 'in': tax_identifier = self.invoice.es_sii_party_tax_identifier number = self.invoice.reference or self.invoice.number or '' else: tax_identifier = self.invoice.tax_identifier number = self.invoice.number or '' date = self.invoice.invoice_date payload = { 'PeriodoLiquidacion': { 'Ejercicio': "{:04}".format(self.invoice.move.date.year), 'Periodo': "{:02}".format(self.invoice.move.date.month), }, 'IDFactura': { 'IDEmisorFactura': (tax_identifier.es_sii_values() if tax_identifier else {}), 'NumSerieFacturaEmisor': number[-60:], 'FechaExpedicionFacturaEmisor': date.strftime('%d-%m-%Y'), }, } invoice_payload = self.get_invoice_payload() if self.invoice.type == 'in': payload['FacturaRecibida'] = invoice_payload else: payload['FacturaExpedida'] = invoice_payload return payload