485 lines
18 KiB
Python
485 lines
18 KiB
Python
# This file is part of Tryton. The COPYRIGHT file at the top level of
|
|
# this repository contains the full copyright notices and license terms.
|
|
|
|
import bisect
|
|
import datetime as dt
|
|
import logging
|
|
import re
|
|
from decimal import Decimal
|
|
from operator import itemgetter
|
|
|
|
from trytond.cache import Cache
|
|
from trytond.model import (
|
|
MatchMixin, ModelSQL, ModelView, Workflow, fields, sequence_ordered)
|
|
from trytond.pool import Pool, PoolMeta
|
|
from trytond.pyson import Eval
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class Incoming(metaclass=PoolMeta):
|
|
__name__ = 'document.incoming'
|
|
|
|
ocr_service = fields.Many2One(
|
|
'document.incoming.ocr.service', "OCR Service", readonly=True,
|
|
states={
|
|
'invisible': ~Eval('ocr_service'),
|
|
})
|
|
|
|
@classmethod
|
|
def __setup__(cls):
|
|
super().__setup__()
|
|
cls._buttons.update(
|
|
ocr_send_feedback={
|
|
'invisible': ~Eval('ocr_service') | (Eval('state') != 'done'),
|
|
'depends': ['ocr_service', 'state'],
|
|
},
|
|
)
|
|
|
|
@classmethod
|
|
@ModelView.button
|
|
@Workflow.transition('done')
|
|
def proceed(cls, documents, with_children=False):
|
|
pool = Pool()
|
|
Service = pool.get('document.incoming.ocr.service')
|
|
for document in documents:
|
|
service = Service.get_service(document)
|
|
if service:
|
|
document.ocr_service = service
|
|
document.parsed_data = service.process(document)
|
|
cls.save(documents)
|
|
super().proceed(documents, with_children=with_children)
|
|
|
|
def _process_document_incoming(self):
|
|
document = super()._process_document_incoming()
|
|
if self.ocr_service:
|
|
document_data = self.ocr_service.get_document_incoming(self)
|
|
document_type = document_data.get('document_type')
|
|
if (document_type
|
|
and document_type in dict(self.__class__.type.selection)):
|
|
document.type = document_type
|
|
return document
|
|
|
|
@classmethod
|
|
@ModelView.button
|
|
def ocr_send_feedback(cls, documents):
|
|
for document in documents:
|
|
if document.ocr_service:
|
|
document.ocr_service.send_feedback(document)
|
|
|
|
@classmethod
|
|
def copy(cls, documents, default=None):
|
|
default = default.copy() if default is not None else {}
|
|
default.setdefault('ocr_service')
|
|
return super().copy(documents, default=default)
|
|
|
|
|
|
class IncomingSupplierInvoice(metaclass=PoolMeta):
|
|
__name__ = 'document.incoming'
|
|
|
|
def _process_supplier_invoice(self):
|
|
pool = Pool()
|
|
Party = pool.get('party.party')
|
|
Currency = pool.get('currency.currency')
|
|
|
|
invoice = super()._process_supplier_invoice()
|
|
if self.ocr_service:
|
|
invoice_data = self.ocr_service.get_supplier_invoice(self)
|
|
|
|
tax_identifier = invoice_data.get('tax_identifier')
|
|
if tax_identifier and invoice.party:
|
|
tax_identifier_types = Party.tax_identifier_types()
|
|
for identifier in invoice.party.identifiers:
|
|
if (identifier.type in tax_identifier_types
|
|
and identifier.code == tax_identifier):
|
|
invoice.party_tax_identifier = identifier
|
|
|
|
currency = invoice_data.get('currency')
|
|
if currency:
|
|
try:
|
|
invoice.currency, = Currency.search([
|
|
('code', '=', currency),
|
|
])
|
|
except ValueError:
|
|
logger.debug(f"Cannot find currency '{currency}'")
|
|
|
|
invoice.reference = invoice_data.get('number')
|
|
invoice.description = invoice_data.get('description')
|
|
|
|
invoice_date = invoice_data.get('invoice_date')
|
|
if invoice_date:
|
|
try:
|
|
invoice.invoice_date = dt.date.fromisoformat(invoice_date)
|
|
except ValueError:
|
|
logger.debug(f"Cannot parse invoice date '{invoice_date}'")
|
|
|
|
payment_term_date = invoice_data.get('payment_term_date')
|
|
if payment_term_date:
|
|
try:
|
|
invoice.payment_term_date = dt.date.fromisoformat(
|
|
payment_term_date)
|
|
invoice.payment_term = None
|
|
except ValueError:
|
|
logger.debug(
|
|
"Cannot parse payment term date "
|
|
f"'{payment_term_date}'")
|
|
|
|
lines = []
|
|
for parsed_line in invoice_data.get('lines', []):
|
|
line = self._process_supplier_invoice_line(
|
|
invoice, parsed_line, invoice_data)
|
|
if line:
|
|
lines.append(line)
|
|
if not lines:
|
|
line_data = self._process_supplier_invoice_line_single(
|
|
invoice, invoice_data)
|
|
if line_data:
|
|
line = self._process_supplier_invoice_line(
|
|
invoice, line_data, invoice_data)
|
|
if line:
|
|
lines.append(line)
|
|
invoice.lines = lines
|
|
|
|
taxes = []
|
|
for parsed_tax in invoice_data.get('taxes', []):
|
|
tax = self._process_supplier_invoice_tax(
|
|
invoice, parsed_tax)
|
|
if tax:
|
|
taxes.append(tax)
|
|
if taxes:
|
|
for line in invoice.lines:
|
|
line.taxes = None
|
|
invoice.taxes = taxes
|
|
|
|
return invoice
|
|
|
|
def _process_supplier_invoice_line_single(self, invoice, invoice_data):
|
|
total_amount = invoice_data.get('total_amount')
|
|
if total_amount:
|
|
return {'quantity': 1, 'amount': total_amount}
|
|
|
|
def _process_supplier_invoice_line(self, invoice, line_data, invoice_data):
|
|
from trytond.modules.product import round_price
|
|
pool = Pool()
|
|
AccountConfiguration = pool.get('account.configuration')
|
|
InvoiceLine = pool.get('account.invoice.line')
|
|
Product = pool.get('product.product')
|
|
UoM = pool.get('product.uom')
|
|
|
|
account_configuration = AccountConfiguration(1)
|
|
|
|
line = InvoiceLine(
|
|
invoice=invoice,
|
|
currency=invoice.currency,
|
|
company=invoice.company)
|
|
product_name = line_data.get('product_name')
|
|
if product_name:
|
|
try:
|
|
product, = Product.search([
|
|
('rec_name', 'ilike', product_name),
|
|
])
|
|
except ValueError:
|
|
logger.debug(f"Cannot find product '{product_name}'")
|
|
line.product = None
|
|
else:
|
|
line.product = product.id
|
|
line.on_change_product()
|
|
else:
|
|
line.product = None
|
|
line.description = line_data.get('description')
|
|
if not line.product:
|
|
if line.description:
|
|
similar_lines = InvoiceLine.search([
|
|
('description', 'ilike', line.description),
|
|
('invoice.company', '=', invoice.company),
|
|
('invoice.type', '=', invoice.type),
|
|
('invoice.state', 'in',
|
|
['validated', 'posted', 'paid']),
|
|
],
|
|
order=[('invoice.invoice_date', 'DESC')],
|
|
limit=1)
|
|
else:
|
|
similar_lines = []
|
|
if similar_lines:
|
|
similar_line, = similar_lines
|
|
line.account = similar_line.account
|
|
line.product = similar_line.product
|
|
line.unit = similar_line.unit
|
|
line.taxes = similar_line.taxes
|
|
else:
|
|
line.account = account_configuration.get_multivalue(
|
|
'default_category_account_expense',
|
|
company=invoice.company.id)
|
|
line.on_change_account()
|
|
|
|
unit = line_data.get('unit')
|
|
if unit:
|
|
try:
|
|
unit, = UoM.search([
|
|
('rec_name', 'ilike', unit),
|
|
])
|
|
except ValueError:
|
|
logger.debug(f"Cannot find UoM '{unit}'")
|
|
else:
|
|
if (not line.product
|
|
or line.product.default_uom.category == unit.category):
|
|
line.unit = unit
|
|
|
|
quantity = line_data.get('quantity') or 0
|
|
if getattr(line, 'unit', None):
|
|
quantity = line.unit.round(quantity)
|
|
line.quantity = quantity or 1
|
|
|
|
unit_price = line_data.get('unit_price')
|
|
amount = line_data.get('amount')
|
|
if unit_price is not None:
|
|
line.unit_price = round_price(unit_price)
|
|
elif amount is not None:
|
|
line.unit_price = round_price(
|
|
amount / Decimal(str(line.quantity)))
|
|
else:
|
|
line.unit_price = 0
|
|
return line
|
|
|
|
def _process_supplier_invoice_tax(self, invoice, parsed_tax):
|
|
pool = Pool()
|
|
Tax = pool.get('account.tax')
|
|
InvoiceTax = pool.get('account.invoice.tax')
|
|
|
|
invoice_tax = InvoiceTax(invoice=invoice, manual=True)
|
|
|
|
try:
|
|
tax, = Tax.search([
|
|
['OR',
|
|
('group', '=', None),
|
|
('group.kind', 'in', ['purchase', 'both']),
|
|
],
|
|
('company', '=', invoice.company.id),
|
|
('type', '=', parsed_tax.get('type')),
|
|
('amount', '=', parsed_tax.get('amount')),
|
|
('rate', '=', parsed_tax.get('rate')),
|
|
])
|
|
except ValueError:
|
|
logger.debug(f"Cannot find tax for '{parsed_tax}'")
|
|
return
|
|
invoice_tax.tax = tax.id
|
|
invoice_tax.on_change_tax()
|
|
|
|
base = parsed_tax.get('base')
|
|
if base is not None:
|
|
invoice_tax.base = invoice.currency.round(base)
|
|
invoice_tax.on_change_base()
|
|
else:
|
|
amount = parsed_tax.get('amount') or 0
|
|
invoice_tax.base = 0
|
|
invoice_tax.amount = invoice_tax.currency.round(amount)
|
|
return invoice_tax
|
|
|
|
@property
|
|
def supplier_invoice_company(self):
|
|
pool = Pool()
|
|
Company = pool.get('company.company')
|
|
Party = pool.get('party.party')
|
|
Identifier = pool.get('party.identifier')
|
|
|
|
company = super().supplier_invoice_company
|
|
|
|
if self.ocr_service:
|
|
invoice_data = self.ocr_service.get_supplier_invoice(self)
|
|
|
|
company_name = invoice_data.get('company_name')
|
|
if company_name:
|
|
try:
|
|
company, = Company.search([
|
|
('party.rec_name', 'ilike', company_name),
|
|
])
|
|
except ValueError:
|
|
logger.debug(f"Cannot find company '{company_name}'")
|
|
|
|
tax_identifier = invoice_data.get('company_tax_identifier')
|
|
if tax_identifier:
|
|
identifiers = Identifier.search([
|
|
('code', '=', tax_identifier),
|
|
('type', 'in', Party.tax_identifier_types()),
|
|
])
|
|
if len(identifiers) == 1:
|
|
identifier, = identifiers
|
|
try:
|
|
company, = Company.search([
|
|
('party', '=', identifier.party.id),
|
|
])
|
|
except ValueError:
|
|
logger.debug(
|
|
"Cannot find company for party "
|
|
f"'{identifier.party.id}'")
|
|
else:
|
|
logger.debug(f"Cannot find company '{tax_identifier}'")
|
|
return company
|
|
|
|
@property
|
|
def supplier_invoice_party(self):
|
|
pool = Pool()
|
|
Party = pool.get('party.party')
|
|
Identifier = pool.get('party.identifier')
|
|
|
|
party = super().supplier_invoice_party
|
|
|
|
if self.ocr_service:
|
|
invoice_data = self.ocr_service.get_supplier_invoice(self)
|
|
|
|
supplier_name = invoice_data.get('supplier_name')
|
|
if supplier_name:
|
|
try:
|
|
party, = Party.search([
|
|
('rec_name', 'ilike', supplier_name),
|
|
])
|
|
except ValueError:
|
|
logger.debug(f"Cannot find party '{supplier_name}'")
|
|
|
|
tax_identifier = invoice_data.get('tax_identifier')
|
|
if tax_identifier:
|
|
identifiers = Identifier.search([
|
|
('code', '=', tax_identifier),
|
|
('type', 'in', Party.tax_identifier_types()),
|
|
])
|
|
if len(identifiers) == 1:
|
|
identifier, = identifiers
|
|
party = identifier.party
|
|
else:
|
|
logger.debug(f"Cannot find party '{tax_identifier}'")
|
|
return party
|
|
|
|
|
|
class IncomingSupplierInvoicePurchase(metaclass=PoolMeta):
|
|
__name__ = 'document.incoming'
|
|
|
|
def _process_supplier_invoice_line(self, invoice, line_data, invoice_data):
|
|
pool = Pool()
|
|
PurchaseLine = pool.get('purchase.line')
|
|
UoM = pool.get('product.uom')
|
|
|
|
line = super()._process_supplier_invoice_line(
|
|
invoice, line_data, invoice_data)
|
|
|
|
if (line and line.product and line.unit
|
|
and (line_data.get('purchase_orders')
|
|
or line_data.get('purchase_order'))):
|
|
if line_data.get('purchase_order'):
|
|
numbers = [line_data['purchase_order']]
|
|
else:
|
|
numbers = re.split(r'[ ,;]', line_data['purchase_orders'])
|
|
purchase_lines = PurchaseLine.search([
|
|
('purchase.company', '=', invoice.company),
|
|
('purchase.rec_name', 'in', numbers),
|
|
('type', '=', 'line'),
|
|
('product', '=', line.product.id),
|
|
])
|
|
if purchase_lines:
|
|
quantities = []
|
|
for purchase_line in purchase_lines:
|
|
quantity = UoM.compute_qty(
|
|
purchase_line.unit, purchase_line.quantity, line.unit)
|
|
quantities.append((quantity, purchase_line))
|
|
key = itemgetter(0)
|
|
quantities.sort(key=key)
|
|
index = bisect.bisect_left(quantities, line.quantity, key=key)
|
|
if index >= len(quantities):
|
|
index = -1
|
|
line.origin = str(quantities[index][1])
|
|
return line
|
|
|
|
|
|
class IncomingOCRService(sequence_ordered(), ModelSQL, ModelView, MatchMixin):
|
|
__name__ = 'document.incoming.ocr.service'
|
|
|
|
type = fields.Selection([
|
|
(None, ''),
|
|
], "Type")
|
|
company = fields.Many2One(
|
|
'company.company', "Company",
|
|
help="The company for which the service is used.\n"
|
|
"Leave empty for any company.")
|
|
source = fields.Char(
|
|
"Source",
|
|
help="The regular expression to match the document source.\n"
|
|
"Leave empty to allow any source.")
|
|
document_type = fields.Selection(
|
|
'get_document_types', "Type",
|
|
help="The document type to match.\n"
|
|
"Leave empty for any type.")
|
|
_get_service_cache = Cache(
|
|
'document.incoming.ocr.service.get_service', context=False)
|
|
|
|
@classmethod
|
|
def get_document_types(cls):
|
|
pool = Pool()
|
|
Incoming = pool.get('document.incoming')
|
|
return Incoming.fields_get(['type'])['type']['selection']
|
|
|
|
@classmethod
|
|
def get_service(cls, document):
|
|
pattern = cls._get_pattern(document)
|
|
key = tuple(sorted(pattern.items()))
|
|
service_id = cls._get_service_cache.get(key, -1)
|
|
if service_id is None:
|
|
return None
|
|
if service_id >= 0:
|
|
return cls(service_id)
|
|
for service in cls.search([]):
|
|
if service.match(pattern):
|
|
break
|
|
else:
|
|
service = None
|
|
cls._get_service_cache.set(key, service.id if service else None)
|
|
return service
|
|
|
|
@classmethod
|
|
def _get_pattern(cls, document):
|
|
return {
|
|
'company': document.company.id if document.company else None,
|
|
'source': document.source or None,
|
|
'document_type': document.type or None,
|
|
'mime_type': document.mime_type,
|
|
}
|
|
|
|
def match(self, pattern):
|
|
pattern = pattern.copy()
|
|
source = pattern.pop('source', None)
|
|
if (self.source
|
|
and (not source
|
|
or not re.search(self.source, source))):
|
|
return False
|
|
if not self.match_mime_type(pattern.pop('mime_type', None)):
|
|
return False
|
|
return super().match(pattern)
|
|
|
|
def match_mime_type(self, mime_type):
|
|
return True
|
|
|
|
def process(self, document):
|
|
if self.type:
|
|
return getattr(self, f'_process_{self.type}')(document)
|
|
|
|
def get_document_incoming(self, document):
|
|
if self.type:
|
|
return getattr(
|
|
self, f'_get_document_incoming_{self.type}')(document)
|
|
else:
|
|
return {}
|
|
|
|
def get_supplier_invoice(self, document):
|
|
if self.type:
|
|
return getattr(
|
|
self, f'_get_supplier_invoice_{self.type}')(document)
|
|
else:
|
|
return {}
|
|
|
|
def send_feedback(self, document):
|
|
if self.type:
|
|
getattr(self, f'_send_feedback_{self.type}')(document)
|
|
|
|
@classmethod
|
|
def on_modification(cls, mode, services, field_names=None):
|
|
super().on_modification(mode, services, field_names=field_names)
|
|
cls._get_service_cache.clear()
|