#!/usr/bin/env python3 """ Stripe Payment Processor Class A clean, reusable class for processing single Stripe payments with comprehensive error handling and fee calculations. """ import stripe from stripe import StripeError import os import logging import time import json from datetime import datetime from decimal import Decimal from typing import Dict, Any, Optional class StripePaymentProcessor: """ A clean, focused class for processing individual Stripe payments. Returns comprehensive JSON results for each payment attempt. """ def __init__(self, api_key: Optional[str] = None, enable_logging: bool = False, log_level: int = logging.INFO): """ Initialize the Stripe payment processor. Args: api_key (str, optional): Stripe API key. If None, will use STRIPE_SECRET_KEY env var enable_logging (bool): Whether to enable console logging. Defaults to False log_level (int): Logging level if logging is enabled """ # Set up Stripe API key #print(f"processor api_key: {api_key}") if api_key: stripe.api_key = api_key else: stripe.api_key = os.getenv('STRIPE_SECRET_KEY') #print(f"processor api_key: {stripe.api_key}") if not stripe.api_key: raise ValueError("Stripe API key is required. Provide via api_key parameter or STRIPE_SECRET_KEY environment variable.") # Validate API key format if not (stripe.api_key.startswith('sk_test_') or stripe.api_key.startswith('rk_live_')): raise ValueError("Invalid Stripe API key format. Key should start with 'sk_test_' or 'rk_live_'") self.is_test_mode = stripe.api_key.startswith('sk_test_') # Set up optional logging self.enable_logging = enable_logging if enable_logging: logging.basicConfig(level=log_level, format='%(asctime)s - %(levelname)s - %(message)s') self.logger = logging.getLogger(__name__) self.logger.info(f"StripePaymentProcessor initialized ({'TEST' if self.is_test_mode else 'LIVE'} mode)") else: self.logger = None def _log(self, level: str, message: str): """Internal logging method""" if self.logger: getattr(self.logger, level.lower())(message) def extract_actual_fees(self, balance_transaction: Any) -> Dict[str, Any]: """ Extract actual fee details from Stripe's balance transaction. This provides real fee data instead of estimates. Args: balance_transaction: Stripe balance transaction object Returns: dict: Actual fee details from Stripe """ if not balance_transaction or not hasattr(balance_transaction, 'fee_details'): return {'note': 'No fee details available'} fee_breakdown = [] total_fee = getattr(balance_transaction, 'fee', 0) / 100 # Convert from cents for fee_detail in balance_transaction.fee_details: fee_breakdown.append({ 'type': getattr(fee_detail, 'type', 'unknown'), 'description': getattr(fee_detail, 'description', 'Unknown fee'), 'amount': getattr(fee_detail, 'amount', 0) / 100, # Convert from cents 'currency': getattr(fee_detail, 'currency', 'unknown') }) return { 'source': 'stripe_actual', 'total_fee': total_fee, 'net_amount': getattr(balance_transaction, 'net', 0) / 100, # Convert from cents 'fee_breakdown': fee_breakdown, 'available_on': getattr(balance_transaction, 'available_on', None), 'note': 'Actual fees from Stripe balance transaction' } def calculate_stripe_fees(self, amount: float, payment_method_type: str, payment_method_details: Optional[Any] = None, transaction_successful: bool = True) -> Dict[str, Any]: """ DEPRECATED: Calculate estimated Stripe fees based on payment method type. Use extract_actual_fees() with balance transaction for real fee data. This method provides fee estimates and is kept for fallback scenarios where actual fee data is not available from Stripe. Args: amount (float): Transaction amount in dollars payment_method_type (str): Type of payment method ('card', 'au_becs_debit', etc.) payment_method_details: Stripe PaymentMethod object with additional details transaction_successful (bool): Whether transaction succeeded (affects BECS caps) Returns: dict: Estimated fee calculation information """ fee_info = { 'payment_method_type': payment_method_type, 'percentage_fee': 0.0, 'fixed_fee': 0.0, 'total_fee': 0.0, 'fee_description': 'Unknown payment method', 'capped': False, 'cap_amount': None, 'international': False } if payment_method_type == 'card': # Default to domestic card rates fee_info['percentage_fee'] = 1.7 fee_info['fixed_fee'] = 0.30 fee_info['fee_description'] = 'Domestic credit/debit card' # Check if it's an international card if payment_method_details and hasattr(payment_method_details, 'card') and payment_method_details.card: card_country = payment_method_details.card.country self._log('info', f"Card country detected: {card_country}") fee_info['card_brand'] = payment_method_details.get('card').get('brand') fee_info['card_display_brand'] = payment_method_details.get('card').get('display_brand') if card_country and card_country != 'AU': fee_info['percentage_fee'] = 3.5 fee_info['fixed_fee'] = 0.30 fee_info['fee_description'] = f'International credit/debit card ({card_country})' fee_info['international'] = True else: self._log('info', f"Domestic card confirmed (country: {card_country})") else: # If we can't determine country, assume domestic for AU-based business self._log('info', "Card country not available - assuming domestic") elif payment_method_type == 'au_becs_debit': fee_info['percentage_fee'] = 1.0 fee_info['fixed_fee'] = 0.30 fee_info['fee_description'] = 'Australia BECS Direct Debit' # Apply BECS caps based on transaction outcome if transaction_successful: fee_info['cap_amount'] = 3.50 fee_info['fee_description'] += ' (capped at $3.50)' else: fee_info['cap_amount'] = 2.50 fee_info['fee_description'] += ' (failure/dispute - capped at $2.50)' # Calculate total fee percentage_amount = amount * (fee_info['percentage_fee'] / 100) calculated_fee = percentage_amount + fee_info['fixed_fee'] # Apply cap if applicable if fee_info['cap_amount'] and calculated_fee > fee_info['cap_amount']: fee_info['total_fee'] = fee_info['cap_amount'] fee_info['capped'] = True else: fee_info['total_fee'] = round(calculated_fee, 2) return fee_info def process_payment(self, customer_id: str, amount: float, currency: str = 'aud', description: Optional[str] = None, wait_for_completion: bool = True, stripe_pm: Optional[str] = None) -> Dict[str, Any]: """ Process a single payment for a customer using their default payment method. Args: customer_id (str): Stripe customer ID amount (float): Amount in dollars (will be converted to cents internally) currency (str): Currency code (default: 'aud') description (str, optional): Payment description wait_for_completion (bool): If True, will poll for 'processing' payments to complete (default: True) Returns: dict: Comprehensive payment result with success status and details """ transaction_start = datetime.now() # Base response structure response = { 'success': False, 'timestamp': transaction_start.isoformat(), 'customer_id': customer_id, 'amount': amount, 'currency': currency.lower(), 'description': description, 'processing_time_seconds': 0.0, 'test_mode': self.is_test_mode } try: # Validate inputs if not customer_id or not isinstance(customer_id, str): response['error'] = 'Invalid customer_id provided' response['error_type'] = 'validation_error' return response if amount <= 0: response['error'] = 'Amount must be greater than 0' response['error_type'] = 'validation_error' return response # Convert dollars to cents amount_cents = int(Decimal(str(amount)) * 100) self._log('info', f"Processing payment: {customer_id}, ${amount} {currency.upper()}") # Retrieve customer customer = stripe.Customer.retrieve(customer_id) print(f"customer: {json.dumps(customer,indent=2)}") if not customer: response['error'] = f'Customer {customer_id} not found' response['error_type'] = 'customer_not_found' return response # Add customer details to response response.update({ 'customer_email': customer.email, 'customer_name': customer.description or customer.name }) if stripe_pm: default_payment_method = stripe_pm else: # Get default payment method default_payment_method = customer.invoice_settings.default_payment_method if not default_payment_method: response['error'] = 'Customer has no default payment method set' response['error_type'] = 'no_payment_method' return response # Retrieve payment method details payment_method = stripe.PaymentMethod.retrieve(default_payment_method) payment_method_type = payment_method.type print(f"payment_method: {json.dumps(payment_method,indent=2)}") response.update({ 'payment_method_id': default_payment_method, 'payment_method_type': payment_method_type }) # Calculate estimated fees before payment estimated_fee_details = self.calculate_stripe_fees( amount, payment_method_type, payment_method, transaction_successful=True # Will be updated if payment fails ) estimated_fee_details['source'] = 'estimated' estimated_fee_details['note'] = 'Pre-payment estimate' response['estimated_fee_details'] = estimated_fee_details self._log('info', f"Payment method: {payment_method_type} - {estimated_fee_details['fee_description']}") # Prepare Payment Intent parameters payment_intent_params = { 'amount': amount_cents, 'currency': currency, 'customer': customer_id, 'payment_method': default_payment_method, 'description': description or f"Payment for {customer.description or customer.email}", 'confirm': True, 'return_url': 'https://your-website.com/payment-success', 'off_session': True } # Add mandate data for BECS Direct Debit if payment_method_type == 'au_becs_debit': payment_intent_params['mandate_data'] = { 'customer_acceptance': { 'type': 'offline' } } self._log('info', "Added BECS mandate data for offline acceptance") # Create and confirm Payment Intent payment_intent = stripe.PaymentIntent.create(**payment_intent_params) # Add payment intent details response.update({ 'payment_intent_id': payment_intent.id, 'status': payment_intent.status }) if payment_intent.status == 'succeeded': response['success'] = True self._log('info', f"✅ Payment successful: {payment_intent.id}") # Get actual fee details for successful payments try: # Re-retrieve with expanded balance transaction to get actual fees time.sleep(3) payment_intent_expanded = stripe.PaymentIntent.retrieve( payment_intent.id, expand=['latest_charge.balance_transaction'] ) if (hasattr(payment_intent_expanded, 'latest_charge') and payment_intent_expanded.latest_charge and hasattr(payment_intent_expanded.latest_charge, 'balance_transaction') and payment_intent_expanded.latest_charge.balance_transaction): balance_transaction = payment_intent_expanded.latest_charge.balance_transaction actual_fees = self.extract_actual_fees(balance_transaction) response['fee_details'] = actual_fees self._log('info', f"Retrieved actual fees: ${actual_fees['total_fee']:.2f}") else: # Keep estimated fees if balance transaction not available response['fee_details'] = estimated_fee_details response['fee_details']['note'] = 'Balance transaction not yet available, showing estimate' # Record this payment for later fee update response['needs_fee_update'] = [customer_id, payment_intent.id] self._log('info', f"Balance transaction not available, using estimates - marked for later update") except Exception as e: # If we can't get actual fees, keep the estimates and mark for later response['fee_details'] = estimated_fee_details response['fee_details']['note'] = f'Could not retrieve actual fees: {str(e)}' response['needs_fee_update'] = [customer_id, payment_intent.id] self._log('warning', f"Failed to get actual fees: {str(e)} - marked for later update") elif payment_intent.status == 'processing' and wait_for_completion: # Payment is processing - wait for completion self._log('info', f"💭 Payment is processing, waiting for completion...") # Use the polling method to wait for completion polling_result = self.wait_for_payment_completion(payment_intent.id, customer_id=customer_id, max_wait_seconds=30) if polling_result['success']: # Update our response with the final polling result response.update(polling_result) # The polling result already has all the details we need if polling_result['status'] == 'succeeded': response['success'] = True self._log('info', f"✅ Payment completed successfully after polling") else: response['success'] = False response['error'] = f'Payment completed with status: {polling_result["status"]}' response['error_type'] = 'payment_not_succeeded' else: # Polling failed - update with polling error info response.update(polling_result) response['success'] = False if 'error' not in response: response['error'] = 'Payment polling failed' response['error_type'] = 'polling_failed' else: # For failed payments or processing without polling if payment_method_type == 'au_becs_debit': # Recalculate BECS fees with failure cap for failed payments failed_fee_details = self.calculate_stripe_fees( amount, payment_method_type, payment_method, transaction_successful=False ) failed_fee_details['source'] = 'estimated' failed_fee_details['note'] = 'Estimated fees for failed BECS payment' response['fee_details'] = failed_fee_details else: # Use estimated fees for other payment types response['fee_details'] = estimated_fee_details if payment_intent.status == 'processing': response['error'] = f'Payment is processing (polling disabled). Check status later.' response['error_type'] = 'payment_processing' response['next_action'] = f'Use check_payment_intent("{payment_intent.id}") or wait_for_payment_completion("{payment_intent.id}") to check status' else: response['error'] = f'Payment not completed. Status: {payment_intent.status}' response['error_type'] = 'payment_incomplete' self._log('warning', f"⚠️ Payment incomplete: {payment_intent.id} - {payment_intent.status}") # Calculate processing time processing_time = (datetime.now() - transaction_start).total_seconds() response['processing_time_seconds'] = round(processing_time, 2) response['pi_status'] = payment_intent.status return response except stripe.CardError as e: # Card-specific error (declined, etc.) processing_time = (datetime.now() - transaction_start).total_seconds() #print(f"stripe.CardError: {str(e)}\n{e.user_message}\n{e.request_id}\n{e.code}") #print(json.dumps(e, indent=2)) response.update({ 'error': f'Card declined: {e.user_message}', 'error_type': 'card_declined', 'decline_code': e.code, 'processing_time_seconds': round(processing_time, 2) }) self._log('error', f"❌ Card declined for {customer_id}: {e.user_message}") return response except stripe.InvalidRequestError as e: # Invalid parameters processing_time = (datetime.now() - transaction_start).total_seconds() response.update({ 'error': f'Invalid request: {str(e)}', 'error_type': 'invalid_request', 'processing_time_seconds': round(processing_time, 2) }) self._log('error', f"❌ Invalid request for {customer_id}: {str(e)}") return response except stripe.AuthenticationError as e: # Authentication with Stripe failed processing_time = (datetime.now() - transaction_start).total_seconds() response.update({ 'error': f'Authentication failed: {str(e)}', 'error_type': 'authentication_error', 'processing_time_seconds': round(processing_time, 2) }) self._log('error', f"❌ Authentication failed: {str(e)}") return response except stripe.APIConnectionError as e: # Network communication with Stripe failed processing_time = (datetime.now() - transaction_start).total_seconds() response.update({ 'error': f'Network error: {str(e)}', 'error_type': 'network_error', 'processing_time_seconds': round(processing_time, 2) }) self._log('error', f"❌ Network error: {str(e)}") return response except stripe.StripeError as e: # Other Stripe-specific errors processing_time = (datetime.now() - transaction_start).total_seconds() response.update({ 'error': f'Stripe error: {str(e)}', 'error_type': 'stripe_error', 'processing_time_seconds': round(processing_time, 2) }) self._log('error', f"❌ Stripe error: {str(e)}") return response except Exception as e: # Unexpected errors processing_time = (datetime.now() - transaction_start).total_seconds() response.update({ 'error': f'Unexpected error: {str(e)}', 'error_type': 'unexpected_error', 'processing_time_seconds': round(processing_time, 2) }) self._log('error', f"❌ Unexpected error for {customer_id}: {str(e)}") return response def get_customer_info(self, customer_id: str) -> Dict[str, Any]: """ Retrieve customer information including payment methods. Args: customer_id (str): Stripe customer ID Returns: dict: Customer information and payment method details """ try: customer = stripe.Customer.retrieve(customer_id) customer_info = { 'success': True, 'customer_id': customer.id, 'email': customer.email, 'name': customer.description or customer.name, 'created': customer.created, 'default_payment_method': customer.invoice_settings.default_payment_method if customer.invoice_settings else None, 'payment_methods': [] } # Get payment methods payment_methods = stripe.PaymentMethod.list( customer=customer_id, limit=10 ) for pm in payment_methods.data: pm_info = { 'id': pm.id, 'type': pm.type, 'created': pm.created } if pm.card: pm_info['card'] = { 'brand': pm.card.brand, 'last4': pm.card.last4, 'country': pm.card.country, 'exp_month': pm.card.exp_month, 'exp_year': pm.card.exp_year } elif pm.au_becs_debit: pm_info['au_becs_debit'] = { 'bsb_number': pm.au_becs_debit.bsb_number, 'last4': pm.au_becs_debit.last4 } customer_info['payment_methods'].append(pm_info) return customer_info except stripe.StripeError as e: return { 'success': False, 'error': f'Stripe error: {str(e)}', 'error_type': 'stripe_error' } except Exception as e: return { 'success': False, 'error': f'Unexpected error: {str(e)}', 'error_type': 'unexpected_error' } def get_payment_methods(self, customer_id: str) -> list: """ Get all payment methods for a Stripe customer. Args: customer_id (str): Stripe customer ID Returns: list: List of payment methods with details """ #try: self._log('info', f"Retrieving payment methods for customer: {customer_id}") # Get payment methods for the customer payment_methods = stripe.PaymentMethod.list( customer=customer_id, limit=10 ) #print(json.dumps(payment_methods,indent=2)) methods_list = [] for pm in payment_methods.data: pm_info = { 'id': pm.id, 'type': pm.type, 'created': pm.created } if pm_info['type'] == "card": pm_info['card'] = { 'brand': pm.card.brand, 'last4': pm.card.last4, 'country': pm.card.country, 'exp_month': pm.card.exp_month, 'exp_year': pm.card.exp_year } elif pm_info['type'] == "au_becs_debit": pm_info['au_becs_debit'] = { 'bsb_number': pm.au_becs_debit.bsb_number, 'last4': pm.au_becs_debit.last4 } methods_list.append(pm_info) self._log('info', f"Found {len(methods_list)} payment methods") print(f"methods_list: {methods_list}") return methods_list #except stripe.StripeError as e: # self._log('error', f"Stripe error retrieving payment methods: {str(e)}") # return [] #except Exception as e: # self._log('error', f"Unexpected error retrieving payment methods: {str(e)}") # return [] def check_payment_intent(self, payment_intent_id: str) -> Dict[str, Any]: """ Check the status and details of a specific payment intent. Args: payment_intent_id (str): Stripe Payment Intent ID (e.g., 'pi_1234567890') Returns: dict: Payment intent status and comprehensive details """ try: # Validate input if not payment_intent_id or not isinstance(payment_intent_id, str): return { 'success': False, 'error': 'Invalid payment_intent_id provided', 'error_type': 'validation_error', 'timestamp': datetime.now().isoformat() } if not payment_intent_id.startswith('pi_'): return { 'success': False, 'error': 'Payment Intent ID must start with "pi_"', 'error_type': 'validation_error', 'timestamp': datetime.now().isoformat() } self._log('info', f"Checking payment intent: {payment_intent_id}") # Retrieve the payment intent with expanded balance transaction for fee details payment_intent = stripe.PaymentIntent.retrieve( payment_intent_id, expand=['latest_charge.balance_transaction'] ) self._log('info', f"Retrieved payment intent with expanded data") Stripe_Charge_ID = None if payment_intent.get('latest_charge') and payment_intent.get('latest_charge').get('id').startswith('ch_'): Stripe_Charge_ID = payment_intent.get('latest_charge').get('id') # Base response response = { 'success': True, 'payment_intent_id': payment_intent.id, 'status': payment_intent.status, 'amount': payment_intent.amount / 100, # Convert from cents to dollars 'currency': payment_intent.currency, 'created': datetime.fromtimestamp(payment_intent.created).isoformat(), 'description': payment_intent.description, 'customer_id': payment_intent.customer, 'payment_method_id': payment_intent.payment_method, 'test_mode': self.is_test_mode, 'timestamp': datetime.now().isoformat(), 'charge_id': Stripe_Charge_ID } # Add status-specific information if payment_intent.status == 'succeeded': response.update({ 'success_date': datetime.fromtimestamp(payment_intent.created).isoformat() }) # Add receipt URL if available if hasattr(payment_intent, 'charges') and payment_intent.charges and payment_intent.charges.data: first_charge = payment_intent.charges.data[0] response['receipt_url'] = getattr(first_charge, 'receipt_url', None) # Get actual fee details from balance transaction if available if (hasattr(payment_intent, 'latest_charge') and payment_intent.latest_charge and hasattr(payment_intent.latest_charge, 'balance_transaction') and payment_intent.latest_charge.balance_transaction): # Use actual fee data from Stripe balance_transaction = payment_intent.latest_charge.balance_transaction actual_fees = self.extract_actual_fees(balance_transaction) response['fee_details'] = actual_fees self._log('info', f"Using actual fee data: ${actual_fees['total_fee']:.2f}") elif payment_intent.payment_method: # Fallback to calculated fees if balance transaction not available try: payment_method = stripe.PaymentMethod.retrieve(payment_intent.payment_method) estimated_fees = self.calculate_stripe_fees( response['amount'], payment_method.type, payment_method, transaction_successful=True ) estimated_fees['source'] = 'estimated' estimated_fees['note'] = 'Estimated fees - actual fees not yet available' response['fee_details'] = estimated_fees self._log('info', f"Using estimated fee data: ${estimated_fees['total_fee']:.2f}") except Exception as e: # If we can't get payment method details, just note it response['fee_details'] = { 'note': 'Fee details unavailable - payment method not accessible', 'error': str(e) } self._log('warning', f"Could not retrieve fee details: {str(e)}") else: response['fee_details'] = {'note': 'No payment method associated with this payment intent'} elif payment_intent.status == 'requires_payment_method': response['next_action'] = 'Payment method required' elif payment_intent.status == 'requires_confirmation': response['next_action'] = 'Payment requires confirmation' elif payment_intent.status == 'requires_action': response['next_action'] = 'Additional action required (e.g., 3D Secure)' if payment_intent.next_action: response['next_action_details'] = { 'type': payment_intent.next_action.type if hasattr(payment_intent.next_action, 'type') else 'unknown' } elif payment_intent.status == 'processing': response['next_action'] = 'Payment is being processed' if payment_intent.status in ['canceled', 'failed', 'requires_payment_method']: response['success'] = False response['failure_reason'] = 'Payment was canceled or failed' # Get failure details if available if payment_intent.last_payment_error: error = payment_intent.last_payment_error response['failure_details'] = { 'code': error.code, 'message': error.message, 'type': error.type, 'decline_code': getattr(error, 'decline_code', None) } # Add charges information if available if hasattr(payment_intent, 'charges') and payment_intent.charges and payment_intent.charges.data: charge = payment_intent.charges.data[0] response['charge_details'] = { 'charge_id': charge.id, 'paid': getattr(charge, 'paid', False), 'refunded': getattr(charge, 'refunded', False), 'amount_refunded': getattr(charge, 'amount_refunded', 0) / 100, # Convert to dollars 'failure_code': getattr(charge, 'failure_code', None), 'failure_message': getattr(charge, 'failure_message', None) } # Add outcome information if available if hasattr(charge, 'outcome') and charge.outcome: response['charge_details']['outcome'] = { 'network_status': getattr(charge.outcome, 'network_status', None), 'reason': getattr(charge.outcome, 'reason', None), 'seller_message': getattr(charge.outcome, 'seller_message', None), 'type': getattr(charge.outcome, 'type', None) } # Add payment method details from charge if available if hasattr(charge, 'payment_method_details') and charge.payment_method_details: pm_details = charge.payment_method_details response['payment_method_details'] = { 'type': getattr(pm_details, 'type', 'unknown') } if hasattr(pm_details, 'card') and pm_details.card: response['payment_method_details']['card'] = { 'brand': getattr(pm_details.card, 'brand', None), 'country': getattr(pm_details.card, 'country', None), 'last4': getattr(pm_details.card, 'last4', None), 'funding': getattr(pm_details.card, 'funding', None) } elif hasattr(pm_details, 'au_becs_debit') and pm_details.au_becs_debit: response['payment_method_details']['au_becs_debit'] = { 'bsb_number': getattr(pm_details.au_becs_debit, 'bsb_number', None), 'last4': getattr(pm_details.au_becs_debit, 'last4', None) } self._log('info', f"Payment intent {payment_intent_id} status: {payment_intent.status}") return response except stripe.InvalidRequestError as e: return { 'success': False, 'error': f'Invalid request: {str(e)}', 'error_type': 'invalid_request', 'payment_intent_id': payment_intent_id, 'timestamp': datetime.now().isoformat() } except stripe.PermissionError as e: return { 'success': False, 'error': f'Permission denied: {str(e)}', 'error_type': 'permission_error', 'payment_intent_id': payment_intent_id, 'timestamp': datetime.now().isoformat() } except stripe.StripeError as e: return { 'success': False, 'error': f'Stripe error: {str(e)}', 'error_type': 'stripe_error', 'payment_intent_id': payment_intent_id, 'timestamp': datetime.now().isoformat() } except Exception as e: return { 'success': False, 'error': f'Unexpected error: {str(e)}', 'error_type': 'unexpected_error', 'payment_intent_id': payment_intent_id, 'timestamp': datetime.now().isoformat() } def check_refund_status(self, refund_id: str) -> Dict[str, Any]: """ Check the status and details of a specific refund. Args: refund_id (str): Stripe Refund ID (e.g., 'pyr_1234567890') Returns: dict: Refund status and comprehensive details """ try: # Validate input if not refund_id or not isinstance(refund_id, str): return { 'success': False, 'error': 'Invalid refund_id provided', 'error_type': 'validation_error', 'timestamp': datetime.now().isoformat() } if not refund_id.startswith('pyr_') and not refund_id.startswith('re_'): return { 'success': False, 'error': 'Refund ID must start with "pyr_" or "re_"', 'error_type': 'validation_error', 'timestamp': datetime.now().isoformat() } self._log('info', f"Checking refund status: {refund_id}") # Retrieve the refund with expanded balance transaction for fee details refund = stripe.Refund.retrieve( refund_id, expand=['balance_transaction'] ) print(f"refund: {refund}") self._log('info', f"Retrieved refund with expanded data") # Base response response = { 'success': True, 'refund_id': refund.id, 'status': refund.status, 'amount': refund.amount / 100, # Convert from cents to dollars 'currency': refund.currency.upper(), 'reason': refund.reason, 'failure_reason': getattr(refund, 'failure_reason', None), 'charge_id': refund.charge, 'payment_intent_id': refund.payment_intent, 'created': datetime.fromtimestamp(refund.created).isoformat(), 'timestamp': datetime.now().isoformat() } # Add metadata if present if refund.metadata: response['metadata'] = dict(refund.metadata) # Add balance transaction details if available if hasattr(refund, 'balance_transaction') and refund.balance_transaction: balance_txn = refund.balance_transaction response['balance_transaction'] = { 'id': balance_txn.id, 'net': balance_txn.net / 100, # Convert from cents 'fee': balance_txn.fee / 100, # Convert from cents 'available_on': datetime.fromtimestamp(balance_txn.available_on).isoformat() if balance_txn.available_on else None } # Add fee details if available if hasattr(balance_txn, 'fee_details') and balance_txn.fee_details: response['fee_details'] = [] for fee_detail in balance_txn.fee_details: response['fee_details'].append({ 'type': fee_detail.type, 'amount': fee_detail.amount / 100, # Convert from cents 'currency': fee_detail.currency.upper(), 'description': fee_detail.description }) # Add receipt details if available if hasattr(refund, 'receipt_number') and refund.receipt_number: response['receipt_number'] = refund.receipt_number # Determine if refund is complete response['is_complete'] = refund.status == 'succeeded' response['is_failed'] = refund.status in ['failed', 'canceled'] response['is_pending'] = refund.status == 'pending' # Add pending reason if applicable if hasattr(refund, 'pending_reason') and refund.pending_reason: response['pending_reason'] = refund.pending_reason self._log('info', f"✅ Refund status check successful: {refund_id} - {refund.status}") return response except stripe.InvalidRequestError as e: return { 'success': False, 'error': f'Invalid request: {str(e)}', 'error_type': 'invalid_request_error', 'refund_id': refund_id, 'timestamp': datetime.now().isoformat() } except stripe.AuthenticationError as e: return { 'success': False, 'error': f'Authentication failed: {str(e)}', 'error_type': 'authentication_error', 'refund_id': refund_id, 'timestamp': datetime.now().isoformat() } except stripe.PermissionError as e: return { 'success': False, 'error': f'Permission denied: {str(e)}', 'error_type': 'permission_error', 'refund_id': refund_id, 'timestamp': datetime.now().isoformat() } except stripe.StripeError as e: return { 'success': False, 'error': f'Stripe error: {str(e)}', 'error_type': 'stripe_error', 'refund_id': refund_id, 'timestamp': datetime.now().isoformat() } except Exception as e: return { 'success': False, 'error': f'Unexpected error: {str(e)}', 'error_type': 'unexpected_error', 'refund_id': refund_id, 'timestamp': datetime.now().isoformat() } def wait_for_payment_completion(self, payment_intent_id: str, max_wait_seconds: int = 60, check_interval: int = 5, customer_id: Optional[str] = None) -> Dict[str, Any]: """ Poll a payment intent until it completes or times out. Useful for payments that start with 'processing' status. Args: payment_intent_id (str): Stripe Payment Intent ID max_wait_seconds (int): Maximum time to wait in seconds (default: 60) check_interval (int): How often to check in seconds (default: 5) customer_id (str, optional): Customer ID to include in needs_fee_update tracking Returns: dict: Final payment intent status with polling metadata """ start_time = datetime.now() attempts = 0 max_attempts = max_wait_seconds // check_interval self._log('info', f"Starting payment polling for {payment_intent_id} (max {max_wait_seconds}s, every {check_interval}s)") # Check initial status result = self.check_payment_intent(payment_intent_id) if not result['success']: # If we can't even check the payment, return the error return result initial_status = result['status'] self._log('info', f"Initial payment status: {initial_status}") # If payment is already in a final state, return immediately final_statuses = ['succeeded', 'failed', 'canceled'] if initial_status in final_statuses: result['polling_info'] = { 'polling_needed': False, 'initial_status': initial_status, 'final_status': initial_status, 'total_wait_time_seconds': 0, 'attempts': 1 } return result # Start polling for non-final statuses polling_statuses = ['processing', 'requires_action', 'requires_confirmation'] if initial_status not in polling_statuses: # Status doesn't require polling result['polling_info'] = { 'polling_needed': False, 'initial_status': initial_status, 'final_status': initial_status, 'total_wait_time_seconds': 0, 'attempts': 1, 'note': f'Status "{initial_status}" does not require polling' } return result # Polling loop while attempts < max_attempts: attempts += 1 # Wait before checking (except for first attempt which we already did) if attempts > 1: self._log('info', f"Waiting {check_interval} seconds before attempt {attempts}...") time.sleep(check_interval) # Check current status current_result = self.check_payment_intent(payment_intent_id) if not current_result['success']: # Error occurred during polling elapsed_time = (datetime.now() - start_time).total_seconds() current_result['polling_info'] = { 'polling_needed': True, 'initial_status': initial_status, 'final_status': 'error', 'total_wait_time_seconds': round(elapsed_time, 2), 'attempts': attempts, 'polling_error': 'Failed to check payment status during polling' } return current_result current_status = current_result['status'] elapsed_time = (datetime.now() - start_time).total_seconds() self._log('info', f"Attempt {attempts}: Status = {current_status} (elapsed: {elapsed_time:.1f}s)") # Check if we've reached a final status if current_status in final_statuses: # Payment completed (success or failure) current_result['polling_info'] = { 'polling_needed': True, 'initial_status': initial_status, 'final_status': current_status, 'total_wait_time_seconds': round(elapsed_time, 2), 'attempts': attempts, 'completed': True } if current_status == 'succeeded': self._log('info', f"✅ Payment completed successfully after {elapsed_time:.1f}s ({attempts} attempts)") else: self._log('warning', f"❌ Payment completed with status '{current_status}' after {elapsed_time:.1f}s") current_result['pi_status'] = current_status return current_result # Check if status changed to something that doesn't need polling if current_status not in polling_statuses: current_result['polling_info'] = { 'polling_needed': True, 'initial_status': initial_status, 'final_status': current_status, 'total_wait_time_seconds': round(elapsed_time, 2), 'attempts': attempts, 'completed': False, 'note': f'Status changed to "{current_status}" which does not require further polling' } self._log('info', f"Status changed to '{current_status}', stopping polling") current_result['pi_status'] = current_status return current_result # Timeout reached elapsed_time = (datetime.now() - start_time).total_seconds() final_result = self.check_payment_intent(payment_intent_id) if final_result['success']: final_status = final_result['status'] final_result['polling_info'] = { 'polling_needed': True, 'initial_status': initial_status, 'final_status': final_status, 'total_wait_time_seconds': round(elapsed_time, 2), 'attempts': attempts, 'completed': False, 'timed_out': True, 'timeout_reason': f'Reached maximum wait time of {max_wait_seconds} seconds' } # If payment is still processing after timeout and we have customer_id, mark for later review if final_status == 'processing' and customer_id: final_result['needs_fee_update'] = [customer_id, payment_intent_id] self._log('warning', f"⏰ Payment still processing after timeout - marked for later review") self._log('warning', f"⏰ Polling timed out after {max_wait_seconds}s. Final status: {final_status}") else: # Error on final check final_result['polling_info'] = { 'polling_needed': True, 'initial_status': initial_status, 'final_status': 'error', 'total_wait_time_seconds': round(elapsed_time, 2), 'attempts': attempts, 'completed': False, 'timed_out': True, 'timeout_reason': 'Timeout reached and final status check failed' } # If we have customer_id and this might be a processing payment, mark for later if customer_id: final_result['needs_fee_update'] = [customer_id, payment_intent_id] self._log('warning', f"⏰ Final check failed - marked for later review") current_result['pi_status'] = final_status return final_result def create_setup_intent(self, customer_id: str, payment_method_types: list = None) -> Dict[str, Any]: """ Create a Setup Intent to collect and save payment method details for future use. Args: customer_id (str): Stripe customer ID payment_method_types (list): List of payment method types (e.g., ['card', 'au_becs_debit']) Returns: dict: Setup Intent creation result with client_secret for frontend """ #customer_id = "cus_SoQqMGLmCjiBDZ" try: if not customer_id or not isinstance(customer_id, str): return { 'success': False, 'error': 'Invalid customer_id provided', 'error_type': 'validation_error' } # Default payment method types if none provided if not payment_method_types: payment_method_types = ['card', 'au_becs_debit'] self._log('info', f"Creating setup intent for customer: {customer_id}") # Verify customer exists try: customer = stripe.Customer.retrieve(customer_id) except stripe.InvalidRequestError: return { 'success': False, 'error': f'Customer {customer_id} not found', 'error_type': 'customer_not_found' } # Create Setup Intent setup_intent = stripe.SetupIntent.create( customer=customer_id, payment_method_types=payment_method_types, usage='off_session' # For future payments ) response = { 'success': True, 'setup_intent_id': setup_intent.id, 'client_secret': setup_intent.client_secret, 'status': setup_intent.status, 'customer_id': customer_id, 'payment_method_types': payment_method_types, 'timestamp': datetime.now().isoformat() } self._log('info', f"✅ Setup intent created: {setup_intent.id}") return response except stripe.StripeError as e: return { 'success': False, 'error': f'Stripe error: {str(e)}', 'error_type': 'stripe_error', 'customer_id': customer_id, 'timestamp': datetime.now().isoformat() } except Exception as e: return { 'success': False, 'error': f'Unexpected error: {str(e)}', 'error_type': 'unexpected_error', 'customer_id': customer_id, 'timestamp': datetime.now().isoformat() } def get_setup_intent_status(self, setup_intent_id: str) -> Dict[str, Any]: """ Check the status of a Setup Intent and retrieve payment method details if succeeded. Args: setup_intent_id (str): Stripe Setup Intent ID Returns: dict: Setup Intent status and payment method details """ try: if not setup_intent_id or not setup_intent_id.startswith('seti_'): return { 'success': False, 'error': 'Invalid setup_intent_id provided', 'error_type': 'validation_error' } self._log('info', f"Checking setup intent status: {setup_intent_id}") # Retrieve setup intent setup_intent = stripe.SetupIntent.retrieve(setup_intent_id) response = { 'success': True, 'setup_intent_id': setup_intent.id, 'status': setup_intent.status, 'customer_id': setup_intent.customer, 'timestamp': datetime.now().isoformat() } # If succeeded, get payment method details if setup_intent.status == 'succeeded' and setup_intent.payment_method: payment_method = stripe.PaymentMethod.retrieve(setup_intent.payment_method) pm_details = { 'id': payment_method.id, 'type': payment_method.type, 'created': payment_method.created } #if payment_method.card: if pm_details['type'] == "card": pm_details['card'] = { 'brand': payment_method.card.brand, 'last4': payment_method.card.last4, 'country': payment_method.card.country, 'exp_month': payment_method.card.exp_month, 'exp_year': payment_method.card.exp_year } #elif payment_method.au_becs_debit: elif pm_details['type'] == "au_becs_debit": pm_details['au_becs_debit'] = { 'bsb_number': payment_method.au_becs_debit.bsb_number, 'last4': payment_method.au_becs_debit.last4 } response['payment_method'] = pm_details self._log('info', f"✅ Setup intent succeeded with payment method: {payment_method.id}") elif setup_intent.status in ['requires_payment_method', 'requires_confirmation']: response['next_action'] = 'Setup still requires user action' elif setup_intent.status == 'processing': response['next_action'] = 'Setup is processing' elif setup_intent.status in ['canceled', 'failed']: response['success'] = False response['error'] = f'Setup intent {setup_intent.status}' if setup_intent.last_setup_error: response['error_details'] = { 'code': setup_intent.last_setup_error.code, 'message': setup_intent.last_setup_error.message, 'type': setup_intent.last_setup_error.type } return response except stripe.StripeError as e: return { 'success': False, 'error': f'Stripe error: {str(e)}', 'error_type': 'stripe_error', 'setup_intent_id': setup_intent_id, 'timestamp': datetime.now().isoformat() } except Exception as e: return { 'success': False, 'error': f'Unexpected error: {str(e)}', 'error_type': 'unexpected_error', 'setup_intent_id': setup_intent_id, 'timestamp': datetime.now().isoformat() } def attach_payment_method(self, payment_method_id: str, customer_id: str) -> Dict[str, Any]: """ Attach a payment method to a customer (if not already attached). Args: payment_method_id (str): Stripe Payment Method ID customer_id (str): Stripe customer ID Returns: dict: Attachment result """ try: if not payment_method_id or not payment_method_id.startswith('pm_'): return { 'success': False, 'error': 'Invalid payment_method_id provided', 'error_type': 'validation_error' } if not customer_id: return { 'success': False, 'error': 'Invalid customer_id provided', 'error_type': 'validation_error' } self._log('info', f"Attaching payment method {payment_method_id} to customer {customer_id}") # Try to attach (may already be attached) try: payment_method = stripe.PaymentMethod.attach( payment_method_id, customer=customer_id ) self._log('info', f"✅ Payment method attached successfully") except stripe.InvalidRequestError as e: if 'already attached' in str(e).lower(): # Already attached, just retrieve it payment_method = stripe.PaymentMethod.retrieve(payment_method_id) self._log('info', f"Payment method was already attached") else: raise e return { 'success': True, 'payment_method_id': payment_method.id, 'customer_id': customer_id, 'type': payment_method.type, 'timestamp': datetime.now().isoformat() } except stripe.StripeError as e: return { 'success': False, 'error': f'Stripe error: {str(e)}', 'error_type': 'stripe_error', 'payment_method_id': payment_method_id, 'customer_id': customer_id, 'timestamp': datetime.now().isoformat() } except Exception as e: return { 'success': False, 'error': f'Unexpected error: {str(e)}', 'error_type': 'unexpected_error', 'payment_method_id': payment_method_id, 'customer_id': customer_id, 'timestamp': datetime.now().isoformat() } def set_default_payment_method(self, customer_id: str, payment_method_id: str) -> Dict[str, Any]: """ Set a payment method as the default for a customer. Args: customer_id (str): Stripe customer ID payment_method_id (str): Stripe Payment Method ID Returns: dict: Update result """ try: if not customer_id or not payment_method_id: return { 'success': False, 'error': 'Both customer_id and payment_method_id are required', 'error_type': 'validation_error' } self._log('info', f"Setting default payment method for customer {customer_id}") # Update customer's default payment method customer = stripe.Customer.modify( customer_id, invoice_settings={ 'default_payment_method': payment_method_id } ) return { 'success': True, 'customer_id': customer_id, 'default_payment_method': payment_method_id, 'timestamp': datetime.now().isoformat() } except stripe.StripeError as e: return { 'success': False, 'error': f'Stripe error: {str(e)}', 'error_type': 'stripe_error', 'customer_id': customer_id, 'payment_method_id': payment_method_id, 'timestamp': datetime.now().isoformat() } except Exception as e: return { 'success': False, 'error': f'Unexpected error: {str(e)}', 'error_type': 'unexpected_error', 'customer_id': customer_id, 'payment_method_id': payment_method_id, 'timestamp': datetime.now().isoformat() } def detach_payment_method(self, payment_method_id: str) -> Dict[str, Any]: """ Detach a payment method from its customer. Args: payment_method_id (str): Stripe Payment Method ID Returns: dict: Detachment result """ try: if not payment_method_id or not payment_method_id.startswith('pm_'): return { 'success': False, 'error': 'Invalid payment_method_id provided', 'error_type': 'validation_error' } self._log('info', f"Detaching payment method: {payment_method_id}") payment_method = stripe.PaymentMethod.detach(payment_method_id) return { 'success': True, 'payment_method_id': payment_method.id, 'detached': True, 'timestamp': datetime.now().isoformat() } except stripe.StripeError as e: return { 'success': False, 'error': f'Stripe error: {str(e)}', 'error_type': 'stripe_error', 'payment_method_id': payment_method_id, 'timestamp': datetime.now().isoformat() } except Exception as e: return { 'success': False, 'error': f'Unexpected error: {str(e)}', 'error_type': 'unexpected_error', 'payment_method_id': payment_method_id, 'timestamp': datetime.now().isoformat() } def update_payment_fees(self, needs_fee_update) -> Dict[str, Any]: """ Update fees for a payment that was previously marked as needing a fee update. Args: needs_fee_update (list): List containing [customer_id, payment_intent_id] Returns: dict: Updated payment information with actual fees if available """ try: # Parse the identifier if not isinstance(needs_fee_update, (list, tuple)) or len(needs_fee_update) != 2: return { 'success': False, 'error': 'Invalid needs_fee_update format. Expected [customer_id, payment_intent_id]', 'error_type': 'validation_error' } customer_id, payment_intent_id = needs_fee_update self._log('info', f"Updating fees for {payment_intent_id} (customer: {customer_id})") # Get the current payment intent status with expanded data current_result = self.check_payment_intent(payment_intent_id) if not current_result['success']: return current_result # Check if we now have actual fees or if payment is now complete has_actual_fees = (current_result.get('fee_details', {}).get('source') == 'stripe_actual') is_complete = current_result['status'] in ['succeeded', 'failed', 'canceled'] update_result = { 'success': True, 'payment_intent_id': payment_intent_id, 'customer_id': customer_id, 'status': current_result['status'], 'amount': current_result['amount'], 'currency': current_result['currency'], 'has_actual_fees': has_actual_fees, 'is_complete': is_complete, 'fee_details': current_result.get('fee_details', {}), 'timestamp': datetime.now().isoformat() } # Determine if this payment still needs future updates if has_actual_fees and is_complete: update_result['needs_further_updates'] = False update_result['note'] = 'Payment complete with actual fees' self._log('info', f"✅ Payment {payment_intent_id} now complete with actual fees") elif is_complete and not has_actual_fees: update_result['needs_further_updates'] = False update_result['note'] = 'Payment complete but actual fees not available' self._log('info', f"✅ Payment {payment_intent_id} complete but no actual fees") elif has_actual_fees and not is_complete: update_result['needs_further_updates'] = True update_result['needs_fee_update'] = [customer_id, payment_intent_id] # Keep tracking update_result['note'] = 'Has actual fees but payment still processing' self._log('info', f"⏳ Payment {payment_intent_id} has fees but still processing") else: update_result['needs_further_updates'] = True update_result['needs_fee_update'] = [customer_id, payment_intent_id] # Keep tracking update_result['note'] = 'Payment still processing without actual fees' self._log('info', f"⏳ Payment {payment_intent_id} still needs updates") return update_result except Exception as e: return { 'success': False, 'error': f'Failed to update payment fees: {str(e)}', 'error_type': 'update_error', 'needs_fee_update': needs_fee_update, 'timestamp': datetime.now().isoformat() }