You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

306 lines
11 KiB

"""
Batch Payment Processor Module
This module handles batch processing of Direct Debit and Card payments.
"""
import sys
import json
import logging
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
from typing import Dict, Any, List
from .base_processor import BasePaymentProcessor
logger = logging.getLogger(__name__)
class BatchPaymentProcessor(BasePaymentProcessor):
"""
Processor for batch payment mode.
This class handles the batch processing of Direct Debit and Card payments,
including multi-threaded payment execution.
"""
def __init__(self, *args, **kwargs):
"""Initialize the batch payment processor."""
super().__init__(*args, **kwargs)
self.db_lock = threading.Lock()
def process(self) -> Dict[str, Any]:
"""
Process batch payments for Direct Debit and Card payment methods.
Returns:
Dictionary with processing results
"""
start_time = datetime.now()
self.log_processing_start("Batch")
print(f"Batch Processor Mode: {self.process_live}")
# Create batches for both payment methods
if self.process_live:
print("LIVE MODE")
batch_ids = self._create_payment_batches()
else:
print("The Sandbox")
batch_ids = self._create_payment_batches()
#sys.exit()
if not batch_ids:
self.logger.warning("No batches created for processing")
return {
'success': False,
'batch_ids': [],
'success_count': 0,
'failed_count': 0,
'duration': (datetime.now() - start_time).total_seconds()
}
# Execute payments for all batches
success_count, failed_count = self._execute_payment_batches(batch_ids)
# Calculate duration and log completion
duration = (datetime.now() - start_time).total_seconds()
self.log_processing_complete(
"Batch",
success_count,
failed_count,
duration,
{'batch_ids': batch_ids}
)
return {
'success': True,
'batch_ids': batch_ids,
'success_count': success_count,
'failed_count': failed_count,
'duration': duration
}
def _create_payment_batches(self) -> List[int]:
"""
Create payment batches for Direct Debit and Card payments.
Returns:
List of batch IDs
"""
batch_ids = []
payment_methods = [
self.config.PAYMENT_METHOD_CARD,
self.config.PAYMENT_METHOD_DIRECT_DEBIT
]
payment_method_names = {
self.config.PAYMENT_METHOD_CARD: "Card Payment",
self.config.PAYMENT_METHOD_DIRECT_DEBIT: "Direct Debit"
}
#for pm in payment_methods:
# Create batch
batch_id = self.payment_repo.create_payment_batch()
if batch_id is None:
self.logger.error(f"Failed to create batch for payment method {payment_methods}")
#continue
# Query customers for this payment method
if self.process_live:
customers = self.payment_repo.query_mysql_customers(
mysql_config=self.config.MYSQL_CONFIG,
payment_methods=payment_methods,
deposit_threshold=self.config.DEPOSIT_THRESHOLD,
limit=self.config.DEFAULT_QUERY_LIMIT
)
invoices = self.payment_repo.query_mysql_customer_invoices(
mysql_config=self.config.MYSQL_CONFIG,
limit=self.config.DEFAULT_QUERY_LIMIT
)
else:
print("Collecting test customer data")
#customers = self.payment_repo.get_test_customer_data(payment_method=payment_methods)
customers = self.payment_repo.query_mysql_customers(
mysql_config=self.config.MYSQL_CONFIG,
payment_methods=payment_methods,
deposit_threshold=self.config.DEPOSIT_THRESHOLD,
limit=self.config.DEFAULT_QUERY_LIMIT
)
invoices = self.payment_repo.query_mysql_customer_invoices(
mysql_config=self.config.MYSQL_CONFIG,
limit=self.config.DEFAULT_QUERY_LIMIT
)
if customers:
customer_count = len(customers)
result = self.payment_repo.add_payments_to_batch(customers, batch_id, invoices)
if result['added'] > 0:
batch_ids.append(batch_id)
#self.logger.info(f"Created batch {batch_id} for {payment_method_names[pm]} with {customer_count} customers")
self.logger.info(f"Created batch {batch_id} with {customer_count} customers")
# Log batch creation using services
try:
from services import log_batch_created
#log_batch_created(batch_id, payment_method_names[pm], customer_count)
log_batch_created(batch_id, str(payment_methods), customer_count)
except Exception as e:
self.logger.warning(f"Failed to log batch creation: {e}")
else:
self.logger.warning(f"No payments added to batch {batch_id}")
else:
#self.logger.info(f"No customers found for {payment_method_names[pm]}")
self.logger.info(f"No customers found for {str(payment_methods)}")
#sys.exit()
return batch_ids
def _execute_payment_batches(self, batch_ids: List[int]) -> tuple:
"""
Execute payments for all provided batch IDs using thread pool.
Args:
batch_ids: List of batch IDs to process
Returns:
Tuple of (success_count, failed_count)
"""
if not batch_ids:
self.logger.warning("No valid batches to process")
return (0, 0)
max_threads = self.config.MAX_PAYMENT_THREADS
total_success = 0
total_failed = 0
for batch_id in batch_ids:
self.logger.info(f"Processing batch {batch_id}")
# Get payments for this batch
payments = self.payment_repo.get_payments_by_batch(batch_id)
if not payments:
self.logger.info(f"No payments found for batch {batch_id}")
continue
self.logger.info(f"Processing {len(payments)} payments using {max_threads} threads")
success_count, failed_count = self._process_payments_threaded(
payments,
max_threads
)
total_success += success_count
total_failed += failed_count
self.logger.info(f"Batch {batch_id} completed: {success_count}/{len(payments)} successful")
return (total_success, total_failed)
def _process_payments_threaded(self, payments: List[Any], max_threads: int) -> tuple:
"""
Process payments using thread pool with immediate commits.
Args:
payments: List of payment records
max_threads: Maximum number of concurrent threads
Returns:
Tuple of (success_count, failed_count)
"""
processed_count = 0
failed_count = 0
# Process payments in chunks to avoid timeout issues
chunk_size = max_threads * 2
for i in range(0, len(payments), chunk_size):
chunk = payments[i:i + chunk_size]
self.logger.info(f"Processing chunk {i//chunk_size + 1}: payments {i+1}-{min(i+chunk_size, len(payments))}")
# Prepare payment tasks for this chunk
payment_tasks = []
for pay in chunk:
payment_data = self.create_payment_data(pay, pay.id)
payment_tasks.append(payment_data)
# Process chunk with ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=max_threads) as executor:
future_to_payment = {
executor.submit(self._process_single_payment, task): task
for task in payment_tasks
}
# Process results as they complete
for future in as_completed(future_to_payment):
try:
result = future.result(timeout=60)
if result['success'] and result['result']:
# Immediately commit each successful payment
self._update_payment_result(result['payment_id'], result['result'])
processed_count += 1
self.logger.info(f"Payment {result['payment_id']} processed ({processed_count}/{len(payments)})")
else:
failed_count += 1
self.logger.warning(f"Payment {result['payment_id']} failed ({failed_count} failures)")
except Exception as e:
payment_data = future_to_payment[future]
failed_count += 1
self.logger.error(f"Thread exception for payment {payment_data['payment_id']}: {e}")
self.logger.info(f"Chunk completed: {processed_count} processed, {failed_count} failed")
return (processed_count, failed_count)
def _process_single_payment(self, payment_data: Dict[str, Any]) -> Dict[str, Any]:
"""
Process a single payment (thread-safe).
Args:
payment_data: Dictionary containing payment information
Returns:
Dictionary with payment result and metadata
"""
try:
# Process payment with Stripe
print(f"\n\nBatch Processor - processPaymentResult - Mode: {self.config.MODE}")
result = self.stripe_processor.process_payment(
customer_id=payment_data['customer_id'],
amount=payment_data['amount'],
currency=payment_data['currency'],
description=payment_data['description'],
stripe_pm=payment_data['stripe_pm']
)
return {
'payment_id': payment_data['payment_id'],
'result': result,
'success': True
}
except Exception as e:
self.logger.error(f"Payment processing failed for payment ID {payment_data['payment_id']}: {e}")
return {
'payment_id': payment_data['payment_id'],
'result': None,
'success': False,
'error': str(e)
}
def _update_payment_result(self, payment_id: int, result: Dict[str, Any]):
"""
Thread-safe update of payment result to database.
Args:
payment_id: Payment ID
result: Payment processing result
"""
with self.db_lock:
try:
if result:
self.handle_payment_result(payment_id, result, payment_type="pay")
self.logger.debug(f"Payment {payment_id} result committed to database")
else:
self.logger.warning(f"No result to commit for payment {payment_id}")
except Exception as e:
self.logger.error(f"Failed to update payment {payment_id}: {e}")