Files
tradon/modules/document_incoming/document.py
2026-03-14 09:42:12 +00:00

377 lines
12 KiB
Python

# This file is part of Tryton. The COPYRIGHT file at the top level of
# this repository contains the full copyright notices and license terms.
import mimetypes
from collections import defaultdict
from io import BytesIO
from pypdf import PdfReader, PdfWriter
import trytond.config as config
from trytond.i18n import gettext
from trytond.model import (
DeactivableMixin, ModelSingleton, ModelSQL, ModelView, Workflow, fields)
from trytond.pool import Pool
from trytond.pyson import Eval, If
from trytond.transaction import Transaction
from trytond.wizard import Button, StateTransition, StateView, Wizard
from .exceptions import DocumentIncomingSplitError
if config.getboolean('document_incoming', 'filestore', default=True):
file_id = 'file_id'
store_prefix = config.get(
'document_incoming', 'store_prefix', default=None)
else:
file_id = store_prefix = None
class IncomingConfiguration(ModelSingleton, ModelSQL, ModelView):
__name__ = 'document.incoming.configuration'
class Incoming(DeactivableMixin, Workflow, ModelSQL, ModelView):
__name__ = 'document.incoming'
_states = {
'readonly': Eval('state') != 'draft',
}
name = fields.Char("Name", required=True, states=_states)
company = fields.Many2One('company.company', "Company", states=_states)
data = fields.Binary(
"Data", filename='name',
file_id=file_id, store_prefix=store_prefix,
states={
'required': ~Eval('children'),
'readonly': _states['readonly'],
})
parsed_data = fields.Dict(None, "Parsed Data", readonly=True)
file_id = fields.Char("File ID", readonly=True)
mime_type = fields.Function(
fields.Char("MIME Type"), 'on_change_with_mime_type')
type = fields.Selection([
(None, ""),
('document_incoming', "Unknown"),
], "Type",
states={
'required': Eval('active', True) & (Eval('state') == 'done'),
'readonly': _states['readonly'],
})
source = fields.Char("Source", states=_states)
parent = fields.Many2One(
'document.incoming', "Parent", readonly=True,
domain=[
('active', '=', False),
],
states={
'invisible': ~Eval('parent'),
})
children = fields.One2Many(
'document.incoming', 'parent', "Children", readonly=True,
states={
'invisible': ~Eval('children'),
})
result = fields.Reference(
"Result", selection='get_results', readonly=True,
states={
'required': Eval('active', True) & (Eval('state') == 'done'),
'invisible': ~Eval('result'),
})
state = fields.Selection([
('draft', "Draft"),
('processing', "Processing"),
('done', "Done"),
('cancelled', "Cancelled"),
], "State", required=True, readonly=True)
del _states
@classmethod
def __setup__(cls):
super().__setup__()
cls._transitions |= {
('draft', 'processing'),
('draft', 'cancelled'),
('processing', 'processing'),
('processing', 'done'),
('processing', 'draft'),
('cancelled', 'draft'),
}
cls._buttons.update(
cancel={
'invisible': Eval('state') != 'draft',
'depends': ['state'],
},
draft={
'invisible': ~Eval('state').in_(['processing', 'cancelled']),
'depends': ['state'],
},
split_wizard={
'invisible': (
(Eval('state') != 'draft')
| ~Eval('mime_type').in_(cls._split_mime_types())),
'depends': ['state', 'mime_type'],
},
process={
'pre_validate': [
If(~Eval('type'),
('type', '!=', None),
()),
],
'invisible': Eval('state') != 'draft',
'depends': ['state'],
},
proceed={
'invisible': Eval('state') != 'processing',
'depends': ['state'],
},
)
@classmethod
def __register__(cls, module):
super().__register__(module)
table_h = cls.__table_handler__(module)
# Migration from 7.6: remove not null on data
table_h.not_null_action('data', action='remove')
@fields.depends('name')
def on_change_with_mime_type(self, name=None):
if self.name:
type, _ = mimetypes.guess_type(self.name)
return type
@classmethod
def get_results(cls):
pool = Pool()
IrModel = pool.get('ir.model')
get_name = IrModel.get_name
models = cls._get_results()
return [(None, '')] + [(m, get_name(m)) for m in models]
@classmethod
def _get_results(cls):
return {'document.incoming'}
@classmethod
def default_state(cls):
return 'draft'
@classmethod
def view_attributes(cls):
process_states = cls._buttons['process'].copy()
process_states['invisible'] = Eval('state') != 'draft'
return super().view_attributes() + [
('/form//button[@name="process"]', 'states', process_states),
]
@classmethod
@ModelView.button
@Workflow.transition('cancelled')
def cancel(cls, documents):
pass
@classmethod
@ModelView.button
@Workflow.transition('draft')
def draft(cls, documents):
pass
@classmethod
@ModelView.button_action(
'document_incoming.wizard_document_incoming_split')
def split_wizard(cls, documents):
pass
@classmethod
def _split_mime_types(cls):
return ['application/pdf']
@classmethod
def from_inbound_email(cls, email_, rule):
def sanitize_name(value):
for forbidden_char in cls.name.forbidden_chars:
value = value.replace(forbidden_char, ' ')
return value
message = email_.as_dict()
active = not message.get('attachments')
data = message.get('text', message.get('html'))
if isinstance(data, str):
data = data.encode()
name = sanitize_name(message.get('subject') or 'No Subject')
document = cls(
active=active,
name=name,
company=rule.document_incoming_company,
data=data,
type=rule.document_incoming_type if active else None,
source='inbound_email',
)
children = []
for attachment in message.get('attachments', []):
child = cls(
name=sanitize_name(attachment['filename'] or 'data.bin'),
company=rule.document_incoming_company,
data=attachment['data'],
type=rule.document_incoming_type,
source='inbound_email')
children.append(child)
document.children = children
document.save()
return document
@classmethod
@ModelView.button
@Workflow.transition('processing')
def process(cls, documents, with_children=False):
transaction = Transaction()
context = transaction.context
with transaction.set_context(
queue_batch=context.get('queue_batch', True)):
cls.__queue__.proceed(documents, with_children=with_children)
@classmethod
@ModelView.button
@Workflow.transition('done')
def proceed(cls, documents, with_children=False):
pool = Pool()
Attachment = pool.get('ir.attachment')
results = defaultdict(list)
attachments = []
for document in documents:
if document.result or not document.active:
continue
document.result = getattr(document, f'_process_{document.type}')()
results[document.result.__class__].append(document.result)
attachment = Attachment(
name=document.name,
resource=document.result,
type='data',
data=document.data)
attachments.append(attachment)
for kls, records in results.items():
kls.save(records)
cls.save(documents)
Attachment.save(attachments)
if with_children:
children = list(filter(
lambda d: d.type,
(c for d in documents for c in d.children)))
if children:
cls.process(children, with_children=True)
def _process_document_incoming(self):
self.active = False
self.save()
document, = self.__class__.copy([self], default={
'type': None,
'parent': self.id,
})
return document
@classmethod
def copy(cls, documents, default=None):
default = default.copy() if default is not None else {}
default.setdefault('result')
default.setdefault('parsed_data')
default.setdefault('children')
return super().copy(documents, default=default)
def iter_pages(expression, size):
ranges = set()
for pages in expression.split(','):
pages = pages.split('-')
if not len(pages):
continue
if not pages[0]:
pages[0] = 1
if not pages[-1]:
pages[-1] = size
pages = list(map(int, filter(None, pages)))
ranges.add((
min(max(min(pages) - 1, 0), size),
min(max(max(pages), 0), size)))
ranges = sorted(ranges)
def iter_():
last = 0
for start, end in ranges:
if last != start:
yield range(last, start)
yield range(start, end)
last = end
if last != size:
yield range(last, size)
return iter_()
class IncomingSplit(Wizard):
__name__ = 'document.incoming.split'
start = StateView(
'document.incoming.split.start',
'document_incoming.document_incoming_split_start_view_form', [
Button("Cancel", 'end', 'tryton-cancel'),
Button("Split", 'split', 'tryton-ok', default=True),
])
split = StateTransition()
def default_start(self, fields):
if self.record.mime_type == 'application/pdf':
reader = PdfReader(BytesIO(self.record.data))
if len(reader.pages) == 1:
pages = '1'
else:
pages = '1-%d' % len(reader.pages)
else:
pages = ''
return {
'data': self.record.data,
'pages': pages,
}
def transition_split(self):
pool = Pool()
Document = pool.get('document.incoming')
if self.record.active and self.record.mime_type == 'application/pdf':
self.record.active = False
self.record.save()
reader = PdfReader(BytesIO(self.record.data))
try:
iter_ = iter_pages(self.start.pages, len(reader.pages))
except ValueError as exception:
raise DocumentIncomingSplitError(gettext(
'document_incoming.msg_document_split_invalid_pages',
expression=self.start.pages,
exception=exception)) from exception
for pages in iter_:
writer = PdfWriter()
for i in pages:
page = reader.pages[i]
writer.add_page(page)
data = BytesIO()
writer.write(data)
Document.copy([self.record], default={
'active': True,
'data': data.getvalue(),
'parent': self.record.id,
})
return 'end'
def end(self):
return 'reload'
class IncomingSplitStart(ModelView):
__name__ = 'document.incoming.split.start'
data = fields.Binary("Data", readonly=True)
pages = fields.Char(
"Pages", required=True,
help="List pages to split.\n"
"Ex: 1-3,4,5-6")