Files
tradon/modules/account_stock_eu/account_stock_eu.py
2026-03-14 09:42:12 +00:00

548 lines
18 KiB
Python

# This file is part of Tryton. The COPYRIGHT file at the top level of
# this repository contains the full copyright notices and license terms.
import csv
import zipfile
from io import BytesIO, TextIOWrapper
from itertools import groupby
from sql import Null
from sql.aggregate import Min, Sum
from sql.functions import Extract, Round
from trytond.cache import Cache
from trytond.model import (
Check, DeactivableMixin, Index, ModelSQL, ModelView, Unique, Workflow,
fields)
from trytond.modules.account.exceptions import FiscalYearNotFoundError
from trytond.pool import Pool, PoolMeta
from trytond.pyson import Eval
from trytond.tools import grouped_slice
from trytond.transaction import Transaction
from trytond.wizard import Button, StateTransition, StateView, Wizard
class IntrastatTransaction(DeactivableMixin, ModelSQL, ModelView):
__name__ = 'account.stock.eu.intrastat.transaction'
_rec_name = 'code'
code = fields.Char("Code", size=2, required=True)
parent = fields.Many2One(
'account.stock.eu.intrastat.transaction', "Parent")
description = fields.Char("Name", translate=True, required=True)
_code_cache = Cache(
'account.stock.eu.intrastat.transaction.code', context=False)
@classmethod
def get(cls, code, date=None):
pool = Pool()
Date = pool.get('ir.date')
if date is None:
date = Date.today()
if date.year < 2022:
code = code[:1]
transaction_id = cls._code_cache.get(code, -1)
if transaction_id == -1:
transactions = cls.search([
('code', '=', code)
], limit=1)
if transactions:
transaction, = transactions
cls._code_cache.set(code, transaction.id)
else:
transaction = None
cls._code_cache.set(code, None)
elif transaction_id is not None:
transaction = cls(transaction_id)
else:
transaction = None
return transaction
@classmethod
def on_modification(cls, mode, transactions, field_names=None):
super().on_modification(mode, transactions, field_names=field_names)
cls._code_cache.clear()
class IntrastatTransport(DeactivableMixin, ModelSQL, ModelView):
__name__ = 'account.stock.eu.intrastat.transport'
name = fields.Char("Name", translate=True, required=True)
code = fields.Char("Code", size=2, required=True)
class IntrastatDeclaration(Workflow, ModelSQL, ModelView):
__name__ = 'account.stock.eu.intrastat.declaration'
_states = {
'readonly': Eval('id', -1) >= 0,
}
company = fields.Many2One(
'company.company', "Company", required=True,
states=_states)
country = fields.Many2One(
'country.country', "Country", required=True,
states=_states)
month = fields.Date(
"Month", required=True,
states=_states)
state = fields.Selection([
('opened', "Opened"),
('closed', "Closed"),
], "State", readonly=True, required=True, sort=False)
extended = fields.Function(
fields.Boolean("Extended"),
'on_change_with_extended')
del _states
_get_cache = Cache(
'account.stock.eu.intrastat.declaration.get', context=False)
@classmethod
def __setup__(cls):
super().__setup__()
t = cls.__table__()
cls._sql_constraints += [
('company_country_month_unique',
Unique(t, t.company, t.country, t.month),
'account_stock_eu'
'.msg_intrastat_declaration_company_country_month_unique'),
('month_first_day', Check(t, Extract('DAY', t.month) == 1),
'account_stock_eu'
'.msg_intrastat_declaration_month_first_day'),
]
cls._sql_indexes.add(
Index(
t,
(t.state, Index.Equality(cardinality='low')),
where=t.state == 'opened'))
cls._order = [
('month', 'DESC'),
('company.id', 'DESC'),
('country.id', 'DESC'),
('id', 'DESC'),
]
cls._transitions |= {
('opened', 'closed'),
('closed', 'opened'),
}
cls._buttons.update({
'export': {},
})
@classmethod
def default_state(cls):
return 'opened'
@fields.depends('company', 'month')
def on_change_with_extended(self, name=None):
pool = Pool()
FiscalYear = pool.get('account.fiscalyear')
if self.company and self.month:
try:
fiscalyear = FiscalYear.find(
self.company.id,
date=self.month)
except FiscalYearNotFoundError:
pass
else:
return fiscalyear.intrastat_extended
def get_rec_name(self, name):
pool = Pool()
Lang = pool.get('ir.lang')
lang = Lang.get()
return '-'.join([
self.country.rec_name,
self.company.rec_name,
lang.strftime(self.month, '%m/%Y')])
@classmethod
def get(cls, company, country, date):
month = date.replace(day=1)
key = (company.id, country.id, date)
declaration = cls._get_cache.get(key)
if declaration is None:
declarations = cls.search([
('company', '=', company.id),
('country', '=', country.id),
('month', '=', month),
], limit=1)
if declarations:
declaration, = declarations
else:
declaration = cls(
company=company, country=country, month=month)
declaration.save()
cls._get_cache.set(key, declaration.id)
else:
declaration = cls(declaration)
cls.open([declaration])
return declaration
@classmethod
@Workflow.transition('opened')
def open(cls, declarations):
pass
@classmethod
@Workflow.transition('closed')
def close(cls, declarations):
pass
@classmethod
@ModelView.button_action(
'account_stock_eu.wizard_intrastat_declaration_export')
def export(cls, declarations):
cls.close(declarations)
@classmethod
def on_modification(cls, mode, declarations, field_names=None):
super().on_modification(mode, declarations, field_names=field_names)
cls._get_cache.clear()
class IntrastatDeclarationContext(ModelView):
__name__ = 'account.stock.eu.intrastat.declaration.context'
company = fields.Many2One('company.company', "Company", required=True)
declaration = fields.Many2One(
'account.stock.eu.intrastat.declaration', "Declaration", required=True,
domain=[
('company', '=', Eval('company', -1)),
])
@classmethod
def default_company(cls):
pool = Pool()
Declaration = pool.get('account.stock.eu.intrastat.declaration')
declaration = cls.default_declaration()
if declaration is not None:
declaration = Declaration(declaration)
return declaration.company.id
return Transaction().context.get('company')
@fields.depends('company', 'declaration')
def on_change_company(self):
if not self.company:
self.declaration = None
else:
if self.declaration.company != self.company:
self.declaration = None
@classmethod
def default_declaration(cls):
return Transaction().context.get('declaration')
class IntrastatDeclarationLine(ModelSQL, ModelView):
__name__ = 'account.stock.eu.intrastat.declaration.line'
type = fields.Selection([
('arrival', "Arrival"),
('dispatch', "Dispatch"),
], "Type", sort=False)
country = fields.Many2One('country.country', "Country")
subdivision = fields.Many2One('country.subdivision', "Subdivision")
tariff_code = fields.Many2One('customs.tariff.code', "Tariff Code")
weight = fields.Float("Weight", digits=(None, 3))
value = fields.Numeric("Value", digits=(None, 2))
transaction = fields.Many2One(
'account.stock.eu.intrastat.transaction', "Transaction")
additional_unit = fields.Float("Additional Unit", digits=(None, 3))
country_of_origin = fields.Many2One('country.country', "Country of Origin")
vat = fields.Many2One(
'party.identifier', "VAT",
states={
'invisible': Eval('type') != 'dispatch',
})
@classmethod
def __setup__(cls):
super().__setup__()
cls._order[0:0] = [
('type', 'ASC'),
('country', 'ASC'),
('tariff_code', 'ASC'),
('transaction', 'ASC'),
('country_of_origin', 'ASC'),
('vat', 'ASC'),
]
@classmethod
def view_attributes(cls):
pool = Pool()
Declaration = pool.get('account.stock.eu.intrastat.declaration')
context = Transaction().context
attributes = super().view_attributes()
if context.get('declaration') is not None:
declaration = Declaration(context['declaration'])
if not declaration.extended:
attributes.append(
('//field[@name="transport"]|//field[@name="incoterm"]',
'tree_invisible', True))
return attributes
@classmethod
def table_query(cls):
pool = Pool()
Move = pool.get('stock.move')
context = Transaction().context
move = Move.__table__()
where = move.intrastat_type != Null
where &= move.state == 'done'
where &= move.company == context.get('company')
where &= move.intrastat_declaration == context.get('declaration')
return (move
.select(
Min(move.id).as_('id'),
*cls._columns(move),
where=where,
group_by=cls._group_by(move),
))
@classmethod
def _columns(cls, move):
weight_factor = 10 ** cls.weight.digits[1]
value_precision = cls.value.digits[1]
additional_unit_factor = 10 ** cls.additional_unit.digits[1]
return [
move.intrastat_type.as_('type'),
move.intrastat_country.as_('country'),
move.intrastat_subdivision.as_('subdivision'),
move.intrastat_tariff_code.as_('tariff_code'),
(Round(Sum(move.internal_weight) * weight_factor)
/ weight_factor).as_('weight'),
Round(Sum(move.intrastat_value), value_precision).as_('value'),
move.intrastat_transaction.as_('transaction'),
(Round(
Sum(move.intrastat_additional_unit)
* additional_unit_factor)
/ additional_unit_factor).as_('additional_unit'),
move.intrastat_country_of_origin.as_('country_of_origin'),
move.intrastat_vat.as_('vat'),
]
@classmethod
def _group_by(cls, move):
return [
move.intrastat_type,
move.intrastat_country,
move.intrastat_subdivision,
move.intrastat_tariff_code,
move.intrastat_transaction,
move.intrastat_country_of_origin,
move.intrastat_vat,
]
class IntrastatDeclarationLine_Incoterm(metaclass=PoolMeta):
__name__ = 'account.stock.eu.intrastat.declaration.line'
transport = fields.Many2One(
'account.stock.eu.intrastat.transport', "Transport")
incoterm = fields.Many2One('incoterm.incoterm', "Incoterm")
@classmethod
def _columns(cls, move):
return super()._columns(move) + [
move.intrastat_transport.as_('transport'),
move.intrastat_incoterm.as_('incoterm'),
]
@classmethod
def _group_by(cls, move):
return super()._group_by(move) + [
move.intrastat_transport,
move.intrastat_incoterm,
]
class IntrastatDeclarationExport(Wizard):
__name__ = 'account.stock.eu.intrastat.declaration.export'
start_state = 'generate'
generate = StateTransition()
result = StateView(
'account.stock.eu.intrastat.declaration.export.result',
'account_stock_eu.intrastat_declaration_export_result_view_form', [
Button("Close", 'end', 'tryton-close'),
])
def transition_generate(self):
pool = Pool()
Line = pool.get('account.stock.eu.intrastat.declaration.line')
declaration = self.record
with Transaction().set_context(
company=declaration.company.id,
declaration=declaration.id):
lines = Line.search([], order=[('type', 'ASC')])
content, extension = self.export(lines)
self.result.file = self.result.__class__.file.cast(content)
self.result.filename = f'{declaration.rec_name}.{extension}'
return 'result'
def export(self, lines):
country = self.record.country
method = getattr(
self, 'export_%s' % country.code.lower(), self.export_fallback)
return method(lines)
def export_fallback(self, lines):
data = BytesIO()
writer = csv.writer(
TextIOWrapper(data, encoding='utf-8', write_through=True))
for line in lines:
writer.writerow(self.export_fallback_row(line))
return data.getvalue(), 'csv'
def export_fallback_row(self, line):
dispatch = line.type == 'dispatch'
return [
line.type,
line.country.code,
line.subdivision.intrastat_code,
line.tariff_code.code,
round(line.weight, 3),
round(line.value, 2),
line.transaction.code,
line.additional_unit if line.additional_unit is not None else '',
line.country_of_origin.code
if dispatch and line.country_of_origin else '',
line.vat.code if dispatch and line.vat else '',
]
def default_result(self, fields):
file = self.result.file
self.result.file = None # No need to store in session
return {
'file': file,
'filename': self.result.filename,
}
class IntrastatDeclarationExport_Incoterm(metaclass=PoolMeta):
__name__ = 'account.stock.eu.intrastat.declaration.export'
def export_fallback_row(self, line):
row = super().export_fallback_row(line)
if self.record.extended:
row.append(line.transport.code if line.transport else '')
row.append(line.incoterm.code if line.incoterm else '')
return row
class IntrastatDeclarationExportResult(ModelView):
__name__ = 'account.stock.eu.intrastat.declaration.export.result'
file = fields.Binary("File", readonly=True, filename='filename')
filename = fields.Char("File Name", readonly=True)
class IntrastatDeclarationExport_BE(metaclass=PoolMeta):
__name__ = 'account.stock.eu.intrastat.declaration.export'
def export_be(self, lines):
data = BytesIO()
writer = csv.writer(
TextIOWrapper(data, encoding='utf-8', write_through=True),
delimiter=';')
for line in lines:
writer.writerow(self.export_be_row(line))
return data.getvalue(), 'csv'
def export_be_row(self, line):
dispatch = line.type == 'dispatch'
type = {
'arrival': '19',
'dispatch': '29',
}[line.type]
return [
type,
line.country.code,
line.transaction.code,
line.subdivision.intrastat_code,
line.tariff_code.code,
round(line.weight, 2),
round(line.additional_unit, 2)
if line.additional_unit is not None else '',
round(line.value, 2),
line.country_of_origin.code
if dispatch and line.country_of_origin else '',
line.vat.code if dispatch and line.vat else '',
]
class IntrastatDeclarationExport_BE_Incoterm(metaclass=PoolMeta):
__name__ = 'account.stock.eu.intrastat.declaration.export'
def export_be_row(self, line):
row = super().export_be_row(line)
if self.record.extended:
row.insert(8, line.transport.code if line.transport else '')
row.insert(9, line.incoterm.code if line.incoterm else '')
return row
class IntrastatDeclarationExport_ES(metaclass=PoolMeta):
__name__ = 'account.stock.eu.intrastat.declaration.export'
def export_es(self, lines):
data = BytesIO()
with zipfile.ZipFile(data, 'w') as zip:
for type, lines in groupby(lines, key=lambda l: l.type):
for i, lines in enumerate(grouped_slice(lines, 999)):
content, extension = self._export_es(lines)
filename = f'{type}-{i}.{extension}'
zip.writestr(filename, content)
return data.getvalue(), 'zip'
def _export_es(self, lines):
data = BytesIO()
writer = csv.writer(
TextIOWrapper(data, encoding='utf-8', write_through=True),
delimiter=';')
for line in lines:
writer.writerow(self.export_es_row(line))
return data.getvalue(), 'csv'
def export_es_row(self, line):
dispatch = line.type == 'dispatch'
return [
line.country.code,
line.subdivision.intrastat_code,
'', # incoterm
line.transaction.code,
'', # transport
'',
line.tariff_code.code,
line.country_of_origin.code
if dispatch and line.country_of_origin else '',
'',
round(line.weight, 3),
round(line.additional_unit, 3)
if line.additional_unit is not None else '',
round(line.value, 2),
round(line.value, 2),
line.vat.code if dispatch and line.vat else '',
]
class IntrastatDeclarationExport_ES_Incoterm(metaclass=PoolMeta):
__name__ = 'account.stock.eu.intrastat.declaration.export'
def export_es_row(self, line):
row = super().export_es_row(line)
if self.record.extended:
row[2] = line.incoterm.code if line.incoterm else ''
row[4] = line.transport.code if line.transport else ''
return row