Files
2026-03-14 09:42:12 +00:00

2012 lines
74 KiB
Python

# This file is part of Tryton. The COPYRIGHT file at the top level of
# this repository contains the full copyright notices and license terms.
import datetime as dt
import logging
import urllib.parse
import uuid
from decimal import Decimal
from itertools import groupby
from operator import attrgetter
import stripe
import trytond.config as config
from trytond.cache import Cache
from trytond.i18n import gettext
from trytond.model import (
DeactivableMixin, Index, ModelSQL, ModelView, Unique, Workflow, dualmethod,
fields)
from trytond.modules.account_payment.exceptions import (
PaymentValidationError, ProcessError)
from trytond.modules.company.model import (
employee_field, reset_employee, set_employee)
from trytond.modules.currency.fields import Monetary
from trytond.pool import Pool, PoolMeta
from trytond.pyson import Eval, TimeDelta
from trytond.report import Report, get_email
from trytond.rpc import RPC
from trytond.sendmail import send_message_transactional
from trytond.tools import sql_pairing
from trytond.tools.email_ import set_from_header
from trytond.transaction import Transaction
from trytond.url import http_host
from trytond.wizard import Button, StateTransition, StateView, Wizard
from .common import StripeCustomerMethodMixin
from .exceptions import StripeAccountWarning
logger = logging.getLogger(__name__)
stripe.max_network_retries = config.getint(
'account_payment_stripe', 'max_network_retries', default=3)
RETRY_CODES = {'lock_timeout', 'token_in_use'}
STRIPE_VERSION = '2023-08-16'
class Journal(metaclass=PoolMeta):
__name__ = 'account.payment.journal'
stripe_account = fields.Many2One(
'account.payment.stripe.account', "Account", ondelete='RESTRICT',
states={
'required': Eval('process_method') == 'stripe',
'invisible': Eval('process_method') != 'stripe',
})
@classmethod
def __setup__(cls):
super().__setup__()
stripe_method = ('stripe', 'Stripe')
if stripe_method not in cls.process_method.selection:
cls.process_method.selection.append(stripe_method)
class Group(metaclass=PoolMeta):
__name__ = 'account.payment.group'
@classmethod
def __setup__(cls):
super().__setup__()
cls._buttons['succeed']['invisible'] |= (
Eval('process_method') == 'stripe')
class CheckoutMixin:
__slots__ = ()
stripe_checkout_id = fields.Char(
"Stripe Checkout ID", readonly=True, strip=False)
@classmethod
def copy(cls, records, default=None):
if default is None:
default = {}
else:
default = default.copy()
default.setdefault('stripe_checkout_id')
return super().copy(records, default=default)
@classmethod
@ModelView.button_action('account_payment_stripe.url_checkout')
def stripe_checkout(cls, records):
for record in records:
record.stripe_checkout_id = uuid.uuid4().hex
record.save()
return {
'url': record.stripe_checkout_url,
}
@property
def stripe_checkout_url(self):
pool = Pool()
database = Transaction().database.name
ModelData = pool.get('ir.model.data')
URL = pool.get('ir.action.url')
action = URL(ModelData.get_id('account_payment_stripe.url_checkout'))
return action.url % {
'http_host': http_host(),
'database': database,
'model': self.__class__.__name__,
'id': self.stripe_checkout_id,
}
class Payment(StripeCustomerMethodMixin, CheckoutMixin, metaclass=PoolMeta):
__name__ = 'account.payment'
stripe_checkout_needed = fields.Function(
fields.Boolean("Stripe Checkout Needed"),
'on_change_with_stripe_checkout_needed')
stripe_charge_id = fields.Char(
"Stripe Charge ID", readonly=True, strip=False,
states={
'invisible': ((Eval('process_method') != 'stripe')
| ~Eval('stripe_charge_id')),
})
stripe_capture = fields.Boolean(
"Stripe Capture",
states={
'invisible': Eval('process_method') != 'stripe',
'readonly': Eval('state') != 'draft',
})
stripe_captured = fields.Boolean(
"Stripe Captured", readonly=True,
states={
'invisible': Eval('process_method') != 'stripe',
})
stripe_capture_needed = fields.Function(
fields.Boolean(
"Stripe Capture Needed",
states={
'invisible': Eval('process_method') != 'stripe',
}),
'get_stripe_capture_needed')
stripe_token = fields.Char(
"Stripe Token", readonly=True, strip=False,
states={
'invisible': ~Eval('stripe_token'),
})
stripe_payment_intent_id = fields.Char(
"Stripe Payment Intent", readonly=True, strip=False,
states={
'invisible': ~Eval('stripe_payment_intent_id'),
})
stripe_chargeable = fields.Boolean(
"Stripe Chargeable",
states={
'invisible': ((Eval('process_method') != 'stripe')
| ~Eval('stripe_token')),
})
stripe_capturable = fields.Boolean(
"Stripe Capturable",
states={
'invisible': ((Eval('process_method') != 'stripe')
| ~Eval('stripe_payment_intent_id')
| ~Eval('stripe_capture_needed')),
})
stripe_idempotency_key = fields.Char(
"Stripe Idempotency Key", readonly=True, strip=False,
states={
'invisible': Eval('process_method') != 'stripe',
})
stripe_error_message = fields.Char("Stripe Error Message", readonly=True,
states={
'invisible': (
~Eval('stripe_error_message')
| (Eval('state') == 'succeeded')),
})
stripe_error_code = fields.Char("Stripe Error Code", readonly=True,
states={
'invisible': (
~Eval('stripe_error_code')
| (Eval('state') == 'succeeded')),
})
stripe_error_param = fields.Char("Stripe Error Param", readonly=True,
states={
'invisible': (
~Eval('stripe_error_param')
| (Eval('state') == 'succeeded')),
})
stripe_dispute_reason = fields.Char("Stripe Dispute Reason", readonly=True,
states={
'invisible': ~Eval('stripe_dispute_reason'),
})
stripe_dispute_status = fields.Char("Stripe Dispute Status", readonly=True,
states={
'invisible': ~Eval('stripe_dispute_status'),
})
stripe_amount = fields.Function(
fields.Integer(
"Stripe Amount",
states={
'invisible': Eval('process_method') != 'stripe',
}),
'get_stripe_amount', setter='set_stripe_amount')
stripe_refunds = fields.One2Many(
'account.payment.stripe.refund', 'payment', "Refunds",
states={
'invisible': ((Eval('process_method') != 'stripe')
| (~Eval('stripe_charge_id')
& ~Eval('stripe_payment_intent_id'))),
})
@classmethod
def __setup__(cls):
super().__setup__()
cls.stripe_customer.states['readonly'] = (
~Eval('state').in_(['draft', 'submitted', 'approved'])
| Eval('stripe_token')
| Eval('stripe_payment_intent_id'))
cls.stripe_customer_source.states['invisible'] |= (
Eval('stripe_token') | Eval('stripe_payment_intent_id'))
cls.stripe_customer_source.states['readonly'] = (
~Eval('state').in_(['draft', 'submitted', 'approved']))
cls.stripe_customer_source_selection.states['invisible'] |= (
Eval('stripe_token') | Eval('stripe_payment_intent_id'))
cls.stripe_customer_source_selection.states['readonly'] = (
~Eval('state').in_(['draft', 'submitted', 'approved']))
cls.stripe_customer_payment_method.states['invisible'] |= (
Eval('stripe_token'))
cls.stripe_customer_payment_method.states['readonly'] = (
~Eval('state').in_(['draft', 'submitted', 'approved'])
| Eval('stripe_payment_intent_id'))
cls.stripe_customer_payment_method_selection.states['invisible'] |= (
Eval('stripe_token'))
cls.stripe_customer_payment_method_selection.states['readonly'] = (
~Eval('state').in_(['draft', 'submitted', 'approved'])
| Eval('stripe_payment_intent_id'))
cls.amount.states['readonly'] &= ~Eval('stripe_capture_needed')
cls.stripe_amount.states.update(cls.amount.states)
cls._buttons.update({
'stripe_checkout': {
'invisible': ((Eval('process_method') != 'stripe')
| (~Eval('state', 'draft').in_(
['submitted', 'approved', 'processing'])
| ~Eval('stripe_checkout_needed', False))),
'depends': [
'process_method', 'state', 'stripe_checkout_needed'],
},
'stripe_do_capture': {
'invisible': ((Eval('process_method') != 'stripe')
| ((Eval('state', 'draft') != 'processing')
| ~Eval('stripe_capture_needed'))),
'depends': [
'process_method', 'state', 'stripe_capture_needed'],
},
'stripe_do_pull': {
'invisible': ((Eval('process_method') != 'stripe')
| (~Eval('stripe_charge_id')
& ~Eval('stripe_payment_intent_id'))),
'depends': [
'stripe_charge_id', 'stripe_payment_intent_id'],
},
})
@classmethod
def default_stripe_capture(cls):
return True
@classmethod
def default_stripe_captured(cls):
return False
@classmethod
def default_stripe_chargeable(cls):
return False
@classmethod
def default_stripe_capturable(cls):
return False
@classmethod
def default_stripe_idempotency_key(cls):
return uuid.uuid4().hex
@fields.depends('process_method',
'stripe_token', 'stripe_payment_intent_id',
'stripe_customer_source', 'stripe_customer_source_selection',
'stripe_customer_payment_method',
'stripe_customer_payment_method_selection')
def on_change_with_stripe_checkout_needed(self, name=None):
return (self.process_method == 'stripe'
and not self.stripe_token
and not self.stripe_payment_intent_id
and not self.stripe_customer_source
and not self.stripe_customer_payment_method)
def get_stripe_capture_needed(self, name):
return (self.journal.process_method == 'stripe'
and (self.stripe_charge_id
or self.stripe_payment_intent_id)
and not self.stripe_capture
and not self.stripe_captured)
def get_stripe_amount(self, name):
return int(self.amount * 10 ** self.currency.digits)
@classmethod
def set_stripe_amount(cls, payments, name, value):
keyfunc = attrgetter('currency')
payments = sorted(payments, key=keyfunc)
value = Decimal(value)
for currency, payments in groupby(payments, keyfunc):
cls.write(list(payments), {
'amount': value * 10 ** -Decimal(currency.digits),
})
@classmethod
def view_attributes(cls):
return super().view_attributes() + [
('//page[@id="stripe"]', 'states', {
'invisible': Eval('process_method') != 'stripe',
}),
]
@classmethod
def validate_fields(cls, payments, field_names):
super().validate_fields(payments, field_names)
cls.check_stripe_journal(payments, field_names)
@classmethod
def check_stripe_journal(cls, payments, field_names=None):
if field_names and not (field_names & {'kind', 'journal'}):
return
for payment in payments:
if (payment.kind != 'receivable'
and payment.journal.process_method == 'stripe'):
raise PaymentValidationError(
gettext('account_payment_stripe.msg_stripe_receivable',
journal=payment.journal.rec_name,
payment=payment.rec_name))
@classmethod
def preprocess_values(cls, mode, values):
values = super().preprocess_values(mode, values)
if mode == 'create':
if 'stripe_idempotency_key' not in values:
# Ensure to get a different key for each record
# default methods are called only once
values['stripe_idempotency_key'] = (
cls.default_stripe_idempotency_key())
return values
@classmethod
def copy(cls, payments, default=None):
if default is None:
default = {}
else:
default = default.copy()
default.setdefault('stripe_charge_id', None)
default.setdefault('stripe_token', None)
default.setdefault('stripe_payment_intent_id', None)
default.setdefault('stripe_idempotency_key', None)
default.setdefault('stripe_error_message', None)
default.setdefault('stripe_error_code', None)
default.setdefault('stripe_error_param', None)
default.setdefault('stripe_captured', False)
default.setdefault('stripe_chargeable', False)
default.setdefault('stripe_capturable', False)
return super().copy(payments, default=default)
@classmethod
@ModelView.button
@Workflow.transition('draft')
def draft(cls, payments):
super().draft(payments)
for payment in payments:
if payment.stripe_token:
payment.stripe_token = None
payment.stripe_payment_intent_id = None
cls.save(payments)
def process_stripe(self):
assert self.process_method == 'stripe'
if (not self.stripe_token
and not self.stripe_payment_intent_id
and not self.stripe_customer):
account = self.journal.stripe_account
for customer in self.party.stripe_customers:
if (customer.stripe_account == account
and customer.stripe_customer_id):
self.stripe_customer = customer
break
else:
raise ProcessError(
gettext('account_payment_stripe.msg_no_stripe_token',
self=self.rec_name))
self.save()
self.__class__.__queue__.stripe_charge([self])
@classmethod
def stripe_checkout(cls, payments):
for payment in payments:
if not payment.stripe_payment_intent_id:
payment_intent = stripe.PaymentIntent.create(
**payment._payment_intent_parameters(off_session=False))
payment.stripe_payment_intent_id = payment_intent.id
return super().stripe_checkout(payments)
def _send_email_checkout(self, from_=None):
pool = Pool()
Language = pool.get('ir.lang')
from_cfg = config.get('email', 'from')
self.stripe_checkout([self])
emails = self._emails_checkout()
if not emails:
logger.warning("Could not send checkout email for %d", self.id)
return
languages = [self.party.lang or Language.get()]
msg, title = get_email(
'account.payment.stripe.email_checkout', self, languages)
set_from_header(msg, from_cfg, from_ or from_cfg)
msg['To'] = emails
msg['Subject'] = title
send_message_transactional(msg)
def _emails_checkout(self):
emails = []
if self.party.email:
emails.append(self.party.email)
return emails
def _payment_intent_parameters(self, off_session=False):
idempotency_key = None
if self.stripe_idempotency_key:
idempotency_key = 'payment_intent-%s' % self.stripe_idempotency_key
params = {
'api_key': self.journal.stripe_account.secret_key,
'stripe_version': STRIPE_VERSION,
'amount': self.stripe_amount,
'currency': self.currency.code,
'automatic_payment_methods': {
'enabled': True,
},
'capture_method': 'automatic' if self.stripe_capture else 'manual',
'customer': (self.stripe_customer.stripe_customer_id
if self.stripe_customer else None),
'description': self.reference,
'off_session': off_session,
'idempotency_key': idempotency_key,
}
if self.stripe_customer_payment_method:
params['payment_method'] = self.stripe_customer_payment_method
params['confirm'] = True
return params
@classmethod
def stripe_charge(cls, payments=None, off_session=True):
"""Charge stripe payments
The transaction is committed after each payment charge.
"""
pool = Pool()
Customer = pool.get('account.payment.stripe.customer')
if payments is None:
payments = cls.search([
('state', '=', 'processing'),
('journal.process_method', '=', 'stripe'),
['OR',
[
('stripe_token', '!=', None),
('stripe_chargeable', '=', True),
],
('stripe_customer_source', '!=', None),
('stripe_customer_payment_method', '!=', None),
],
('stripe_charge_id', '=', None),
('stripe_payment_intent_id', '=', None),
('company', '=', Transaction().context.get('company')),
])
def create_charge(payment):
charge = stripe.Charge.create(**payment._charge_parameters())
payment.stripe_charge_id = charge.id
payment.stripe_captured = charge.captured
payment.save()
def create_payment_intent(payment):
try:
payment_intent = stripe.PaymentIntent.create(
**payment._payment_intent_parameters(
off_session=off_session))
except stripe.CardError as e:
error = e.json_body.get('error', {})
payment_intent = error.get('payment_intent')
if not payment_intent:
raise
payment.stripe_payment_intent_id = payment_intent['id']
payment.save()
for payment in payments:
# Use clear cache after a commit
payment = cls(payment.id)
if (payment.stripe_charge_id
or payment.stripe_payment_intent_id
or payment.journal.process_method != 'stripe'
or payment.state != 'processing'):
continue
payment.lock()
try:
if ((payment.stripe_token and payment.stripe_chargeable)
or payment.stripe_customer_source):
create_charge(payment)
elif payment.stripe_customer_payment_method:
create_payment_intent(payment)
except (stripe.RateLimitError,
stripe.APIConnectionError) as e:
logger.warning(str(e))
continue
except stripe.StripeError as e:
if e.code in RETRY_CODES:
logger.warning(str(e))
continue
payment.stripe_error_message = str(e)
payment.stripe_error_code = e.code
if isinstance(e, stripe.StripeErrorWithParamCode):
payment.stripe_error_param = e.param
payment.save()
cls.fail([payment])
except Exception:
logger.error(
"Error when processing payment %d", payment.id,
exc_info=True)
continue
Transaction().commit()
customers = [p.stripe_customer for p in payments if p.stripe_customer]
if customers:
Customer.__queue__.find_identical(customers)
def _charge_parameters(self):
source, customer = None, None
if self.stripe_token:
source = self.stripe_token
elif self.stripe_customer_source:
source = self.stripe_customer_source
if self.stripe_customer:
customer = self.stripe_customer.stripe_customer_id
idempotency_key = None
if self.stripe_idempotency_key:
idempotency_key = 'charge-%s' % self.stripe_idempotency_key
return {
'api_key': self.journal.stripe_account.secret_key,
'stripe_version': STRIPE_VERSION,
'amount': self.stripe_amount,
'currency': self.currency.code,
'capture': bool(self.stripe_capture),
'description': self.reference,
'customer': customer,
'source': source,
'idempotency_key': idempotency_key,
}
@classmethod
@ModelView.button
def stripe_do_capture(cls, payments):
cls.write(payments, {
'stripe_capture': True,
})
cls.__queue__.stripe_capture_(payments)
@classmethod
def stripe_capture_(cls, payments=None):
"""Capture stripe payments
The transaction is committed after each payment capture.
"""
if payments is None:
payments = cls.search([
('state', '=', 'processing'),
('journal.process_method', '=', 'stripe'),
['OR',
('stripe_charge_id', '!=', None),
[
('stripe_payment_intent_id', '!=', None),
('stripe_capturable', '=', True),
],
],
('stripe_captured', '=', False),
('stripe_capture', '=', True),
('company', '=', Transaction().context.get('company')),
])
def capture_charge(payment):
charge = stripe.Charge.retrieve(
payment.stripe_charge_id,
api_key=payment.journal.stripe_account.secret_key,
stripe_version=STRIPE_VERSION)
charge.capture(**payment._capture_parameters())
payment.stripe_captured = charge.captured
payment.save()
def capture_intent(payment):
params = payment._capture_parameters()
params['amount_to_capture'] = params.pop('amount')
stripe.PaymentIntent.capture(
payment.stripe_payment_intent_id,
api_key=payment.journal.stripe_account.secret_key,
stripe_version=STRIPE_VERSION,
**params)
payment.stripe_captured = True
payment.save()
for payment in payments:
# Use clear cache after a commit
payment = cls(payment.id)
if (payment.journal.process_method != 'stripe'
or payment.stripe_captured
or payment.state != 'processing'):
continue
payment.lock()
try:
if payment.stripe_charge_id:
capture_charge(payment)
elif payment.stripe_payment_intent_id:
capture_intent(payment)
except (stripe.RateLimitError,
stripe.APIConnectionError) as e:
logger.warning(str(e))
continue
except stripe.StripeError as e:
if e.code in RETRY_CODES:
logger.warning(str(e))
continue
payment.stripe_error_message = str(e)
payment.stripe_error_code = e.code
if isinstance(e, stripe.StripeErrorWithParamCode):
payment.stripe_error_param = e.param
payment.save()
cls.fail([payment])
except Exception:
logger.error(
"Error when capturing payment %d", payment.id,
exc_info=True)
continue
Transaction().commit()
def _capture_parameters(self):
idempotency_key = None
if self.stripe_idempotency_key:
idempotency_key = 'capture-%s' % self.stripe_idempotency_key
return {
'amount': self.stripe_amount,
'idempotency_key': idempotency_key,
}
@property
def stripe_payment_intent(self):
if not self.stripe_payment_intent_id:
return
try:
return stripe.PaymentIntent.retrieve(
self.stripe_payment_intent_id,
expand=['latest_charge'],
api_key=self.journal.stripe_account.secret_key,
stripe_version=STRIPE_VERSION)
except (stripe.RateLimitError,
stripe.APIConnectionError) as e:
logger.warning(str(e))
stripe_intent = stripe_payment_intent
@dualmethod
def stripe_intent_update(cls, payments=None):
pass
@property
def stripe_charge_(self):
if not self.stripe_charge_id:
return
try:
return stripe.Charge.retrieve(
self.stripe_charge_id,
api_key=self.journal.stripe_account.secret_key,
stripe_version=STRIPE_VERSION)
except (stripe.RateLimitError,
stripe.APIConnectionError) as e:
logger.warning(str(e))
@classmethod
@ModelView.button
def stripe_do_pull(cls, payments):
cls.stripe_pull(payments)
@classmethod
def stripe_pull(cls, payments):
"Update payments with Stripe charges"
for payment in payments:
charge = payment.stripe_charge_
if payment_intent := payment.stripe_payment_intent:
charge = payment_intent.latest_charge
if charge:
payment.stripe_update(charge)
def stripe_update(self, charge):
assert (
(charge.id == self.stripe_charge_id)
or (charge.payment_intent == self.stripe_payment_intent_id))
amount = charge.amount - charge.amount_refunded
if (self.state not in {'succeeded', 'failed'}
or self.stripe_amount != amount
or (not amount and self.state != 'failed')):
if self.state == 'succeeded':
self.__class__.proceed([self])
self.stripe_captured = charge.captured
self.stripe_amount = amount
self.save()
if self.amount:
if charge.status == 'succeeded' and charge.captured:
self.__class__.succeed([self])
else:
self.__class__.fail([self])
class Refund(Workflow, ModelSQL, ModelView):
__name__ = 'account.payment.stripe.refund'
payment = fields.Many2One(
'account.payment', "Payment", required=True,
domain=[
('process_method', '=', 'stripe'),
['OR',
('stripe_charge_id', '!=', None),
('stripe_payment_intent_id', '!=', None),
],
],
states={
'readonly': Eval('state') != 'draft',
})
amount = Monetary(
"Amount", currency='currency', digits='currency', required=True,
states={
'readonly': Eval('state') != 'draft',
})
stripe_amount = fields.Function(
fields.Integer("Stripe Amount"), 'get_stripe_amount')
reason = fields.Selection([
(None, ""),
('duplicate', "Duplicate"),
('fraudulent', "Fraudulent"),
('requested_by_customer', "Requested by Customer"),
], "Reason",
states={
'readonly': Eval('state') != 'draft',
})
submitted_by = employee_field(
"Submitted by",
states=['submitted', 'approved', 'processing', 'succeeded', 'failed'])
approved_by = employee_field(
"Approved by",
states=['approved', 'processing', 'succeeded', 'failed'])
state = fields.Selection([
('draft', "Draft"),
('submitted', "Submitted"),
('approved', "Approved"),
('processing', "Processing"),
('succeeded', "Succeeded"),
('failed', "Failed"),
], "State", readonly=True, sort=False)
stripe_idempotency_key = fields.Char(
"Stripe Idempotency Key", readonly=True, strip=False)
stripe_refund_id = fields.Char(
"Stripe Refund ID", readonly=True, strip=False)
stripe_error_message = fields.Char("Stripe Error Message", readonly=True,
states={
'invisible': (
~Eval('stripe_error_message')
| (Eval('state') == 'succeeded')),
})
stripe_error_code = fields.Char("Stripe Error Code", readonly=True,
states={
'invisible': (
~Eval('stripe_error_code')
| (Eval('state') == 'succeeded')),
})
stripe_error_param = fields.Char("Stripe Error Param", readonly=True,
states={
'invisible': (
~Eval('stripe_error_param')
| (Eval('state') == 'succeeded')),
})
currency = fields.Function(
fields.Many2One('currency.currency', "Currency"),
'on_change_with_currency')
company = fields.Function(
fields.Many2One('company.company', "Company"),
'on_change_with_company')
@classmethod
def __setup__(cls):
super().__setup__()
t = cls.__table__()
cls._sql_indexes.add(
Index(
t,
(t.state, Index.Equality(cardinality='low')),
where=t.state.in_([
'draft', 'submitted', 'approved', 'processing'])))
cls.__access__.add('payment')
cls._transitions |= set((
('draft', 'submitted'),
('submitted', 'approved'),
('approved', 'processing'),
('processing', 'succeeded'),
('processing', 'failed'),
('succeeded', 'failed'),
('approved', 'draft'),
))
cls._buttons.update({
'draft': {
'invisible': ~Eval('state').in_(['approved', 'submitted']),
'icon': 'tryton-back',
'depends': ['state'],
},
'submit': {
'invisible': Eval('state') != 'draft',
'icon': 'tryton-forward',
'depends': ['state'],
},
'approve': {
'invisible': Eval('state') != 'submitted',
'icon': 'tryton-forward',
'depends': ['state'],
},
})
def get_stripe_amount(self, name):
return int(self.amount * 10 ** self.currency.digits)
@classmethod
def default_stripe_idempotency_key(cls):
return uuid.uuid4().hex
@fields.depends('payment', '_parent_payment.currency')
def on_change_with_currency(self, name=None):
return self.payment.currency if self.payment else None
@fields.depends('payment', '_parent_payment.company')
def on_change_with_company(self, name=None):
return self.payment.company if self.payment else None
@classmethod
def default_state(cls):
return 'draft'
@classmethod
def preprocess_values(cls, mode, values):
values = super().preprocess_values(mode, values)
if mode == 'create':
if 'stripe_idempotency_key' not in values:
# Ensure to get a different key for each record
# default methods are called only once
values['stripe_idempotency_key'] = (
cls.default_stripe_idempotency_key())
return values
@classmethod
def copy(cls, refunds, default=None):
if default is None:
default = {}
else:
default = default.copy()
default.setdefault('stripe_refund_id')
default.setdefault('stripe_idempotency_key')
default.setdefault('stripe_error_message')
default.setdefault('stripe_error_code')
default.setdefault('stripe_error_param')
return super().copy(refunds, default=default)
@classmethod
@ModelView.button
@Workflow.transition('draft')
@reset_employee('submitted_by', 'approved_by')
def draft(cls, refunds):
pass
@classmethod
@ModelView.button
@Workflow.transition('submitted')
@set_employee('submitted_by')
def submit(cls, refunds):
pass
@classmethod
@ModelView.button
@Workflow.transition('approved')
@set_employee('approved_by')
def approve(cls, refunds):
pass
@classmethod
@Workflow.transition('processing')
def process(cls, refunds):
pass
@classmethod
@Workflow.transition('succeeded')
def succeed(cls, refunds):
pass
@classmethod
@Workflow.transition('failed')
def fail(cls, refunds):
pass
@classmethod
def stripe_create(cls, refunds=None):
"""Create stripe refund
The transaction is committed after each refund.
"""
if not refunds:
refunds = cls.search([('state', '=', 'approved')])
for refund in refunds:
# Use clear cache after a commit
refund = cls(refund.id)
if refund.stripe_refund_id:
continue
refund.lock()
try:
rf = stripe.Refund.create(
api_key=refund.payment.journal.stripe_account.secret_key,
stripe_version=STRIPE_VERSION,
**refund._refund_parameters())
except (stripe.RateLimitError,
stripe.APIConnectionError) as e:
logger.warning(str(e))
continue
except stripe.StripeError as e:
if e.code in RETRY_CODES:
logger.warning(str(e))
continue
refund.stripe_error_message = str(e)
refund.stripe_error_code = e.code
if isinstance(e, stripe.StripeErrorWithParamCode):
refund.stripe_error_param = e.param
cls.process([refund])
cls.fail([refund])
except Exception:
logger.error(
"Error when creating refund %d", refund.id,
exc_info=True)
continue
else:
refund.stripe_refund_id = rf.id
cls.process([refund])
if rf.status == 'succeeded':
cls.succeed([refund])
elif rf.status in {'failed', 'canceled'}:
refund.stripe_error_code = rf['failure_reason']
cls.fail([refund])
refund.save()
Transaction().commit()
def _refund_parameters(self):
idempotency_key = None
if self.stripe_idempotency_key:
idempotency_key = 'refund-%s' % self.stripe_idempotency_key
params = {
'amount': self.stripe_amount,
'reason': self.reason,
'idempotency_key': idempotency_key,
}
payment = self.payment
if payment.stripe_charge_id:
params['charge'] = payment.stripe_charge_id
elif payment.stripe_payment_intent_id:
params['payment_intent'] = payment.stripe_payment_intent_id
return params
class Account(ModelSQL, ModelView):
__name__ = 'account.payment.stripe.account'
name = fields.Char("Name", required=True)
secret_key = fields.Char("Secret Key", required=True, strip=False)
publishable_key = fields.Char(
"Publishable Key", required=True, strip=False)
webhook_identifier = fields.Char("Webhook Identifier", readonly=True)
webhook_endpoint = fields.Function(
fields.Char(
"Webhook Endpoint",
help="The URL to be called by Stripe."),
'on_change_with_webhook_endpoint')
webhook_signing_secret = fields.Char(
"Webhook Signing Secret", strip=False,
states={
'invisible': ~Eval('webhook_identifier'),
},
help="The Stripe's signing secret of the webhook.")
last_event = fields.Char("Last Event", readonly=True, strip=False)
setup_intent_delay = fields.TimeDelta(
"Setup Intent Delay", required=True,
domain=[
('setup_intent_delay', '>=', TimeDelta()),
],
help="The delay before cancelling setup intent not succeeded.")
@classmethod
def __setup__(cls):
super().__setup__()
cls._buttons.update({
'new_identifier': {
'icon': 'tryton-refresh',
},
})
if Pool().test:
cls.__rpc__['webhook'] = RPC(
readonly=False, instantiate=0, check_access=False)
@fields.depends('webhook_identifier')
def on_change_with_webhook_endpoint(self, name=None):
if not self.webhook_identifier:
return ''
# TODO add basic authentication support
url_part = {
'identifier': self.webhook_identifier,
'database_name': Transaction().database.name,
}
return http_host() + (
urllib.parse.quote(
'/%(database_name)s/account_payment_stripe'
'/webhook/%(identifier)s'
% url_part))
@classmethod
def default_setup_intent_delay(cls):
return dt.timedelta(days=30)
@classmethod
def fetch_events(cls):
"""Fetch last events of each account without webhook and process them
The transaction is committed after each event.
"""
accounts = cls.search([
('webhook_identifier', '=', None),
])
for account in accounts:
while True:
events = stripe.Event.list(
api_key=account.secret_key,
stripe_version=STRIPE_VERSION,
ending_before=account.last_event,
limit=100)
if not events:
break
for event in reversed(list(events)):
account.webhook(event)
account.last_event = event.id
account.save()
Transaction().commit()
def webhook(self, payload):
"""This method handles stripe webhook callbacks
The return values are:
- None if the method could not handle payload['type']
- True if the payload has been handled
- False if the webhook should be retried by Stripe
"""
data = payload['data']
type_ = payload['type']
if type_ == 'charge.succeeded':
return self.webhook_charge_succeeded(data)
if type_ == 'charge.captured':
return self.webhook_charge_captured(data)
elif type_ == 'charge.expired':
return self.webhook_charge_expired(data)
elif type_ == 'charge.failed':
return self.webhook_charge_failed(data)
elif type_ == 'charge.pending':
return self.webhook_charge_pending(data)
elif type_ == 'charge.refunded':
return self.webhook_charge_refunded(data)
elif type_ == 'charge.refund.updated':
return self.webhook_charge_refund_updated(data)
elif type_ == 'charge.dispute.created':
return self.webhook_charge_dispute_created(data)
elif type_ == 'charge.dispute.closed':
return self.webhook_charge_dispute_closed(data)
elif type_ == 'source.chargeable':
return self.webhook_source_chargeable(data)
elif type_ == 'source.failed':
return self.webhook_source_failed(data)
elif type_ == 'source.canceled':
return self.webhook_source_canceled(data)
elif type_ == 'payment_intent.succeeded':
return self.webhook_payment_intent_succeeded(data)
elif type_ == 'payment_intent.amount_capturable_updated':
return self.webhook_payment_intent_amount_capturable_updated(data)
elif type_ == 'payment_intent.payment_failed':
return self.webhook_payment_intent_payment_failed(data)
elif type_ == 'payment_intent.canceled':
return self.webhook_payment_intent_canceled(data)
return None
def webhook_charge_succeeded(self, payload, _event='charge.succeeded'):
pool = Pool()
Payment = pool.get('account.payment')
charge = payload['object']
payments = Payment.search([
('stripe_charge_id', '=', charge['id']),
])
if not payments:
payment_intent_id = charge.get('payment_intent')
if payment_intent_id:
found = Payment.search([
('stripe_payment_intent_id', '=', payment_intent_id),
])
# Once payment intent has succeeded or failed,
# only charge events are sent.
payments = [p for p in found
if p.state in {'succeeded', 'failed'}]
if found and not payments:
return True
if not payments:
logger.error("%s: No payment '%s'", _event, charge['id'])
for payment in payments:
if payment.state == 'succeeded':
Payment.proceed([payment])
payment.stripe_captured = charge['captured']
payment.stripe_amount = (
charge['amount'] - charge['amount_refunded'])
payment.stripe_error_code = charge['failure_code']
payment.stripe_error_message = charge['failure_message']
payment.stripe_error_param = None
payment.save()
if payment.amount:
if charge['status'] == 'succeeded' and charge['captured']:
Payment.succeed([payment])
else:
Payment.fail([payment])
return bool(payments)
def webhook_charge_captured(self, payload):
return self.webhook_charge_succeeded(payload, _event='charge.captured')
def webhook_charge_expired(self, payload):
return self.webhook_source_failed(payload)
def webhook_charge_pending(self, payload):
return self.webhook_charge_succeeded(payload, _event='charge.pending')
def webhook_charge_refunded(self, payload):
return self.webhook_charge_succeeded(payload, _event='charge.refunded')
def webhook_charge_refund_updated(self, payload):
pool = Pool()
Payment = pool.get('account.payment')
Refund = pool.get('account.payment.stripe.refund')
payments = []
rf = payload['object']
refunds = Refund.search([
('stripe_refund_id', '=', rf['id']),
])
if not refunds:
logger.error("charge.refund.updated: No refund '%s'", rf['id'])
for refund in refunds:
refund.stripe_error_code = rf.get('failure_reason')
if rf['status'] == 'pending':
Refund.process([refund])
elif rf['status'] == 'succeeded':
Refund.succeed([refund])
elif rf['status'] in {'failed', 'canceled'}:
Refund.fail([refund])
payments.append(refund.payment)
refund.save()
Payment.stripe_pull(payments)
return bool(refunds)
def webhook_charge_failed(self, payload, _event='charge.failed'):
pool = Pool()
Payment = pool.get('account.payment')
charge = payload['object']
payments = Payment.search([
('stripe_charge_id', '=', charge['id']),
])
if not payments:
payment_intent_id = charge.get('payment_intent')
if payment_intent_id:
found = Payment.search([
('stripe_payment_intent_id', '=', payment_intent_id),
])
# Once payment intent has succeeded or failed,
# only charge events are sent.
payments = [p for p in found
if p.state in {'succeeded', 'failed'}]
if found and not payments:
return True
if not payments:
logger.error("%s: No payment '%s'", _event, charge['id'])
for payment in payments:
payment.stripe_error_message = charge['failure_message']
payment.stripe_error_code = charge['failure_code']
payment.stripe_error_param = None
payment.save()
if charge['status'] == 'failed':
Payment.fail([payment])
return bool(payments)
def webhook_charge_dispute_created(self, payload):
pool = Pool()
Payment = pool.get('account.payment')
source = payload['object']
payments = Payment.search([
('stripe_charge_id', '=', source['charge']),
])
if not payments:
charge = stripe.Charge.retrieve(
source['charge'],
api_key=self.secret_key,
stripe_version=STRIPE_VERSION)
if charge.payment_intent:
payments = Payment.search([
('stripe_payment_intent_id', '=',
charge.payment_intent),
])
if not payments:
logger.error(
"charge.dispute.created: No payment '%s'", source['charge'])
for payment in payments:
payment.stripe_dispute_reason = source['reason']
payment.stripe_dispute_status = source['status']
payment.save()
return bool(payments)
def webhook_charge_dispute_closed(self, payload):
pool = Pool()
Payment = pool.get('account.payment')
source = payload['object']
payments = Payment.search([
('stripe_charge_id', '=', source['charge']),
])
if not payments:
charge = stripe.Charge.retrieve(
source['charge'],
api_key=self.secret_key,
stripe_version=STRIPE_VERSION)
if charge.payment_intent:
payments = Payment.search([
('stripe_payment_intent_id', '=',
charge.payment_intent),
])
if not payments:
logger.error(
"charge.dispute.closed: No payment '%s'", source['charge'])
for payment in payments:
payment.stripe_dispute_reason = source['reason']
payment.stripe_dispute_status = source['status']
payment.save()
if source['status'] == 'lost':
Payment.fail([payment])
if payment.stripe_amount > source['amount']:
payment.stripe_amount -= source['amount']
payment.save()
Payment.succeed([payment])
return bool(payments)
def webhook_source_chargeable(self, payload):
pool = Pool()
Payment = pool.get('account.payment')
source = payload['object']
payments = Payment.search([
('stripe_token', '=', source['id']),
])
if payments:
Payment.write(payments, {'stripe_chargeable': True})
return True
def webhook_source_failed(self, payload):
pool = Pool()
Payment = pool.get('account.payment')
source = payload['object']
payments = Payment.search([
('stripe_token', '=', source['id']),
])
for payment in payments:
Payment.fail([payment])
return True
def webhook_source_canceled(self, payload):
pool = Pool()
Payment = pool.get('account.payment')
source = payload['object']
payments = Payment.search([
('stripe_token', '=', source['id']),
])
for payment in payments:
Payment.fail([payment])
return True
def webhook_payment_intent_succeeded(self, payload):
pool = Pool()
Payment = pool.get('account.payment')
payment_intent = payload['object']
payments = Payment.search([
('stripe_payment_intent_id', '=', payment_intent['id']),
])
if not payments:
logger.error(
"payment_intent.succeeded: No payment '%s'",
payment_intent['id'])
for payment in payments:
if payment.state == 'succeeded':
Payment.proceed([payment])
payment.stripe_captured = bool(
payment_intent['amount_received'])
if payment.stripe_amount > payment_intent['amount_received']:
payment.stripe_amount = payment_intent['amount_received']
payment.save()
if payment.amount:
Payment.succeed([payment])
else:
Payment.fail([payment])
return bool(payments)
def webhook_payment_intent_amount_capturable_updated(self, payload):
pool = Pool()
Payment = pool.get('account.payment')
payment_intent = payload['object']
payments = Payment.search([
('stripe_payment_intent_id', '=', payment_intent['id']),
])
if not payments:
logger.error(
"payment_intent.amount_capturable_updated: No payment '%s'",
payment_intent['id'])
for payment in payments:
payment = Payment(payment.id)
if payment.state == 'succeeded':
Payment.proceed([payment])
payment.stripe_capturable = bool(
payment_intent['amount_capturable'])
if payment.stripe_amount > payment_intent['amount_capturable']:
payment.stripe_amount = payment_intent['amount_capturable']
payment.save()
return bool(payments)
def webhook_payment_intent_payment_failed(self, payload):
pool = Pool()
Payment = pool.get('account.payment')
payment_intent = payload['object']
payments = Payment.search([
('stripe_payment_intent_id', '=', payment_intent['id']),
])
if not payments:
logger.error(
"payment_intent.payment_failed: No payment '%s'",
payment_intent['id'])
for payment in payments:
error = payment_intent['last_payment_error']
if error:
payment.stripe_error_message = error['message']
payment.stripe_error_code = error['code']
payment.stripe_error_param = None
payment.save()
if payment_intent['status'] in [
'requires_payment_method', 'requires_source']:
payment._send_email_checkout()
else:
Payment.fail([payment])
return bool(payments)
def webhook_payment_intent_canceled(self, payload):
pool = Pool()
Payment = pool.get('account.payment')
payment_intent = payload['object']
payments = Payment.search([
('stripe_payment_intent_id', '=', payment_intent['id']),
])
if not payments:
logger.error(
"payment_intent.canceled: No payment '%s'",
payment_intent['id'])
for payment in payments:
payment = Payment(payment.id)
Payment.fail([payment])
return bool(payments)
@classmethod
@ModelView.button
def new_identifier(cls, accounts):
for account in accounts:
if account.webhook_identifier:
account.webhook_identifier = None
else:
account.webhook_identifier = uuid.uuid4().hex
cls.save(accounts)
@classmethod
def check_modification(cls, mode, accounts, values=None, external=False):
pool = Pool()
Warning = pool.get('res.user.warning')
super().check_modification(
mode, accounts, values=values, external=external)
if (mode == 'write'
and external
and values.keys() & {'secret_key', 'publishable_key'}):
warning_name = Warning.format('stripe_key', accounts)
if Warning.check(warning_name):
raise StripeAccountWarning(
warning_name,
gettext('account_payment_stripe'
'.msg_stripe_key_modified'))
class Customer(CheckoutMixin, DeactivableMixin, ModelSQL, ModelView):
__name__ = 'account.payment.stripe.customer'
_history = True
_rec_name = 'stripe_customer_id'
party = fields.Many2One('party.party', "Party", required=True,
states={
'readonly': Eval('stripe_customer_id') | Eval('stripe_token'),
})
stripe_account = fields.Many2One(
'account.payment.stripe.account', "Account", required=True,
states={
'readonly': Eval('stripe_customer_id') | Eval('stripe_token'),
})
stripe_checkout_needed = fields.Function(
fields.Boolean("Stripe Checkout Needed"), 'get_stripe_checkout_needed')
stripe_customer_id = fields.Char(
"Stripe Customer ID", strip=False,
states={
'readonly': ((Eval('stripe_customer_id') | Eval('stripe_token'))
& (Eval('id', -1) >= 0)),
})
stripe_token = fields.Char("Stripe Token", readonly=True, strip=False)
stripe_setup_intent_id = fields.Char(
"Stripe SetupIntent ID", readonly=True, strip=False)
stripe_error_message = fields.Char("Stripe Error Message", readonly=True,
states={
'invisible': (
~Eval('stripe_error_message')
| Eval('stripe_customer_id')),
})
stripe_error_code = fields.Char("Stripe Error Code", readonly=True,
states={
'invisible': (
~Eval('stripe_error_code')
| Eval('stripe_customer_id')),
})
stripe_error_param = fields.Char("Stripe Error Param", readonly=True,
states={
'invisible': (
~Eval('stripe_error_param')
| Eval('stripe_customer_id')),
})
identical_customers = fields.Many2Many(
'account.payment.stripe.customer.identical',
'source', 'target', "Identical Customers", readonly=True,
states={
'invisible': ~Eval('identical_customers'),
})
fingerprints = fields.One2Many(
'account.payment.stripe.customer.fingerprint', 'customer',
"Fingerprints", readonly=True)
_sources_cache = Cache(
'account_payment_stripe_customer.sources',
duration=config.getint(
'account_payment_stripe', 'sources_cache', default=15 * 60),
context=False)
_payment_methods_cache = Cache(
'account_payment_stripe_customer.payment_methods',
duration=config.getint(
'account_payment_stripe', 'payment_methods', default=15 * 60),
context=False)
@classmethod
def __setup__(cls):
super().__setup__()
cls._buttons.update({
'stripe_checkout': {
'invisible': ~Eval('stripe_checkout_needed', False),
'depends': ['stripe_checkout_needed'],
},
'stripe_update': {
'invisible': ~Eval('stripe_customer_id'),
'depends': ['stripe_customer_id'],
},
'detach_source': {
'invisible': ~Eval('stripe_customer_id'),
'depends': ['stripe_customer_id'],
},
'find_identical': {
'invisible': ~Eval('stripe_customer_id'),
'depends': ['stripe_customer_id'],
},
})
def get_stripe_checkout_needed(self, name):
return (not self.stripe_customer_id
or not self.stripe_token
or not self.stripe_setup_intent_id)
def get_rec_name(self, name):
name = super().get_rec_name(name)
return self.stripe_customer_id if self.stripe_customer_id else name
@classmethod
def on_modification(cls, mode, customers, field_names=None):
super().on_modification(mode, customers, field_names=field_names)
if mode == 'write':
cls._sources_cache.clear()
cls._payment_methods_cache.clear()
@classmethod
def delete(cls, customers):
ids, on_delete = cls._before_delete(customers)
cls.write(customers, {
'active': False,
})
cls._after_delete(ids, on_delete)
@classmethod
def copy(cls, customers, default=None):
if default is None:
default = {}
else:
default = default.copy()
default.setdefault('stripe_token', None)
default.setdefault('stripe_customer_id', None)
return super().copy(customers, default=default)
@classmethod
def stripe_checkout(cls, customers):
for customer in customers:
if customer.stripe_setup_intent_id:
continue
setup_intent = stripe.SetupIntent.create(
api_key=customer.stripe_account.secret_key,
stripe_version=STRIPE_VERSION)
customer.stripe_setup_intent_id = setup_intent.id
return super().stripe_checkout(customers)
@classmethod
def stripe_create(cls, customers=None):
"""Create stripe customer with token
The transaction is committed after each customer.
"""
if not customers:
customers = cls.search([
('stripe_token', '!=', None),
['OR',
('stripe_customer_id', '=', None),
('stripe_customer_id', '=', ''),
],
])
for customer in customers:
# Use clear cache after a commit
customer = cls(customer.id)
if customer.stripe_customer_id:
continue
customer.lock()
try:
cu = stripe.Customer.create(
api_key=customer.stripe_account.secret_key,
stripe_version=STRIPE_VERSION,
source=customer.stripe_token,
**customer._customer_parameters())
except (stripe.RateLimitError,
stripe.APIConnectionError) as e:
logger.warning(str(e))
continue
except stripe.StripeError as e:
if e.code in RETRY_CODES:
logger.warning(str(e))
continue
customer.stripe_error_message = str(e)
customer.stripe_error_code = e.code
if isinstance(e, stripe.StripeErrorWithParamCode):
customer.stripe_error_param = e.param
customer.stripe_token = None
except Exception:
logger.error(
"Error when creating customer %d", customer.id,
exc_info=True)
continue
else:
customer.stripe_customer_id = cu.id
# TODO add card
customer.save()
Transaction().commit()
cls.__queue__.find_identical(customers)
def _customer_parameters(self):
locales = [pl.lang.code for pl in self.party.langs if pl.lang]
return {
'email': self.party.email,
'name': self.party.name,
'phone': self.party.phone,
'preferred_locales': locales,
}
@classmethod
@ModelView.button
def stripe_update(cls, customers):
for customer in customers:
if not customer.stripe_customer_id:
continue
try:
stripe.Customer.modify(
customer.stripe_customer_id,
api_key=customer.stripe_account.secret_key,
stripe_version=STRIPE_VERSION,
**customer._customer_parameters()
)
except (stripe.RateLimitError,
stripe.APIConnectionError) as e:
logger.warning(str(e))
raise
@classmethod
def stripe_delete(cls, customers=None):
"""Delete customer
The transaction is committed after each customer.
"""
if not customers:
customers = cls.search([
('active', '=', False),
('stripe_customer_id', '!=', None),
])
for customer in customers:
# Use clear cache after a commit
customer = cls(customer.id)
assert not customer.active
customer.lock()
try:
cu = stripe.Customer.retrieve(
api_key=customer.stripe_account.secret_key,
stripe_version=STRIPE_VERSION,
id=customer.stripe_customer_id)
cu.delete()
except (stripe.RateLimitError,
stripe.APIConnectionError) as e:
logger.warning(str(e))
continue
except Exception:
logger.error(
"Error when deleting customer %d", customer.id,
exc_info=True)
continue
customer.stripe_token = None
customer.stripe_customer_id = None
customer.save()
Transaction().commit()
def retrieve(self, **params):
if not self.stripe_customer_id:
return
try:
return stripe.Customer.retrieve(
api_key=self.stripe_account.secret_key,
stripe_version=STRIPE_VERSION,
id=self.stripe_customer_id,
**params)
except (stripe.RateLimitError,
stripe.APIConnectionError) as e:
logger.warning(str(e))
def sources(self):
sources = self._sources_cache.get(self.id)
if sources is not None:
return sources
sources = []
customer = self.retrieve(expand=['sources'])
if customer:
for source in customer.sources:
name = source.id
if source.object == 'card':
name = self._source_name(source)
elif source.object == 'source':
if source.usage != 'reusable':
continue
name = self._source_name(source)
else:
continue
sources.append((source.id, name))
self._sources_cache.set(self.id, sources)
return sources
@classmethod
def _source_name(cls, source):
def card_name(card):
name = card.brand
if card.last4 or card.dynamic_last4:
name += ' ****' + (card.last4 or card.dynamic_last4)
if card.exp_month and card.exp_year:
name += ' %s/%s' % (card.exp_month, card.exp_year)
return name
name = source.id
if source.object == 'card':
name = card_name(source)
elif source.object == 'source':
if source.type == 'card':
name = card_name(source.card)
elif source.type == 'sepa_debit':
name = '****' + source.sepa_debit.last4
return name
@classmethod
@ModelView.button_action(
'account_payment_stripe.wizard_customer_source_detach')
def detach_source(cls, customers):
pass
def delete_source(self, source):
try:
if source in dict(self.payment_methods()):
stripe.PaymentMethod.detach(
source,
api_key=self.stripe_account.secret_key,
stripe_version=STRIPE_VERSION)
else:
stripe.Customer.delete_source(
self.stripe_customer_id,
source,
api_key=self.stripe_account.secret_key,
stripe_version=STRIPE_VERSION)
except (stripe.RateLimitError,
stripe.APIConnectionError) as e:
logger.warning(str(e))
raise
self._sources_cache.clear()
self._payment_methods_cache.clear()
def payment_methods(self):
methods = self._payment_methods_cache.get(self.id)
if methods is not None:
return methods
methods = []
if self.stripe_customer_id:
try:
payment_methods = stripe.PaymentMethod.list(
api_key=self.stripe_account.secret_key,
stripe_version=STRIPE_VERSION,
customer=self.stripe_customer_id)
except (stripe.RateLimitError,
stripe.APIConnectionError) as e:
logger.warning(str(e))
return []
for payment_method in payment_methods:
name = self._payment_method_name(payment_method)
methods.append((payment_method.id, name))
self._payment_methods_cache.set(self.id, methods)
return methods
@classmethod
def _payment_method_name(cls, payment_method):
name = payment_method.id
if payment_method.type == 'card':
card = payment_method.card
name = card.brand
if card.last4:
name += ' ****' + card.last4
if card.exp_month and card.exp_year:
name += ' %s/%s' % (card.exp_month, card.exp_year)
elif payment_method.type == 'sepa_debit':
name = '****' + payment_method.sepa_debit.last4
return name
def get_session(self):
if self.stripe_customer_id:
return stripe.CustomerSession.create(
api_key=self.stripe_account.secret_key,
customer=self.stripe_customer_id,
components={
'payment_element': {
'enabled': True,
'features': self._session_payment_element_features,
},
})
@property
def _session_payment_element_features(self):
return {
'payment_method_redisplay': 'enabled',
'payment_method_save': 'enabled',
'payment_method_save_usage': 'on_session',
'payment_method_remove': 'enabled',
}
@property
def stripe_setup_intent(self):
if not self.stripe_setup_intent_id:
return
try:
return stripe.SetupIntent.retrieve(
self.stripe_setup_intent_id,
api_key=self.stripe_account.secret_key,
stripe_version=STRIPE_VERSION)
except (stripe.RateLimitError,
stripe.APIConnectionError) as e:
logger.warning(str(e))
stripe_intent = stripe_setup_intent
@dualmethod
def stripe_intent_update(cls, customers=None):
"""Update stripe customers with intent
The transaction is committed after each customer."""
if customers is None:
customers = cls.search([
('stripe_setup_intent_id', '!=', None),
])
for customer in customers:
# Use clear cache after commit
customer = cls(customer.id)
setup_intent = customer.stripe_setup_intent
if not setup_intent:
continue
if setup_intent.status not in {'succeeded', 'canceled'}:
delay = customer.stripe_account.setup_intent_delay
expiration = dt.datetime.now() - delay
created = dt.datetime.fromtimestamp(setup_intent.created)
if created < expiration:
setup_intent.cancel()
continue
customer.lock()
try:
if setup_intent.status == 'succeeded':
if customer.stripe_customer_id:
stripe.PaymentMethod.attach(
setup_intent.payment_method,
customer=customer.stripe_customer_id,
api_key=customer.stripe_account.secret_key,
stripe_version=STRIPE_VERSION)
else:
cu = stripe.Customer.create(
api_key=customer.stripe_account.secret_key,
stripe_version=STRIPE_VERSION,
payment_method=setup_intent.payment_method,
**customer._customer_parameters())
customer.stripe_customer_id = cu.id
except (stripe.RateLimitError,
stripe.APIConnectionError) as e:
logger.warning(str(e))
continue
except stripe.StripeError as e:
if e.code in RETRY_CODES:
logger.warning(str(e))
continue
customer.stripe_error_message = str(e)
except Exception:
logger.error(
"Error when updating customer %d", customer.id,
exc_info=True)
continue
else:
customer.stripe_error_message = None
customer.stripe_error_code = None
customer.stripe_error_param = None
customer.stripe_setup_intent_id = None
customer.save()
cls._payment_methods_cache.clear()
Transaction().commit()
def fetch_fingeprints(self):
customer = self.retrieve(expand=['sources'])
if customer:
for source in customer.sources:
if hasattr(source, 'fingerprint'):
yield source.fingerprint
try:
payment_methods = stripe.PaymentMethod.list(
api_key=self.stripe_account.secret_key,
stripe_version=STRIPE_VERSION,
customer=customer.id,
type='card')
except (stripe.RateLimitError,
stripe.APIConnectionError) as e:
logger.warning(str(e))
payment_methods = []
for payment_method in payment_methods:
yield payment_method.card.fingerprint
@classmethod
@ModelView.button
def find_identical(cls, customers):
pool = Pool()
Fingerprint = pool.get('account.payment.stripe.customer.fingerprint')
new = []
for customer in customers:
fingerprints = set(customer.fetch_fingeprints())
fingerprints -= {f.fingerprint for f in customer.fingerprints}
for fingerprint in fingerprints:
new.append(Fingerprint(
customer=customer,
fingerprint=fingerprint))
Fingerprint.save(new)
class CustomerFingerprint(ModelSQL):
__name__ = 'account.payment.stripe.customer.fingerprint'
customer = fields.Many2One(
'account.payment.stripe.customer', "Customer",
required=True, ondelete='CASCADE')
fingerprint = fields.Char("Fingerprint", required=True)
@classmethod
def __setup__(cls):
super().__setup__()
t = cls.__table__()
cls._sql_constraints += [
('customer_fingerprint_unique',
Unique(t, t.customer, t.fingerprint),
'account_payment_stripe.msg_customer_fingerprint_unique'),
]
class CustomerIdentical(ModelSQL):
__name__ = 'account.payment.stripe.customer.identical'
source = fields.Many2One('account.payment.stripe.customer', "Source")
target = fields.Many2One('account.payment.stripe.customer', "Target")
@classmethod
def table_query(cls):
pool = Pool()
Fingerprint = pool.get('account.payment.stripe.customer.fingerprint')
source = Fingerprint.__table__()
target = Fingerprint.__table__()
return (
source
.join(target, condition=source.fingerprint == target.fingerprint)
.select(
sql_pairing(source.id, target.id).as_('id'),
source.customer.as_('source'),
target.customer.as_('target'),
where=source.customer != target.customer))
class CheckoutPage(Report):
__name__ = 'account.payment.stripe.checkout'
class CustomerSourceDetach(Wizard):
__name__ = 'account.payment.stripe.customer.source.detach'
start_state = 'ask'
ask = StateView(
'account.payment.stripe.customer.source.detach.ask',
'account_payment_stripe.customer_source_detach_ask_view_form', [
Button("Cancel", 'end', 'tryton-cancel'),
Button("Detach", 'detach', 'tryton-ok', default=True),
])
detach = StateTransition()
def default_ask(self, fields):
default = {}
if 'customer' in fields:
default['customer'] = self.record.id
return default
def transition_detach(self):
self.record.delete_source(self.ask.source)
return 'end'
class CustomerSourceDetachAsk(ModelView):
__name__ = 'account.payment.stripe.customer.source.detach.ask'
customer = fields.Many2One(
'account.payment.stripe.customer', "Customer", readonly=True)
source = fields.Selection('get_sources', "Source", required=True)
@fields.depends('customer')
def get_sources(self):
sources = [('', '')]
if self.customer:
sources.extend(
dict(set(self.customer.sources())
| set(self.customer.payment_methods())).items())
return sources