Files
tradon/modules/party/party.py
2026-03-14 09:42:12 +00:00

1244 lines
43 KiB
Python

# This file is part of Tryton. The COPYRIGHT file at the top level of
# this repository contains the full copyright notices and license terms.
import logging
import re
import stdnum.exceptions
from sql import Column, Literal
from sql.aggregate import Min
from sql.functions import CharLength
from stdnum import get_cc_module
from stdnum.eu.vat import MEMBER_STATES as EU_MEMBER_STATES
from trytond import backend
from trytond.i18n import gettext
from trytond.model import (
DeactivableMixin, Index, ModelSQL, ModelView, MultiValueMixin, Unique,
ValueMixin, convert_from, fields, sequence_ordered)
from trytond.model.exceptions import AccessError
from trytond.pool import Pool
from trytond.pyson import Bool, Eval
from trytond.tools import is_full_text, lstrip_wildcard
from trytond.transaction import Transaction, inactive_records
from trytond.wizard import Button, StateTransition, StateView, Wizard
from .contact_mechanism import _PHONE_TYPES, _ContactMechanismMixin
from .exceptions import (
EraseError, InvalidIdentifierCode, SimilarityWarning, VIESUnavailable)
logger = logging.getLogger(__name__)
class Party(
DeactivableMixin, _ContactMechanismMixin, ModelSQL, ModelView,
MultiValueMixin):
__name__ = 'party.party'
_contact_mechanism_states = {
'readonly': Eval('id', -1) >= 0,
}
name = fields.Char(
"Name", strip=False,
help="The main identifier of the party.")
code = fields.Char(
"Code", required=True,
states={
'readonly': Eval('code_readonly', True),
},
help="The unique identifier of the party.")
code_readonly = fields.Function(fields.Boolean('Code Readonly'),
'get_code_readonly')
code_alnum = fields.Char("Code Alphanumeric", readonly=True)
code_digit = fields.Integer("Code Digit", readonly=True)
lang = fields.MultiValue(
fields.Many2One('ir.lang', "Language",
help="Used to translate communications with the party."))
langs = fields.One2Many(
'party.party.lang', 'party', "Languages")
identifiers = fields.One2Many(
'party.identifier', 'party', "Identifiers",
help="Add other identifiers of the party.")
tax_identifier = fields.Function(fields.Many2One(
'party.identifier', 'Tax Identifier',
help="The identifier used for tax report."),
'get_tax_identifier', searcher='search_tax_identifier')
addresses = fields.One2Many('party.address', 'party', "Addresses")
contact_mechanisms = fields.One2Many(
'party.contact_mechanism', 'party', "Contact Mechanisms")
categories = fields.Many2Many(
'party.party-party.category', 'party', 'category', "Categories",
help="The categories the party belongs to.")
replaced_by = fields.Many2One('party.party', "Replaced By", readonly=True,
states={
'invisible': ~Eval('replaced_by'),
},
help="The party replacing this one.")
full_name = fields.Function(fields.Char('Full Name'), 'get_full_name')
phone = fields.Function(
fields.Char("Phone", states=_contact_mechanism_states),
'get_contact_mechanism', setter='set_contact_mechanism')
mobile = fields.Function(
fields.Char("Mobile", states=_contact_mechanism_states),
'get_contact_mechanism', setter='set_contact_mechanism')
fax = fields.Function(
fields.Char("Fax", states=_contact_mechanism_states),
'get_contact_mechanism', setter='set_contact_mechanism')
email = fields.Function(
fields.Char("Email", states=_contact_mechanism_states),
'get_contact_mechanism', setter='set_contact_mechanism')
website = fields.Function(
fields.Char("Website", states=_contact_mechanism_states),
'get_contact_mechanism', setter='set_contact_mechanism')
distance = fields.Function(fields.Integer('Distance'), 'get_distance')
del _contact_mechanism_states
@classmethod
def __setup__(cls):
cls.code.search_unaccented = False
super().__setup__()
t = cls.__table__()
cls._sql_constraints = [
('code_uniq', Unique(t, t.code), 'party.msg_party_code_unique')
]
cls._sql_indexes.update({
Index(t, (t.code, Index.Equality(cardinality='high'))),
Index(t, (t.code, Index.Similarity(cardinality='high'))),
})
cls._order.insert(0, ('distance', 'ASC NULLS LAST'))
cls._order.insert(1, ('name', 'ASC'))
cls.active.states.update({
'readonly': Bool(Eval('replaced_by')),
})
@staticmethod
def order_code(tables):
table, _ = tables[None]
return [CharLength(table.code), table.code]
@staticmethod
def default_categories():
return Transaction().context.get('categories', [])
@staticmethod
def default_addresses():
if Transaction().user == 0:
return []
return [{}]
@classmethod
def default_lang(cls, **pattern):
Configuration = Pool().get('party.configuration')
config = Configuration(1)
lang = config.get_multivalue('party_lang', **pattern)
return lang.id if lang else None
@classmethod
def default_code_readonly(cls, **pattern):
Configuration = Pool().get('party.configuration')
config = Configuration(1)
return bool(config.get_multivalue('party_sequence', **pattern))
def get_code_readonly(self, name):
return True
def _get_identifier(self, name, types):
for identifier in self.identifiers:
if identifier.type in types:
return identifier.id
@classmethod
def _search_identifier(cls, name, clause, types):
_, operator, value = clause
nested = clause[0][len(name) + 1:]
domain = [
('identifiers', 'where', [
(nested or 'rec_name', operator, value),
('type', 'in', types),
]),
]
# Add party without identifier
if ((operator == '=' and value is None)
or (operator == 'in' and None in value)):
domain = ['OR',
domain, [
('identifiers', 'not where', [
('type', 'in', types),
]),
],
]
return domain
@classmethod
def tax_identifier_types(cls):
return TAX_IDENTIFIER_TYPES
def get_tax_identifier(self, name):
types = self.tax_identifier_types()
return self._get_identifier(name, types)
@classmethod
def search_tax_identifier(cls, name, clause):
types = cls.tax_identifier_types()
return cls._search_identifier(name, clause, types)
def get_full_name(self, name):
return self.name
def get_contact_mechanism(self, name):
usage = Transaction().context.get('party_contact_mechanism_usage')
mechanism = self.contact_mechanism_get(name, usage)
return mechanism.value if mechanism else ''
@classmethod
def set_contact_mechanism(cls, parties, name, value):
pool = Pool()
ContactMechanism = pool.get('party.contact_mechanism')
usage = Transaction().context.get('party_contact_mechanism_usage')
contact_mechanisms = []
for party in parties:
if getattr(party, name):
type_string = cls.fields_get([name])[name]['string']
raise AccessError(gettext(
'party.msg_party_set_contact_mechanism',
party=party.rec_name,
field=type_string))
if value:
contact_mechanism = None
if usage:
for contact_mechanism in party.contact_mechanisms:
if (contact_mechanism.type == name
and contact_mechanism.value == value):
break
else:
contact_mechanism = None
if not contact_mechanism:
contact_mechanism = ContactMechanism(
party=party,
type=name,
value=value)
if usage:
setattr(contact_mechanism, usage, True)
contact_mechanisms.append(contact_mechanism)
ContactMechanism.save(contact_mechanisms)
@classmethod
def _distance_query(cls, usages=None, party=None, depth=None):
context = Transaction().context
if party is None:
party = context.get('related_party')
if not party:
return
table = cls.__table__()
return table.select(
table.id.as_('to'),
Literal(0).as_('distance'),
where=(table.id == party))
@classmethod
def get_distance(cls, parties, name):
distances = {p.id: None for p in parties}
query = cls._distance_query()
if query:
cursor = Transaction().connection.cursor()
cursor.execute(*query.select(
query.to.as_('to'),
Min(query.distance).as_('distance'),
group_by=[query.to]))
distances.update(cursor)
return distances
@classmethod
def order_distance(cls, tables):
party, _ = tables[None]
key = 'distance'
if key not in tables:
query = cls._distance_query()
if not query:
return []
query = query.select(
query.to.as_('to'),
Min(query.distance).as_('distance'),
group_by=[query.to])
join = party.join(query, type_='LEFT',
condition=query.to == party.id)
tables[key] = {
None: (join.right, join.condition),
}
else:
query, _ = tables[key][None]
return [query.distance]
@classmethod
def index_set_field(cls, name):
index = super().index_set_field(name)
if name in _PHONE_TYPES:
# Phone validation may need the address country
index = cls.index_set_field('addresses') + 1
return index
@classmethod
def _code_sequence(cls, **pattern):
pool = Pool()
Configuration = pool.get('party.configuration')
config = Configuration(1)
return config.get_multivalue('party_sequence', **pattern)
@classmethod
def preprocess_values(cls, mode, values):
values = super().preprocess_values(mode, values)
if mode == 'create':
if not values.get('code'):
if sequence := cls._code_sequence():
values['code'] = sequence.get()
values.setdefault('addresses', None)
return values
def compute_fields(self, field_names=None):
values = super().compute_fields(field_names=field_names)
if (field_names is None or 'code' in field_names):
values['code_alnum'] = (
re.sub(r'[\W_]', '', self.code)
if self.code is not None else None)
try:
values['code_digit'] = int(re.sub(r'\D', '', self.code or ''))
except ValueError:
values['code_digit'] = None
return values
@classmethod
def copy(cls, parties, default=None):
if default is None:
default = {}
else:
default = default.copy()
default.setdefault('code', None)
return super().copy(parties, default=default)
@classmethod
def search_global(cls, text):
for record, rec_name, icon in super().search_global(text):
icon = icon or 'tryton-party'
yield record, rec_name, icon
def get_rec_name(self, name):
if not self.name:
return '[' + self.code + ']'
return self.name
@classmethod
def search_rec_name(cls, name, clause):
_, operator, operand, *extra = clause
if operator.startswith('!') or operator.startswith('not '):
bool_op = 'AND'
else:
bool_op = 'OR'
code_value = operand
if operator.endswith('like') and is_full_text(operand):
code_value = lstrip_wildcard(operand)
return [bool_op,
('code', operator, code_value, *extra),
('identifiers.code', operator, code_value, *extra),
('name', operator, operand, *extra),
('contact_mechanisms.rec_name', operator, operand, *extra),
]
def address_get(self, type=None):
"""
Try to find an address for the given type, if no type matches
the first address is returned.
"""
default_address = None
if self.addresses:
default_address = self.addresses[0]
if type:
for address in self.addresses:
if getattr(address, type):
return address
return default_address
@classmethod
def autocomplete(cls, text, domain=None, limit=None, order=None):
pool = Pool()
Configuration = pool.get('party.configuration')
configuration = Configuration(1)
result = super().autocomplete(
text, domain=domain, limit=limit, order=order)
identifier_types = dict(configuration.get_identifier_types())
def add(type, code):
result.append({
'id': None,
'name': code,
'defaults': {
'identifiers': [{
'type': type,
'code': code,
}],
},
})
eu_vat = get_cc_module('eu', 'vat')
if 'eu_vat' in identifier_types and eu_vat.is_valid(text):
code = eu_vat.compact(text)
add('eu_vat', code)
else:
for country in eu_vat.guess_country(text):
if 'eu_vat' in identifier_types:
code = eu_vat.compact(country + text)
add('eu_vat', code)
elif f'{country}_vat' in identifier_types:
vat = get_cc_module(country, 'vat')
code = vat.compact(text)
add(f'{country}_vat', code)
return result
@classmethod
def default_get(
cls, fields_names=None, with_rec_name=True, with_default=True):
pool = Pool()
Country = pool.get('country.country')
values = super().default_get(
fields_names=fields_names,
with_rec_name=with_rec_name,
with_default=with_default)
eu_vat = get_cc_module('eu', 'vat')
eu_types = {'eu_vat'} | {f'{m}_vat' for m in EU_MEMBER_STATES}
for identifier in values.get('identifiers') or []:
if identifier.get('type') in eu_types and identifier.get('code'):
code = identifier['code']
if identifier['type'] != 'eu_vat':
code = identifier['type'][2:] + code
try:
result = eu_vat.check_vies(code)
except Exception:
logger.debug(
f"Fail to check {identifier['code']}", exc_info=True)
continue
if result['valid']:
values['name'] = result['name']
for address in values.get('addresses') or []:
if not address:
break
else:
address = {}
values.setdefault('addresses', []).insert(0, address)
address['street'] = result['address']
country_code = code[:2]
countries = Country.search([
('code', 'ilike', country_code),
], limit=1)
if countries:
country, = countries
address['country'] = country.id
if with_rec_name:
address.setdefault('country.', {})['rec_name'] = (
country.rec_name)
break
return values
class PartyLang(ModelSQL, ValueMixin):
__name__ = 'party.party.lang'
party = fields.Many2One(
'party.party', "Party", ondelete='CASCADE')
lang = fields.Many2One('ir.lang', "Language")
class PartyCategory(ModelSQL):
__name__ = 'party.party-party.category'
party = fields.Many2One(
'party.party', "Party", ondelete='CASCADE', required=True)
category = fields.Many2One(
'party.category', "Category", ondelete='CASCADE', required=True)
@classmethod
def __register__(cls, module):
# Migration from 7.0: rename to standard name
backend.TableHandler.table_rename('party_category_rel', cls._table)
super().__register__(module)
IDENTIFIER_VAT = [
'ad_nrt',
'al_nipt',
'ar_cuit',
'at_uid',
'au_abn',
'br_cnpj',
'by_unp',
'ca_bn',
'cl_rut',
'cn_uscc',
'co_nit',
'cr_cpj',
'cz_dic',
'dk_cvr',
'do_rnc',
'dz_nif',
'ec_ruc',
'ee_kmkr',
'eg_tn',
'es_nif',
'fi_alv',
'fo_vn',
'fr_tva',
'gh_tin',
'gn_nifp',
'gt_nit',
'hr_oib',
'hu_anum',
'id_npwp',
'il_hp',
'in_gstin',
'is_vsk',
'it_iva',
'jp_cn',
'ke_pin',
'kr_brn',
'lu_tva',
'lv_pvn',
'ma_ice',
'mc_tva',
'me_pib',
'mk_edb',
'mx_rfc',
'nl_btw',
'no_mva',
'nz_ird',
'pe_ruc',
'pl_nip',
'pt_nif',
'py_ruc',
'ro_cf',
'rs_pib',
'ru_inn',
'sg_uen',
'si_ddv',
'sk_dph',
'sm_coe',
'sv_nit',
'tn_mf',
'tr_vkn',
'tw_ubn',
'uy_rut',
've_rif',
'vn_mst',
]
def replace_vat(code):
if code in IDENTIFIER_VAT:
code = code.split('_', 1)[0] + '_vat'
return code
IDENTIFIER_TYPES = [
('ad_nrt', "Andorra Tax Number"),
('al_nipt', "Albanian VAT Number"),
('ar_cuit', "Argentinian Tax Number"),
('ar_dni', "Argentinian National Identity Number"),
('at_businessid', "Austrian Company Register"),
('at_tin', "Austrian Tax Identification"),
('at_uid', "Austrian Umsatzsteuer-Identifikationsnummer"),
('at_vnr', "Austrian Social Security Number"),
('au_abn', "Australian Business Number"),
('au_acn', "Australian Company Number"),
('au_tfn', "Australian Tax File Number"),
('be_bis', "Belgian BIS Number"),
('be_nn', "Belgian National Number"),
('be_vat', "Belgian VAT Number"),
('be_businessid', "Belgian Company Number"),
('bg_egn', "Bulgarian Personal Identity Codes"),
('bg_pnf', "Bulgarian Number of a Foreigner"),
('bg_vat', "Bulgarian VAT Number"),
('br_cnpj', "Brazillian Company Identifier"),
('br_cpf', "Brazillian National Identifier"),
('by_unp', "Belarus VAT Number"),
('ca_bn', "Canadian Business Number"),
('ca_sin', "Canadian Social Insurance Number"),
('ch_ssn', "Swiss Social Security Number"),
('ch_uid', "Swiss Business Identifier"),
('ch_vat', "Swiss VAT Number"),
('cl_rut', "Chilean National Tax Number"),
('cn_ric', "Chinese Resident Identity Card Number"),
('cn_uscc', "Chinese Unified Social Credit Code"),
('co_nit', "Colombian Identity Code"),
('cr_cpf', "Costa Rica Physical Person ID Number"),
('cr_cpj', "Costa Rica Tax Number"),
('cr_cr', "Costa Rica Foreigners ID Number"),
('cu_ni', "Cuban Identity Card Number"),
('cy_vat', "Cypriot VAT Number"),
('cz_dic', "Czech VAT Number"),
('cz_rc', "Czech National Identifier"),
('de_handelsregisternummer', "German Company Register Number"),
('de_idnr', "German Personal Tax Number"),
('de_stnr', "German Tax Number"),
('de_vat', "German VAT Number"),
('dk_cpr', "Danish Citizen Number"),
('dk_cvr', "Danish VAT Number"),
('do_cedula', "Dominican Republic National Identification Number"),
('do_rnc', "Dominican Republic Tax"),
('dz_nif', "Algerian Tax Number"),
('ec_ci', "Ecuadorian Personal Identity Code"),
('ec_ruc', "Ecuadorian Tax Identification"),
('ee_ik', "Estonian Personal ID Number"),
('ee_kmkr', "Estonian VAT Number"),
('ee_registrikood', "Estonian Organisation Registration Code"),
('eg_tn', "Egyptian Tax Registration Number"),
('es_cif', "Spanish Company Tax"),
('es_dni', "Spanish Personal Identity Codes"),
('es_nie', "Spanish Foreigner Number"),
('es_nif', "Spanish VAT Number"),
('eu_at_02', "SEPA Identifier of the Creditor (AT-02)"),
('eu_oss', "European VAT on e-Commerce - One Stop Shop"),
('eu_vat', "European VAT Number"),
('fi_alv', "Finnish VAT Number"),
('fi_associationid', "Finnish Association Identifier"),
('fi_hetu', "Finnish Personal Identity Code"),
('fi_veronumero', "Finnish Individual Tax Number"),
('fi_ytunnus', "Finnish Business Identifier"),
('fo_vn', "Faroese Tax Number"),
('fr_nif', "French Tax Identification Number"),
('fr_nir', "French Personal Identification Number"),
('fr_siren', "French Company Identification Number"),
('fr_siret', "French Company Establishment Identification Number"),
('fr_tva', "French VAT Number"),
('gb_nhs',
"United Kingdom National Health Service Patient Identifier"),
('gb_upn', "English Unique Pupil Number"),
('gb_utr', "United Kingdom Unique Taxpayer Reference"),
('gb_vat', "United Kingdom (and Isle of Man) VAT Number"),
('gh_tin', "Ghanaian Taxpayer Identification Number"),
('gn_nifp', "Guinean Tax Number"),
('gr_amka', "Greek Social Security Number"),
('gr_vat', "Greek VAT Number"),
('gt_nit', "Guatemala Tax Number"),
('hr_oib', "Croatian Identification Number"),
('hu_anum', "Hungarian VAT Number"),
('id_npwp', "Indonesian VAT Number"),
('ie_pps', "Irish Personal Number"),
('ie_vat', "Irish VAT Number"),
('il_hp', "Israeli Company Number"),
('il_idnr', "Israeli Identity Number"),
('in_aadhaar', "Indian Digital Resident Personal Identity Number"),
('in_epic', "Indian Voter ID"),
('in_gstin', "Indian VAT number"),
('in_pan', "Indian Income Tax Identifier"),
('is_kennitala',
"Icelandic Personal and Organisation Identity Code"),
('is_vsk', "Icelandic VAT Number"),
('it_codicefiscale', "Italian Tax Code for Individuals"),
('it_iva', "Italian VAT Number"),
('jp_cn', "Japanese Corporate Number"),
('ke_pin', "Kenya Tax Number"),
('kr_brn', "South Korea Business Registration Number"),
('kr_krn', "South Korean Resident Registration Number"),
('lt_asmens', "Lithuanian Personal Number"),
('lt_pvm', "Lithuanian Legal Entities"),
('lt_vat', "Lithuanian VAT Number"),
('lu_tva', "Luxembourgian VAT Number"),
('lv_pvn', "Latvian VAT Number"),
('ma_ice', "Moroccan Tax Number"),
('mc_tva', "Monacan VAT Number"),
('md_idno', "Moldavian Company Identification Number"),
('me_pib', "Montenegro Tax Number"),
('mk_edb', "Macedonian Tax Number"),
('mt_vat', "Maltese VAT Number"),
('mu_nid', "Mauritian National Identifier"),
('mx_rfc', "Mexican Tax Number"),
('my_nric',
"Malaysian National Registration Identity Card Number"),
('nl_brin', "Dutch School Identification Number"),
('nl_bsn', "Dutch Citizen Identification Number"),
('nl_btw', "Dutch VAT Number"),
('nl_onderwijsnummer', "Dutch Student Identification Number"),
('no_fodselsnummer',
"Norwegian Birth Number, the National Identity Number"),
('no_mva', "Norwegian VAT Number"),
('no_orgnr', "Norwegian Organisation Number"),
('nz_ird', "New Zealand Inland Revenue Department Number"),
('pe_cui', "Peruvian Identity Number"),
('pe_ruc', "Peruvian Company Tax Number"),
('pk_cnic', "Pakistani Computerised National Identity Card Number"),
('pl_nip', "Polish VAT Number"),
('pl_pesel', "Polish National Identification Number"),
('pl_regon', "Polish Register of Economic Units"),
('pt_cc', "Portuguese Identity Number"),
('pt_nif', "Portuguese VAT Number"),
('py_ruc', "Paraguay Tax Number"),
('ro_cf', "Romanian VAT Number"),
('ro_cnp', "Romanian Numerical Personal Code"),
('ro_onrc', "Romanian ONRC Number"),
('rs_pib', "Serbian Tax Identification"),
('ru_inn', "Russian Tax identifier"),
('se_orgnr', "Swedish Company Number"),
('se_personnummer', "Swedish Personal Number"),
('se_vat', "Swedish VAT Number"),
('sg_uen', "Singapore's Unique Entity Number"),
('si_businessid', "Slovenian Corporate Registration Number"),
('si_ddv', "Slovenian VAT Number"),
('si_emso', "Slovenian Unique Master Citizen Number"),
('sk_dph', "Slovak VAT Number"),
('sk_rc', "Slovak Birth Number"),
('sm_coe', "San Marino National Tax Number"),
('sv_nit', "El Salvador Tax Number"),
('th_moa', "Thai Memorandum of Association Number"),
('th_pin', "Thai Personal Identification Number"),
('th_tin', "Thai Taxpayer Identification Number"),
('tn_mf', "Tunisian Tax Number"),
('tr_vkn', "Turkish Tax Identification Number"),
('tr_tckimlik', "Turkish Personal Identification Number"),
('tw_ubn', "Taiwanese Tax Number"),
('ua_edrpou', "Ukrainian Identifier for Enterprises and Organizations"),
('ua_rntrc', "Ukrainian Individual Taxpayer Registration Number"),
('us_atin', "U.S. Adoption Taxpayer Identification Number"),
('us_ein', "U.S. Employer Identification Number"),
('us_itin', "U.S. Individual Taxpayer Identification Number"),
('us_ptin', "U.S. Preparer Tax Identification Number"),
('us_ssn', "U.S. Social Security Number"),
('us_tin', "U.S. Taxpayer Identification Number"),
('uy_rut', "Uruguay Tax Number"),
('ve_rif', "Venezuelan VAT Number"),
('vn_mst', "Vietnam Tax Number"),
('za_idnr', "South African Identity Document Number"),
('za_tin', "South African Tax Identification Number"),
]
IDENTIFIER_TYPES = list(map(
lambda x: (replace_vat(x[0]), x[1]),
IDENTIFIER_TYPES))
TAX_IDENTIFIER_TYPES = [
'ad_nrt',
'al_nipt',
'ar_cuit',
'at_uid',
'au_abn',
'au_acn',
'be_vat',
'bg_vat',
'br_cnpj',
'by_unp',
'ca_bn',
'ch_vat',
'cl_rut',
'cn_uscc',
'co_nit',
'cr_cpj',
'cy_vat',
'cz_dic',
'de_vat',
'dk_cvr',
'do_rnc',
'dz_nif',
'ec_ruc',
'ee_kmkr',
'eg_tn',
'es_nif',
'eu_vat',
'fi_alv',
'fo_vn',
'fr_tva',
'gb_vat',
'gh_tin',
'gn_nifp',
'gr_vat',
'gt_nit',
'hr_oib',
'hu_anum',
'id_npwp',
'ie_vat',
'il_hp',
'in_gstin',
'is_vsk',
'it_iva',
'jp_cn',
'ke_pin',
'kr_brn',
'lt_pvm',
'lu_tva',
'lv_pvn',
'ma_ice',
'mc_tva',
'md_idno',
'me_pib',
'mk_edb',
'mt_vat',
'mx_rfc',
'nl_btw',
'no_mva',
'nz_ird',
'pe_ruc',
'pl_nip',
'pt_nif',
'py_ruc',
'ro_cf',
'rs_pib',
'ru_inn',
'se_vat',
'sg_uen',
'si_ddv',
'sk_dph',
'sm_coe',
'sv_nit',
'th_tin',
'tn_mf',
'tr_vkn',
'tr_vkn',
'tw_ubn',
'ua_edrpou',
'ua_rntrc',
'us_atin',
'us_ein',
'us_itin',
'us_ptin',
'us_ssn',
'us_tin',
'uy_rut',
've_rif',
'vn_mst',
'za_tin',
]
TAX_IDENTIFIER_TYPES = list(map(replace_vat, TAX_IDENTIFIER_TYPES))
class Identifier(sequence_ordered(), DeactivableMixin, ModelSQL, ModelView):
__name__ = 'party.identifier'
_rec_name = 'code'
party = fields.Many2One(
'party.party', "Party", ondelete='CASCADE', required=True,
help="The party identified by this record.")
address = fields.Many2One(
'party.address', "Address", ondelete='CASCADE',
states={
'required': Eval('type_address', False),
'invisible': ~Eval('type_address', True),
},
domain=[
('party', '=', Eval('party', -1)),
],
help="The address identified by this record.")
type = fields.Selection('get_types', 'Type')
type_string = type.translated('type')
type_address = fields.Function(
fields.Boolean("Type of Address"), 'on_change_with_type_address')
code = fields.Char('Code', required=True)
@classmethod
def __setup__(cls):
super().__setup__()
cls.__access__.add('party')
@classmethod
def __register__(cls, module_name):
cursor = Transaction().connection.cursor()
table = cls.__table__()
super().__register__(module_name)
# Migration from 6.8: Use vat alias
for old in IDENTIFIER_VAT:
new = replace_vat(old)
cursor.execute(*table.update(
[table.type], [new],
where=table.type == old))
# Migration from 7.2: Rename uy_ruc into uy_rut
cursor.execute(*table.update([table.type], ['uy_rut'],
where=(table.type == 'uy_ruc')))
# Migration from 7.4: Rename co_rut into co_nit
cursor.execute(*table.update([table.type], ['co_nit'],
where=(table.type == 'co_rut')))
@classmethod
def get_types(cls):
pool = Pool()
Configuration = pool.get('party.configuration')
configuration = Configuration(1)
return [(None, '')] + configuration.get_identifier_types()
@classmethod
def _type_addresses(cls):
return {'fr_siret'}
@fields.depends('address', '_parent_address.party')
def on_change_address(self):
if self.address:
self.party = self.address.party
@fields.depends('type')
def on_change_with_type_address(self, name=None):
return self.type in self._type_addresses()
@fields.depends('type', 'code')
def on_change_with_code(self):
if self.type and '_' in self.type:
module = get_cc_module(*self.type.split('_', 1))
if module:
try:
return module.compact(self.code)
except stdnum.exceptions.ValidationError:
pass
return self.code
def pre_validate(self):
super().pre_validate()
self.check_code()
@fields.depends('type', 'party', 'code')
def check_code(self):
if self.type and '_' in self.type:
module = get_cc_module(*self.type.split('_', 1))
if module:
if not module.is_valid(self.code):
if self.party and self.party.id > 0:
party = self.party.rec_name
else:
party = ''
raise InvalidIdentifierCode(
gettext('party.msg_invalid_code',
type=self.type_string,
code=self.code,
party=party))
@fields.depends(methods=['_notify_duplicate'])
def on_change_notify(self):
notifications = super().on_change_notify()
notifications.extend(self._notify_duplicate())
return notifications
@fields.depends('party', 'code', methods=['on_change_with_code'])
def _notify_duplicate(self):
cls = self.__class__
if self.code:
code = self.on_change_with_code()
others = cls.search([
('id', '!=', self.id),
('party', '!=', self.party),
('type', '!=', None),
('code', '=', code),
], limit=1)
if others:
other, = others
yield ('warning', gettext(
'party.msg_party_identifier_duplicate',
party=other.party.rec_name,
type=other.type_string,
code=other.code))
class CheckVIESResult(ModelView):
__name__ = 'party.check_vies.result'
parties_succeed = fields.Many2Many('party.party', None, None,
'Parties Succeed', readonly=True, states={
'invisible': ~Eval('parties_succeed'),
})
parties_failed = fields.Many2Many('party.party', None, None,
'Parties Failed', readonly=True, states={
'invisible': ~Eval('parties_failed'),
})
class CheckVIES(Wizard):
__name__ = 'party.check_vies'
start_state = 'check'
check = StateTransition()
result = StateView('party.check_vies.result',
'party.check_vies_result', [
Button('OK', 'end', 'tryton-ok', True),
])
def transition_check(self):
parties_succeed = []
parties_failed = []
for party in self.records:
for identifier in party.identifiers:
if identifier.type != 'eu_vat':
continue
eu_vat = get_cc_module('eu', 'vat')
try:
if not eu_vat.check_vies(identifier.code)['valid']:
parties_failed.append(party.id)
else:
parties_succeed.append(party.id)
except Exception as e:
for msg in e.args:
if msg == 'INVALID_INPUT':
parties_failed.append(party.id)
break
elif msg in {
'SERVICE_UNAVAILABLE',
'MS_UNAVAILABLE',
'MS_MAX_CONCURRENT_REQ',
'GLOBAL_MS_MAX_CONCURRENT_REQ',
'TIMEOUT',
'SERVER_BUSY',
}:
raise VIESUnavailable(
gettext('party.msg_vies_unavailable')) from e
else:
raise
self.result.parties_succeed = parties_succeed
self.result.parties_failed = parties_failed
return 'result'
def default_result(self, fields):
return {
'parties_succeed': [p.id for p in self.result.parties_succeed],
'parties_failed': [p.id for p in self.result.parties_failed],
}
class Replace(Wizard):
__name__ = 'party.replace'
start_state = 'ask'
ask = StateView('party.replace.ask', 'party.replace_ask_view_form', [
Button("Cancel", 'end', 'tryton-cancel'),
Button("Replace", 'replace', 'tryton-launch', default=True),
])
replace = StateTransition()
def check_similarity(self):
pool = Pool()
Warning = pool.get('res.user.warning')
source = self.ask.source
destination = self.ask.destination
if source.name != destination.name:
key = 'party.replace name %s %s' % (source.id, destination.id)
if Warning.check(key):
raise SimilarityWarning(
key,
gettext('party.msg_different_name',
source_name=source.name,
destination_name=destination.name))
source_code = (source.tax_identifier.code
if source.tax_identifier else '')
destination_code = (destination.tax_identifier.code
if destination.tax_identifier else '')
if source_code != destination_code:
key = 'party.replace tax_identifier %s %s' % (
source.id, destination.id)
if Warning.check(key):
raise SimilarityWarning(
key,
gettext('party.msg_different_tax_identifier',
source_code=source_code,
destination_code=destination_code))
def transition_replace(self):
pool = Pool()
Address = pool.get('party.address')
ContactMechanism = pool.get('party.contact_mechanism')
Identifier = pool.get('party.identifier')
transaction = Transaction()
self.check_similarity()
source = self.ask.source
destination = self.ask.destination
Address.write(list(source.addresses), {
'active': False,
})
ContactMechanism.write(list(source.contact_mechanisms), {
'active': False,
})
Identifier.write(list(source.identifiers), {
'active': False,
})
source.replaced_by = destination
source.active = False
source.save()
cursor = transaction.connection.cursor()
for model_name, field_name in self.fields_to_replace():
Model = pool.get(model_name)
field = getattr(Model, field_name)
table = Model.__table__()
column = Column(table, field_name)
if field._type == 'reference':
source_value = str(source)
destination_value = str(destination)
else:
source_value = source.id
destination_value = destination.id
where = column == source_value
if transaction.database.has_returning():
returning = [table.id]
else:
cursor.execute(*table.select(table.id, where=where))
ids = [x[0] for x in cursor]
returning = None
cursor.execute(*table.update(
[column],
[destination_value],
where=where,
returning=returning))
if transaction.database.has_returning():
ids = [x[0] for x in cursor]
Model._insert_history(ids)
return 'end'
@classmethod
def fields_to_replace(cls):
return [
('party.address', 'party'),
('party.contact_mechanism', 'party'),
('party.identifier', 'party'),
]
class ReplaceAsk(ModelView):
__name__ = 'party.replace.ask'
source = fields.Many2One('party.party', "Source", required=True,
help="The party to be replaced.")
destination = fields.Many2One('party.party', "Destination", required=True,
domain=[
('id', '!=', Eval('source', -1)),
],
help="The party that replaces.")
@classmethod
def default_source(cls):
context = Transaction().context
if context.get('active_model') == 'party.party':
return context.get('active_id')
@fields.depends('source')
def on_change_source(self):
if self.source and self.source.replaced_by:
self.destination = self.source.replaced_by
class Erase(Wizard):
__name__ = 'party.erase'
start_state = 'ask'
ask = StateView('party.erase.ask', 'party.erase_ask_view_form', [
Button("Cancel", 'end', 'tryton-cancel'),
Button("Erase", 'erase', 'tryton-clear', default=True),
])
erase = StateTransition()
def transition_erase(self):
pool = Pool()
Party = pool.get('party.party')
transaction = Transaction()
cursor = transaction.connection.cursor()
resources = self.get_resources()
parties = replacing = [self.ask.party]
with inactive_records():
while replacing:
replacing = Party.search([
('replaced_by', 'in', list(map(int, replacing))),
])
parties += replacing
for party in parties:
self.check_erase(party)
to_erase = self.to_erase(party.id)
for Model, domain, resource, columns, values in to_erase:
assert issubclass(Model, ModelSQL)
assert len(columns) == len(values)
if 'active' in Model._fields:
records = Model.search(domain)
Model.write(records, {'active': False})
tables, where = Model.search_domain(domain, active_test=False)
from_ = convert_from(None, tables, type_='INNER')
table, _ = tables[None]
query = from_.select(table.id, where=where)
if columns:
model_tables = [Model.__table__()]
if Model._history:
model_tables.append(Model.__table_history__())
for table in model_tables:
sql_columns, sql_values = [], []
for column, value in zip(columns, values):
column = Column(table, column)
sql_columns.append(column)
sql_values.append(
value(column) if callable(value) else value)
cursor.execute(*table.update(
sql_columns, sql_values,
where=table.id.in_(query)))
if resource:
for Resource in resources:
model_tables = [
(Resource.__table__(), Resource.resource)]
if Resource._history:
model_tables.append(
(Resource.__table_history__(),
Resource.resource))
for (table, resource_field) in model_tables:
cursor.execute(*table.delete(
where=table.resource.like(
Model.__name__ + ',%')
& resource_field.sql_id(
table.resource, Model).in_(query)))
transaction.counter += 1
return 'end'
def check_erase(self, party):
if party.active:
raise EraseError(gettext('party.msg_erase_active_party',
party=party.rec_name))
def to_erase(self, party_id):
pool = Pool()
Party = pool.get('party.party')
Identifier = pool.get('party.identifier')
Address = pool.get('party.address')
ContactMechanism = pool.get('party.contact_mechanism')
return [
(Party, [('id', '=', party_id)], True,
['name'],
[None]),
(Identifier, [('party', '=', party_id)], True,
['type', 'code'],
[None, '****']),
(Address, [('party', '=', party_id)], True,
['street_unstructured', 'street_name', 'building_name',
'building_number', 'unit_number', 'floor_number',
'room_number', 'post_box', 'private_bag', 'post_office',
'postal_code', 'city', 'country', 'subdivision'],
[None] * 14),
(ContactMechanism, [('party', '=', party_id)], True,
['value', 'name', 'comment'],
[None, None, None]),
]
@classmethod
def get_resources(cls):
pool = Pool()
Attachment = pool.get('ir.attachment')
Note = pool.get('ir.note')
Avatar = pool.get('ir.avatar')
return [Attachment, Note, Avatar]
class EraseAsk(ModelView):
__name__ = 'party.erase.ask'
party = fields.Many2One('party.party', "Party", required=True,
help="The party to be erased.")
@classmethod
def default_party(cls):
context = Transaction().context
if context.get('active_model') == 'party.party':
return context.get('active_id')