You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
306 lines
11 KiB
306 lines
11 KiB
"""
|
|
Batch Payment Processor Module
|
|
|
|
This module handles batch processing of Direct Debit and Card payments.
|
|
"""
|
|
import sys
|
|
import json
|
|
import logging
|
|
import threading
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
from datetime import datetime
|
|
from typing import Dict, Any, List
|
|
from .base_processor import BasePaymentProcessor
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class BatchPaymentProcessor(BasePaymentProcessor):
|
|
"""
|
|
Processor for batch payment mode.
|
|
|
|
This class handles the batch processing of Direct Debit and Card payments,
|
|
including multi-threaded payment execution.
|
|
"""
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
"""Initialize the batch payment processor."""
|
|
super().__init__(*args, **kwargs)
|
|
self.db_lock = threading.Lock()
|
|
|
|
def process(self) -> Dict[str, Any]:
|
|
"""
|
|
Process batch payments for Direct Debit and Card payment methods.
|
|
|
|
Returns:
|
|
Dictionary with processing results
|
|
"""
|
|
start_time = datetime.now()
|
|
self.log_processing_start("Batch")
|
|
|
|
print(f"Batch Processor Mode: {self.process_live}")
|
|
|
|
# Create batches for both payment methods
|
|
if self.process_live:
|
|
print("LIVE MODE")
|
|
batch_ids = self._create_payment_batches()
|
|
else:
|
|
print("The Sandbox")
|
|
batch_ids = self._create_payment_batches()
|
|
#sys.exit()
|
|
|
|
if not batch_ids:
|
|
self.logger.warning("No batches created for processing")
|
|
return {
|
|
'success': False,
|
|
'batch_ids': [],
|
|
'success_count': 0,
|
|
'failed_count': 0,
|
|
'duration': (datetime.now() - start_time).total_seconds()
|
|
}
|
|
|
|
# Execute payments for all batches
|
|
success_count, failed_count = self._execute_payment_batches(batch_ids)
|
|
|
|
# Calculate duration and log completion
|
|
duration = (datetime.now() - start_time).total_seconds()
|
|
self.log_processing_complete(
|
|
"Batch",
|
|
success_count,
|
|
failed_count,
|
|
duration,
|
|
{'batch_ids': batch_ids}
|
|
)
|
|
|
|
return {
|
|
'success': True,
|
|
'batch_ids': batch_ids,
|
|
'success_count': success_count,
|
|
'failed_count': failed_count,
|
|
'duration': duration
|
|
}
|
|
|
|
def _create_payment_batches(self) -> List[int]:
|
|
"""
|
|
Create payment batches for Direct Debit and Card payments.
|
|
|
|
Returns:
|
|
List of batch IDs
|
|
"""
|
|
batch_ids = []
|
|
payment_methods = [
|
|
self.config.PAYMENT_METHOD_CARD,
|
|
self.config.PAYMENT_METHOD_DIRECT_DEBIT
|
|
]
|
|
|
|
payment_method_names = {
|
|
self.config.PAYMENT_METHOD_CARD: "Card Payment",
|
|
self.config.PAYMENT_METHOD_DIRECT_DEBIT: "Direct Debit"
|
|
}
|
|
|
|
#for pm in payment_methods:
|
|
# Create batch
|
|
batch_id = self.payment_repo.create_payment_batch()
|
|
if batch_id is None:
|
|
self.logger.error(f"Failed to create batch for payment method {payment_methods}")
|
|
#continue
|
|
|
|
# Query customers for this payment method
|
|
if self.process_live:
|
|
customers = self.payment_repo.query_mysql_customers(
|
|
mysql_config=self.config.MYSQL_CONFIG,
|
|
payment_methods=payment_methods,
|
|
deposit_threshold=self.config.DEPOSIT_THRESHOLD,
|
|
limit=self.config.DEFAULT_QUERY_LIMIT
|
|
)
|
|
invoices = self.payment_repo.query_mysql_customer_invoices(
|
|
mysql_config=self.config.MYSQL_CONFIG,
|
|
limit=self.config.DEFAULT_QUERY_LIMIT
|
|
)
|
|
else:
|
|
print("Collecting test customer data")
|
|
#customers = self.payment_repo.get_test_customer_data(payment_method=payment_methods)
|
|
customers = self.payment_repo.query_mysql_customers(
|
|
mysql_config=self.config.MYSQL_CONFIG,
|
|
payment_methods=payment_methods,
|
|
deposit_threshold=self.config.DEPOSIT_THRESHOLD,
|
|
limit=self.config.DEFAULT_QUERY_LIMIT
|
|
)
|
|
invoices = self.payment_repo.query_mysql_customer_invoices(
|
|
mysql_config=self.config.MYSQL_CONFIG,
|
|
limit=self.config.DEFAULT_QUERY_LIMIT
|
|
)
|
|
|
|
if customers:
|
|
customer_count = len(customers)
|
|
result = self.payment_repo.add_payments_to_batch(customers, batch_id, invoices)
|
|
|
|
if result['added'] > 0:
|
|
batch_ids.append(batch_id)
|
|
#self.logger.info(f"Created batch {batch_id} for {payment_method_names[pm]} with {customer_count} customers")
|
|
self.logger.info(f"Created batch {batch_id} with {customer_count} customers")
|
|
|
|
# Log batch creation using services
|
|
try:
|
|
from services import log_batch_created
|
|
#log_batch_created(batch_id, payment_method_names[pm], customer_count)
|
|
log_batch_created(batch_id, str(payment_methods), customer_count)
|
|
except Exception as e:
|
|
self.logger.warning(f"Failed to log batch creation: {e}")
|
|
else:
|
|
self.logger.warning(f"No payments added to batch {batch_id}")
|
|
else:
|
|
#self.logger.info(f"No customers found for {payment_method_names[pm]}")
|
|
self.logger.info(f"No customers found for {str(payment_methods)}")
|
|
sys.exit()
|
|
return batch_ids
|
|
|
|
def _execute_payment_batches(self, batch_ids: List[int]) -> tuple:
|
|
"""
|
|
Execute payments for all provided batch IDs using thread pool.
|
|
|
|
Args:
|
|
batch_ids: List of batch IDs to process
|
|
|
|
Returns:
|
|
Tuple of (success_count, failed_count)
|
|
"""
|
|
if not batch_ids:
|
|
self.logger.warning("No valid batches to process")
|
|
return (0, 0)
|
|
|
|
max_threads = self.config.MAX_PAYMENT_THREADS
|
|
total_success = 0
|
|
total_failed = 0
|
|
|
|
for batch_id in batch_ids:
|
|
self.logger.info(f"Processing batch {batch_id}")
|
|
|
|
# Get payments for this batch
|
|
payments = self.payment_repo.get_payments_by_batch(batch_id)
|
|
if not payments:
|
|
self.logger.info(f"No payments found for batch {batch_id}")
|
|
continue
|
|
|
|
self.logger.info(f"Processing {len(payments)} payments using {max_threads} threads")
|
|
|
|
success_count, failed_count = self._process_payments_threaded(
|
|
payments,
|
|
max_threads
|
|
)
|
|
|
|
total_success += success_count
|
|
total_failed += failed_count
|
|
|
|
self.logger.info(f"Batch {batch_id} completed: {success_count}/{len(payments)} successful")
|
|
|
|
return (total_success, total_failed)
|
|
|
|
def _process_payments_threaded(self, payments: List[Any], max_threads: int) -> tuple:
|
|
"""
|
|
Process payments using thread pool with immediate commits.
|
|
|
|
Args:
|
|
payments: List of payment records
|
|
max_threads: Maximum number of concurrent threads
|
|
|
|
Returns:
|
|
Tuple of (success_count, failed_count)
|
|
"""
|
|
processed_count = 0
|
|
failed_count = 0
|
|
|
|
# Process payments in chunks to avoid timeout issues
|
|
chunk_size = max_threads * 2
|
|
for i in range(0, len(payments), chunk_size):
|
|
chunk = payments[i:i + chunk_size]
|
|
self.logger.info(f"Processing chunk {i//chunk_size + 1}: payments {i+1}-{min(i+chunk_size, len(payments))}")
|
|
|
|
# Prepare payment tasks for this chunk
|
|
payment_tasks = []
|
|
for pay in chunk:
|
|
payment_data = self.create_payment_data(pay, pay.id)
|
|
payment_tasks.append(payment_data)
|
|
|
|
# Process chunk with ThreadPoolExecutor
|
|
with ThreadPoolExecutor(max_workers=max_threads) as executor:
|
|
future_to_payment = {
|
|
executor.submit(self._process_single_payment, task): task
|
|
for task in payment_tasks
|
|
}
|
|
|
|
# Process results as they complete
|
|
for future in as_completed(future_to_payment):
|
|
try:
|
|
result = future.result(timeout=60)
|
|
|
|
if result['success'] and result['result']:
|
|
# Immediately commit each successful payment
|
|
self._update_payment_result(result['payment_id'], result['result'])
|
|
processed_count += 1
|
|
self.logger.info(f"Payment {result['payment_id']} processed ({processed_count}/{len(payments)})")
|
|
else:
|
|
failed_count += 1
|
|
self.logger.warning(f"Payment {result['payment_id']} failed ({failed_count} failures)")
|
|
|
|
except Exception as e:
|
|
payment_data = future_to_payment[future]
|
|
failed_count += 1
|
|
self.logger.error(f"Thread exception for payment {payment_data['payment_id']}: {e}")
|
|
|
|
self.logger.info(f"Chunk completed: {processed_count} processed, {failed_count} failed")
|
|
|
|
return (processed_count, failed_count)
|
|
|
|
def _process_single_payment(self, payment_data: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""
|
|
Process a single payment (thread-safe).
|
|
|
|
Args:
|
|
payment_data: Dictionary containing payment information
|
|
|
|
Returns:
|
|
Dictionary with payment result and metadata
|
|
"""
|
|
try:
|
|
# Process payment with Stripe
|
|
print(f"\n\nBatch Processor - processPaymentResult - Mode: {self.config.MODE}")
|
|
result = self.stripe_processor.process_payment(
|
|
customer_id=payment_data['customer_id'],
|
|
amount=payment_data['amount'],
|
|
currency=payment_data['currency'],
|
|
description=payment_data['description'],
|
|
stripe_pm=payment_data['stripe_pm']
|
|
)
|
|
|
|
return {
|
|
'payment_id': payment_data['payment_id'],
|
|
'result': result,
|
|
'success': True
|
|
}
|
|
except Exception as e:
|
|
self.logger.error(f"Payment processing failed for payment ID {payment_data['payment_id']}: {e}")
|
|
return {
|
|
'payment_id': payment_data['payment_id'],
|
|
'result': None,
|
|
'success': False,
|
|
'error': str(e)
|
|
}
|
|
|
|
def _update_payment_result(self, payment_id: int, result: Dict[str, Any]):
|
|
"""
|
|
Thread-safe update of payment result to database.
|
|
|
|
Args:
|
|
payment_id: Payment ID
|
|
result: Payment processing result
|
|
"""
|
|
with self.db_lock:
|
|
try:
|
|
if result:
|
|
self.handle_payment_result(payment_id, result, payment_type="pay")
|
|
self.logger.debug(f"Payment {payment_id} result committed to database")
|
|
else:
|
|
self.logger.warning(f"No result to commit for payment {payment_id}")
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to update payment {payment_id}: {e}")
|
|
|