You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
1386 lines
61 KiB
1386 lines
61 KiB
#!/usr/bin/env python3
|
|
"""
|
|
Stripe Payment Processor Class
|
|
A clean, reusable class for processing single Stripe payments with comprehensive error handling and fee calculations.
|
|
"""
|
|
|
|
import stripe
|
|
from stripe import StripeError
|
|
import os
|
|
import logging
|
|
import time
|
|
import json
|
|
from datetime import datetime
|
|
from decimal import Decimal
|
|
from typing import Dict, Any, Optional
|
|
|
|
|
|
class StripePaymentProcessor:
|
|
"""
|
|
A clean, focused class for processing individual Stripe payments.
|
|
Returns comprehensive JSON results for each payment attempt.
|
|
"""
|
|
|
|
def __init__(self, api_key: Optional[str] = None, enable_logging: bool = False,
|
|
log_level: int = logging.INFO):
|
|
"""
|
|
Initialize the Stripe payment processor.
|
|
|
|
Args:
|
|
api_key (str, optional): Stripe API key. If None, will use STRIPE_SECRET_KEY env var
|
|
enable_logging (bool): Whether to enable console logging. Defaults to False
|
|
log_level (int): Logging level if logging is enabled
|
|
"""
|
|
# Set up Stripe API key
|
|
#print(f"processor api_key: {api_key}")
|
|
if api_key:
|
|
stripe.api_key = api_key
|
|
else:
|
|
stripe.api_key = os.getenv('STRIPE_SECRET_KEY')
|
|
#print(f"processor api_key: {stripe.api_key}")
|
|
if not stripe.api_key:
|
|
raise ValueError("Stripe API key is required. Provide via api_key parameter or STRIPE_SECRET_KEY environment variable.")
|
|
|
|
# Validate API key format
|
|
if not (stripe.api_key.startswith('sk_test_') or stripe.api_key.startswith('rk_live_')):
|
|
raise ValueError("Invalid Stripe API key format. Key should start with 'sk_test_' or 'rk_live_'")
|
|
|
|
self.is_test_mode = stripe.api_key.startswith('sk_test_')
|
|
|
|
# Set up optional logging
|
|
self.enable_logging = enable_logging
|
|
if enable_logging:
|
|
logging.basicConfig(level=log_level, format='%(asctime)s - %(levelname)s - %(message)s')
|
|
self.logger = logging.getLogger(__name__)
|
|
self.logger.info(f"StripePaymentProcessor initialized ({'TEST' if self.is_test_mode else 'LIVE'} mode)")
|
|
else:
|
|
self.logger = None
|
|
|
|
def _log(self, level: str, message: str):
|
|
"""Internal logging method"""
|
|
if self.logger:
|
|
getattr(self.logger, level.lower())(message)
|
|
|
|
def extract_actual_fees(self, balance_transaction: Any) -> Dict[str, Any]:
|
|
"""
|
|
Extract actual fee details from Stripe's balance transaction.
|
|
This provides real fee data instead of estimates.
|
|
|
|
Args:
|
|
balance_transaction: Stripe balance transaction object
|
|
|
|
Returns:
|
|
dict: Actual fee details from Stripe
|
|
"""
|
|
if not balance_transaction or not hasattr(balance_transaction, 'fee_details'):
|
|
return {'note': 'No fee details available'}
|
|
|
|
fee_breakdown = []
|
|
total_fee = getattr(balance_transaction, 'fee', 0) / 100 # Convert from cents
|
|
|
|
for fee_detail in balance_transaction.fee_details:
|
|
fee_breakdown.append({
|
|
'type': getattr(fee_detail, 'type', 'unknown'),
|
|
'description': getattr(fee_detail, 'description', 'Unknown fee'),
|
|
'amount': getattr(fee_detail, 'amount', 0) / 100, # Convert from cents
|
|
'currency': getattr(fee_detail, 'currency', 'unknown')
|
|
})
|
|
|
|
return {
|
|
'source': 'stripe_actual',
|
|
'total_fee': total_fee,
|
|
'net_amount': getattr(balance_transaction, 'net', 0) / 100, # Convert from cents
|
|
'fee_breakdown': fee_breakdown,
|
|
'available_on': getattr(balance_transaction, 'available_on', None),
|
|
'note': 'Actual fees from Stripe balance transaction'
|
|
}
|
|
|
|
def calculate_stripe_fees(self, amount: float, payment_method_type: str,
|
|
payment_method_details: Optional[Any] = None,
|
|
transaction_successful: bool = True) -> Dict[str, Any]:
|
|
"""
|
|
DEPRECATED: Calculate estimated Stripe fees based on payment method type.
|
|
Use extract_actual_fees() with balance transaction for real fee data.
|
|
|
|
This method provides fee estimates and is kept for fallback scenarios
|
|
where actual fee data is not available from Stripe.
|
|
|
|
Args:
|
|
amount (float): Transaction amount in dollars
|
|
payment_method_type (str): Type of payment method ('card', 'au_becs_debit', etc.)
|
|
payment_method_details: Stripe PaymentMethod object with additional details
|
|
transaction_successful (bool): Whether transaction succeeded (affects BECS caps)
|
|
|
|
Returns:
|
|
dict: Estimated fee calculation information
|
|
"""
|
|
fee_info = {
|
|
'payment_method_type': payment_method_type,
|
|
'percentage_fee': 0.0,
|
|
'fixed_fee': 0.0,
|
|
'total_fee': 0.0,
|
|
'fee_description': 'Unknown payment method',
|
|
'capped': False,
|
|
'cap_amount': None,
|
|
'international': False
|
|
}
|
|
|
|
if payment_method_type == 'card':
|
|
# Default to domestic card rates
|
|
fee_info['percentage_fee'] = 1.7
|
|
fee_info['fixed_fee'] = 0.30
|
|
fee_info['fee_description'] = 'Domestic credit/debit card'
|
|
|
|
# Check if it's an international card
|
|
if payment_method_details and hasattr(payment_method_details, 'card') and payment_method_details.card:
|
|
card_country = payment_method_details.card.country
|
|
self._log('info', f"Card country detected: {card_country}")
|
|
fee_info['card_brand'] = payment_method_details.get('card').get('brand')
|
|
fee_info['card_display_brand'] = payment_method_details.get('card').get('display_brand')
|
|
|
|
if card_country and card_country != 'AU':
|
|
fee_info['percentage_fee'] = 3.5
|
|
fee_info['fixed_fee'] = 0.30
|
|
fee_info['fee_description'] = f'International credit/debit card ({card_country})'
|
|
fee_info['international'] = True
|
|
else:
|
|
self._log('info', f"Domestic card confirmed (country: {card_country})")
|
|
else:
|
|
# If we can't determine country, assume domestic for AU-based business
|
|
self._log('info', "Card country not available - assuming domestic")
|
|
|
|
elif payment_method_type == 'au_becs_debit':
|
|
fee_info['percentage_fee'] = 1.0
|
|
fee_info['fixed_fee'] = 0.30
|
|
fee_info['fee_description'] = 'Australia BECS Direct Debit'
|
|
|
|
# Apply BECS caps based on transaction outcome
|
|
if transaction_successful:
|
|
fee_info['cap_amount'] = 3.50
|
|
fee_info['fee_description'] += ' (capped at $3.50)'
|
|
else:
|
|
fee_info['cap_amount'] = 2.50
|
|
fee_info['fee_description'] += ' (failure/dispute - capped at $2.50)'
|
|
|
|
# Calculate total fee
|
|
percentage_amount = amount * (fee_info['percentage_fee'] / 100)
|
|
calculated_fee = percentage_amount + fee_info['fixed_fee']
|
|
|
|
# Apply cap if applicable
|
|
if fee_info['cap_amount'] and calculated_fee > fee_info['cap_amount']:
|
|
fee_info['total_fee'] = fee_info['cap_amount']
|
|
fee_info['capped'] = True
|
|
else:
|
|
fee_info['total_fee'] = round(calculated_fee, 2)
|
|
|
|
return fee_info
|
|
|
|
def process_payment(self, customer_id: str, amount: float, currency: str = 'aud',
|
|
description: Optional[str] = None, wait_for_completion: bool = True,
|
|
stripe_pm: Optional[str] = None) -> Dict[str, Any]:
|
|
"""
|
|
Process a single payment for a customer using their default payment method.
|
|
|
|
Args:
|
|
customer_id (str): Stripe customer ID
|
|
amount (float): Amount in dollars (will be converted to cents internally)
|
|
currency (str): Currency code (default: 'aud')
|
|
description (str, optional): Payment description
|
|
wait_for_completion (bool): If True, will poll for 'processing' payments to complete (default: True)
|
|
|
|
Returns:
|
|
dict: Comprehensive payment result with success status and details
|
|
"""
|
|
transaction_start = datetime.now()
|
|
|
|
# Base response structure
|
|
response = {
|
|
'success': False,
|
|
'timestamp': transaction_start.isoformat(),
|
|
'customer_id': customer_id,
|
|
'amount': amount,
|
|
'currency': currency.lower(),
|
|
'description': description,
|
|
'processing_time_seconds': 0.0,
|
|
'test_mode': self.is_test_mode
|
|
}
|
|
|
|
try:
|
|
# Validate inputs
|
|
if not customer_id or not isinstance(customer_id, str):
|
|
response['error'] = 'Invalid customer_id provided'
|
|
response['error_type'] = 'validation_error'
|
|
return response
|
|
|
|
if amount <= 0:
|
|
response['error'] = 'Amount must be greater than 0'
|
|
response['error_type'] = 'validation_error'
|
|
return response
|
|
|
|
# Convert dollars to cents
|
|
amount_cents = int(Decimal(str(amount)) * 100)
|
|
|
|
self._log('info', f"Processing payment: {customer_id}, ${amount} {currency.upper()}")
|
|
|
|
# Retrieve customer
|
|
customer = stripe.Customer.retrieve(customer_id)
|
|
print(f"customer: {json.dumps(customer,indent=2)}")
|
|
|
|
if not customer:
|
|
response['error'] = f'Customer {customer_id} not found'
|
|
response['error_type'] = 'customer_not_found'
|
|
return response
|
|
|
|
# Add customer details to response
|
|
response.update({
|
|
'customer_email': customer.email,
|
|
'customer_name': customer.description or customer.name
|
|
})
|
|
|
|
if stripe_pm:
|
|
default_payment_method = stripe_pm
|
|
else:
|
|
# Get default payment method
|
|
default_payment_method = customer.invoice_settings.default_payment_method
|
|
|
|
if not default_payment_method:
|
|
response['error'] = 'Customer has no default payment method set'
|
|
response['error_type'] = 'no_payment_method'
|
|
return response
|
|
|
|
# Retrieve payment method details
|
|
payment_method = stripe.PaymentMethod.retrieve(default_payment_method)
|
|
payment_method_type = payment_method.type
|
|
|
|
print(f"payment_method: {json.dumps(payment_method,indent=2)}")
|
|
|
|
|
|
response.update({
|
|
'payment_method_id': default_payment_method,
|
|
'payment_method_type': payment_method_type
|
|
})
|
|
|
|
# Calculate estimated fees before payment
|
|
estimated_fee_details = self.calculate_stripe_fees(
|
|
amount,
|
|
payment_method_type,
|
|
payment_method,
|
|
transaction_successful=True # Will be updated if payment fails
|
|
)
|
|
estimated_fee_details['source'] = 'estimated'
|
|
estimated_fee_details['note'] = 'Pre-payment estimate'
|
|
|
|
response['estimated_fee_details'] = estimated_fee_details
|
|
|
|
self._log('info', f"Payment method: {payment_method_type} - {estimated_fee_details['fee_description']}")
|
|
|
|
# Prepare Payment Intent parameters
|
|
payment_intent_params = {
|
|
'amount': amount_cents,
|
|
'currency': currency,
|
|
'customer': customer_id,
|
|
'payment_method': default_payment_method,
|
|
'description': description or f"Payment for {customer.description or customer.email}",
|
|
'confirm': True,
|
|
'return_url': 'https://your-website.com/payment-success',
|
|
'off_session': True
|
|
}
|
|
|
|
# Add mandate data for BECS Direct Debit
|
|
if payment_method_type == 'au_becs_debit':
|
|
payment_intent_params['mandate_data'] = {
|
|
'customer_acceptance': {
|
|
'type': 'offline'
|
|
}
|
|
}
|
|
self._log('info', "Added BECS mandate data for offline acceptance")
|
|
|
|
# Create and confirm Payment Intent
|
|
payment_intent = stripe.PaymentIntent.create(**payment_intent_params)
|
|
|
|
# Add payment intent details
|
|
response.update({
|
|
'payment_intent_id': payment_intent.id,
|
|
'status': payment_intent.status
|
|
})
|
|
|
|
if payment_intent.status == 'succeeded':
|
|
response['success'] = True
|
|
self._log('info', f"✅ Payment successful: {payment_intent.id}")
|
|
|
|
# Get actual fee details for successful payments
|
|
try:
|
|
# Re-retrieve with expanded balance transaction to get actual fees
|
|
time.sleep(3)
|
|
payment_intent_expanded = stripe.PaymentIntent.retrieve(
|
|
payment_intent.id,
|
|
expand=['latest_charge.balance_transaction']
|
|
)
|
|
|
|
if (hasattr(payment_intent_expanded, 'latest_charge') and
|
|
payment_intent_expanded.latest_charge and
|
|
hasattr(payment_intent_expanded.latest_charge, 'balance_transaction') and
|
|
payment_intent_expanded.latest_charge.balance_transaction):
|
|
|
|
balance_transaction = payment_intent_expanded.latest_charge.balance_transaction
|
|
actual_fees = self.extract_actual_fees(balance_transaction)
|
|
response['fee_details'] = actual_fees
|
|
self._log('info', f"Retrieved actual fees: ${actual_fees['total_fee']:.2f}")
|
|
else:
|
|
# Keep estimated fees if balance transaction not available
|
|
response['fee_details'] = estimated_fee_details
|
|
response['fee_details']['note'] = 'Balance transaction not yet available, showing estimate'
|
|
# Record this payment for later fee update
|
|
response['needs_fee_update'] = [customer_id, payment_intent.id]
|
|
self._log('info', f"Balance transaction not available, using estimates - marked for later update")
|
|
|
|
except Exception as e:
|
|
# If we can't get actual fees, keep the estimates and mark for later
|
|
response['fee_details'] = estimated_fee_details
|
|
response['fee_details']['note'] = f'Could not retrieve actual fees: {str(e)}'
|
|
response['needs_fee_update'] = [customer_id, payment_intent.id]
|
|
self._log('warning', f"Failed to get actual fees: {str(e)} - marked for later update")
|
|
elif payment_intent.status == 'processing' and wait_for_completion:
|
|
# Payment is processing - wait for completion
|
|
self._log('info', f"💭 Payment is processing, waiting for completion...")
|
|
|
|
# Use the polling method to wait for completion
|
|
polling_result = self.wait_for_payment_completion(payment_intent.id, customer_id=customer_id, max_wait_seconds=30)
|
|
|
|
if polling_result['success']:
|
|
# Update our response with the final polling result
|
|
response.update(polling_result)
|
|
# The polling result already has all the details we need
|
|
|
|
if polling_result['status'] == 'succeeded':
|
|
response['success'] = True
|
|
self._log('info', f"✅ Payment completed successfully after polling")
|
|
else:
|
|
response['success'] = False
|
|
response['error'] = f'Payment completed with status: {polling_result["status"]}'
|
|
response['error_type'] = 'payment_not_succeeded'
|
|
else:
|
|
# Polling failed - update with polling error info
|
|
response.update(polling_result)
|
|
response['success'] = False
|
|
if 'error' not in response:
|
|
response['error'] = 'Payment polling failed'
|
|
response['error_type'] = 'polling_failed'
|
|
else:
|
|
# For failed payments or processing without polling
|
|
if payment_method_type == 'au_becs_debit':
|
|
# Recalculate BECS fees with failure cap for failed payments
|
|
failed_fee_details = self.calculate_stripe_fees(
|
|
amount,
|
|
payment_method_type,
|
|
payment_method,
|
|
transaction_successful=False
|
|
)
|
|
failed_fee_details['source'] = 'estimated'
|
|
failed_fee_details['note'] = 'Estimated fees for failed BECS payment'
|
|
response['fee_details'] = failed_fee_details
|
|
else:
|
|
# Use estimated fees for other payment types
|
|
response['fee_details'] = estimated_fee_details
|
|
|
|
if payment_intent.status == 'processing':
|
|
response['error'] = f'Payment is processing (polling disabled). Check status later.'
|
|
response['error_type'] = 'payment_processing'
|
|
response['next_action'] = f'Use check_payment_intent("{payment_intent.id}") or wait_for_payment_completion("{payment_intent.id}") to check status'
|
|
else:
|
|
response['error'] = f'Payment not completed. Status: {payment_intent.status}'
|
|
response['error_type'] = 'payment_incomplete'
|
|
|
|
self._log('warning', f"⚠️ Payment incomplete: {payment_intent.id} - {payment_intent.status}")
|
|
|
|
# Calculate processing time
|
|
processing_time = (datetime.now() - transaction_start).total_seconds()
|
|
response['processing_time_seconds'] = round(processing_time, 2)
|
|
response['pi_status'] = payment_intent.status
|
|
return response
|
|
|
|
except stripe.CardError as e:
|
|
# Card-specific error (declined, etc.)
|
|
processing_time = (datetime.now() - transaction_start).total_seconds()
|
|
#print(f"stripe.CardError: {str(e)}\n{e.user_message}\n{e.request_id}\n{e.code}")
|
|
#print(json.dumps(e, indent=2))
|
|
response.update({
|
|
'error': f'Card declined: {e.user_message}',
|
|
'error_type': 'card_declined',
|
|
'decline_code': e.code,
|
|
'processing_time_seconds': round(processing_time, 2)
|
|
})
|
|
self._log('error', f"❌ Card declined for {customer_id}: {e.user_message}")
|
|
return response
|
|
|
|
except stripe.InvalidRequestError as e:
|
|
# Invalid parameters
|
|
processing_time = (datetime.now() - transaction_start).total_seconds()
|
|
response.update({
|
|
'error': f'Invalid request: {str(e)}',
|
|
'error_type': 'invalid_request',
|
|
'processing_time_seconds': round(processing_time, 2)
|
|
})
|
|
self._log('error', f"❌ Invalid request for {customer_id}: {str(e)}")
|
|
return response
|
|
|
|
except stripe.AuthenticationError as e:
|
|
# Authentication with Stripe failed
|
|
processing_time = (datetime.now() - transaction_start).total_seconds()
|
|
response.update({
|
|
'error': f'Authentication failed: {str(e)}',
|
|
'error_type': 'authentication_error',
|
|
'processing_time_seconds': round(processing_time, 2)
|
|
})
|
|
self._log('error', f"❌ Authentication failed: {str(e)}")
|
|
return response
|
|
|
|
except stripe.APIConnectionError as e:
|
|
# Network communication with Stripe failed
|
|
processing_time = (datetime.now() - transaction_start).total_seconds()
|
|
response.update({
|
|
'error': f'Network error: {str(e)}',
|
|
'error_type': 'network_error',
|
|
'processing_time_seconds': round(processing_time, 2)
|
|
})
|
|
self._log('error', f"❌ Network error: {str(e)}")
|
|
return response
|
|
|
|
except stripe.StripeError as e:
|
|
# Other Stripe-specific errors
|
|
processing_time = (datetime.now() - transaction_start).total_seconds()
|
|
response.update({
|
|
'error': f'Stripe error: {str(e)}',
|
|
'error_type': 'stripe_error',
|
|
'processing_time_seconds': round(processing_time, 2)
|
|
})
|
|
self._log('error', f"❌ Stripe error: {str(e)}")
|
|
return response
|
|
|
|
except Exception as e:
|
|
# Unexpected errors
|
|
processing_time = (datetime.now() - transaction_start).total_seconds()
|
|
response.update({
|
|
'error': f'Unexpected error: {str(e)}',
|
|
'error_type': 'unexpected_error',
|
|
'processing_time_seconds': round(processing_time, 2)
|
|
})
|
|
self._log('error', f"❌ Unexpected error for {customer_id}: {str(e)}")
|
|
return response
|
|
|
|
def get_customer_info(self, customer_id: str) -> Dict[str, Any]:
|
|
"""
|
|
Retrieve customer information including payment methods.
|
|
|
|
Args:
|
|
customer_id (str): Stripe customer ID
|
|
|
|
Returns:
|
|
dict: Customer information and payment method details
|
|
"""
|
|
try:
|
|
customer = stripe.Customer.retrieve(customer_id)
|
|
|
|
customer_info = {
|
|
'success': True,
|
|
'customer_id': customer.id,
|
|
'email': customer.email,
|
|
'name': customer.description or customer.name,
|
|
'created': customer.created,
|
|
'default_payment_method': customer.invoice_settings.default_payment_method if customer.invoice_settings else None,
|
|
'payment_methods': []
|
|
}
|
|
|
|
# Get payment methods
|
|
payment_methods = stripe.PaymentMethod.list(
|
|
customer=customer_id,
|
|
limit=10
|
|
)
|
|
|
|
for pm in payment_methods.data:
|
|
pm_info = {
|
|
'id': pm.id,
|
|
'type': pm.type,
|
|
'created': pm.created
|
|
}
|
|
|
|
if pm.card:
|
|
pm_info['card'] = {
|
|
'brand': pm.card.brand,
|
|
'last4': pm.card.last4,
|
|
'country': pm.card.country,
|
|
'exp_month': pm.card.exp_month,
|
|
'exp_year': pm.card.exp_year
|
|
}
|
|
elif pm.au_becs_debit:
|
|
pm_info['au_becs_debit'] = {
|
|
'bsb_number': pm.au_becs_debit.bsb_number,
|
|
'last4': pm.au_becs_debit.last4
|
|
}
|
|
|
|
customer_info['payment_methods'].append(pm_info)
|
|
|
|
return customer_info
|
|
|
|
except stripe.StripeError as e:
|
|
return {
|
|
'success': False,
|
|
'error': f'Stripe error: {str(e)}',
|
|
'error_type': 'stripe_error'
|
|
}
|
|
except Exception as e:
|
|
return {
|
|
'success': False,
|
|
'error': f'Unexpected error: {str(e)}',
|
|
'error_type': 'unexpected_error'
|
|
}
|
|
|
|
def get_payment_methods(self, customer_id: str) -> list:
|
|
"""
|
|
Get all payment methods for a Stripe customer.
|
|
|
|
Args:
|
|
customer_id (str): Stripe customer ID
|
|
|
|
Returns:
|
|
list: List of payment methods with details
|
|
"""
|
|
#try:
|
|
self._log('info', f"Retrieving payment methods for customer: {customer_id}")
|
|
|
|
# Get payment methods for the customer
|
|
payment_methods = stripe.PaymentMethod.list(
|
|
customer=customer_id,
|
|
limit=10
|
|
)
|
|
#print(json.dumps(payment_methods,indent=2))
|
|
methods_list = []
|
|
|
|
for pm in payment_methods.data:
|
|
pm_info = {
|
|
'id': pm.id,
|
|
'type': pm.type,
|
|
'created': pm.created
|
|
}
|
|
|
|
if pm_info['type'] == "card":
|
|
pm_info['card'] = {
|
|
'brand': pm.card.brand,
|
|
'last4': pm.card.last4,
|
|
'country': pm.card.country,
|
|
'exp_month': pm.card.exp_month,
|
|
'exp_year': pm.card.exp_year
|
|
}
|
|
elif pm_info['type'] == "au_becs_debit":
|
|
pm_info['au_becs_debit'] = {
|
|
'bsb_number': pm.au_becs_debit.bsb_number,
|
|
'last4': pm.au_becs_debit.last4
|
|
}
|
|
|
|
methods_list.append(pm_info)
|
|
|
|
self._log('info', f"Found {len(methods_list)} payment methods")
|
|
print(f"methods_list: {methods_list}")
|
|
return methods_list
|
|
|
|
#except stripe.StripeError as e:
|
|
# self._log('error', f"Stripe error retrieving payment methods: {str(e)}")
|
|
# return []
|
|
#except Exception as e:
|
|
# self._log('error', f"Unexpected error retrieving payment methods: {str(e)}")
|
|
# return []
|
|
|
|
def check_payment_intent(self, payment_intent_id: str) -> Dict[str, Any]:
|
|
"""
|
|
Check the status and details of a specific payment intent.
|
|
|
|
Args:
|
|
payment_intent_id (str): Stripe Payment Intent ID (e.g., 'pi_1234567890')
|
|
|
|
Returns:
|
|
dict: Payment intent status and comprehensive details
|
|
"""
|
|
try:
|
|
# Validate input
|
|
if not payment_intent_id or not isinstance(payment_intent_id, str):
|
|
return {
|
|
'success': False,
|
|
'error': 'Invalid payment_intent_id provided',
|
|
'error_type': 'validation_error',
|
|
'timestamp': datetime.now().isoformat()
|
|
}
|
|
|
|
if not payment_intent_id.startswith('pi_'):
|
|
return {
|
|
'success': False,
|
|
'error': 'Payment Intent ID must start with "pi_"',
|
|
'error_type': 'validation_error',
|
|
'timestamp': datetime.now().isoformat()
|
|
}
|
|
|
|
self._log('info', f"Checking payment intent: {payment_intent_id}")
|
|
|
|
# Retrieve the payment intent with expanded balance transaction for fee details
|
|
payment_intent = stripe.PaymentIntent.retrieve(
|
|
payment_intent_id,
|
|
expand=['latest_charge.balance_transaction']
|
|
)
|
|
|
|
self._log('info', f"Retrieved payment intent with expanded data")
|
|
|
|
Stripe_Charge_ID = None
|
|
if payment_intent.get('latest_charge') and payment_intent.get('latest_charge').get('id').startswith('ch_'):
|
|
Stripe_Charge_ID = payment_intent.get('latest_charge').get('id')
|
|
# Base response
|
|
response = {
|
|
'success': True,
|
|
'payment_intent_id': payment_intent.id,
|
|
'status': payment_intent.status,
|
|
'amount': payment_intent.amount / 100, # Convert from cents to dollars
|
|
'currency': payment_intent.currency,
|
|
'created': datetime.fromtimestamp(payment_intent.created).isoformat(),
|
|
'description': payment_intent.description,
|
|
'customer_id': payment_intent.customer,
|
|
'payment_method_id': payment_intent.payment_method,
|
|
'test_mode': self.is_test_mode,
|
|
'timestamp': datetime.now().isoformat(),
|
|
'charge_id': Stripe_Charge_ID
|
|
}
|
|
|
|
# Add status-specific information
|
|
if payment_intent.status == 'succeeded':
|
|
response.update({
|
|
'success_date': datetime.fromtimestamp(payment_intent.created).isoformat()
|
|
})
|
|
|
|
# Add receipt URL if available
|
|
if hasattr(payment_intent, 'charges') and payment_intent.charges and payment_intent.charges.data:
|
|
first_charge = payment_intent.charges.data[0]
|
|
response['receipt_url'] = getattr(first_charge, 'receipt_url', None)
|
|
|
|
# Get actual fee details from balance transaction if available
|
|
if (hasattr(payment_intent, 'latest_charge') and
|
|
payment_intent.latest_charge and
|
|
hasattr(payment_intent.latest_charge, 'balance_transaction') and
|
|
payment_intent.latest_charge.balance_transaction):
|
|
|
|
# Use actual fee data from Stripe
|
|
balance_transaction = payment_intent.latest_charge.balance_transaction
|
|
actual_fees = self.extract_actual_fees(balance_transaction)
|
|
response['fee_details'] = actual_fees
|
|
self._log('info', f"Using actual fee data: ${actual_fees['total_fee']:.2f}")
|
|
|
|
elif payment_intent.payment_method:
|
|
# Fallback to calculated fees if balance transaction not available
|
|
try:
|
|
payment_method = stripe.PaymentMethod.retrieve(payment_intent.payment_method)
|
|
estimated_fees = self.calculate_stripe_fees(
|
|
response['amount'],
|
|
payment_method.type,
|
|
payment_method,
|
|
transaction_successful=True
|
|
)
|
|
estimated_fees['source'] = 'estimated'
|
|
estimated_fees['note'] = 'Estimated fees - actual fees not yet available'
|
|
response['fee_details'] = estimated_fees
|
|
self._log('info', f"Using estimated fee data: ${estimated_fees['total_fee']:.2f}")
|
|
except Exception as e:
|
|
# If we can't get payment method details, just note it
|
|
response['fee_details'] = {
|
|
'note': 'Fee details unavailable - payment method not accessible',
|
|
'error': str(e)
|
|
}
|
|
self._log('warning', f"Could not retrieve fee details: {str(e)}")
|
|
else:
|
|
response['fee_details'] = {'note': 'No payment method associated with this payment intent'}
|
|
|
|
elif payment_intent.status == 'requires_payment_method':
|
|
response['next_action'] = 'Payment method required'
|
|
|
|
elif payment_intent.status == 'requires_confirmation':
|
|
response['next_action'] = 'Payment requires confirmation'
|
|
|
|
elif payment_intent.status == 'requires_action':
|
|
response['next_action'] = 'Additional action required (e.g., 3D Secure)'
|
|
if payment_intent.next_action:
|
|
response['next_action_details'] = {
|
|
'type': payment_intent.next_action.type if hasattr(payment_intent.next_action, 'type') else 'unknown'
|
|
}
|
|
|
|
elif payment_intent.status == 'processing':
|
|
response['next_action'] = 'Payment is being processed'
|
|
|
|
if payment_intent.status in ['canceled', 'failed', 'requires_payment_method']:
|
|
response['success'] = False
|
|
response['failure_reason'] = 'Payment was canceled or failed'
|
|
|
|
# Get failure details if available
|
|
if payment_intent.last_payment_error:
|
|
error = payment_intent.last_payment_error
|
|
response['failure_details'] = {
|
|
'code': error.code,
|
|
'message': error.message,
|
|
'type': error.type,
|
|
'decline_code': getattr(error, 'decline_code', None)
|
|
}
|
|
|
|
# Add charges information if available
|
|
if hasattr(payment_intent, 'charges') and payment_intent.charges and payment_intent.charges.data:
|
|
charge = payment_intent.charges.data[0]
|
|
response['charge_details'] = {
|
|
'charge_id': charge.id,
|
|
'paid': getattr(charge, 'paid', False),
|
|
'refunded': getattr(charge, 'refunded', False),
|
|
'amount_refunded': getattr(charge, 'amount_refunded', 0) / 100, # Convert to dollars
|
|
'failure_code': getattr(charge, 'failure_code', None),
|
|
'failure_message': getattr(charge, 'failure_message', None)
|
|
}
|
|
|
|
# Add outcome information if available
|
|
if hasattr(charge, 'outcome') and charge.outcome:
|
|
response['charge_details']['outcome'] = {
|
|
'network_status': getattr(charge.outcome, 'network_status', None),
|
|
'reason': getattr(charge.outcome, 'reason', None),
|
|
'seller_message': getattr(charge.outcome, 'seller_message', None),
|
|
'type': getattr(charge.outcome, 'type', None)
|
|
}
|
|
|
|
# Add payment method details from charge if available
|
|
if hasattr(charge, 'payment_method_details') and charge.payment_method_details:
|
|
pm_details = charge.payment_method_details
|
|
response['payment_method_details'] = {
|
|
'type': getattr(pm_details, 'type', 'unknown')
|
|
}
|
|
|
|
if hasattr(pm_details, 'card') and pm_details.card:
|
|
response['payment_method_details']['card'] = {
|
|
'brand': getattr(pm_details.card, 'brand', None),
|
|
'country': getattr(pm_details.card, 'country', None),
|
|
'last4': getattr(pm_details.card, 'last4', None),
|
|
'funding': getattr(pm_details.card, 'funding', None)
|
|
}
|
|
elif hasattr(pm_details, 'au_becs_debit') and pm_details.au_becs_debit:
|
|
response['payment_method_details']['au_becs_debit'] = {
|
|
'bsb_number': getattr(pm_details.au_becs_debit, 'bsb_number', None),
|
|
'last4': getattr(pm_details.au_becs_debit, 'last4', None)
|
|
}
|
|
|
|
self._log('info', f"Payment intent {payment_intent_id} status: {payment_intent.status}")
|
|
return response
|
|
|
|
except stripe.InvalidRequestError as e:
|
|
return {
|
|
'success': False,
|
|
'error': f'Invalid request: {str(e)}',
|
|
'error_type': 'invalid_request',
|
|
'payment_intent_id': payment_intent_id,
|
|
'timestamp': datetime.now().isoformat()
|
|
}
|
|
|
|
except stripe.PermissionError as e:
|
|
return {
|
|
'success': False,
|
|
'error': f'Permission denied: {str(e)}',
|
|
'error_type': 'permission_error',
|
|
'payment_intent_id': payment_intent_id,
|
|
'timestamp': datetime.now().isoformat()
|
|
}
|
|
|
|
except stripe.StripeError as e:
|
|
return {
|
|
'success': False,
|
|
'error': f'Stripe error: {str(e)}',
|
|
'error_type': 'stripe_error',
|
|
'payment_intent_id': payment_intent_id,
|
|
'timestamp': datetime.now().isoformat()
|
|
}
|
|
|
|
except Exception as e:
|
|
return {
|
|
'success': False,
|
|
'error': f'Unexpected error: {str(e)}',
|
|
'error_type': 'unexpected_error',
|
|
'payment_intent_id': payment_intent_id,
|
|
'timestamp': datetime.now().isoformat()
|
|
}
|
|
|
|
def wait_for_payment_completion(self, payment_intent_id: str, max_wait_seconds: int = 60,
|
|
check_interval: int = 5, customer_id: Optional[str] = None) -> Dict[str, Any]:
|
|
"""
|
|
Poll a payment intent until it completes or times out.
|
|
Useful for payments that start with 'processing' status.
|
|
|
|
Args:
|
|
payment_intent_id (str): Stripe Payment Intent ID
|
|
max_wait_seconds (int): Maximum time to wait in seconds (default: 60)
|
|
check_interval (int): How often to check in seconds (default: 5)
|
|
customer_id (str, optional): Customer ID to include in needs_fee_update tracking
|
|
|
|
Returns:
|
|
dict: Final payment intent status with polling metadata
|
|
"""
|
|
start_time = datetime.now()
|
|
attempts = 0
|
|
max_attempts = max_wait_seconds // check_interval
|
|
|
|
self._log('info', f"Starting payment polling for {payment_intent_id} (max {max_wait_seconds}s, every {check_interval}s)")
|
|
|
|
# Check initial status
|
|
result = self.check_payment_intent(payment_intent_id)
|
|
|
|
if not result['success']:
|
|
# If we can't even check the payment, return the error
|
|
return result
|
|
|
|
initial_status = result['status']
|
|
self._log('info', f"Initial payment status: {initial_status}")
|
|
|
|
# If payment is already in a final state, return immediately
|
|
final_statuses = ['succeeded', 'failed', 'canceled']
|
|
if initial_status in final_statuses:
|
|
result['polling_info'] = {
|
|
'polling_needed': False,
|
|
'initial_status': initial_status,
|
|
'final_status': initial_status,
|
|
'total_wait_time_seconds': 0,
|
|
'attempts': 1
|
|
}
|
|
return result
|
|
|
|
# Start polling for non-final statuses
|
|
polling_statuses = ['processing', 'requires_action', 'requires_confirmation']
|
|
|
|
if initial_status not in polling_statuses:
|
|
# Status doesn't require polling
|
|
result['polling_info'] = {
|
|
'polling_needed': False,
|
|
'initial_status': initial_status,
|
|
'final_status': initial_status,
|
|
'total_wait_time_seconds': 0,
|
|
'attempts': 1,
|
|
'note': f'Status "{initial_status}" does not require polling'
|
|
}
|
|
return result
|
|
|
|
# Polling loop
|
|
while attempts < max_attempts:
|
|
attempts += 1
|
|
|
|
# Wait before checking (except for first attempt which we already did)
|
|
if attempts > 1:
|
|
self._log('info', f"Waiting {check_interval} seconds before attempt {attempts}...")
|
|
time.sleep(check_interval)
|
|
|
|
# Check current status
|
|
current_result = self.check_payment_intent(payment_intent_id)
|
|
|
|
if not current_result['success']:
|
|
# Error occurred during polling
|
|
elapsed_time = (datetime.now() - start_time).total_seconds()
|
|
current_result['polling_info'] = {
|
|
'polling_needed': True,
|
|
'initial_status': initial_status,
|
|
'final_status': 'error',
|
|
'total_wait_time_seconds': round(elapsed_time, 2),
|
|
'attempts': attempts,
|
|
'polling_error': 'Failed to check payment status during polling'
|
|
}
|
|
return current_result
|
|
|
|
current_status = current_result['status']
|
|
elapsed_time = (datetime.now() - start_time).total_seconds()
|
|
|
|
self._log('info', f"Attempt {attempts}: Status = {current_status} (elapsed: {elapsed_time:.1f}s)")
|
|
|
|
# Check if we've reached a final status
|
|
if current_status in final_statuses:
|
|
# Payment completed (success or failure)
|
|
current_result['polling_info'] = {
|
|
'polling_needed': True,
|
|
'initial_status': initial_status,
|
|
'final_status': current_status,
|
|
'total_wait_time_seconds': round(elapsed_time, 2),
|
|
'attempts': attempts,
|
|
'completed': True
|
|
}
|
|
|
|
if current_status == 'succeeded':
|
|
self._log('info', f"✅ Payment completed successfully after {elapsed_time:.1f}s ({attempts} attempts)")
|
|
else:
|
|
self._log('warning', f"❌ Payment completed with status '{current_status}' after {elapsed_time:.1f}s")
|
|
current_result['pi_status'] = current_status
|
|
return current_result
|
|
|
|
# Check if status changed to something that doesn't need polling
|
|
if current_status not in polling_statuses:
|
|
current_result['polling_info'] = {
|
|
'polling_needed': True,
|
|
'initial_status': initial_status,
|
|
'final_status': current_status,
|
|
'total_wait_time_seconds': round(elapsed_time, 2),
|
|
'attempts': attempts,
|
|
'completed': False,
|
|
'note': f'Status changed to "{current_status}" which does not require further polling'
|
|
}
|
|
self._log('info', f"Status changed to '{current_status}', stopping polling")
|
|
current_result['pi_status'] = current_status
|
|
return current_result
|
|
|
|
# Timeout reached
|
|
elapsed_time = (datetime.now() - start_time).total_seconds()
|
|
final_result = self.check_payment_intent(payment_intent_id)
|
|
|
|
if final_result['success']:
|
|
final_status = final_result['status']
|
|
final_result['polling_info'] = {
|
|
'polling_needed': True,
|
|
'initial_status': initial_status,
|
|
'final_status': final_status,
|
|
'total_wait_time_seconds': round(elapsed_time, 2),
|
|
'attempts': attempts,
|
|
'completed': False,
|
|
'timed_out': True,
|
|
'timeout_reason': f'Reached maximum wait time of {max_wait_seconds} seconds'
|
|
}
|
|
|
|
# If payment is still processing after timeout and we have customer_id, mark for later review
|
|
if final_status == 'processing' and customer_id:
|
|
final_result['needs_fee_update'] = [customer_id, payment_intent_id]
|
|
self._log('warning', f"⏰ Payment still processing after timeout - marked for later review")
|
|
|
|
self._log('warning', f"⏰ Polling timed out after {max_wait_seconds}s. Final status: {final_status}")
|
|
else:
|
|
# Error on final check
|
|
final_result['polling_info'] = {
|
|
'polling_needed': True,
|
|
'initial_status': initial_status,
|
|
'final_status': 'error',
|
|
'total_wait_time_seconds': round(elapsed_time, 2),
|
|
'attempts': attempts,
|
|
'completed': False,
|
|
'timed_out': True,
|
|
'timeout_reason': 'Timeout reached and final status check failed'
|
|
}
|
|
|
|
# If we have customer_id and this might be a processing payment, mark for later
|
|
if customer_id:
|
|
final_result['needs_fee_update'] = [customer_id, payment_intent_id]
|
|
self._log('warning', f"⏰ Final check failed - marked for later review")
|
|
current_result['pi_status'] = final_status
|
|
return final_result
|
|
|
|
def create_setup_intent(self, customer_id: str, payment_method_types: list = None) -> Dict[str, Any]:
|
|
"""
|
|
Create a Setup Intent to collect and save payment method details for future use.
|
|
|
|
Args:
|
|
customer_id (str): Stripe customer ID
|
|
payment_method_types (list): List of payment method types (e.g., ['card', 'au_becs_debit'])
|
|
|
|
Returns:
|
|
dict: Setup Intent creation result with client_secret for frontend
|
|
"""
|
|
#customer_id = "cus_SoQqMGLmCjiBDZ"
|
|
try:
|
|
if not customer_id or not isinstance(customer_id, str):
|
|
return {
|
|
'success': False,
|
|
'error': 'Invalid customer_id provided',
|
|
'error_type': 'validation_error'
|
|
}
|
|
|
|
# Default payment method types if none provided
|
|
if not payment_method_types:
|
|
payment_method_types = ['card', 'au_becs_debit']
|
|
|
|
self._log('info', f"Creating setup intent for customer: {customer_id}")
|
|
|
|
# Verify customer exists
|
|
try:
|
|
customer = stripe.Customer.retrieve(customer_id)
|
|
except stripe.InvalidRequestError:
|
|
return {
|
|
'success': False,
|
|
'error': f'Customer {customer_id} not found',
|
|
'error_type': 'customer_not_found'
|
|
}
|
|
|
|
# Create Setup Intent
|
|
setup_intent = stripe.SetupIntent.create(
|
|
customer=customer_id,
|
|
payment_method_types=payment_method_types,
|
|
usage='off_session' # For future payments
|
|
)
|
|
|
|
response = {
|
|
'success': True,
|
|
'setup_intent_id': setup_intent.id,
|
|
'client_secret': setup_intent.client_secret,
|
|
'status': setup_intent.status,
|
|
'customer_id': customer_id,
|
|
'payment_method_types': payment_method_types,
|
|
'timestamp': datetime.now().isoformat()
|
|
}
|
|
|
|
self._log('info', f"✅ Setup intent created: {setup_intent.id}")
|
|
return response
|
|
|
|
except stripe.StripeError as e:
|
|
return {
|
|
'success': False,
|
|
'error': f'Stripe error: {str(e)}',
|
|
'error_type': 'stripe_error',
|
|
'customer_id': customer_id,
|
|
'timestamp': datetime.now().isoformat()
|
|
}
|
|
except Exception as e:
|
|
return {
|
|
'success': False,
|
|
'error': f'Unexpected error: {str(e)}',
|
|
'error_type': 'unexpected_error',
|
|
'customer_id': customer_id,
|
|
'timestamp': datetime.now().isoformat()
|
|
}
|
|
|
|
def get_setup_intent_status(self, setup_intent_id: str) -> Dict[str, Any]:
|
|
"""
|
|
Check the status of a Setup Intent and retrieve payment method details if succeeded.
|
|
|
|
Args:
|
|
setup_intent_id (str): Stripe Setup Intent ID
|
|
|
|
Returns:
|
|
dict: Setup Intent status and payment method details
|
|
"""
|
|
try:
|
|
if not setup_intent_id or not setup_intent_id.startswith('seti_'):
|
|
return {
|
|
'success': False,
|
|
'error': 'Invalid setup_intent_id provided',
|
|
'error_type': 'validation_error'
|
|
}
|
|
|
|
self._log('info', f"Checking setup intent status: {setup_intent_id}")
|
|
|
|
# Retrieve setup intent
|
|
setup_intent = stripe.SetupIntent.retrieve(setup_intent_id)
|
|
|
|
response = {
|
|
'success': True,
|
|
'setup_intent_id': setup_intent.id,
|
|
'status': setup_intent.status,
|
|
'customer_id': setup_intent.customer,
|
|
'timestamp': datetime.now().isoformat()
|
|
}
|
|
|
|
# If succeeded, get payment method details
|
|
if setup_intent.status == 'succeeded' and setup_intent.payment_method:
|
|
payment_method = stripe.PaymentMethod.retrieve(setup_intent.payment_method)
|
|
|
|
pm_details = {
|
|
'id': payment_method.id,
|
|
'type': payment_method.type,
|
|
'created': payment_method.created
|
|
}
|
|
|
|
#if payment_method.card:
|
|
if pm_details['type'] == "card":
|
|
pm_details['card'] = {
|
|
'brand': payment_method.card.brand,
|
|
'last4': payment_method.card.last4,
|
|
'country': payment_method.card.country,
|
|
'exp_month': payment_method.card.exp_month,
|
|
'exp_year': payment_method.card.exp_year
|
|
}
|
|
#elif payment_method.au_becs_debit:
|
|
elif pm_details['type'] == "au_becs_debit":
|
|
pm_details['au_becs_debit'] = {
|
|
'bsb_number': payment_method.au_becs_debit.bsb_number,
|
|
'last4': payment_method.au_becs_debit.last4
|
|
}
|
|
|
|
response['payment_method'] = pm_details
|
|
self._log('info', f"✅ Setup intent succeeded with payment method: {payment_method.id}")
|
|
|
|
elif setup_intent.status in ['requires_payment_method', 'requires_confirmation']:
|
|
response['next_action'] = 'Setup still requires user action'
|
|
|
|
elif setup_intent.status == 'processing':
|
|
response['next_action'] = 'Setup is processing'
|
|
|
|
elif setup_intent.status in ['canceled', 'failed']:
|
|
response['success'] = False
|
|
response['error'] = f'Setup intent {setup_intent.status}'
|
|
if setup_intent.last_setup_error:
|
|
response['error_details'] = {
|
|
'code': setup_intent.last_setup_error.code,
|
|
'message': setup_intent.last_setup_error.message,
|
|
'type': setup_intent.last_setup_error.type
|
|
}
|
|
|
|
return response
|
|
|
|
except stripe.StripeError as e:
|
|
return {
|
|
'success': False,
|
|
'error': f'Stripe error: {str(e)}',
|
|
'error_type': 'stripe_error',
|
|
'setup_intent_id': setup_intent_id,
|
|
'timestamp': datetime.now().isoformat()
|
|
}
|
|
except Exception as e:
|
|
return {
|
|
'success': False,
|
|
'error': f'Unexpected error: {str(e)}',
|
|
'error_type': 'unexpected_error',
|
|
'setup_intent_id': setup_intent_id,
|
|
'timestamp': datetime.now().isoformat()
|
|
}
|
|
|
|
def attach_payment_method(self, payment_method_id: str, customer_id: str) -> Dict[str, Any]:
|
|
"""
|
|
Attach a payment method to a customer (if not already attached).
|
|
|
|
Args:
|
|
payment_method_id (str): Stripe Payment Method ID
|
|
customer_id (str): Stripe customer ID
|
|
|
|
Returns:
|
|
dict: Attachment result
|
|
"""
|
|
try:
|
|
if not payment_method_id or not payment_method_id.startswith('pm_'):
|
|
return {
|
|
'success': False,
|
|
'error': 'Invalid payment_method_id provided',
|
|
'error_type': 'validation_error'
|
|
}
|
|
|
|
if not customer_id:
|
|
return {
|
|
'success': False,
|
|
'error': 'Invalid customer_id provided',
|
|
'error_type': 'validation_error'
|
|
}
|
|
|
|
self._log('info', f"Attaching payment method {payment_method_id} to customer {customer_id}")
|
|
|
|
# Try to attach (may already be attached)
|
|
try:
|
|
payment_method = stripe.PaymentMethod.attach(
|
|
payment_method_id,
|
|
customer=customer_id
|
|
)
|
|
self._log('info', f"✅ Payment method attached successfully")
|
|
except stripe.InvalidRequestError as e:
|
|
if 'already attached' in str(e).lower():
|
|
# Already attached, just retrieve it
|
|
payment_method = stripe.PaymentMethod.retrieve(payment_method_id)
|
|
self._log('info', f"Payment method was already attached")
|
|
else:
|
|
raise e
|
|
|
|
return {
|
|
'success': True,
|
|
'payment_method_id': payment_method.id,
|
|
'customer_id': customer_id,
|
|
'type': payment_method.type,
|
|
'timestamp': datetime.now().isoformat()
|
|
}
|
|
|
|
except stripe.StripeError as e:
|
|
return {
|
|
'success': False,
|
|
'error': f'Stripe error: {str(e)}',
|
|
'error_type': 'stripe_error',
|
|
'payment_method_id': payment_method_id,
|
|
'customer_id': customer_id,
|
|
'timestamp': datetime.now().isoformat()
|
|
}
|
|
except Exception as e:
|
|
return {
|
|
'success': False,
|
|
'error': f'Unexpected error: {str(e)}',
|
|
'error_type': 'unexpected_error',
|
|
'payment_method_id': payment_method_id,
|
|
'customer_id': customer_id,
|
|
'timestamp': datetime.now().isoformat()
|
|
}
|
|
|
|
def set_default_payment_method(self, customer_id: str, payment_method_id: str) -> Dict[str, Any]:
|
|
"""
|
|
Set a payment method as the default for a customer.
|
|
|
|
Args:
|
|
customer_id (str): Stripe customer ID
|
|
payment_method_id (str): Stripe Payment Method ID
|
|
|
|
Returns:
|
|
dict: Update result
|
|
"""
|
|
try:
|
|
if not customer_id or not payment_method_id:
|
|
return {
|
|
'success': False,
|
|
'error': 'Both customer_id and payment_method_id are required',
|
|
'error_type': 'validation_error'
|
|
}
|
|
|
|
self._log('info', f"Setting default payment method for customer {customer_id}")
|
|
|
|
# Update customer's default payment method
|
|
customer = stripe.Customer.modify(
|
|
customer_id,
|
|
invoice_settings={
|
|
'default_payment_method': payment_method_id
|
|
}
|
|
)
|
|
|
|
return {
|
|
'success': True,
|
|
'customer_id': customer_id,
|
|
'default_payment_method': payment_method_id,
|
|
'timestamp': datetime.now().isoformat()
|
|
}
|
|
|
|
except stripe.StripeError as e:
|
|
return {
|
|
'success': False,
|
|
'error': f'Stripe error: {str(e)}',
|
|
'error_type': 'stripe_error',
|
|
'customer_id': customer_id,
|
|
'payment_method_id': payment_method_id,
|
|
'timestamp': datetime.now().isoformat()
|
|
}
|
|
except Exception as e:
|
|
return {
|
|
'success': False,
|
|
'error': f'Unexpected error: {str(e)}',
|
|
'error_type': 'unexpected_error',
|
|
'customer_id': customer_id,
|
|
'payment_method_id': payment_method_id,
|
|
'timestamp': datetime.now().isoformat()
|
|
}
|
|
|
|
def detach_payment_method(self, payment_method_id: str) -> Dict[str, Any]:
|
|
"""
|
|
Detach a payment method from its customer.
|
|
|
|
Args:
|
|
payment_method_id (str): Stripe Payment Method ID
|
|
|
|
Returns:
|
|
dict: Detachment result
|
|
"""
|
|
try:
|
|
if not payment_method_id or not payment_method_id.startswith('pm_'):
|
|
return {
|
|
'success': False,
|
|
'error': 'Invalid payment_method_id provided',
|
|
'error_type': 'validation_error'
|
|
}
|
|
|
|
self._log('info', f"Detaching payment method: {payment_method_id}")
|
|
|
|
payment_method = stripe.PaymentMethod.detach(payment_method_id)
|
|
|
|
return {
|
|
'success': True,
|
|
'payment_method_id': payment_method.id,
|
|
'detached': True,
|
|
'timestamp': datetime.now().isoformat()
|
|
}
|
|
|
|
except stripe.StripeError as e:
|
|
return {
|
|
'success': False,
|
|
'error': f'Stripe error: {str(e)}',
|
|
'error_type': 'stripe_error',
|
|
'payment_method_id': payment_method_id,
|
|
'timestamp': datetime.now().isoformat()
|
|
}
|
|
except Exception as e:
|
|
return {
|
|
'success': False,
|
|
'error': f'Unexpected error: {str(e)}',
|
|
'error_type': 'unexpected_error',
|
|
'payment_method_id': payment_method_id,
|
|
'timestamp': datetime.now().isoformat()
|
|
}
|
|
|
|
def update_payment_fees(self, needs_fee_update) -> Dict[str, Any]:
|
|
"""
|
|
Update fees for a payment that was previously marked as needing a fee update.
|
|
|
|
Args:
|
|
needs_fee_update (list): List containing [customer_id, payment_intent_id]
|
|
|
|
Returns:
|
|
dict: Updated payment information with actual fees if available
|
|
"""
|
|
try:
|
|
# Parse the identifier
|
|
if not isinstance(needs_fee_update, (list, tuple)) or len(needs_fee_update) != 2:
|
|
return {
|
|
'success': False,
|
|
'error': 'Invalid needs_fee_update format. Expected [customer_id, payment_intent_id]',
|
|
'error_type': 'validation_error'
|
|
}
|
|
|
|
customer_id, payment_intent_id = needs_fee_update
|
|
|
|
self._log('info', f"Updating fees for {payment_intent_id} (customer: {customer_id})")
|
|
|
|
# Get the current payment intent status with expanded data
|
|
current_result = self.check_payment_intent(payment_intent_id)
|
|
|
|
if not current_result['success']:
|
|
return current_result
|
|
|
|
# Check if we now have actual fees or if payment is now complete
|
|
has_actual_fees = (current_result.get('fee_details', {}).get('source') == 'stripe_actual')
|
|
is_complete = current_result['status'] in ['succeeded', 'failed', 'canceled']
|
|
|
|
update_result = {
|
|
'success': True,
|
|
'payment_intent_id': payment_intent_id,
|
|
'customer_id': customer_id,
|
|
'status': current_result['status'],
|
|
'amount': current_result['amount'],
|
|
'currency': current_result['currency'],
|
|
'has_actual_fees': has_actual_fees,
|
|
'is_complete': is_complete,
|
|
'fee_details': current_result.get('fee_details', {}),
|
|
'timestamp': datetime.now().isoformat()
|
|
}
|
|
|
|
# Determine if this payment still needs future updates
|
|
if has_actual_fees and is_complete:
|
|
update_result['needs_further_updates'] = False
|
|
update_result['note'] = 'Payment complete with actual fees'
|
|
self._log('info', f"✅ Payment {payment_intent_id} now complete with actual fees")
|
|
elif is_complete and not has_actual_fees:
|
|
update_result['needs_further_updates'] = False
|
|
update_result['note'] = 'Payment complete but actual fees not available'
|
|
self._log('info', f"✅ Payment {payment_intent_id} complete but no actual fees")
|
|
elif has_actual_fees and not is_complete:
|
|
update_result['needs_further_updates'] = True
|
|
update_result['needs_fee_update'] = [customer_id, payment_intent_id] # Keep tracking
|
|
update_result['note'] = 'Has actual fees but payment still processing'
|
|
self._log('info', f"⏳ Payment {payment_intent_id} has fees but still processing")
|
|
else:
|
|
update_result['needs_further_updates'] = True
|
|
update_result['needs_fee_update'] = [customer_id, payment_intent_id] # Keep tracking
|
|
update_result['note'] = 'Payment still processing without actual fees'
|
|
self._log('info', f"⏳ Payment {payment_intent_id} still needs updates")
|
|
|
|
return update_result
|
|
|
|
except Exception as e:
|
|
return {
|
|
'success': False,
|
|
'error': f'Failed to update payment fees: {str(e)}',
|
|
'error_type': 'update_error',
|
|
'needs_fee_update': needs_fee_update,
|
|
'timestamp': datetime.now().isoformat()
|
|
}
|