""" Batch Payment Processor Module This module handles batch processing of Direct Debit and Card payments. """ import sys import json import logging import threading from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime from typing import Dict, Any, List from .base_processor import BasePaymentProcessor logger = logging.getLogger(__name__) class BatchPaymentProcessor(BasePaymentProcessor): """ Processor for batch payment mode. This class handles the batch processing of Direct Debit and Card payments, including multi-threaded payment execution. """ def __init__(self, *args, **kwargs): """Initialize the batch payment processor.""" super().__init__(*args, **kwargs) self.db_lock = threading.Lock() def process(self) -> Dict[str, Any]: """ Process batch payments for Direct Debit and Card payment methods. Returns: Dictionary with processing results """ start_time = datetime.now() self.log_processing_start("Batch") print(f"Batch Processor Mode: {self.process_live}") # Create batches for both payment methods if self.process_live: print("LIVE MODE") batch_ids = self._create_payment_batches() else: print("The Sandbox") batch_ids = self._create_payment_batches() #sys.exit() if not batch_ids: self.logger.warning("No batches created for processing") return { 'success': False, 'batch_ids': [], 'success_count': 0, 'failed_count': 0, 'duration': (datetime.now() - start_time).total_seconds() } # Execute payments for all batches success_count, failed_count = self._execute_payment_batches(batch_ids) # Calculate duration and log completion duration = (datetime.now() - start_time).total_seconds() self.log_processing_complete( "Batch", success_count, failed_count, duration, {'batch_ids': batch_ids} ) return { 'success': True, 'batch_ids': batch_ids, 'success_count': success_count, 'failed_count': failed_count, 'duration': duration } def _create_payment_batches(self) -> List[int]: """ Create payment batches for Direct Debit and Card payments. Returns: List of batch IDs """ batch_ids = [] payment_methods = [ self.config.PAYMENT_METHOD_CARD, self.config.PAYMENT_METHOD_DIRECT_DEBIT ] payment_method_names = { self.config.PAYMENT_METHOD_CARD: "Card Payment", self.config.PAYMENT_METHOD_DIRECT_DEBIT: "Direct Debit" } #for pm in payment_methods: # Create batch batch_id = self.payment_repo.create_payment_batch() if batch_id is None: self.logger.error(f"Failed to create batch for payment method {payment_methods}") #continue # Query customers for this payment method if self.process_live: customers = self.payment_repo.query_mysql_customers( mysql_config=self.config.MYSQL_CONFIG, payment_methods=payment_methods, deposit_threshold=self.config.DEPOSIT_THRESHOLD, limit=self.config.DEFAULT_QUERY_LIMIT ) invoices = self.payment_repo.query_mysql_customer_invoices( mysql_config=self.config.MYSQL_CONFIG, limit=self.config.DEFAULT_QUERY_LIMIT ) else: print("Collecting test customer data") #customers = self.payment_repo.get_test_customer_data(payment_method=payment_methods) customers = self.payment_repo.query_mysql_customers( mysql_config=self.config.MYSQL_CONFIG, payment_methods=payment_methods, deposit_threshold=self.config.DEPOSIT_THRESHOLD, limit=self.config.DEFAULT_QUERY_LIMIT ) invoices = self.payment_repo.query_mysql_customer_invoices( mysql_config=self.config.MYSQL_CONFIG, limit=self.config.DEFAULT_QUERY_LIMIT ) if customers: customer_count = len(customers) result = self.payment_repo.add_payments_to_batch(customers, batch_id, invoices) if result['added'] > 0: batch_ids.append(batch_id) #self.logger.info(f"Created batch {batch_id} for {payment_method_names[pm]} with {customer_count} customers") self.logger.info(f"Created batch {batch_id} with {customer_count} customers") # Log batch creation using services try: from services import log_batch_created #log_batch_created(batch_id, payment_method_names[pm], customer_count) log_batch_created(batch_id, str(payment_methods), customer_count) except Exception as e: self.logger.warning(f"Failed to log batch creation: {e}") else: self.logger.warning(f"No payments added to batch {batch_id}") else: #self.logger.info(f"No customers found for {payment_method_names[pm]}") self.logger.info(f"No customers found for {str(payment_methods)}") #sys.exit() return batch_ids def _execute_payment_batches(self, batch_ids: List[int]) -> tuple: """ Execute payments for all provided batch IDs using thread pool. Args: batch_ids: List of batch IDs to process Returns: Tuple of (success_count, failed_count) """ if not batch_ids: self.logger.warning("No valid batches to process") return (0, 0) max_threads = self.config.MAX_PAYMENT_THREADS total_success = 0 total_failed = 0 for batch_id in batch_ids: self.logger.info(f"Processing batch {batch_id}") # Get payments for this batch payments = self.payment_repo.get_payments_by_batch(batch_id) if not payments: self.logger.info(f"No payments found for batch {batch_id}") continue self.logger.info(f"Processing {len(payments)} payments using {max_threads} threads") success_count, failed_count = self._process_payments_threaded( payments, max_threads ) total_success += success_count total_failed += failed_count self.logger.info(f"Batch {batch_id} completed: {success_count}/{len(payments)} successful") return (total_success, total_failed) def _process_payments_threaded(self, payments: List[Any], max_threads: int) -> tuple: """ Process payments using thread pool with immediate commits. Args: payments: List of payment records max_threads: Maximum number of concurrent threads Returns: Tuple of (success_count, failed_count) """ processed_count = 0 failed_count = 0 # Process payments in chunks to avoid timeout issues chunk_size = max_threads * 2 for i in range(0, len(payments), chunk_size): chunk = payments[i:i + chunk_size] self.logger.info(f"Processing chunk {i//chunk_size + 1}: payments {i+1}-{min(i+chunk_size, len(payments))}") # Prepare payment tasks for this chunk payment_tasks = [] for pay in chunk: payment_data = self.create_payment_data(pay, pay.id) payment_tasks.append(payment_data) # Process chunk with ThreadPoolExecutor with ThreadPoolExecutor(max_workers=max_threads) as executor: future_to_payment = { executor.submit(self._process_single_payment, task): task for task in payment_tasks } # Process results as they complete for future in as_completed(future_to_payment): try: result = future.result(timeout=60) if result['success'] and result['result']: # Immediately commit each successful payment self._update_payment_result(result['payment_id'], result['result']) processed_count += 1 self.logger.info(f"Payment {result['payment_id']} processed ({processed_count}/{len(payments)})") else: failed_count += 1 self.logger.warning(f"Payment {result['payment_id']} failed ({failed_count} failures)") except Exception as e: payment_data = future_to_payment[future] failed_count += 1 self.logger.error(f"Thread exception for payment {payment_data['payment_id']}: {e}") self.logger.info(f"Chunk completed: {processed_count} processed, {failed_count} failed") return (processed_count, failed_count) def _process_single_payment(self, payment_data: Dict[str, Any]) -> Dict[str, Any]: """ Process a single payment (thread-safe). Args: payment_data: Dictionary containing payment information Returns: Dictionary with payment result and metadata """ try: # Process payment with Stripe print(f"\n\nBatch Processor - processPaymentResult - Mode: {self.config.MODE}") result = self.stripe_processor.process_payment( customer_id=payment_data['customer_id'], amount=payment_data['amount'], currency=payment_data['currency'], description=payment_data['description'], stripe_pm=payment_data['stripe_pm'] ) return { 'payment_id': payment_data['payment_id'], 'result': result, 'success': True } except Exception as e: self.logger.error(f"Payment processing failed for payment ID {payment_data['payment_id']}: {e}") return { 'payment_id': payment_data['payment_id'], 'result': None, 'success': False, 'error': str(e) } def _update_payment_result(self, payment_id: int, result: Dict[str, Any]): """ Thread-safe update of payment result to database. Args: payment_id: Payment ID result: Payment processing result """ with self.db_lock: try: if result: self.handle_payment_result(payment_id, result, payment_type="pay") self.logger.debug(f"Payment {payment_id} result committed to database") else: self.logger.warning(f"No result to commit for payment {payment_id}") except Exception as e: self.logger.error(f"Failed to update payment {payment_id}: {e}")