You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

292 lines
10 KiB

"""
Payment Plan Processor Module
This module handles processing of recurring payment plans.
"""
import logging
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
from typing import Dict, Any, List
from .base_processor import BasePaymentProcessor
logger = logging.getLogger(__name__)
class PaymentPlanProcessor(BasePaymentProcessor):
"""
Processor for payment plan mode.
This class handles processing of recurring payment plans that are due
based on their schedule (weekly/fortnightly).
"""
def __init__(self, *args, **kwargs):
"""Initialize the payment plan processor."""
super().__init__(*args, **kwargs)
self.db_lock = threading.Lock()
def process(self) -> Dict[str, Any]:
"""
Process payment plans that are due today.
Returns:
Dictionary with processing results
"""
start_time = datetime.now()
self.log_processing_start("Payment Plan")
# Query for due payment plans
due_plans = self._get_due_payment_plans()
if not due_plans:
self.logger.info("No payment plans due for processing today")
return {
'success': True,
'batch_id': None,
'success_count': 0,
'failed_count': 0,
'total_amount': 0.0,
'duration': (datetime.now() - start_time).total_seconds()
}
# Create batch for payment plans
batch_id = self.payment_repo.create_payment_batch()
if batch_id is None:
self.logger.error("Failed to create batch for payment plan processing")
return {
'success': False,
'batch_id': None,
'success_count': 0,
'failed_count': 0,
'total_amount': 0.0,
'duration': (datetime.now() - start_time).total_seconds()
}
# Add payments to batch
total_amount = sum(abs(plan.get('deposit', 0)) for plan in due_plans)
result = self.payment_repo.add_payments_to_batch(due_plans, batch_id)
if result['added'] == 0:
self.logger.error("Failed to add payment plans to batch")
return {
'success': False,
'batch_id': batch_id,
'success_count': 0,
'failed_count': 0,
'total_amount': 0.0,
'duration': (datetime.now() - start_time).total_seconds()
}
self.logger.info(f"Created payment plan batch {batch_id} with {result['added']} plans (${total_amount:,.2f} total)")
# Log batch creation
try:
from services import log_batch_created
log_batch_created(batch_id, "Payment Plan", result['added'])
except Exception as e:
self.logger.warning(f"Failed to log batch creation: {e}")
# Execute payments
payments = self.payment_repo.get_payments_by_batch(batch_id)
success_count, failed_count = self._process_payments_threaded(
payments,
self.config.MAX_PAYMENT_THREADS
)
# Calculate duration and log completion
duration = (datetime.now() - start_time).total_seconds()
self.log_processing_complete(
"Payment Plan",
success_count,
failed_count,
duration,
{
'batch_id': batch_id,
'total_amount': f"${total_amount:,.2f}"
}
)
return {
'success': True,
'batch_id': batch_id,
'success_count': success_count,
'failed_count': failed_count,
'total_amount': total_amount,
'duration': duration
}
def _get_due_payment_plans(self) -> List[Dict[str, Any]]:
"""
Get payment plans that are due for processing today.
Returns:
List of payment plan dictionaries
"""
active_plans = self.payment_repo.get_active_payment_plans()
due_plans = []
for plan in active_plans:
if plan.Start_Date and self._is_payment_day(
start_date_string=str(plan.Start_Date.strftime('%Y-%m-%d')),
schedule=plan.Frequency
):
payment_data = {
"customer_id": plan.Splynx_ID,
"stripe_customer_id": plan.Stripe_Customer_ID,
"deposit": plan.Amount * -1,
"stripe_pm": plan.Stripe_Payment_Method,
"paymentplan_id": plan.id
}
due_plans.append(payment_data)
self.logger.info(f"Found {len(due_plans)} payment plans due today")
return due_plans
def _is_payment_day(self, start_date_string: str, schedule: str, date_format: str = "%Y-%m-%d") -> bool:
"""
Check if today is a payment day based on start date and frequency.
Args:
start_date_string: The first payment date
schedule: Payment frequency ("Weekly" or "Fortnightly")
date_format: Format of the date string
Returns:
True if today is a payment day, False otherwise
"""
try:
if not start_date_string or not schedule:
self.logger.error("Missing required parameters for payment day calculation")
return False
if schedule == "Weekly":
num_days = 7
elif schedule == "Fortnightly":
num_days = 14
else:
self.logger.error(f"Unsupported payment schedule '{schedule}'")
return False
start_date = datetime.strptime(start_date_string, date_format)
today = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
start_date = start_date.replace(hour=0, minute=0, second=0, microsecond=0)
# Calculate days since start date
days_since_start = (today - start_date).days
# Check if it's a multiple of the payment frequency
return days_since_start >= 0 and days_since_start % num_days == 0
except ValueError as e:
self.logger.error(f"Error parsing date '{start_date_string}' with format '{date_format}': {e}")
return False
except Exception as e:
self.logger.error(f"Unexpected error in is_payment_day: {e}")
return False
def _process_payments_threaded(self, payments: List[Any], max_threads: int) -> tuple:
"""
Process payments using thread pool.
Args:
payments: List of payment records
max_threads: Maximum number of concurrent threads
Returns:
Tuple of (success_count, failed_count)
"""
processed_count = 0
failed_count = 0
if not payments:
return (0, 0)
self.logger.info(f"Processing {len(payments)} payment plan payments using {max_threads} threads")
# Prepare payment tasks
payment_tasks = []
for pay in payments:
payment_data = self.create_payment_data(pay, pay.id)
payment_tasks.append(payment_data)
# Process with ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=max_threads) as executor:
future_to_payment = {
executor.submit(self._process_single_payment, task): task
for task in payment_tasks
}
# Process results as they complete
for future in as_completed(future_to_payment):
try:
result = future.result(timeout=60)
if result['success'] and result['result']:
# Immediately commit each successful payment
self._update_payment_result(result['payment_id'], result['result'])
processed_count += 1
self.logger.info(f"Payment {result['payment_id']} processed ({processed_count}/{len(payments)})")
else:
failed_count += 1
self.logger.warning(f"Payment {result['payment_id']} failed ({failed_count} failures)")
except Exception as e:
payment_data = future_to_payment[future]
failed_count += 1
self.logger.error(f"Thread exception for payment {payment_data['payment_id']}: {e}")
return (processed_count, failed_count)
def _process_single_payment(self, payment_data: Dict[str, Any]) -> Dict[str, Any]:
"""
Process a single payment (thread-safe).
Args:
payment_data: Dictionary containing payment information
Returns:
Dictionary with payment result and metadata
"""
try:
# Process payment with Stripe
result = self.stripe_processor.process_payment(
customer_id=payment_data['customer_id'],
amount=payment_data['amount'],
currency=payment_data['currency'],
description=payment_data['description'],
stripe_pm=payment_data['stripe_pm']
)
return {
'payment_id': payment_data['payment_id'],
'result': result,
'success': True
}
except Exception as e:
self.logger.error(f"Payment processing failed for payment ID {payment_data['payment_id']}: {e}")
return {
'payment_id': payment_data['payment_id'],
'result': None,
'success': False,
'error': str(e)
}
def _update_payment_result(self, payment_id: int, result: Dict[str, Any]):
"""
Thread-safe update of payment result to database.
Args:
payment_id: Payment ID
result: Payment processing result
"""
with self.db_lock:
try:
if result:
self.handle_payment_result(payment_id, result, payment_type="pay")
self.logger.debug(f"Payment {payment_id} result committed to database")
else:
self.logger.warning(f"No result to commit for payment {payment_id}")
except Exception as e:
self.logger.error(f"Failed to update payment {payment_id}: {e}")