Files
tradon/modules/edocument_ubl/edocument.py
2026-03-14 09:42:12 +00:00

1235 lines
47 KiB
Python

# This file is part of Tryton. The COPYRIGHT file at the top level of
# this repository contains the full copyright notices and license terms.
import bisect
import datetime as dt
import mimetypes
import os
from base64 import b64decode, b64encode
from collections import namedtuple
from decimal import Decimal
from io import BytesIO
from itertools import chain, groupby
from operator import itemgetter
import genshi
import genshi.template
# XXX fix: https://genshi.edgewall.org/ticket/582
from genshi.template.astutil import ASTCodeGenerator, ASTTransformer
from lxml import etree
from trytond.i18n import gettext
from trytond.model import Model
from trytond.modules.product import round_price
from trytond.pool import Pool, PoolMeta
from trytond.rpc import RPC
from trytond.tools import cached_property, slugify, sortable_values
from trytond.transaction import Transaction
from .exceptions import InvoiceError
from .party import ISO6523_TYPES
ISO6523 = {v: k for k, v in ISO6523_TYPES.items()}
if not hasattr(ASTCodeGenerator, 'visit_NameConstant'):
def visit_NameConstant(self, node):
if node.value is None:
self._write('None')
elif node.value is True:
self._write('True')
elif node.value is False:
self._write('False')
else:
raise Exception("Unknown NameConstant %r" % (node.value,))
ASTCodeGenerator.visit_NameConstant = visit_NameConstant
if not hasattr(ASTTransformer, 'visit_NameConstant'):
# Re-use visit_Name because _clone is deleted
ASTTransformer.visit_NameConstant = ASTTransformer.visit_Name
loader = genshi.template.TemplateLoader(
os.path.join(os.path.dirname(__file__), 'template'),
auto_reload=True)
def remove_comment(stream):
for kind, data, pos in stream:
if kind is genshi.core.COMMENT:
continue
yield kind, data, pos
class Invoice(Model):
__name__ = 'edocument.ubl.invoice'
__slots__ = ('invoice',)
@classmethod
def __setup__(cls):
super().__setup__()
cls.__rpc__.update({
'render': RPC(instantiate=0),
'parse': RPC(readonly=False, result=int),
})
def __init__(self, invoice):
pool = Pool()
Invoice = pool.get('account.invoice')
super().__init__()
if int(invoice) >= 0:
invoice = Invoice(int(invoice))
with Transaction().set_context(language=invoice.party_lang):
self.invoice = invoice.__class__(int(invoice))
else:
self.invoice = invoice
def render(self, template, specification=None):
if self.invoice.state not in {'posted', 'paid'}:
raise ValueError("Invoice must be posted")
tmpl = self._get_template(template)
if not tmpl:
raise NotImplementedError
return (tmpl.generate(
this=self,
specification=specification,
Decimal=Decimal)
.filter(remove_comment)
.render()
.encode('utf-8'))
def _get_template(self, version):
if self.invoice.sequence_type == 'credit_note':
return loader.load(os.path.join(version, 'CreditNote.xml'))
else:
return loader.load(os.path.join(version, 'Invoice.xml'))
@property
def filename(self):
return f'{slugify(self.invoice.rec_name)}.xml'
@cached_property
def type_code(self):
if self.invoice.type == 'out':
if self.invoice.sequence_type == 'credit_note':
return '381'
else:
return '380'
else:
return '389'
@property
def additional_documents(self):
pool = Pool()
InvoiceReport = pool.get('account.invoice', type='report')
oext, content, _, filename = InvoiceReport.execute(
[self.invoice.id], {})
filename = f'{filename}.{oext}'
mimetype = mimetypes.guess_type(filename)[0]
yield {
'id': self.invoice.number,
'type': 'binary',
'binary': b64encode(content).decode(),
'mimetype': mimetype,
'filename': filename,
}
@cached_property
def accounting_supplier_party(self):
if self.invoice.type == 'out':
return self.invoice.company.party
else:
return self.invoice.party
@cached_property
def accounting_supplier_address(self):
if self.invoice.type == 'out':
return self.invoice.company.party.address_get('invoice')
else:
return self.invoice.invoice_address
@cached_property
def accounting_supplier_tax_identifier(self):
if self.invoice.type == 'out':
return self.invoice.tax_identifier
else:
return self.invoice.party_tax_identifier
@cached_property
def accounting_customer_party(self):
if self.invoice.type == 'out':
return self.invoice.party
else:
return self.invoice.company.party
@cached_property
def accounting_customer_address(self):
if self.invoice.type == 'out':
return self.invoice.invoice_address
else:
return self.invoice.company.party.address_get('invoice')
@cached_property
def accounting_customer_tax_identifier(self):
if self.invoice.type == 'out':
return self.invoice.party_tax_identifier
else:
return self.invoice.tax_identifier
@cached_property
def sale_reference(self):
if hasattr(self.invoice, 'sales'):
return ','.join(s.reference for s in self.invoice.sales)
@cached_property
def sale_number(self):
if hasattr(self.invoice, 'sales'):
return ','.join(s.number for s in self.invoice.sales)
@property
def taxes(self):
class Tax(namedtuple(
'Tax',
('type', 'rate', 'unece_category_code', 'unece_code',
'legal_notice'))):
@classmethod
def from_line(cls, line):
return Tax(
type=line.tax.type,
rate=line.tax.rate,
unece_category_code=line.tax.unece_category_code,
unece_code=line.tax.unece_code,
legal_notice=line.legal_notice or '')
TaxLine = namedtuple('TaxLine', ('base', 'amount', 'tax'))
for group, lines in groupby(sorted(
self.invoice.taxes,
key=sortable_values(self._taxes_key)),
key=self._taxes_key):
tax_lines, tax_amount = [], 0
for tax, lines in groupby(sorted(
lines, key=sortable_values(Tax.from_line)),
key=Tax.from_line):
amount = base = 0
for line in lines:
tax_amount += line.amount
base += line.base
amount += line.amount
tax_lines.append(TaxLine(base=base, amount=amount, tax=tax))
yield group, tax_lines, amount
def _taxes_key(self, line):
return ()
@cached_property
def lines(self):
return [l for l in self.invoice.lines if l.type == 'line']
@classmethod
def parse(cls, document):
pool = Pool()
Attachment = pool.get('ir.attachment')
tree = etree.parse(BytesIO(document))
root = tree.getroot()
namespace = root.nsmap.get(root.prefix)
invoice, attachments = cls.parser(namespace)(root)
invoice.save()
invoice.update_taxes()
cls.checker(namespace)(root, invoice)
attachments = list(attachments)
for attachment in attachments:
attachment.resource = invoice
Attachment.save(attachments)
return invoice
@classmethod
def parser(cls, namespace):
return {
'urn:oasis:names:specification:ubl:schema:xsd:Invoice-2': (
cls._parse_invoice_2),
'urn:oasis:names:specification:ubl:schema:xsd:CreditNote-2': (
cls._parse_credit_note_2),
}.get(namespace)
@classmethod
def checker(cls, namespace):
return {
'urn:oasis:names:specification:ubl:schema:xsd:Invoice-2': (
cls._check_invoice_2),
'urn:oasis:names:specification:ubl:schema:xsd:CreditNote-2': (
cls._check_credit_note_2),
}.get(namespace)
@classmethod
def _parse_invoice_2(cls, root):
pool = Pool()
Invoice = pool.get('account.invoice')
Currency = pool.get('currency.currency')
type_code = root.findtext('./{*}InvoiceTypeCode')
if type_code and type_code != '380':
raise InvoiceError(gettext(
'edocument_ubl.msg_invoice_type_code_unsupported',
type_code=type_code))
invoice = Invoice(type='in')
invoice.reference = root.findtext('./{*}ID')
invoice.invoice_date = dt.date.fromisoformat(
root.findtext('./{*}IssueDate'))
invoice.party = cls._parse_2_supplier(
root.find('./{*}AccountingSupplierParty'), create=True)
invoice.set_journal()
invoice.on_change_party()
invoice.invoice_address = cls._parse_2_address(
root.find(
'./{*}AccountingSupplierParty/{*}Party/{*}PostalAddress'),
party=invoice.party)
invoice.party_tax_identifier = cls._parse_2_tax_identifier(
root.findall(
'./{*}AccountingSupplierParty/{*}Party/{*}PartyTaxScheme'),
party=invoice.party, create=True)
if (seller := root.find('./{*}SellerSupplierParty')) is not None:
supplier = cls._parse_2_supplier(seller)
else:
supplier = invoice.party
if (customer_party := root.find('./{*}AccountingCustomerParty')
) is not None:
invoice.company = cls._parse_2_company(customer_party)
else:
invoice.company = Invoice.default_company()
if not invoice.company:
raise InvoiceError(gettext(
'edocument_ubl.msg_company_not_found',
company=etree.tostring(
customer_party, pretty_print=True).decode()
if customer_party else ''))
if (payee_party := root.find('./{*}PayeeParty')) is not None:
party = cls._parse_2_party(payee_party)
if not party:
party = cls._create_2_party(payee_party)
party.save()
invoice.alternative_payees = [party]
currency_code = root.findtext('./{*}DocumentCurrencyCode')
if not currency_code:
payable_amount = root.find(
'./{*}LegalMonetaryTotal/{*}PayableAmount')
currency_code = payable_amount.get('currencyID')
if currency_code is not None:
try:
invoice.currency, = Currency.search([
('code', '=', currency_code),
], limit=1)
except ValueError:
raise InvoiceError(gettext(
'edocument_ubl.msg_currency_not_found',
code=currency_code))
invoice.payment_term_date = cls._parse_2_payment_term_date(
root.findall('./{*}PaymentTerms'))
lines = [
cls._parse_invoice_2_line(
line,
company=invoice.company,
currency=invoice.currency,
supplier=supplier)
for line in root.iterfind('./{*}InvoiceLine')]
lines += [
cls._parse_invoice_2_allowance_charge(
allowance_charge,
company=invoice.company,
currency=invoice.currency,
supplier=supplier)
for allowance_charge in root.iterfind('./{*}AllowanceCharge')]
invoice.lines = lines
invoice.taxes = [
cls._parse_2_tax(
tax, company=invoice.company)
for tax in root.iterfind('./{*}TaxTotal/{*}TaxSubtotal')]
if (hasattr(Invoice, 'cash_rounding')
and (root.find(
'./{*}LegalMonetaryTotal/{*}PayableRoundingAmount')
is not None)):
invoice.cash_rounding = True
return invoice, cls._parse_2_attachments(root)
@classmethod
def _parse_invoice_2_line(
cls, invoice_line, company, currency, supplier=None):
pool = Pool()
Line = pool.get('account.invoice.line')
UoM = pool.get('product.uom')
Tax = pool.get('account.tax')
AccountConfiguration = pool.get('account.configuration')
account_configuration = AccountConfiguration(1)
line = Line(
type='line', company=company, currency=currency, invoice_type='in')
if (invoiced_quantity := invoice_line.find('./{*}InvoicedQuantity')
) is not None:
line.quantity = float(invoiced_quantity.text)
if (unit_code := invoiced_quantity.get('unitCode')) not in {
None, 'ZZ', 'XZZ'}:
try:
line.unit, = UoM.search([
('unece_code', '=', unit_code),
], limit=1)
except ValueError:
raise InvoiceError(gettext(
'edocument_ubl.msg_unit_not_found',
code=unit_code))
else:
line.unit = None
else:
line.quantity = 1
line.unit = None
line.product = cls._parse_2_item(
invoice_line.find('./{*}Item'), supplier=supplier)
if line.product:
line.on_change_product()
line.description = '\n'.join(e.text for e in chain(
invoice_line.iterfind('./{*}Item/{*}Name'),
invoice_line.iterfind('./{*}Item/{*}BrandName'),
invoice_line.iterfind('./{*}Item/{*}ModelName'),
invoice_line.iterfind('./{*}Item/{*}Description'),
invoice_line.iterfind(
'./{*}Item/{*}AdditionalInformation'),
invoice_line.iterfind('./{*}Item/{*}WarrantyInformation'),
) if e is not None and e.text)
if not line.product:
if line.description:
similar_domain = [
('description', 'ilike', line.description),
('invoice.company', '=', company),
('invoice.type', '=', 'in'),
('invoice.state', 'in',
['validated', 'posted', 'paid']),
]
if line.unit:
similar_domain.append(
('unit.category', '=', line.unit.category))
similar_lines = Line.search(
similar_domain,
order=[('invoice.invoice_date', 'DESC')],
limit=1)
else:
similar_lines = []
if similar_lines:
similar_line, = similar_lines
line.account = similar_line.account
line.product = similar_line.product
if not line.unit:
line.unit = similar_line.unit
else:
line.account = account_configuration.get_multivalue(
'default_category_account_expense',
company=company.id)
for line_reference in invoice_line.iterfind('./{*}OrderLineReference'):
line.origin = cls._parse_2_line_reference(
line_reference, line, company, supplier=supplier)
if line.origin:
break
line.unit_price = round_price(
Decimal(invoice_line.findtext('./{*}LineExtensionAmount'))
/ Decimal(str(line.quantity)))
if invoice_line.find('./{*}Item/{*}ClassifiedTaxCategory') is not None:
tax_categories = invoice_line.iterfind(
'./{*}Item/{*}ClassifiedTaxCategory')
else:
tax_categories = invoice_line.iterfind(
'./{*}TaxTotal/{*}TaxSubtotal/{*}TaxCategory')
taxes = []
for tax_category in tax_categories:
domain = cls._parse_2_tax_category(tax_category)
domain.extend([
['OR',
('group', '=', None),
('group.kind', 'in', ['purchase', 'both']),
],
('company', '=', company.id),
])
try:
tax, = Tax.search(domain, limit=1)
except ValueError:
raise InvoiceError(gettext(
'edocument_ubl.msg_tax_not_found',
tax_category=etree.tostring(
tax_category, pretty_print=True).decode()))
taxes.append(tax)
line.taxes = taxes
return line
@classmethod
def _parse_invoice_2_allowance_charge(
cls, allowance_charge, company, currency, supplier=None):
pool = Pool()
AccountConfiguration = pool.get('account.configuration')
Line = pool.get('account.invoice.line')
Tax = pool.get('account.tax')
account_configuration = AccountConfiguration(1)
line = Line(
type='line', company=company, currency=currency, invoice_type='in')
indicator = allowance_charge.findtext('./{*}ChargeIndicator')
line.quantity = {'true': 1, 'false': -1}[indicator]
line.account = account_configuration.get_multivalue(
'default_category_account_expense',
company=company.id)
line.description = allowance_charge.findtext(
'./{*}AllowanceChargeReason')
line.unit_price = round_price(Decimal(allowance_charge.findtext(
'./{*}Amount')))
taxes = []
tax_categories = allowance_charge.iterfind('./{*}TaxCategory')
for tax_category in tax_categories:
domain = cls._parse_2_tax_category(tax_category)
domain.extend([
['OR',
('group', '=', None),
('group.kind', 'in', ['purchase', 'both']),
],
('company', '=', company.id),
])
try:
tax, = Tax.search(domain, limit=1)
except ValueError:
raise InvoiceError(gettext(
'edocument_ubl.msg_tax_not_found',
tax_category=etree.tostring(
tax_category, pretty_print=True).decode()))
taxes.append(tax)
line.taxes = taxes
return line
@classmethod
def _parse_credit_note_2(cls, root):
pool = Pool()
Invoice = pool.get('account.invoice')
Currency = pool.get('currency.currency')
type_code = root.findtext('./{*}CreditNoteTypeCode')
if type_code and type_code != '381':
raise InvoiceError(gettext(
'edocument_ubl.msg_credit_note_type_code_unsupported',
type_code=type_code))
invoice = Invoice(type='in')
invoice.reference = root.findtext('./{*}ID')
invoice.invoice_date = dt.date.fromisoformat(
root.findtext('./{*}IssueDate'))
invoice.party = cls._parse_2_supplier(
root.find('./{*}AccountingSupplierParty'), create=True)
invoice.set_journal()
invoice.on_change_party()
if (seller := root.find('./{*}SellerSupplierParty')) is not None:
supplier = cls._parse_2_supplier(seller)
else:
supplier = invoice.party
if (customer_party := root.find('./{*}AccountingCustomerParty')
) is not None:
invoice.company = cls._parse_2_company(customer_party)
else:
invoice.company = Invoice.default_company()
if (payee_party := root.find('./{*}PayeeParty')) is not None:
party = cls._parse_2_party(payee_party)
if not party:
party = cls._create_2_party(payee_party)
party.save()
invoice.alternative_payees = [party]
if (currency_code := root.findtext('./{*}DocumentCurrencyCode')
) is not None:
try:
invoice.currency, = Currency.search([
('code', '=', currency_code),
], limit=1)
except ValueError:
raise InvoiceError(gettext(
'edocument_ubl.msg_currency_not_found',
code=currency_code))
invoice.payment_term_date = cls._parse_2_payment_term_date(
root.findall('./{*}PaymentTerms'))
lines = [
cls._parse_credit_note_2_line(
line, company=invoice.company, currency=invoice.currency,
supplier=supplier)
for line in root.iterfind('./{*}CreditNoteLine')]
lines += [
cls._parse_credit_note_2_allowance_charge(
allowance_charge,
company=invoice.company,
currency=invoice.currency,
supplier=supplier)
for allowance_charge in root.iterfind('./{*}AllowanceCharge')]
invoice.lines = lines
invoice.taxes = [
cls._parse_2_tax(
tax, company=invoice.company)
for tax in root.iterfind('./{*}TaxTotal/{*}TaxSubtotal')]
if (hasattr(Invoice, 'cash_rounding')
and root.find(
'./{*}LegalMonetaryTotal/{*}PayableRoundingAmount')
is not None):
invoice.cash_rounding = True
return invoice, cls._parse_2_attachments(root)
@classmethod
def _parse_credit_note_2_line(
cls, credit_note_line, company, currency, supplier=None):
pool = Pool()
Line = pool.get('account.invoice.line')
UoM = pool.get('product.uom')
Tax = pool.get('account.tax')
AccountConfiguration = pool.get('account.configuration')
account_configuration = AccountConfiguration(1)
line = Line(
type='line', company=company, currency=currency, invoice_type='in')
if (credited_quantity := credit_note_line.find('./{*}CreditedQuantity')
) is not None:
line.quantity = -float(credited_quantity.text)
if (unit_code := credited_quantity.get('unitCode')) not in {
None, 'ZZ', 'XZZ'}:
try:
line.unit, = UoM.search([
('unece_code', '=', unit_code),
], limit=1)
except ValueError:
raise InvoiceError(gettext(
'edocument_ubl.msg_unit_not_found',
code=unit_code))
else:
line.unit = None
else:
line.quantity = -1
line.unit = None
line.product = cls._parse_2_item(
credit_note_line.find('./{*}Item'), supplier=supplier)
if line.product:
line.on_change_product()
line.description = '\n'.join(e.text for e in chain(
credit_note_line.iterfind('./{*}Item/{*}Name'),
credit_note_line.iterfind('./{*}Item/{*}BrandName'),
credit_note_line.iterfind('./{*}Item/{*}ModelName'),
credit_note_line.iterfind('./{*}Item/{*}Description'),
credit_note_line.iterfind(
'./{*}Item/{*}AdditionalInformation'),
credit_note_line.iterfind('./{*}Item/{*}WarrantyInformation'),
) if e is not None and e.text)
if not line.product:
if line.description:
similar_domain = [
('description', 'ilike', line.description),
('invoice.company', '=', company),
('invoice.type', '=', 'in'),
('invoice.state', 'in',
['validated', 'posted', 'paid']),
]
if line.unit:
similar_domain.append(
('unit.category', '=', line.unit.category))
similar_lines = Line.search(
similar_domain,
order=[('invoice.invoice_date', 'DESC')],
limit=1)
else:
similar_lines = []
if similar_lines:
similar_line, = similar_lines
line.account = similar_line.account
line.product = similar_line.product
if not line.unit:
line.unit = similar_line.unit
else:
line.account = account_configuration.get_multivalue(
'default_category_account_expense',
company=company.id)
for line_reference in credit_note_line.iterfind(
'./{*}OrderLineReference'):
line.origin = cls._parse_2_line_reference(
line_reference, line, company, supplier=supplier)
if line.origin:
break
line.unit_price = round_price(
-Decimal(credit_note_line.findtext('./{*}LineExtensionAmount'))
/ Decimal(str(line.quantity)))
if (credit_note_line.find('./{*}Item/{*}ClassifiedTaxCategory')
is not None):
tax_categories = credit_note_line.iterfind(
'./{*}Item/{*}ClassifiedTaxCategory')
else:
tax_categories = credit_note_line.iterfind(
'./{*}TaxTotal/{*}TaxSubtotal/{*}TaxCategory')
taxes = []
for tax_category in tax_categories:
domain = cls._parse_2_tax_category(tax_category)
domain.extend([
['OR',
('group', '=', None),
('group.kind', 'in', ['purchase', 'both']),
],
('company', '=', company.id),
])
try:
tax, = Tax.search(domain, limit=1)
except ValueError:
raise InvoiceError(gettext(
'edocument_ubl.msg_tax_not_found',
tax_category=etree.tostring(
tax_category, pretty_print=True).decode()))
taxes.append(tax)
line.taxes = taxes
return line
@classmethod
def _parse_credit_note_2_allowance_charge(
cls, allowance_charge, company, currency, supplier=None):
pool = Pool()
AccountConfiguration = pool.get('account.configuration')
Line = pool.get('account.invoice.line')
Tax = pool.get('account.tax')
account_configuration = AccountConfiguration(1)
line = Line(
type='line', company=company, currency=currency, invoice_type='in')
indicator = allowance_charge.findtext('./{*}ChargeIndicator')
line.quantity = {'true': -1, 'false': 1}[indicator]
line.account = account_configuration.get_multivalue(
'default_category_account_expense',
company=company.id)
line.description = allowance_charge.findtext(
'./{*}AllowanceChargeReason')
line.unit_price = round_price(Decimal(allowance_charge.findtext(
'./{*}Amount')))
taxes = []
tax_categories = allowance_charge.iterfind('./{*}TaxCategory')
for tax_category in tax_categories:
domain = cls._parse_2_tax_category(tax_category)
domain.extend([
['OR',
('group', '=', None),
('group.kind', 'in', ['purchase', 'both']),
],
('company', '=', company.id),
])
try:
tax, = Tax.search(domain, limit=1)
except ValueError:
raise InvoiceError(gettext(
'edocument_ubl.msg_tax_not_found',
tax_category=etree.tostring(
tax_category, pretty_print=True).decode()))
taxes.append(tax)
line.taxes = taxes
return line
@classmethod
def _parse_2_supplier(cls, supplier_party, create=False):
pool = Pool()
Party = pool.get('party.party')
for account_id in filter(None, chain(
[supplier_party.find('./{*}CustomerAssignedAccountID')],
supplier_party.iterfind('./{*}AdditionalAccountID'))):
if account_id.text:
try:
party, = Party.search([
('code', '=', account_id.text),
])
except ValueError:
pass
else:
return party
party_el = supplier_party.find('./{*}Party')
party = cls._parse_2_party(party_el)
if not party and create:
party = cls._create_2_party(party_el)
party.save()
return party
@classmethod
def _parse_2_party(cls, party_el):
pool = Pool()
Party = pool.get('party.party')
for identifier in chain(
party_el.iterfind('./{*}PartyIdentification/{*}ID'),
party_el.iterfind('./{*}PartyLegalEntity/{*}CompanyID'),
party_el.iterfind('./{*}PartyTaxScheme/{*}CompanyID'),
):
if identifier.text:
domain = [
('code', '=', identifier.text),
]
if schemeId := identifier.get('schemeID'):
if type := ISO6523.get(schemeId):
domain.append(('type', '=', type))
parties = Party.search([
('identifiers', 'where', domain),
])
if len(parties) == 1:
party, = parties
return party
@classmethod
def _create_2_party(cls, party_el):
pool = Pool()
Party = pool.get('party.party')
party = Party()
party.name = (
party_el.findtext('./{*}PartyLegalEntity/{*}RegistrationName')
or party_el.findtext('./{*}PartyName/{*}Name'))
identifiers = []
identifiers_done = set()
for identifier in chain(
party_el.iterfind('./{*}PartyIdentification/{*}ID'),
party_el.iterfind('./{*}PartyTaxScheme/{*}CompanyID'),
party_el.iterfind('./{*}PartyLegalEntity/{*}CompanyID'),
):
if identifier.text:
identifier_key = (identifier.text, identifier.get('schemeID'))
if identifier_key not in identifiers_done:
identifiers.append(cls._create_2_party_identifier(
identifier))
identifiers_done.add(identifier_key)
party.identifiers = identifiers
if (address := party_el.find('./{*}PostalAddress')
) is not None:
party.addresses = [cls._create_2_address(address)]
return party
@classmethod
def _create_2_party_identifier(cls, identifier):
pool = Pool()
Identifier = pool.get('party.identifier')
if schemeId := identifier.get('schemeID'):
type = ISO6523.get(schemeId)
else:
type = None
return Identifier(type=type, code=identifier.text)
@classmethod
def _parse_2_address(cls, address_el, party):
pool = Pool()
Address = pool.get('party.address')
address = cls._create_2_address(address_el)
domain = [('party', '=', party)]
for fname in Address._fields:
if value := getattr(address, fname, None):
domain.append((fname, '=', value))
try:
address, = Address.search(domain, limit=1)
except ValueError:
address.party = party
address.save()
return address
@classmethod
def _create_2_address(cls, address_el):
pool = Pool()
Address = pool.get('party.address')
Country = pool.get('country.country')
address = Address()
if address_el is None:
return address
address.post_box = address_el.findtext('./{*}Postbox')
address.floor_number = address_el.findtext('./{*}Floor')
address.room_number = address_el.findtext('./{*}Room')
address.street_name = address_el.findtext('./{*}StreetName')
address.building_name = address_el.findtext('./{*}BuildingName')
address.building_number = address_el.findtext('./{*}BuildingNumber')
address.city = address_el.findtext('./{*}CityName')
address.postal_code = address_el.findtext('./{*}PostalZone')
if (country_code := address_el.findtext(
'./{*}Country/{*}IdentificationCode[@listId="ISO3166-1"]')
) is not None:
try:
country, = Country.search([
('code', '=', country_code),
], limit=1)
except ValueError:
pass
else:
address.country = country
address.street_unstructured = '\n'.join(
(line.text
for line in address_el.iterfind('./{*}AddressLine/{*}Line')))
return address
@classmethod
def _parse_2_tax_identifier(cls, party_tax_schemes, party, create=False):
pool = Pool()
Identifier = pool.get('party.identifier')
tax_identifier_types = party.tax_identifier_types()
for party_tax_scheme in party_tax_schemes:
company_id = party_tax_scheme.find('./{*}CompanyID')
if company_id is not None:
scheme_id = company_id.get('schemeID')
value = company_id.text
for identifier in party.identifiers:
if (identifier.type in tax_identifier_types
and identifier.iso_6523 == scheme_id
and identifier.code == value):
return identifier
else:
if create and scheme_id in ISO6523:
identifier = Identifier(
party=party,
type=ISO6523[scheme_id],
code=value)
identifier.save()
return identifier
@classmethod
def _parse_2_company(cls, customer_party):
pool = Pool()
Company = pool.get('company.company')
try:
CustomerCode = pool.get('party.party.customer_code')
except KeyError:
CustomerCode = None
if CustomerCode:
for account_id in filter(None, chain(
[customer_party.find('./{*}CustomerAssignedAccountID'),
customer_party.find(
'./{*}SupplierAssignedAccountID')],
customer_party.iterfind('./{*}AdditionalAccountID'))):
if account_id.text:
try:
customer_code, = CustomerCode.search([
('customer_code', '=', account_id.text),
])
except ValueError:
pass
else:
return customer_code.company
party_el = customer_party.find('./{*}Party')
for identifier in chain(
party_el.iterfind('./{*}PartyIdentification/{*}ID'),
party_el.iterfind('./{*}PartyTaxScheme/{*}CompanyID'),
party_el.iterfind('./{*}PartyLegalEntity/{*}CompanyID'),
):
if identifier.text:
domain = [
('code', '=', identifier.text),
]
if schemeId := identifier.get('schemeID'):
if type := ISO6523.get(schemeId):
domain.append(('type', '=', type))
companies = Company.search([
('party.identifiers', 'where', domain),
])
if len(companies) == 1:
company, = companies
return company
for name in chain(
party_el.iterfind('./{*}PartyTaxScheme/{*}RegistrationName'),
party_el.iterfind('./{*}PartyName/{*}Name'),
):
if name is None:
continue
companies = Company.search([
('party.name', '=', name.text),
])
if len(companies) == 1:
company, = companies
return company
@classmethod
def _parse_2_payment_term_date(cls, payment_terms):
dates = []
for payment_term in payment_terms:
if (date := payment_term.findtext('./{*}PaymentDueDate')
) is not None:
dates.append(dt.date.fromisoformat(date))
return min(dates, default=None)
@classmethod
def _parse_2_item(cls, item, supplier=None):
pool = Pool()
Product = pool.get('product.product')
if (identifier := item.find('./{*}StandardItemIdentification/{*}ID')
) is not None:
if identifier.get('schemeID') == 'GTIN':
try:
product, = Product.search([
('identifiers', 'where', [
('type', 'in', ['ean', 'isbn', 'ismn']),
('code', '=', identifier.text),
]),
], limit=1)
except ValueError:
pass
else:
return product
if (code := item.findtext('./{*}BuyersItemIdentification/{*}ID')
) is not None:
try:
product, = Product.search([
('code', '=', code),
], limit=1)
except ValueError:
pass
else:
return product
@classmethod
def _parse_2_line_reference(
cls, line_reference, line, company, supplier=None):
return
@classmethod
def _parse_2_tax_category(cls, tax_category):
domain = [
('parent', '=', None),
]
if (unece_category_code := tax_category.findtext('./{*}ID')
) is not None:
domain.append(('unece_category_code', '=', unece_category_code))
if (unece_code := tax_category.findtext('./{*}TaxScheme/{*}ID')
) is not None:
domain.append(('unece_code', '=', unece_code))
percent = tax_category.findtext('./{*}Percent')
if percent:
domain.append(('type', '=', 'percentage'))
domain.append(('rate', '=', Decimal(percent) / 100))
return domain
@classmethod
def _parse_2_tax(cls, tax, company):
pool = Pool()
Tax = pool.get('account.tax')
InvoiceTax = pool.get('account.invoice.tax')
invoice_tax = InvoiceTax(manual=False)
tax_category = tax.find('./{*}TaxCategory')
domain = cls._parse_2_tax_category(tax_category)
domain.extend([
['OR',
('group', '=', None),
('group.kind', 'in', ['purchase', 'both']),
],
('company', '=', company.id),
])
try:
invoice_tax.tax, = Tax.search(domain, limit=1)
except ValueError:
raise InvoiceError(gettext(
'edocument_ubl.msg_tax_not_found',
tax_category=etree.tostring(
tax_category, pretty_print=True).decode()))
invoice_tax.amount = Decimal(tax.findtext('./{*}TaxAmount'))
if (taxable_amount := tax.findtext('./{*}TaxableAmount')) is not None:
invoice_tax.base = Decimal(taxable_amount)
else:
# Use tax amount to define the sign of unknown base
invoice_tax.base = invoice_tax.amount
invoice_tax.on_change_tax()
return invoice_tax
@classmethod
def _parse_2_attachments(cls, root):
pool = Pool()
Attachment = pool.get('ir.attachment')
for name in [
'DespatchDocumentReference',
'ReceiptDocumentReference',
'ContractDocumentReference',
'AdditionalDocumentReference',
'StatementDocumentReference',
'OriginatorDocumentReference'
]:
for document in root.iterfind(f'./{{*}}{name}'):
attachment = Attachment()
name = ' '.join(filter(None, [
document.findtext('./{*}DocumentType'),
document.findtext('./{*}ID'),
]))
if (data := document.find(
'./{*}Attachment/{*}EmbeddedDocumentBinaryObject')
) is not None:
mime_code = (
data.get('mimeCode') or 'application/octet-stream')
name += mimetypes.guess_extension(mime_code) or ''
attachment.type = 'data'
data = b64decode(data.text)
attachment.data = data
elif data := document.findtext(
'./{*}Attachment/{*}EmbeddedDocument'):
name += '.txt'
attachment.type = 'data'
attachment.data = data
elif url := document.findtext(
'./{*}Attachment/{*}ExternalReference/{*}URI'):
attachment.type = 'link'
Attachment.link = url
attachment.name = name
yield attachment
@classmethod
def _check_invoice_2(cls, root, invoice):
pool = Pool()
Lang = pool.get('ir.lang')
lang = Lang.get()
payable_amount = Decimal(
root.findtext('./{*}LegalMonetaryTotal/{*}PayableAmount'))
prepaid_amount = Decimal(
root.findtext('./{*}LegalMonetaryTotal/{*}PrepaidAmount')
or 0)
amount = payable_amount + prepaid_amount
if not getattr(invoice, 'cash_rounding', False):
payable_rounding_amount = Decimal(
root.findtext(
'./{*}LegalMonetaryTotal/{*}PayableRoundingAmount')
or 0)
amount -= payable_rounding_amount
if invoice.total_amount != amount:
raise InvoiceError(gettext(
'edocument_ubl.msg_invoice_total_amount_different',
invoice=invoice.rec_name,
total_amount=lang.format_number(invoice.total_amount),
amount=lang.format_number(amount)))
tax_total = sum(Decimal(amount.text) for amount in root.iterfind(
'./{*}TaxTotal/{*}TaxAmount'))
if invoice.tax_amount != tax_total:
raise InvoiceError(gettext(
'edocument_ubl.msg_invoice_tax_amount_different',
invoice=invoice.rec_name,
tax_amount=lang.format_number(invoice.tax_amount),
tax_total=lang.format_number(tax_total)))
@classmethod
def _check_credit_note_2(cls, root, invoice):
pool = Pool()
Lang = pool.get('ir.lang')
lang = Lang.get()
payable_amount = Decimal(
root.findtext('./{*}LegalMonetaryTotal/{*}PayableAmount'))
prepaid_amount = Decimal(
root.findtext('./{*}LegalMonetaryTotal/{*}PrepaidAmount')
or 0)
amount = payable_amount + prepaid_amount
if not getattr(invoice, 'cash_rounding', False):
payable_rounding_amount = Decimal(
root.findtext(
'./{*}LegalMonetaryTotal/{*}PayableRoundingAmount')
or 0)
amount -= payable_rounding_amount
if -invoice.total_amount != amount:
raise InvoiceError(gettext(
'edocument_ubl.msg_invoice_total_amount_different',
invoice=invoice.rec_name,
total_amount=lang.format_number(-invoice.total_amount),
amount=lang.format_number(amount)))
tax_total = sum(Decimal(amount.text) for amount in root.iterfind(
'./{*}TaxTotal/{*}TaxAmount'))
if -invoice.tax_amount != tax_total:
raise InvoiceError(gettext(
'edocument_ubl.msg_invoice_tax_amount_different',
invoice=invoice.rec_name,
tax_amount=lang.format_number(-invoice.tax_amount),
tax_total=lang.format_number(tax_total)))
class Invoice_Purchase(metaclass=PoolMeta):
__name__ = 'edocument.ubl.invoice'
@classmethod
def _parse_2_item(cls, item, supplier=None):
pool = Pool()
Product = pool.get('product.product')
product = super()._parse_2_item(item, supplier=supplier)
if (not product
and supplier
and (code := item.findtext(
'./{*}SellersItemIdentification/{*}ID'))):
try:
product, = Product.search([
('product_suppliers', 'where', [
('party', '=', supplier.id),
('code', '=', code),
]),
], limit=1)
except ValueError:
pass
return product
@classmethod
def _parse_2_line_reference(
cls, line_reference, line, company, supplier=None):
pool = Pool()
PurchaseLine = pool.get('purchase.line')
UoM = pool.get('product.uom')
origin = super()._parse_2_line_reference(
line_reference, line, company, supplier=supplier)
if origin:
return origin
if not line or not line.product or not line.unit:
return
if numbers := list(filter(None, [
line_reference.findtext('./{*}OrderReference/{*}ID'),
line_reference.findtext(
'./{*}OrderReference/{*}SalesOrderID'),
])):
purchase_lines = PurchaseLine.search([
('purchase.company', '=', company),
('purchase.rec_name', 'in', numbers),
('type', '=', 'line'),
('product', '=', line.product),
])
if purchase_lines:
quantities = []
for purchase_line in purchase_lines:
quantity = UoM.compute_qty(
purchase_line.unit, purchase_line.quantity, line.unit)
quantities.append((quantity, purchase_line))
key = itemgetter(0)
quantities.sort(key=key)
index = bisect.bisect_left(quantities, line.quantity, key=key)
if index >= len(quantities):
index = -1
origin = quantities[index][1]
return origin