Files
2026-03-14 09:42:12 +00:00

444 lines
16 KiB
Python

# This file is part of Tryton. The COPYRIGHT file at the top level of
# this repository contains the full copyright notices and license terms.
import locale
from decimal import Decimal
from io import BytesIO
from itertools import zip_longest
from math import ceil
from lxml import etree
from pypdf import PdfReader, PdfWriter
from zeep.exceptions import Fault
from trytond.i18n import gettext
from trytond.model import fields
from trytond.model.exceptions import AccessError
from trytond.modules.stock_package_shipping.exceptions import (
PackingValidationError)
from trytond.modules.stock_package_shipping.stock import address_name
from trytond.pool import Pool, PoolMeta
from trytond.transaction import Transaction
from trytond.wizard import StateAction, StateTransition, Wizard
from .configuration import SHIPMENT_SERVICE, get_client
from .exceptions import DPDError
TRACKING_URL = 'https://tracking.dpd.de/status/%(code)s/parcel/%(reference)s'
def iter_pdf_pages(document):
if hasattr(document, 'pages'):
yield from document.pages
else:
for i in range(document.getNumPages()):
yield document.getPage(i)
class Package(metaclass=PoolMeta):
__name__ = 'stock.package'
def get_shipping_tracking_url(self, name):
url = super().get_shipping_tracking_url(name)
if (self.shipping_reference
and self.shipment
and self.shipment.id >= 0
and self.shipment.carrier
and self.shipment.carrier.shipping_service == 'dpd'):
party = self.shipment.shipping_to
address = self.shipment.shipping_to_address
if party and party.lang:
lang_code = party.lang.code
else:
lang_code = Transaction().language
if address and address.country:
code = '_'.join(
(lang_code.split('_')[0], address.country.code))
else:
code = lang_code
url = TRACKING_URL % {
'code': code,
'reference': self.shipping_reference,
}
return url
class ShippingDPDMixin:
__slots__ = ()
def validate_packing_dpd(self):
warehouse = self.shipping_warehouse
if not warehouse.address:
raise PackingValidationError(
gettext('stock_package_shipping_dpd'
'.msg_warehouse_address_required',
shipment=self.rec_name,
warehouse=warehouse.rec_name))
class CreateShipping(metaclass=PoolMeta):
__name__ = 'stock.shipment.create_shipping'
dpd = StateAction(
'stock_package_shipping_dpd.act_create_shipping_dpd_wizard')
def transition_start(self):
next_state = super().transition_start()
if self.record.carrier.shipping_service == 'dpd':
next_state = 'dpd'
return next_state
def do_dpd(self, action):
ctx = Transaction().context
return action, {
'model': ctx['active_model'],
'id': ctx['active_id'],
'ids': [ctx['active_id']],
}
class CreateDPDShipping(Wizard):
__name__ = 'stock.shipment.create_shipping.dpd'
start = StateTransition()
def transition_start(self):
pool = Pool()
Package = pool.get('stock.package')
shipment = self.record
if shipment.shipping_reference:
raise AccessError(
gettext('stock_package_shipping_dpd'
'.msg_shipment_has_reference_number',
shipment=shipment.rec_name))
credential = self.get_credential(shipment)
if not credential.depot or not credential.token:
credential.update_token()
carrier = shipment.carrier
shipping_client = get_client(credential.server, SHIPMENT_SERVICE)
print_options = self.get_print_options(shipment)
packages = shipment.root_packages
shipment_data = self.get_shipment_data(credential, shipment, packages)
count = 0
while count < 2:
lang = (credential.company.party.lang.code
if credential.company.party.lang else 'en')
lang = locale.normalize(lang)[:5]
authentication = {
'delisId': credential.user_id,
'authToken': credential.token,
'messageLanguage': lang,
}
try:
shipment_response = shipping_client.service.storeOrders(
print_options, shipment_data, _soapheaders={
'authentication': authentication,
})
break
except Fault as e:
if e.detail:
tag = etree.QName(e.detail[0].tag)
if tag.localname == 'authenticationFault':
count += 1
credential.update_token()
continue
raise DPDError(gettext(
'stock_package_shipping_dpd.'
'msg_dpd_webservice_error',
message=e.message)) from e
else:
raise DPDError(
gettext('stock_package_shipping_dpd.msg_dpd_login_error',
credential=credential.rec_name))
response, = shipment_response.shipmentResponses
if response.faults:
message = '\n'.join(f.message for f in response.faults)
raise DPDError(
gettext('stock_package_shipping_dpd.msg_dpd_webservice_error',
message=message))
labels = []
labels_pdf = BytesIO(shipment_response.output.content)
reader = PdfReader(labels_pdf)
for page in iter_pdf_pages(reader):
new_pdf = PdfWriter()
new_label = BytesIO()
new_pdf.add_page(page)
new_pdf.write(new_label)
labels.append(new_label)
shipment.shipping_reference = response.mpsId
parcels = response.parcelInformation
for package, label, parcel in zip_longest(packages, labels, parcels):
package.shipping_label = fields.Binary.cast(label.getvalue())
package.shipping_label_mimetype = (
carrier.shipping_label_mimetype)
package.shipping_reference = parcel.parcelLabelNumber
Package.save(packages)
shipment.save()
return 'end'
def get_credential_pattern(self, shipment):
return {
'company': shipment.company.id,
}
def get_credential(self, shipment):
pool = Pool()
DPDCredential = pool.get('carrier.credential.dpd')
credential_pattern = self.get_credential_pattern(shipment)
for credential in DPDCredential.search([]):
if credential.match(credential_pattern):
return credential
def get_print_options(self, shipment):
return {
'printOption': [{
'outputFormat': shipment.carrier.dpd_output_format,
'paperFormat': shipment.carrier.dpd_paper_format,
},
],
}
def shipping_party(self, party, address, usage=None, with_contact=False):
if address.street_unstructured:
street = address.street_single_line
house_no = ''
else:
street = address.street_name or ''
house_no = address.numbers
name = address_name(address, party)
contact = party.full_name if party.full_name != name else ''
if with_contact and not contact:
contact = party.full_name
shipping_party = {
'name1': name[:50],
'name2': name[50:85],
'street': street[:50],
'houseNo': house_no[:8],
'country': address.country.code if address.country else '',
'zipCode': address.postal_code[:9],
'city': address.city[:50],
'contact': contact[:35],
}
phone = address.contact_mechanism_get({'phone', 'mobile'}, usage=usage)
if phone and len(phone.value) <= 30:
shipping_party['phone'] = phone.value
mobile = address.contact_mechanism_get('mobile', usage=usage)
if mobile and len(mobile.value) <= 30:
shipping_party['mobile'] = mobile.value
email = address.contact_mechanism_get('email', usage=usage)
if email and 5 <= len(email.value) <= 100:
shipping_party['email'] = email.value
return shipping_party
def get_parcel(self, shipment, package):
pool = Pool()
UoM = pool.get('product.uom')
ModelData = pool.get('ir.model.data')
cm = UoM(ModelData.get_id('product', 'uom_centimeter'))
kg = UoM(ModelData.get_id('product', 'uom_kilogram'))
parcel = {}
if package.total_weight:
# in grams rounded in 10 gram units
weight = UoM.compute_qty(
package.weight_uom, package.total_weight, kg, round=False)
weight = int(round(weight, 2) * 100)
if weight < 1000000000:
parcel['weight'] = weight
if (package.length is not None
and package.width is not None
and package.height is not None):
length = ceil(UoM.compute_qty(
package.length_uom, package.length, cm, round=False))
width = ceil(UoM.compute_qty(
package.width_uom, package.width, cm, round=False))
height = ceil(UoM.compute_qty(
package.height_uom, package.height, cm, round=False))
if 1 <= length < 1000 and 1 <= width < 1000 and 1 <= height < 1000:
parcel['volume'] = int(
'%03i%03i%03i' % (length, width, height))
return parcel
def get_shipment_data(self, credential, shipment, packages):
pool = Pool()
UoM = pool.get('product.uom')
ModelData = pool.get('ir.model.data')
cm3 = UoM(ModelData.get_id('product', 'uom_cubic_centimeter'))
kg = UoM(ModelData.get_id('product', 'uom_kilogram'))
volume = round(UoM.compute_qty(
shipment.volume_uom, shipment.volume, cm3, round=False))
weight = round(UoM.compute_qty(
shipment.weight_uom, shipment.weight, kg, round=False), 2)
return {
'generalShipmentData': {
'identificationNumber': shipment.number,
'sendingDepot': credential.depot,
'product': shipment.carrier.dpd_product,
'mpsVolume': int(volume),
'mpsWeight': int(weight * 100),
'sender': self.shipping_party(
shipment.company.party,
shipment.shipping_warehouse.address),
'recipient': self.shipping_party(
shipment.shipping_to, shipment.shipping_to_address),
},
'parcels': [self.get_parcel(shipment, p) for p in packages],
'productAndServiceData': self.get_product_and_service(shipment),
}
def get_product_and_service(self, shipment):
return {
'orderType': 'consignment',
**self.get_notification(shipment),
}
def get_notification(self, shipment, usage=None):
carrier = shipment.carrier
if not carrier.dpd_notification:
return {}
party = shipment.shipping_to
if party and party.lang:
lang_code = party.lang.code
else:
lang_code = Transaction().language
lang_code = lang_code.upper()
channel2type = {
'sms': {'mobile'},
}
channels = [
(1, 'email'),
(3, 'sms'),
]
if carrier.dpd_notification == 'sms':
channels = reversed(channels)
for channel_id, channel in channels:
mechanism = party.contact_mechanism_get(
channel2type.get(channel, channel), usage=usage)
if not mechanism:
continue
value = mechanism.value
if len(value) > 50:
continue
return {
'predict': {
'channel': channel_id,
'value': value,
'language': lang_code,
},
}
return {}
class CreateDPDShipping_Customs(metaclass=PoolMeta):
__name__ = 'stock.shipment.create_shipping.dpd'
def get_product_and_service(self, shipment):
pool = Pool()
Date = pool.get('ir.date')
Currency = pool.get('currency.currency')
with Transaction().set_context(company=shipment.company.id):
today = Date.today()
product_and_service = super().get_product_and_service(shipment)
if shipment.customs_international:
invoice_date = shipment.effective_date or today
currency, = {m.currency for m in shipment.customs_moves}
amount = sum(
Currency.compute(
curr, Decimal(str(v['quantity'])) * price, currency,
round=False)
for (product, price, curr, _), v in
shipment.customs_products.items())
international = {
'parcelType': 0,
'customsAmount': int(
amount.quantize(Decimal('.01')) * Decimal(100)),
'customsCurrency': currency.code,
'customsTerms': self.get_customs_terms(shipment),
'customsPaper': 'A',
'customsInvoice': shipment.number[:20],
'customsInvoiceDate': invoice_date.strftime('%Y%m%d'),
}
if customs_agent := shipment.customs_agent:
international.update({
'commercialInvoiceConsigneeVatNumber': (
customs_agent.tax_identifier.code)[:20],
'commercialInvoiceConsignee': self.shipping_party(
customs_agent.party,
customs_agent.address,
with_contact=True),
})
if shipment.tax_identifier:
international['commercialInvoiceConsignorVatNumber'] = (
shipment.tax_identifier.code[:17])
international['commercialInvoiceConsignor'] = self.shipping_party(
shipment.company.party, shipment.customs_from_address)
international['additionalInvoiceLines'] = [
{'customsInvoicePosition': i,
**self.get_international_invoice_line(shipment, *k, **v)}
for i, (k, v) in enumerate(
shipment.customs_products.items(), 1)]
international['numberOfArticle'] = len(
international['additionalInvoiceLines'])
product_and_service['international'] = international
return product_and_service
def get_customs_terms(self, shipment):
if shipment and shipment.incoterm:
if shipment.incoterm.code == 'DAP':
return '01'
elif shipment.incoterm.code == 'DDP':
return '03'
elif shipment.incoterm.code == 'EXW':
return '05'
def get_international_invoice_line(
self, shipment, product, price, currency, unit, quantity, weight):
tariff_code = product.get_tariff_code({
'date': shipment.effective_date or shipment.planned_date,
'country': (
shipment.customs_to_country.id
if shipment.customs_to_country else None),
})
weight = round(weight, 2)
value = price * Decimal(str(quantity))
if not quantity.is_integer():
quantity = 1
return {
'quantityItems': int(quantity),
'customsContent': product.name[:200],
'customsTarif': tariff_code.code if tariff_code else None,
'customsAmountLine': int(
value.quantize(Decimal('.01')) * Decimal(100)),
'customsOrigin': (
product.country_of_origin.code_numeric
if product.country_of_origin else None),
'customsGrossWeight': int(weight * 100),
}