""" Payment Plan Processor Module This module handles processing of recurring payment plans. """ import logging import threading from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime from typing import Dict, Any, List from .base_processor import BasePaymentProcessor logger = logging.getLogger(__name__) class PaymentPlanProcessor(BasePaymentProcessor): """ Processor for payment plan mode. This class handles processing of recurring payment plans that are due based on their schedule (weekly/fortnightly). """ def __init__(self, *args, **kwargs): """Initialize the payment plan processor.""" super().__init__(*args, **kwargs) self.db_lock = threading.Lock() def process(self) -> Dict[str, Any]: """ Process payment plans that are due today. Returns: Dictionary with processing results """ start_time = datetime.now() self.log_processing_start("Payment Plan") # Query for due payment plans due_plans = self._get_due_payment_plans() if not due_plans: self.logger.info("No payment plans due for processing today") return { 'success': True, 'batch_id': None, 'success_count': 0, 'failed_count': 0, 'total_amount': 0.0, 'duration': (datetime.now() - start_time).total_seconds() } # Create batch for payment plans batch_id = self.payment_repo.create_payment_batch() if batch_id is None: self.logger.error("Failed to create batch for payment plan processing") return { 'success': False, 'batch_id': None, 'success_count': 0, 'failed_count': 0, 'total_amount': 0.0, 'duration': (datetime.now() - start_time).total_seconds() } # Add payments to batch total_amount = sum(abs(plan.get('deposit', 0)) for plan in due_plans) result = self.payment_repo.add_payments_to_batch(due_plans, batch_id) if result['added'] == 0: self.logger.error("Failed to add payment plans to batch") return { 'success': False, 'batch_id': batch_id, 'success_count': 0, 'failed_count': 0, 'total_amount': 0.0, 'duration': (datetime.now() - start_time).total_seconds() } self.logger.info(f"Created payment plan batch {batch_id} with {result['added']} plans (${total_amount:,.2f} total)") # Log batch creation try: from services import log_batch_created log_batch_created(batch_id, "Payment Plan", result['added']) except Exception as e: self.logger.warning(f"Failed to log batch creation: {e}") # Execute payments payments = self.payment_repo.get_payments_by_batch(batch_id) success_count, failed_count = self._process_payments_threaded( payments, self.config.MAX_PAYMENT_THREADS ) # Calculate duration and log completion duration = (datetime.now() - start_time).total_seconds() self.log_processing_complete( "Payment Plan", success_count, failed_count, duration, { 'batch_id': batch_id, 'total_amount': f"${total_amount:,.2f}" } ) return { 'success': True, 'batch_id': batch_id, 'success_count': success_count, 'failed_count': failed_count, 'total_amount': total_amount, 'duration': duration } def _get_due_payment_plans(self) -> List[Dict[str, Any]]: """ Get payment plans that are due for processing today. Returns: List of payment plan dictionaries """ active_plans = self.payment_repo.get_active_payment_plans() due_plans = [] for plan in active_plans: if plan.Start_Date and self._is_payment_day( start_date_string=str(plan.Start_Date.strftime('%Y-%m-%d')), schedule=plan.Frequency ): payment_data = { "customer_id": plan.Splynx_ID, "stripe_customer_id": plan.Stripe_Customer_ID, "deposit": plan.Amount * -1, "stripe_pm": plan.Stripe_Payment_Method, "paymentplan_id": plan.id } due_plans.append(payment_data) self.logger.info(f"Found {len(due_plans)} payment plans due today") return due_plans def _is_payment_day(self, start_date_string: str, schedule: str, date_format: str = "%Y-%m-%d") -> bool: """ Check if today is a payment day based on start date and frequency. Args: start_date_string: The first payment date schedule: Payment frequency ("Weekly" or "Fortnightly") date_format: Format of the date string Returns: True if today is a payment day, False otherwise """ try: if not start_date_string or not schedule: self.logger.error("Missing required parameters for payment day calculation") return False if schedule == "Weekly": num_days = 7 elif schedule == "Fortnightly": num_days = 14 else: self.logger.error(f"Unsupported payment schedule '{schedule}'") return False start_date = datetime.strptime(start_date_string, date_format) today = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) start_date = start_date.replace(hour=0, minute=0, second=0, microsecond=0) # Calculate days since start date days_since_start = (today - start_date).days # Check if it's a multiple of the payment frequency return days_since_start >= 0 and days_since_start % num_days == 0 except ValueError as e: self.logger.error(f"Error parsing date '{start_date_string}' with format '{date_format}': {e}") return False except Exception as e: self.logger.error(f"Unexpected error in is_payment_day: {e}") return False def _process_payments_threaded(self, payments: List[Any], max_threads: int) -> tuple: """ Process payments using thread pool. Args: payments: List of payment records max_threads: Maximum number of concurrent threads Returns: Tuple of (success_count, failed_count) """ processed_count = 0 failed_count = 0 if not payments: return (0, 0) self.logger.info(f"Processing {len(payments)} payment plan payments using {max_threads} threads") # Prepare payment tasks payment_tasks = [] for pay in payments: payment_data = self.create_payment_data(pay, pay.id) payment_tasks.append(payment_data) # Process with ThreadPoolExecutor with ThreadPoolExecutor(max_workers=max_threads) as executor: future_to_payment = { executor.submit(self._process_single_payment, task): task for task in payment_tasks } # Process results as they complete for future in as_completed(future_to_payment): try: result = future.result(timeout=60) if result['success'] and result['result']: # Immediately commit each successful payment self._update_payment_result(result['payment_id'], result['result']) processed_count += 1 self.logger.info(f"Payment {result['payment_id']} processed ({processed_count}/{len(payments)})") else: failed_count += 1 self.logger.warning(f"Payment {result['payment_id']} failed ({failed_count} failures)") except Exception as e: payment_data = future_to_payment[future] failed_count += 1 self.logger.error(f"Thread exception for payment {payment_data['payment_id']}: {e}") return (processed_count, failed_count) def _process_single_payment(self, payment_data: Dict[str, Any]) -> Dict[str, Any]: """ Process a single payment (thread-safe). Args: payment_data: Dictionary containing payment information Returns: Dictionary with payment result and metadata """ try: # Process payment with Stripe result = self.stripe_processor.process_payment( customer_id=payment_data['customer_id'], amount=payment_data['amount'], currency=payment_data['currency'], description=payment_data['description'], stripe_pm=payment_data['stripe_pm'] ) return { 'payment_id': payment_data['payment_id'], 'result': result, 'success': True } except Exception as e: self.logger.error(f"Payment processing failed for payment ID {payment_data['payment_id']}: {e}") return { 'payment_id': payment_data['payment_id'], 'result': None, 'success': False, 'error': str(e) } def _update_payment_result(self, payment_id: int, result: Dict[str, Any]): """ Thread-safe update of payment result to database. Args: payment_id: Payment ID result: Payment processing result """ with self.db_lock: try: if result: self.handle_payment_result(payment_id, result, payment_type="pay") self.logger.debug(f"Payment {payment_id} result committed to database") else: self.logger.warning(f"No result to commit for payment {payment_id}") except Exception as e: self.logger.error(f"Failed to update payment {payment_id}: {e}")