# This file is part of Tryton. The COPYRIGHT file at the top level of # this repository contains the full copyright notices and license terms. import bisect import datetime as dt import mimetypes import os from base64 import b64decode, b64encode from collections import namedtuple from decimal import Decimal from io import BytesIO from itertools import chain, groupby from operator import itemgetter import genshi import genshi.template # XXX fix: https://genshi.edgewall.org/ticket/582 from genshi.template.astutil import ASTCodeGenerator, ASTTransformer from lxml import etree from trytond.i18n import gettext from trytond.model import Model from trytond.modules.product import round_price from trytond.pool import Pool, PoolMeta from trytond.rpc import RPC from trytond.tools import cached_property, slugify, sortable_values from trytond.transaction import Transaction from .exceptions import InvoiceError from .party import ISO6523_TYPES ISO6523 = {v: k for k, v in ISO6523_TYPES.items()} if not hasattr(ASTCodeGenerator, 'visit_NameConstant'): def visit_NameConstant(self, node): if node.value is None: self._write('None') elif node.value is True: self._write('True') elif node.value is False: self._write('False') else: raise Exception("Unknown NameConstant %r" % (node.value,)) ASTCodeGenerator.visit_NameConstant = visit_NameConstant if not hasattr(ASTTransformer, 'visit_NameConstant'): # Re-use visit_Name because _clone is deleted ASTTransformer.visit_NameConstant = ASTTransformer.visit_Name loader = genshi.template.TemplateLoader( os.path.join(os.path.dirname(__file__), 'template'), auto_reload=True) def remove_comment(stream): for kind, data, pos in stream: if kind is genshi.core.COMMENT: continue yield kind, data, pos class Invoice(Model): __name__ = 'edocument.ubl.invoice' __slots__ = ('invoice',) @classmethod def __setup__(cls): super().__setup__() cls.__rpc__.update({ 'render': RPC(instantiate=0), 'parse': RPC(readonly=False, result=int), }) def __init__(self, invoice): pool = Pool() Invoice = pool.get('account.invoice') super().__init__() if int(invoice) >= 0: invoice = Invoice(int(invoice)) with Transaction().set_context(language=invoice.party_lang): self.invoice = invoice.__class__(int(invoice)) else: self.invoice = invoice def render(self, template, specification=None): if self.invoice.state not in {'posted', 'paid'}: raise ValueError("Invoice must be posted") tmpl = self._get_template(template) if not tmpl: raise NotImplementedError return (tmpl.generate( this=self, specification=specification, Decimal=Decimal) .filter(remove_comment) .render() .encode('utf-8')) def _get_template(self, version): if self.invoice.sequence_type == 'credit_note': return loader.load(os.path.join(version, 'CreditNote.xml')) else: return loader.load(os.path.join(version, 'Invoice.xml')) @property def filename(self): return f'{slugify(self.invoice.rec_name)}.xml' @cached_property def type_code(self): if self.invoice.type == 'out': if self.invoice.sequence_type == 'credit_note': return '381' else: return '380' else: return '389' @property def additional_documents(self): pool = Pool() InvoiceReport = pool.get('account.invoice', type='report') oext, content, _, filename = InvoiceReport.execute( [self.invoice.id], {}) filename = f'{filename}.{oext}' mimetype = mimetypes.guess_type(filename)[0] yield { 'id': self.invoice.number, 'type': 'binary', 'binary': b64encode(content).decode(), 'mimetype': mimetype, 'filename': filename, } @cached_property def accounting_supplier_party(self): if self.invoice.type == 'out': return self.invoice.company.party else: return self.invoice.party @cached_property def accounting_supplier_address(self): if self.invoice.type == 'out': return self.invoice.company.party.address_get('invoice') else: return self.invoice.invoice_address @cached_property def accounting_supplier_tax_identifier(self): if self.invoice.type == 'out': return self.invoice.tax_identifier else: return self.invoice.party_tax_identifier @cached_property def accounting_customer_party(self): if self.invoice.type == 'out': return self.invoice.party else: return self.invoice.company.party @cached_property def accounting_customer_address(self): if self.invoice.type == 'out': return self.invoice.invoice_address else: return self.invoice.company.party.address_get('invoice') @cached_property def accounting_customer_tax_identifier(self): if self.invoice.type == 'out': return self.invoice.party_tax_identifier else: return self.invoice.tax_identifier @cached_property def sale_reference(self): if hasattr(self.invoice, 'sales'): return ','.join(s.reference for s in self.invoice.sales) @cached_property def sale_number(self): if hasattr(self.invoice, 'sales'): return ','.join(s.number for s in self.invoice.sales) @property def taxes(self): class Tax(namedtuple( 'Tax', ('type', 'rate', 'unece_category_code', 'unece_code', 'legal_notice'))): @classmethod def from_line(cls, line): return Tax( type=line.tax.type, rate=line.tax.rate, unece_category_code=line.tax.unece_category_code, unece_code=line.tax.unece_code, legal_notice=line.legal_notice or '') TaxLine = namedtuple('TaxLine', ('base', 'amount', 'tax')) for group, lines in groupby(sorted( self.invoice.taxes, key=sortable_values(self._taxes_key)), key=self._taxes_key): tax_lines, tax_amount = [], 0 for tax, lines in groupby(sorted( lines, key=sortable_values(Tax.from_line)), key=Tax.from_line): amount = base = 0 for line in lines: tax_amount += line.amount base += line.base amount += line.amount tax_lines.append(TaxLine(base=base, amount=amount, tax=tax)) yield group, tax_lines, amount def _taxes_key(self, line): return () @cached_property def lines(self): return [l for l in self.invoice.lines if l.type == 'line'] @classmethod def parse(cls, document): pool = Pool() Attachment = pool.get('ir.attachment') tree = etree.parse(BytesIO(document)) root = tree.getroot() namespace = root.nsmap.get(root.prefix) invoice, attachments = cls.parser(namespace)(root) invoice.save() invoice.update_taxes() cls.checker(namespace)(root, invoice) attachments = list(attachments) for attachment in attachments: attachment.resource = invoice Attachment.save(attachments) return invoice @classmethod def parser(cls, namespace): return { 'urn:oasis:names:specification:ubl:schema:xsd:Invoice-2': ( cls._parse_invoice_2), 'urn:oasis:names:specification:ubl:schema:xsd:CreditNote-2': ( cls._parse_credit_note_2), }.get(namespace) @classmethod def checker(cls, namespace): return { 'urn:oasis:names:specification:ubl:schema:xsd:Invoice-2': ( cls._check_invoice_2), 'urn:oasis:names:specification:ubl:schema:xsd:CreditNote-2': ( cls._check_credit_note_2), }.get(namespace) @classmethod def _parse_invoice_2(cls, root): pool = Pool() Invoice = pool.get('account.invoice') Currency = pool.get('currency.currency') type_code = root.findtext('./{*}InvoiceTypeCode') if type_code and type_code != '380': raise InvoiceError(gettext( 'edocument_ubl.msg_invoice_type_code_unsupported', type_code=type_code)) invoice = Invoice(type='in') invoice.reference = root.findtext('./{*}ID') invoice.invoice_date = dt.date.fromisoformat( root.findtext('./{*}IssueDate')) invoice.party = cls._parse_2_supplier( root.find('./{*}AccountingSupplierParty'), create=True) invoice.set_journal() invoice.on_change_party() invoice.invoice_address = cls._parse_2_address( root.find( './{*}AccountingSupplierParty/{*}Party/{*}PostalAddress'), party=invoice.party) invoice.party_tax_identifier = cls._parse_2_tax_identifier( root.findall( './{*}AccountingSupplierParty/{*}Party/{*}PartyTaxScheme'), party=invoice.party, create=True) if (seller := root.find('./{*}SellerSupplierParty')) is not None: supplier = cls._parse_2_supplier(seller) else: supplier = invoice.party if (customer_party := root.find('./{*}AccountingCustomerParty') ) is not None: invoice.company = cls._parse_2_company(customer_party) else: invoice.company = Invoice.default_company() if not invoice.company: raise InvoiceError(gettext( 'edocument_ubl.msg_company_not_found', company=etree.tostring( customer_party, pretty_print=True).decode() if customer_party else '')) if (payee_party := root.find('./{*}PayeeParty')) is not None: party = cls._parse_2_party(payee_party) if not party: party = cls._create_2_party(payee_party) party.save() invoice.alternative_payees = [party] currency_code = root.findtext('./{*}DocumentCurrencyCode') if not currency_code: payable_amount = root.find( './{*}LegalMonetaryTotal/{*}PayableAmount') currency_code = payable_amount.get('currencyID') if currency_code is not None: try: invoice.currency, = Currency.search([ ('code', '=', currency_code), ], limit=1) except ValueError: raise InvoiceError(gettext( 'edocument_ubl.msg_currency_not_found', code=currency_code)) invoice.payment_term_date = cls._parse_2_payment_term_date( root.findall('./{*}PaymentTerms')) lines = [ cls._parse_invoice_2_line( line, company=invoice.company, currency=invoice.currency, supplier=supplier) for line in root.iterfind('./{*}InvoiceLine')] lines += [ cls._parse_invoice_2_allowance_charge( allowance_charge, company=invoice.company, currency=invoice.currency, supplier=supplier) for allowance_charge in root.iterfind('./{*}AllowanceCharge')] invoice.lines = lines invoice.taxes = [ cls._parse_2_tax( tax, company=invoice.company) for tax in root.iterfind('./{*}TaxTotal/{*}TaxSubtotal')] if (hasattr(Invoice, 'cash_rounding') and (root.find( './{*}LegalMonetaryTotal/{*}PayableRoundingAmount') is not None)): invoice.cash_rounding = True return invoice, cls._parse_2_attachments(root) @classmethod def _parse_invoice_2_line( cls, invoice_line, company, currency, supplier=None): pool = Pool() Line = pool.get('account.invoice.line') UoM = pool.get('product.uom') Tax = pool.get('account.tax') AccountConfiguration = pool.get('account.configuration') account_configuration = AccountConfiguration(1) line = Line( type='line', company=company, currency=currency, invoice_type='in') if (invoiced_quantity := invoice_line.find('./{*}InvoicedQuantity') ) is not None: line.quantity = float(invoiced_quantity.text) if (unit_code := invoiced_quantity.get('unitCode')) not in { None, 'ZZ', 'XZZ'}: try: line.unit, = UoM.search([ ('unece_code', '=', unit_code), ], limit=1) except ValueError: raise InvoiceError(gettext( 'edocument_ubl.msg_unit_not_found', code=unit_code)) else: line.unit = None else: line.quantity = 1 line.unit = None line.product = cls._parse_2_item( invoice_line.find('./{*}Item'), supplier=supplier) if line.product: line.on_change_product() line.description = '\n'.join(e.text for e in chain( invoice_line.iterfind('./{*}Item/{*}Name'), invoice_line.iterfind('./{*}Item/{*}BrandName'), invoice_line.iterfind('./{*}Item/{*}ModelName'), invoice_line.iterfind('./{*}Item/{*}Description'), invoice_line.iterfind( './{*}Item/{*}AdditionalInformation'), invoice_line.iterfind('./{*}Item/{*}WarrantyInformation'), ) if e is not None and e.text) if not line.product: if line.description: similar_domain = [ ('description', 'ilike', line.description), ('invoice.company', '=', company), ('invoice.type', '=', 'in'), ('invoice.state', 'in', ['validated', 'posted', 'paid']), ] if line.unit: similar_domain.append( ('unit.category', '=', line.unit.category)) similar_lines = Line.search( similar_domain, order=[('invoice.invoice_date', 'DESC')], limit=1) else: similar_lines = [] if similar_lines: similar_line, = similar_lines line.account = similar_line.account line.product = similar_line.product if not line.unit: line.unit = similar_line.unit else: line.account = account_configuration.get_multivalue( 'default_category_account_expense', company=company.id) for line_reference in invoice_line.iterfind('./{*}OrderLineReference'): line.origin = cls._parse_2_line_reference( line_reference, line, company, supplier=supplier) if line.origin: break line.unit_price = round_price( Decimal(invoice_line.findtext('./{*}LineExtensionAmount')) / Decimal(str(line.quantity))) if invoice_line.find('./{*}Item/{*}ClassifiedTaxCategory') is not None: tax_categories = invoice_line.iterfind( './{*}Item/{*}ClassifiedTaxCategory') else: tax_categories = invoice_line.iterfind( './{*}TaxTotal/{*}TaxSubtotal/{*}TaxCategory') taxes = [] for tax_category in tax_categories: domain = cls._parse_2_tax_category(tax_category) domain.extend([ ['OR', ('group', '=', None), ('group.kind', 'in', ['purchase', 'both']), ], ('company', '=', company.id), ]) try: tax, = Tax.search(domain, limit=1) except ValueError: raise InvoiceError(gettext( 'edocument_ubl.msg_tax_not_found', tax_category=etree.tostring( tax_category, pretty_print=True).decode())) taxes.append(tax) line.taxes = taxes return line @classmethod def _parse_invoice_2_allowance_charge( cls, allowance_charge, company, currency, supplier=None): pool = Pool() AccountConfiguration = pool.get('account.configuration') Line = pool.get('account.invoice.line') Tax = pool.get('account.tax') account_configuration = AccountConfiguration(1) line = Line( type='line', company=company, currency=currency, invoice_type='in') indicator = allowance_charge.findtext('./{*}ChargeIndicator') line.quantity = {'true': 1, 'false': -1}[indicator] line.account = account_configuration.get_multivalue( 'default_category_account_expense', company=company.id) line.description = allowance_charge.findtext( './{*}AllowanceChargeReason') line.unit_price = round_price(Decimal(allowance_charge.findtext( './{*}Amount'))) taxes = [] tax_categories = allowance_charge.iterfind('./{*}TaxCategory') for tax_category in tax_categories: domain = cls._parse_2_tax_category(tax_category) domain.extend([ ['OR', ('group', '=', None), ('group.kind', 'in', ['purchase', 'both']), ], ('company', '=', company.id), ]) try: tax, = Tax.search(domain, limit=1) except ValueError: raise InvoiceError(gettext( 'edocument_ubl.msg_tax_not_found', tax_category=etree.tostring( tax_category, pretty_print=True).decode())) taxes.append(tax) line.taxes = taxes return line @classmethod def _parse_credit_note_2(cls, root): pool = Pool() Invoice = pool.get('account.invoice') Currency = pool.get('currency.currency') type_code = root.findtext('./{*}CreditNoteTypeCode') if type_code and type_code != '381': raise InvoiceError(gettext( 'edocument_ubl.msg_credit_note_type_code_unsupported', type_code=type_code)) invoice = Invoice(type='in') invoice.reference = root.findtext('./{*}ID') invoice.invoice_date = dt.date.fromisoformat( root.findtext('./{*}IssueDate')) invoice.party = cls._parse_2_supplier( root.find('./{*}AccountingSupplierParty'), create=True) invoice.set_journal() invoice.on_change_party() if (seller := root.find('./{*}SellerSupplierParty')) is not None: supplier = cls._parse_2_supplier(seller) else: supplier = invoice.party if (customer_party := root.find('./{*}AccountingCustomerParty') ) is not None: invoice.company = cls._parse_2_company(customer_party) else: invoice.company = Invoice.default_company() if (payee_party := root.find('./{*}PayeeParty')) is not None: party = cls._parse_2_party(payee_party) if not party: party = cls._create_2_party(payee_party) party.save() invoice.alternative_payees = [party] if (currency_code := root.findtext('./{*}DocumentCurrencyCode') ) is not None: try: invoice.currency, = Currency.search([ ('code', '=', currency_code), ], limit=1) except ValueError: raise InvoiceError(gettext( 'edocument_ubl.msg_currency_not_found', code=currency_code)) invoice.payment_term_date = cls._parse_2_payment_term_date( root.findall('./{*}PaymentTerms')) lines = [ cls._parse_credit_note_2_line( line, company=invoice.company, currency=invoice.currency, supplier=supplier) for line in root.iterfind('./{*}CreditNoteLine')] lines += [ cls._parse_credit_note_2_allowance_charge( allowance_charge, company=invoice.company, currency=invoice.currency, supplier=supplier) for allowance_charge in root.iterfind('./{*}AllowanceCharge')] invoice.lines = lines invoice.taxes = [ cls._parse_2_tax( tax, company=invoice.company) for tax in root.iterfind('./{*}TaxTotal/{*}TaxSubtotal')] if (hasattr(Invoice, 'cash_rounding') and root.find( './{*}LegalMonetaryTotal/{*}PayableRoundingAmount') is not None): invoice.cash_rounding = True return invoice, cls._parse_2_attachments(root) @classmethod def _parse_credit_note_2_line( cls, credit_note_line, company, currency, supplier=None): pool = Pool() Line = pool.get('account.invoice.line') UoM = pool.get('product.uom') Tax = pool.get('account.tax') AccountConfiguration = pool.get('account.configuration') account_configuration = AccountConfiguration(1) line = Line( type='line', company=company, currency=currency, invoice_type='in') if (credited_quantity := credit_note_line.find('./{*}CreditedQuantity') ) is not None: line.quantity = -float(credited_quantity.text) if (unit_code := credited_quantity.get('unitCode')) not in { None, 'ZZ', 'XZZ'}: try: line.unit, = UoM.search([ ('unece_code', '=', unit_code), ], limit=1) except ValueError: raise InvoiceError(gettext( 'edocument_ubl.msg_unit_not_found', code=unit_code)) else: line.unit = None else: line.quantity = -1 line.unit = None line.product = cls._parse_2_item( credit_note_line.find('./{*}Item'), supplier=supplier) if line.product: line.on_change_product() line.description = '\n'.join(e.text for e in chain( credit_note_line.iterfind('./{*}Item/{*}Name'), credit_note_line.iterfind('./{*}Item/{*}BrandName'), credit_note_line.iterfind('./{*}Item/{*}ModelName'), credit_note_line.iterfind('./{*}Item/{*}Description'), credit_note_line.iterfind( './{*}Item/{*}AdditionalInformation'), credit_note_line.iterfind('./{*}Item/{*}WarrantyInformation'), ) if e is not None and e.text) if not line.product: if line.description: similar_domain = [ ('description', 'ilike', line.description), ('invoice.company', '=', company), ('invoice.type', '=', 'in'), ('invoice.state', 'in', ['validated', 'posted', 'paid']), ] if line.unit: similar_domain.append( ('unit.category', '=', line.unit.category)) similar_lines = Line.search( similar_domain, order=[('invoice.invoice_date', 'DESC')], limit=1) else: similar_lines = [] if similar_lines: similar_line, = similar_lines line.account = similar_line.account line.product = similar_line.product if not line.unit: line.unit = similar_line.unit else: line.account = account_configuration.get_multivalue( 'default_category_account_expense', company=company.id) for line_reference in credit_note_line.iterfind( './{*}OrderLineReference'): line.origin = cls._parse_2_line_reference( line_reference, line, company, supplier=supplier) if line.origin: break line.unit_price = round_price( -Decimal(credit_note_line.findtext('./{*}LineExtensionAmount')) / Decimal(str(line.quantity))) if (credit_note_line.find('./{*}Item/{*}ClassifiedTaxCategory') is not None): tax_categories = credit_note_line.iterfind( './{*}Item/{*}ClassifiedTaxCategory') else: tax_categories = credit_note_line.iterfind( './{*}TaxTotal/{*}TaxSubtotal/{*}TaxCategory') taxes = [] for tax_category in tax_categories: domain = cls._parse_2_tax_category(tax_category) domain.extend([ ['OR', ('group', '=', None), ('group.kind', 'in', ['purchase', 'both']), ], ('company', '=', company.id), ]) try: tax, = Tax.search(domain, limit=1) except ValueError: raise InvoiceError(gettext( 'edocument_ubl.msg_tax_not_found', tax_category=etree.tostring( tax_category, pretty_print=True).decode())) taxes.append(tax) line.taxes = taxes return line @classmethod def _parse_credit_note_2_allowance_charge( cls, allowance_charge, company, currency, supplier=None): pool = Pool() AccountConfiguration = pool.get('account.configuration') Line = pool.get('account.invoice.line') Tax = pool.get('account.tax') account_configuration = AccountConfiguration(1) line = Line( type='line', company=company, currency=currency, invoice_type='in') indicator = allowance_charge.findtext('./{*}ChargeIndicator') line.quantity = {'true': -1, 'false': 1}[indicator] line.account = account_configuration.get_multivalue( 'default_category_account_expense', company=company.id) line.description = allowance_charge.findtext( './{*}AllowanceChargeReason') line.unit_price = round_price(Decimal(allowance_charge.findtext( './{*}Amount'))) taxes = [] tax_categories = allowance_charge.iterfind('./{*}TaxCategory') for tax_category in tax_categories: domain = cls._parse_2_tax_category(tax_category) domain.extend([ ['OR', ('group', '=', None), ('group.kind', 'in', ['purchase', 'both']), ], ('company', '=', company.id), ]) try: tax, = Tax.search(domain, limit=1) except ValueError: raise InvoiceError(gettext( 'edocument_ubl.msg_tax_not_found', tax_category=etree.tostring( tax_category, pretty_print=True).decode())) taxes.append(tax) line.taxes = taxes return line @classmethod def _parse_2_supplier(cls, supplier_party, create=False): pool = Pool() Party = pool.get('party.party') for account_id in filter(None, chain( [supplier_party.find('./{*}CustomerAssignedAccountID')], supplier_party.iterfind('./{*}AdditionalAccountID'))): if account_id.text: try: party, = Party.search([ ('code', '=', account_id.text), ]) except ValueError: pass else: return party party_el = supplier_party.find('./{*}Party') party = cls._parse_2_party(party_el) if not party and create: party = cls._create_2_party(party_el) party.save() return party @classmethod def _parse_2_party(cls, party_el): pool = Pool() Party = pool.get('party.party') for identifier in chain( party_el.iterfind('./{*}PartyIdentification/{*}ID'), party_el.iterfind('./{*}PartyLegalEntity/{*}CompanyID'), party_el.iterfind('./{*}PartyTaxScheme/{*}CompanyID'), ): if identifier.text: domain = [ ('code', '=', identifier.text), ] if schemeId := identifier.get('schemeID'): if type := ISO6523.get(schemeId): domain.append(('type', '=', type)) parties = Party.search([ ('identifiers', 'where', domain), ]) if len(parties) == 1: party, = parties return party @classmethod def _create_2_party(cls, party_el): pool = Pool() Party = pool.get('party.party') party = Party() party.name = ( party_el.findtext('./{*}PartyLegalEntity/{*}RegistrationName') or party_el.findtext('./{*}PartyName/{*}Name')) identifiers = [] identifiers_done = set() for identifier in chain( party_el.iterfind('./{*}PartyIdentification/{*}ID'), party_el.iterfind('./{*}PartyTaxScheme/{*}CompanyID'), party_el.iterfind('./{*}PartyLegalEntity/{*}CompanyID'), ): if identifier.text: identifier_key = (identifier.text, identifier.get('schemeID')) if identifier_key not in identifiers_done: identifiers.append(cls._create_2_party_identifier( identifier)) identifiers_done.add(identifier_key) party.identifiers = identifiers if (address := party_el.find('./{*}PostalAddress') ) is not None: party.addresses = [cls._create_2_address(address)] return party @classmethod def _create_2_party_identifier(cls, identifier): pool = Pool() Identifier = pool.get('party.identifier') if schemeId := identifier.get('schemeID'): type = ISO6523.get(schemeId) else: type = None return Identifier(type=type, code=identifier.text) @classmethod def _parse_2_address(cls, address_el, party): pool = Pool() Address = pool.get('party.address') address = cls._create_2_address(address_el) domain = [('party', '=', party)] for fname in Address._fields: if value := getattr(address, fname, None): domain.append((fname, '=', value)) try: address, = Address.search(domain, limit=1) except ValueError: address.party = party address.save() return address @classmethod def _create_2_address(cls, address_el): pool = Pool() Address = pool.get('party.address') Country = pool.get('country.country') address = Address() if address_el is None: return address address.post_box = address_el.findtext('./{*}Postbox') address.floor_number = address_el.findtext('./{*}Floor') address.room_number = address_el.findtext('./{*}Room') address.street_name = address_el.findtext('./{*}StreetName') address.building_name = address_el.findtext('./{*}BuildingName') address.building_number = address_el.findtext('./{*}BuildingNumber') address.city = address_el.findtext('./{*}CityName') address.postal_code = address_el.findtext('./{*}PostalZone') if (country_code := address_el.findtext( './{*}Country/{*}IdentificationCode[@listId="ISO3166-1"]') ) is not None: try: country, = Country.search([ ('code', '=', country_code), ], limit=1) except ValueError: pass else: address.country = country address.street_unstructured = '\n'.join( (line.text for line in address_el.iterfind('./{*}AddressLine/{*}Line'))) return address @classmethod def _parse_2_tax_identifier(cls, party_tax_schemes, party, create=False): pool = Pool() Identifier = pool.get('party.identifier') tax_identifier_types = party.tax_identifier_types() for party_tax_scheme in party_tax_schemes: company_id = party_tax_scheme.find('./{*}CompanyID') if company_id is not None: scheme_id = company_id.get('schemeID') value = company_id.text for identifier in party.identifiers: if (identifier.type in tax_identifier_types and identifier.iso_6523 == scheme_id and identifier.code == value): return identifier else: if create and scheme_id in ISO6523: identifier = Identifier( party=party, type=ISO6523[scheme_id], code=value) identifier.save() return identifier @classmethod def _parse_2_company(cls, customer_party): pool = Pool() Company = pool.get('company.company') try: CustomerCode = pool.get('party.party.customer_code') except KeyError: CustomerCode = None if CustomerCode: for account_id in filter(None, chain( [customer_party.find('./{*}CustomerAssignedAccountID'), customer_party.find( './{*}SupplierAssignedAccountID')], customer_party.iterfind('./{*}AdditionalAccountID'))): if account_id.text: try: customer_code, = CustomerCode.search([ ('customer_code', '=', account_id.text), ]) except ValueError: pass else: return customer_code.company party_el = customer_party.find('./{*}Party') for identifier in chain( party_el.iterfind('./{*}PartyIdentification/{*}ID'), party_el.iterfind('./{*}PartyTaxScheme/{*}CompanyID'), party_el.iterfind('./{*}PartyLegalEntity/{*}CompanyID'), ): if identifier.text: domain = [ ('code', '=', identifier.text), ] if schemeId := identifier.get('schemeID'): if type := ISO6523.get(schemeId): domain.append(('type', '=', type)) companies = Company.search([ ('party.identifiers', 'where', domain), ]) if len(companies) == 1: company, = companies return company for name in chain( party_el.iterfind('./{*}PartyTaxScheme/{*}RegistrationName'), party_el.iterfind('./{*}PartyName/{*}Name'), ): if name is None: continue companies = Company.search([ ('party.name', '=', name.text), ]) if len(companies) == 1: company, = companies return company @classmethod def _parse_2_payment_term_date(cls, payment_terms): dates = [] for payment_term in payment_terms: if (date := payment_term.findtext('./{*}PaymentDueDate') ) is not None: dates.append(dt.date.fromisoformat(date)) return min(dates, default=None) @classmethod def _parse_2_item(cls, item, supplier=None): pool = Pool() Product = pool.get('product.product') if (identifier := item.find('./{*}StandardItemIdentification/{*}ID') ) is not None: if identifier.get('schemeID') == 'GTIN': try: product, = Product.search([ ('identifiers', 'where', [ ('type', 'in', ['ean', 'isbn', 'ismn']), ('code', '=', identifier.text), ]), ], limit=1) except ValueError: pass else: return product if (code := item.findtext('./{*}BuyersItemIdentification/{*}ID') ) is not None: try: product, = Product.search([ ('code', '=', code), ], limit=1) except ValueError: pass else: return product @classmethod def _parse_2_line_reference( cls, line_reference, line, company, supplier=None): return @classmethod def _parse_2_tax_category(cls, tax_category): domain = [ ('parent', '=', None), ] if (unece_category_code := tax_category.findtext('./{*}ID') ) is not None: domain.append(('unece_category_code', '=', unece_category_code)) if (unece_code := tax_category.findtext('./{*}TaxScheme/{*}ID') ) is not None: domain.append(('unece_code', '=', unece_code)) percent = tax_category.findtext('./{*}Percent') if percent: domain.append(('type', '=', 'percentage')) domain.append(('rate', '=', Decimal(percent) / 100)) return domain @classmethod def _parse_2_tax(cls, tax, company): pool = Pool() Tax = pool.get('account.tax') InvoiceTax = pool.get('account.invoice.tax') invoice_tax = InvoiceTax(manual=False) tax_category = tax.find('./{*}TaxCategory') domain = cls._parse_2_tax_category(tax_category) domain.extend([ ['OR', ('group', '=', None), ('group.kind', 'in', ['purchase', 'both']), ], ('company', '=', company.id), ]) try: invoice_tax.tax, = Tax.search(domain, limit=1) except ValueError: raise InvoiceError(gettext( 'edocument_ubl.msg_tax_not_found', tax_category=etree.tostring( tax_category, pretty_print=True).decode())) invoice_tax.amount = Decimal(tax.findtext('./{*}TaxAmount')) if (taxable_amount := tax.findtext('./{*}TaxableAmount')) is not None: invoice_tax.base = Decimal(taxable_amount) else: # Use tax amount to define the sign of unknown base invoice_tax.base = invoice_tax.amount invoice_tax.on_change_tax() return invoice_tax @classmethod def _parse_2_attachments(cls, root): pool = Pool() Attachment = pool.get('ir.attachment') for name in [ 'DespatchDocumentReference', 'ReceiptDocumentReference', 'ContractDocumentReference', 'AdditionalDocumentReference', 'StatementDocumentReference', 'OriginatorDocumentReference' ]: for document in root.iterfind(f'./{{*}}{name}'): attachment = Attachment() name = ' '.join(filter(None, [ document.findtext('./{*}DocumentType'), document.findtext('./{*}ID'), ])) if (data := document.find( './{*}Attachment/{*}EmbeddedDocumentBinaryObject') ) is not None: mime_code = ( data.get('mimeCode') or 'application/octet-stream') name += mimetypes.guess_extension(mime_code) or '' attachment.type = 'data' data = b64decode(data.text) attachment.data = data elif data := document.findtext( './{*}Attachment/{*}EmbeddedDocument'): name += '.txt' attachment.type = 'data' attachment.data = data elif url := document.findtext( './{*}Attachment/{*}ExternalReference/{*}URI'): attachment.type = 'link' Attachment.link = url attachment.name = name yield attachment @classmethod def _check_invoice_2(cls, root, invoice): pool = Pool() Lang = pool.get('ir.lang') lang = Lang.get() payable_amount = Decimal( root.findtext('./{*}LegalMonetaryTotal/{*}PayableAmount')) prepaid_amount = Decimal( root.findtext('./{*}LegalMonetaryTotal/{*}PrepaidAmount') or 0) amount = payable_amount + prepaid_amount if not getattr(invoice, 'cash_rounding', False): payable_rounding_amount = Decimal( root.findtext( './{*}LegalMonetaryTotal/{*}PayableRoundingAmount') or 0) amount -= payable_rounding_amount if invoice.total_amount != amount: raise InvoiceError(gettext( 'edocument_ubl.msg_invoice_total_amount_different', invoice=invoice.rec_name, total_amount=lang.format_number(invoice.total_amount), amount=lang.format_number(amount))) tax_total = sum(Decimal(amount.text) for amount in root.iterfind( './{*}TaxTotal/{*}TaxAmount')) if invoice.tax_amount != tax_total: raise InvoiceError(gettext( 'edocument_ubl.msg_invoice_tax_amount_different', invoice=invoice.rec_name, tax_amount=lang.format_number(invoice.tax_amount), tax_total=lang.format_number(tax_total))) @classmethod def _check_credit_note_2(cls, root, invoice): pool = Pool() Lang = pool.get('ir.lang') lang = Lang.get() payable_amount = Decimal( root.findtext('./{*}LegalMonetaryTotal/{*}PayableAmount')) prepaid_amount = Decimal( root.findtext('./{*}LegalMonetaryTotal/{*}PrepaidAmount') or 0) amount = payable_amount + prepaid_amount if not getattr(invoice, 'cash_rounding', False): payable_rounding_amount = Decimal( root.findtext( './{*}LegalMonetaryTotal/{*}PayableRoundingAmount') or 0) amount -= payable_rounding_amount if -invoice.total_amount != amount: raise InvoiceError(gettext( 'edocument_ubl.msg_invoice_total_amount_different', invoice=invoice.rec_name, total_amount=lang.format_number(-invoice.total_amount), amount=lang.format_number(amount))) tax_total = sum(Decimal(amount.text) for amount in root.iterfind( './{*}TaxTotal/{*}TaxAmount')) if -invoice.tax_amount != tax_total: raise InvoiceError(gettext( 'edocument_ubl.msg_invoice_tax_amount_different', invoice=invoice.rec_name, tax_amount=lang.format_number(-invoice.tax_amount), tax_total=lang.format_number(tax_total))) class Invoice_Purchase(metaclass=PoolMeta): __name__ = 'edocument.ubl.invoice' @classmethod def _parse_2_item(cls, item, supplier=None): pool = Pool() Product = pool.get('product.product') product = super()._parse_2_item(item, supplier=supplier) if (not product and supplier and (code := item.findtext( './{*}SellersItemIdentification/{*}ID'))): try: product, = Product.search([ ('product_suppliers', 'where', [ ('party', '=', supplier.id), ('code', '=', code), ]), ], limit=1) except ValueError: pass return product @classmethod def _parse_2_line_reference( cls, line_reference, line, company, supplier=None): pool = Pool() PurchaseLine = pool.get('purchase.line') UoM = pool.get('product.uom') origin = super()._parse_2_line_reference( line_reference, line, company, supplier=supplier) if origin: return origin if not line or not line.product or not line.unit: return if numbers := list(filter(None, [ line_reference.findtext('./{*}OrderReference/{*}ID'), line_reference.findtext( './{*}OrderReference/{*}SalesOrderID'), ])): purchase_lines = PurchaseLine.search([ ('purchase.company', '=', company), ('purchase.rec_name', 'in', numbers), ('type', '=', 'line'), ('product', '=', line.product), ]) if purchase_lines: quantities = [] for purchase_line in purchase_lines: quantity = UoM.compute_qty( purchase_line.unit, purchase_line.quantity, line.unit) quantities.append((quantity, purchase_line)) key = itemgetter(0) quantities.sort(key=key) index = bisect.bisect_left(quantities, line.quantity, key=key) if index >= len(quantities): index = -1 origin = quantities[index][1] return origin