You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
1095 lines
44 KiB
1095 lines
44 KiB
#!/usr/bin/env python3
|
|
"""
|
|
External script to query MySQL database (Splynx) for customer billing data.
|
|
This script runs independently of the Flask application.
|
|
|
|
Usage: python query_mysql.py [mode] [live]
|
|
Modes: batch, payintent, payplan, refund
|
|
"""
|
|
|
|
import pymysql
|
|
import sys
|
|
import json
|
|
import random
|
|
import threading
|
|
import logging
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
from datetime import datetime
|
|
from typing import List, Dict, Union, Any
|
|
from stripe_payment_processor import StripePaymentProcessor
|
|
from config import Config
|
|
from app import create_app, db
|
|
from models import Payments, PaymentBatch, SinglePayments, PaymentPlans
|
|
from splynx import Splynx, SPLYNX_URL, SPLYNX_KEY, SPLYNX_SECRET
|
|
from services import (
|
|
log_script_start, log_script_completion, log_batch_created,
|
|
log_payment_intent_followup
|
|
)
|
|
from notification_service import NotificationService
|
|
|
|
# Configure logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
|
|
handlers=[
|
|
logging.FileHandler('payment_processing.log'),
|
|
logging.StreamHandler(sys.stdout)
|
|
]
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Initialize Splynx API
|
|
splynx = Splynx(url=SPLYNX_URL, key=SPLYNX_KEY, secret=SPLYNX_SECRET)
|
|
|
|
# Import constants from config
|
|
PAYMENT_METHOD_DIRECT_DEBIT = Config.PAYMENT_METHOD_DIRECT_DEBIT
|
|
PAYMENT_METHOD_CARD = Config.PAYMENT_METHOD_CARD
|
|
PAYMENT_METHOD_PAYMENT_PLAN = Config.PAYMENT_METHOD_PAYMENT_PLAN
|
|
PROCESS_LIVE = Config.PROCESS_LIVE
|
|
|
|
# Get Stripe API key from config
|
|
if PROCESS_LIVE:
|
|
api_key = Config.STRIPE_LIVE_API_KEY
|
|
else:
|
|
api_key = Config.STRIPE_TEST_API_KEY
|
|
#test_stripe_customers = ['cus_SoQqMGLmCjiBDZ', 'cus_SoQptxwe8hczGz', 'cus_SoQjeNXkKOdORI', 'cus_SoQiDcSrNRxbPF', 'cus_SoQedaG3q2ecKG', 'cus_SoQeTkzMA7AaLR', 'cus_SoQeijBTETQcGb', 'cus_SoQe259iKMgz7o', 'cus_SoQejTstdXEDTO', 'cus_SoQeQH2ORWBOWX', 'cus_SoQevtyWxqXtpC', 'cus_SoQekOFUHugf26', 'cus_SoPq6Zh0MCUR9W', 'cus_SoPovwUPJmvugz', 'cus_SoPnvGfejhpSR5', 'cus_SoNAgAbkbFo8ZY', 'cus_SoMyDihTxRsa7U', 'cus_SoMVPWxdYstYbr', 'cus_SoMVQ6Xj2dIrCR', 'cus_SoMVmBn1xipFEB', 'cus_SoMVNvZ2Iawb7Y', 'cus_SoMVZupj6wRy5e', 'cus_SoMVqjH7zkc5Qe', 'cus_SoMVkzj0ZUK0Ai', 'cus_SoMVFq3BUD3Njw', 'cus_SoLcrRrvoy9dJ4', 'cus_SoLcqHN1k0WD8j', 'cus_SoLcLtYDZGG32V', 'cus_SoLcG23ilNeMYt', 'cus_SoLcFhtUVzqumj', 'cus_SoLcPgMnuogINl', 'cus_SoLccGTY9mMV7T', 'cus_SoLRxqvJxuKFes', 'cus_SoKs7cjdcvW1oO']
|
|
|
|
|
|
|
|
|
|
|
|
def create_customer_friendly_message(payment_data: dict, error_details: str) -> str:
|
|
"""
|
|
Create a customer-friendly ticket message for failed payments.
|
|
|
|
Args:
|
|
payment_data: Dictionary containing payment information
|
|
error_details: Raw error details
|
|
|
|
Returns:
|
|
str: HTML formatted customer-friendly message
|
|
"""
|
|
try:
|
|
# Import classify_payment_error from main.py
|
|
from blueprints.main import classify_payment_error
|
|
|
|
# Extract payment details
|
|
amount = abs(payment_data.get('amount', 0))
|
|
splynx_id = payment_data.get('splynx_id', 'Unknown')
|
|
|
|
# Parse PI_JSON for payment method details if available
|
|
pi_json = payment_data.get('pi_json')
|
|
payment_method_type = "unknown"
|
|
last4 = "****"
|
|
|
|
if pi_json:
|
|
try:
|
|
parsed_json = json.loads(pi_json)
|
|
payment_method_type = parsed_json.get('payment_method_type', 'unknown')
|
|
|
|
# Get last 4 digits from various possible locations in JSON
|
|
if 'payment_method_details' in parsed_json:
|
|
pm_details = parsed_json['payment_method_details']
|
|
if payment_method_type == 'card' and 'card' in pm_details:
|
|
last4 = pm_details['card'].get('last4', '****')
|
|
elif payment_method_type == 'au_becs_debit' and 'au_becs_debit' in pm_details:
|
|
last4 = pm_details['au_becs_debit'].get('last4', '****')
|
|
elif 'last4' in parsed_json:
|
|
last4 = parsed_json.get('last4', '****')
|
|
except:
|
|
pass
|
|
|
|
# Format payment method for display
|
|
if payment_method_type == 'au_becs_debit':
|
|
payment_method_display = f"Bank Account ending in {last4}"
|
|
elif payment_method_type == 'card':
|
|
payment_method_display = f"Card ending in {last4}"
|
|
else:
|
|
payment_method_display = "Payment method"
|
|
|
|
# Get current datetime
|
|
current_time = datetime.now().strftime("%d/%m/%Y at %I:%M %p")
|
|
|
|
# Get customer-friendly error explanation
|
|
error_classification = classify_payment_error(error_details, pi_json)
|
|
if error_classification:
|
|
error_message = error_classification['message']
|
|
else:
|
|
error_message = "An error occurred during payment processing"
|
|
|
|
# Create customer-friendly HTML message
|
|
customer_message = f"""
|
|
<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.0 Transitional//EN" "http://www.w3.org/TR/REC-html40/loose.dtd">
|
|
<html>
|
|
<body>
|
|
<div>Your payment attempt was unsuccessful.</div>
|
|
<div><br></div>
|
|
<div><strong>Payment Details:</strong></div>
|
|
<div>• Amount: ${amount:.2f} AUD</div>
|
|
<div>• Date/Time: {current_time}</div>
|
|
<div>• {payment_method_display}</div>
|
|
<div><br></div>
|
|
<div><strong>Issue:</strong> {error_message}</div>
|
|
<div><br></div>
|
|
<div>Please contact us if you need assistance with your payment.</div>
|
|
</body>
|
|
</html>
|
|
"""
|
|
|
|
return customer_message.strip()
|
|
|
|
except Exception as e:
|
|
# Fallback message if there's any error creating the friendly message
|
|
logger.error(f"Error creating customer-friendly message: {e}")
|
|
return f"""
|
|
<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.0 Transitional//EN" "http://www.w3.org/TR/REC-html40/loose.dtd">
|
|
<html>
|
|
<body>
|
|
<div>Your payment attempt was unsuccessful. Please contact us for assistance.</div>
|
|
</body>
|
|
</html>
|
|
"""
|
|
|
|
|
|
|
|
def find_pay_splynx_invoices(splynx_id: int) -> List[Dict[str, Any]]:
|
|
result = splynx.get(url=f"/api/2.0/admin/finance/invoices?main_attributes[customer_id]={splynx_id}&main_attributes[status]=not_paid&main_attributes[status]=pending")
|
|
|
|
invoice_pay = {
|
|
"status": "paid"
|
|
}
|
|
|
|
updated_invoices = []
|
|
for pay in result:
|
|
res = splynx.put(url=f"/api/2.0/admin/finance/invoices/{pay['id']}", params=invoice_pay)
|
|
if res:
|
|
updated_invoices.append(res)
|
|
return updated_invoices
|
|
|
|
def find_set_pending_splynx_invoices(splynx_id: int) -> List[Dict[str, Any]]:
|
|
result = splynx.get(url=f"/api/2.0/admin/finance/invoices?main_attributes[customer_id]={splynx_id}&main_attributes[status]=not_paid")
|
|
|
|
invoice_pay = {
|
|
"status": "pending"
|
|
}
|
|
|
|
updated_invoices = []
|
|
for pay in result:
|
|
res = splynx.put(url=f"/api/2.0/admin/finance/invoices/{pay['id']}", params=invoice_pay)
|
|
if res:
|
|
updated_invoices.append(res)
|
|
return updated_invoices
|
|
|
|
def find_set_pending_splynx_invoices_to_unpaid(splynx_id: int) -> List[Dict[str, Any]]:
|
|
result = splynx.get(url=f"/api/2.0/admin/finance/invoices?main_attributes[customer_id]={splynx_id}&main_attributes[status]=pending")
|
|
|
|
invoice_pay = {
|
|
"status": "not_paid"
|
|
}
|
|
|
|
updated_invoices = []
|
|
for pay in result:
|
|
res = splynx.put(url=f"/api/2.0/admin/finance/invoices/{pay['id']}", params=invoice_pay)
|
|
if res:
|
|
updated_invoices.append(res)
|
|
return updated_invoices
|
|
|
|
def delete_splynx_invoices(splynx_id: int, payintent: str) -> Dict[str, Any]:
|
|
"""Delete Splynx payment records for a given customer and payment intent."""
|
|
try:
|
|
params = {
|
|
'main_attributes': {
|
|
'customer_id': splynx_id,
|
|
'field_1': payintent
|
|
},
|
|
}
|
|
query_string = splynx.build_splynx_query_params(params)
|
|
result = splynx.get(url=f"/api/2.0/admin/finance/payments?{query_string}")
|
|
|
|
if not result:
|
|
logger.warning(f"No Splynx payment found for customer {splynx_id}, payment intent {payintent}")
|
|
return {'success': False, 'error': 'No payment found to delete'}
|
|
|
|
logger.info(f"Found {len(result)} Splynx payment(s) to delete for customer {splynx_id}")
|
|
|
|
delete_success = splynx.delete(url=f"/api/2.0/admin/finance/payments/{result[0]['id']}")
|
|
|
|
if delete_success:
|
|
logger.info(f"Successfully deleted Splynx Payment ID: {result[0]['id']} for customer: {splynx_id}")
|
|
return {
|
|
'success': True,
|
|
'deleted_payment_id': result[0]['id'],
|
|
'customer_id': splynx_id,
|
|
'payment_intent': payintent
|
|
}
|
|
else:
|
|
logger.error(f"Failed to delete Splynx Payment ID: {result[0]['id']} for customer: {splynx_id}")
|
|
return {'success': False, 'error': 'Delete operation failed'}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error deleting Splynx payment for customer {splynx_id}: {e}")
|
|
return {'success': False, 'error': str(e)}
|
|
|
|
def add_payment_splynx(splynx_id: int, pi_id: str, pay_id: int, amount: float) -> Union[int, bool]:
|
|
stripe_pay = {
|
|
"customer_id": splynx_id,
|
|
"amount": amount,
|
|
"date": str(datetime.now().strftime('%Y-%m-%d')),
|
|
"field_1": pi_id,
|
|
"field_2": f"Payment_ID (Batch): {pay_id}"
|
|
}
|
|
|
|
res = splynx.post(url="/api/2.0/admin/finance/payments", params=stripe_pay)
|
|
if res:
|
|
return res['id']
|
|
else:
|
|
return False
|
|
|
|
def handle_database_operation(operation_func: callable, operation_name: str) -> Any:
|
|
"""
|
|
Reusable function to handle database operations with consistent error handling.
|
|
|
|
Args:
|
|
operation_func: Function that performs the database operation
|
|
operation_name: String description of the operation for error messages
|
|
|
|
Returns:
|
|
Result of operation_func or None if failed
|
|
"""
|
|
try:
|
|
result = operation_func()
|
|
db.session.commit()
|
|
return result
|
|
except Exception as e:
|
|
db.session.rollback()
|
|
logger.error(f"{operation_name} failed: {e}")
|
|
return None
|
|
|
|
def is_payment_day(start_date_string: str, payplan_schedule: str, date_format: str = "%Y-%m-%d") -> bool:
|
|
"""
|
|
Check if today is a payment day based on a start date and frequency.
|
|
|
|
Args:
|
|
start_date_string (str): The first payment date
|
|
payplan_schedule (str): Payment frequency ("Weekly" or "Fortnightly")
|
|
date_format (str): Format of the date string
|
|
|
|
Returns:
|
|
bool: True if today is a payment day, False otherwise
|
|
"""
|
|
try:
|
|
if not start_date_string or not payplan_schedule:
|
|
logger.error("Missing required parameters for payment day calculation")
|
|
return False
|
|
|
|
if payplan_schedule == "Weekly":
|
|
num_days = 7
|
|
elif payplan_schedule == "Fortnightly":
|
|
num_days = 14
|
|
else:
|
|
logger.error(f"Unsupported payment schedule '{payplan_schedule}'")
|
|
return False
|
|
|
|
start_date = datetime.strptime(start_date_string, date_format)
|
|
today = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
|
|
start_date = start_date.replace(hour=0, minute=0, second=0, microsecond=0)
|
|
|
|
# Calculate days since start date
|
|
days_since_start = (today - start_date).days
|
|
|
|
# Check if it's a multiple of the payment frequency
|
|
return days_since_start >= 0 and days_since_start % num_days == 0
|
|
|
|
except ValueError as e:
|
|
logger.error(f"Error parsing date '{start_date_string}' with format '{date_format}': {e}")
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"Unexpected error in is_payment_day: {e}")
|
|
return False
|
|
|
|
|
|
def query_payplan_customers() -> List[Dict[str, Any]]:
|
|
"""Query customer billing data from MySQL database and find Payment Plan customers."""
|
|
to_return = []
|
|
customers = db.session.query(PaymentPlans).filter(PaymentPlans.Enabled == True).all()
|
|
|
|
for cust in customers:
|
|
if cust.Start_Date and is_payment_day(start_date_string=str(cust.Start_Date.strftime('%Y-%m-%d')), payplan_schedule=cust.Frequency):
|
|
payment_data = {
|
|
"customer_id": cust.Splynx_ID,
|
|
"stripe_customer_id": cust.Stripe_Customer_ID,
|
|
"deposit": cust.Amount*-1,
|
|
"stripe_pm": cust.Stripe_Payment_Method,
|
|
"paymentplan_id": cust.id
|
|
}
|
|
to_return.append(payment_data)
|
|
|
|
return to_return
|
|
|
|
|
|
def query_splynx_customers(pm: int) -> Union[List[Dict[str, Any]], bool]:
|
|
"""Query customer billing data from MySQL database."""
|
|
|
|
connection = None
|
|
try:
|
|
# Connect to MySQL database
|
|
connection = pymysql.connect(
|
|
host=Config.MYSQL_CONFIG['host'],
|
|
database=Config.MYSQL_CONFIG['database'],
|
|
user=Config.MYSQL_CONFIG['user'],
|
|
password=Config.MYSQL_CONFIG['password'],
|
|
port=Config.MYSQL_CONFIG['port'],
|
|
autocommit=False,
|
|
cursorclass=pymysql.cursors.DictCursor # Return results as dictionaries
|
|
)
|
|
|
|
logger.info("Connected to MySQL database successfully")
|
|
logger.info(f"Database: {Config.MYSQL_CONFIG['database']} on {Config.MYSQL_CONFIG['host']}")
|
|
logger.info("-" * 80)
|
|
|
|
## Payment Method:
|
|
## 2 - Direct Debit (Automatic)
|
|
## 3 - Card Payment (Automatic)
|
|
## 9 - Payment Plan
|
|
|
|
# Execute the query with DISTINCT to prevent duplicate customers
|
|
query = """
|
|
SELECT DISTINCT
|
|
cb.customer_id,
|
|
cb.deposit,
|
|
cb.payment_method,
|
|
pad.field_1 AS stripe_customer_id
|
|
FROM customer_billing cb
|
|
LEFT OUTER JOIN payment_account_data pad ON cb.customer_id = pad.customer_id
|
|
WHERE cb.payment_method = %s
|
|
AND cb.deposit < %s
|
|
AND pad.field_1 IS NOT NULL
|
|
AND NOT EXISTS (
|
|
SELECT 1
|
|
FROM invoices i
|
|
WHERE i.customer_id = cb.customer_id
|
|
AND i.status = 'pending'
|
|
)
|
|
GROUP BY cb.customer_id, cb.deposit, cb.payment_method, pad.field_1
|
|
ORDER BY cb.payment_method ASC, cb.customer_id ASC
|
|
LIMIT %s
|
|
"""
|
|
|
|
with connection.cursor() as cursor:
|
|
cursor.execute(query, (pm, Config.DEPOSIT_THRESHOLD, Config.DEFAULT_QUERY_LIMIT))
|
|
results = cursor.fetchall()
|
|
|
|
if results:
|
|
logger.info(f"Found {len(results)} rows")
|
|
return results
|
|
else:
|
|
logger.info("No rows found matching the criteria")
|
|
return False
|
|
|
|
except pymysql.Error as e:
|
|
logger.error(f"MySQL Error: {e}")
|
|
sys.exit(1)
|
|
except Exception as e:
|
|
logger.error(f"Unexpected Error: {e}")
|
|
sys.exit(1)
|
|
finally:
|
|
if connection:
|
|
connection.close()
|
|
logger.info("MySQL connection closed")
|
|
|
|
|
|
def addInitialPayments(customers, batch_id):
|
|
added = {"added": 0, "failed": 0}
|
|
payments_to_add = []
|
|
|
|
# Prepare all payments first
|
|
for cust in customers:
|
|
stripe_customer_id = cust['stripe_customer_id']
|
|
|
|
add_payer = Payments(
|
|
PaymentBatch_ID = batch_id,
|
|
Splynx_ID = cust['customer_id'],
|
|
Stripe_Customer_ID = stripe_customer_id,
|
|
Payment_Amount = float(cust['deposit'])*-1,
|
|
Stripe_Payment_Method = cust.get('stripe_pm', None),
|
|
PaymentPlan_ID = cust.get('paymentplan_id', None)
|
|
)
|
|
payments_to_add.append(add_payer)
|
|
db.session.add(add_payer)
|
|
|
|
# Atomic commit for entire batch
|
|
try:
|
|
db.session.commit()
|
|
added["added"] = len(payments_to_add)
|
|
logger.info(f"Successfully added {len(payments_to_add)} payments to batch {batch_id}")
|
|
except Exception as e:
|
|
db.session.rollback()
|
|
added["failed"] = len(payments_to_add)
|
|
logger.error(f"addInitialPayments failed for entire batch {batch_id}: {e}")
|
|
|
|
logger.info(f"Database operation result: {json.dumps(added,indent=2)}")
|
|
|
|
def addPaymentBatch():
|
|
"""Create a new payment batch and return its ID."""
|
|
add_batch = PaymentBatch()
|
|
|
|
try:
|
|
db.session.add(add_batch)
|
|
db.session.commit()
|
|
return add_batch.id
|
|
except Exception as e:
|
|
db.session.rollback()
|
|
logger.error(f"addPaymentBatch failed: {e}")
|
|
return None
|
|
|
|
def processPaymentResult(pay_id, result, key):
|
|
if key == "pay":
|
|
payment = db.session.query(Payments).filter(Payments.id == pay_id).first()
|
|
elif key == "singlepay":
|
|
payment = db.session.query(SinglePayments).filter(SinglePayments.id == pay_id).first()
|
|
#try:
|
|
if result.get('error') and not result.get('needs_fee_update'):
|
|
payment.Error = f"Error Type: {result['error_type']}\nError: {result['error']}"
|
|
payment.Success = result['success']
|
|
payment.PI_JSON = json.dumps(result)
|
|
|
|
# Send notification and create ticket for failed payments
|
|
handle_failed_payment_notification(
|
|
payment_record=payment,
|
|
error_details=payment.Error,
|
|
payment_type=key
|
|
)
|
|
elif result.get('failure_details'):
|
|
payment.Error = f"Error Type: {result.get('failure_details').get('decline_code')}\nError: {result['failure_reason']}"
|
|
payment.Success = result['success']
|
|
payment.PI_JSON = json.dumps(result)
|
|
|
|
# Send notification and create ticket for failed payments
|
|
handle_failed_payment_notification(
|
|
payment_record=payment,
|
|
error_details=payment.Error,
|
|
payment_type=key
|
|
)
|
|
else:
|
|
print("Payment successful!")
|
|
if result.get('needs_fee_update'):
|
|
payment.PI_FollowUp = True
|
|
# Mark invoices as pending when PI_FollowUp is set
|
|
if PROCESS_LIVE:
|
|
find_set_pending_splynx_invoices(payment.Splynx_ID)
|
|
payment.Payment_Intent = result['payment_intent_id']
|
|
payment.Success = result['success']
|
|
if result['success'] and PROCESS_LIVE:
|
|
find_pay_splynx_invoices(payment.Splynx_ID)
|
|
add_payment_splynx(
|
|
splynx_id=payment.Splynx_ID,
|
|
pi_id=result['payment_intent_id'],
|
|
pay_id=payment.id,
|
|
amount=payment.Payment_Amount
|
|
)
|
|
if result.get('payment_method_type') == "card":
|
|
payment.Payment_Method = result['estimated_fee_details']['card_display_brand']
|
|
elif result.get('payment_method_type') == "au_becs_debit":
|
|
payment.Payment_Method = result['payment_method_type']
|
|
if payment.PI_JSON:
|
|
combined = {**json.loads(payment.PI_JSON), **result}
|
|
payment.PI_JSON = json.dumps(combined)
|
|
else:
|
|
payment.PI_JSON = json.dumps(result)
|
|
if result.get('fee_details'):
|
|
payment.Fee_Total = result['fee_details']['total_fee']
|
|
for fee_type in result['fee_details']['fee_breakdown']:
|
|
if fee_type['type'] == "tax":
|
|
payment.Fee_Tax = fee_type['amount']
|
|
elif fee_type['type'] == "stripe_fee":
|
|
payment.Fee_Stripe = fee_type['amount']
|
|
#except Exception as e:
|
|
# logger.error(f"processPaymentResult: {e}\nResult: {json.dumps(result)}")
|
|
# payment.PI_FollowUp = True
|
|
# if PROCESS_LIVE:
|
|
# find_set_pending_splynx_invoices(payment.Splynx_ID)
|
|
|
|
def _update_payment():
|
|
return True # Just need to trigger commit, payment is already modified
|
|
|
|
handle_database_operation(_update_payment, "processPaymentResult")
|
|
|
|
# Thread lock for database operations
|
|
db_lock = threading.Lock()
|
|
|
|
def process_single_payment(processor, payment_data):
|
|
"""
|
|
Thread-safe function to process a single payment.
|
|
|
|
Args:
|
|
processor: StripePaymentProcessor instance
|
|
payment_data: Dict containing payment information
|
|
|
|
Returns:
|
|
Dict with payment result and metadata
|
|
"""
|
|
try:
|
|
# Process payment with Stripe (thread-safe)
|
|
result = processor.process_payment(
|
|
customer_id=payment_data['customer_id'],
|
|
amount=payment_data['amount'],
|
|
currency=payment_data['currency'],
|
|
description=payment_data['description'],
|
|
stripe_pm=payment_data['stripe_pm']
|
|
)
|
|
|
|
# Return result with payment ID for database update
|
|
return {
|
|
'payment_id': payment_data['payment_id'],
|
|
'result': result,
|
|
'success': True
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Payment processing failed for payment ID {payment_data['payment_id']}: {e}")
|
|
return {
|
|
'payment_id': payment_data['payment_id'],
|
|
'result': None,
|
|
'success': False,
|
|
'error': str(e)
|
|
}
|
|
|
|
def update_single_payment_result(payment_id, result):
|
|
"""
|
|
Thread-safe immediate update of single payment result to database.
|
|
Commits immediately to ensure data safety.
|
|
|
|
Args:
|
|
payment_id: ID of the payment to update
|
|
result: Payment processing result
|
|
"""
|
|
with db_lock:
|
|
try:
|
|
if result:
|
|
processPaymentResult(pay_id=payment_id, result=result, key="pay")
|
|
logger.info(f"Payment {payment_id} result committed to database")
|
|
else:
|
|
logger.warning(f"No result to commit for payment {payment_id}")
|
|
except Exception as e:
|
|
logger.error(f"Failed to update payment {payment_id}: {e}")
|
|
|
|
def process_batch_mode(processor):
|
|
"""Handle batch processing for Direct Debit and Card payments."""
|
|
to_run_batches = []
|
|
payment_methods = [PAYMENT_METHOD_DIRECT_DEBIT, PAYMENT_METHOD_CARD]
|
|
total_customers = 0
|
|
|
|
payment_method_names = {
|
|
PAYMENT_METHOD_DIRECT_DEBIT: "Direct Debit",
|
|
PAYMENT_METHOD_CARD: "Card Payment"
|
|
}
|
|
|
|
for pm in payment_methods:
|
|
batch_id = addPaymentBatch()
|
|
if batch_id is not None:
|
|
to_run_batches.append(batch_id)
|
|
customers = query_splynx_customers(pm)
|
|
if customers:
|
|
customer_count = len(customers)
|
|
total_customers += customer_count
|
|
addInitialPayments(customers=customers, batch_id=batch_id)
|
|
|
|
# Log batch creation
|
|
log_batch_created(batch_id, payment_method_names[pm], customer_count)
|
|
logger.info(f"Created batch {batch_id} for {payment_method_names[pm]} with {customer_count} customers")
|
|
else:
|
|
logger.info(f"No customers found for {payment_method_names[pm]}")
|
|
else:
|
|
logger.error(f"Failed to create batch for payment method {pm}")
|
|
|
|
return to_run_batches, 0, 0, 0.0 # Success/failed counts will be updated during execution
|
|
|
|
def process_payplan_mode(processor):
|
|
"""Handle payment plan processing."""
|
|
to_run_batches = []
|
|
|
|
# Get count of active payment plans for logging (if needed in future)
|
|
|
|
batch_id = addPaymentBatch()
|
|
if batch_id is not None:
|
|
to_run_batches.append(batch_id)
|
|
customers = query_payplan_customers()
|
|
due_plans_count = len(customers) if customers else 0
|
|
|
|
if customers:
|
|
total_amount = sum(abs(c.get('deposit', 0)) for c in customers)
|
|
addInitialPayments(customers=customers, batch_id=batch_id)
|
|
|
|
# Log batch creation for payment plans
|
|
log_batch_created(batch_id, "Payment Plan", due_plans_count)
|
|
logger.info(f"Created payment plan batch {batch_id} with {due_plans_count} due plans (${total_amount:,.2f} total)")
|
|
else:
|
|
logger.info("No payment plans due for processing today")
|
|
total_amount = 0.0
|
|
else:
|
|
logger.error("Failed to create batch for payment plan processing")
|
|
due_plans_count = 0
|
|
total_amount = 0.0
|
|
|
|
return to_run_batches, 0, 0, total_amount # Success/failed counts will be updated during execution
|
|
|
|
def execute_payment_batches(processor, batch_ids):
|
|
"""Execute payments for all provided batch IDs using safe threading with immediate commits."""
|
|
if not batch_ids:
|
|
logger.warning("No valid batches to process")
|
|
return
|
|
|
|
max_threads = Config.MAX_PAYMENT_THREADS
|
|
|
|
for batch in batch_ids:
|
|
if batch is None:
|
|
logger.warning("Skipping None batch ID")
|
|
continue
|
|
|
|
cust_pay = db.session.query(Payments).filter(Payments.PaymentBatch_ID == batch).all()
|
|
if not cust_pay:
|
|
logger.info(f"No payments found for batch {batch}")
|
|
continue
|
|
|
|
logger.info(f"Processing {len(cust_pay)} payments in batch {batch} using {max_threads} threads")
|
|
logger.info("Safety Mode: Each payment will be committed immediately to database")
|
|
|
|
# Process payments in smaller chunks to avoid timeout issues
|
|
processed_count = 0
|
|
failed_count = 0
|
|
|
|
# Process payments in chunks
|
|
chunk_size = max_threads * 2 # Process 2x thread count at a time
|
|
for i in range(0, len(cust_pay), chunk_size):
|
|
chunk = cust_pay[i:i + chunk_size]
|
|
logger.info(f"Processing chunk {i//chunk_size + 1}: payments {i+1}-{min(i+chunk_size, len(cust_pay))}")
|
|
|
|
# Prepare payment data for this chunk
|
|
payment_tasks = []
|
|
|
|
for pay in chunk:
|
|
if PROCESS_LIVE:
|
|
customer_id = pay.Stripe_Customer_ID
|
|
else:
|
|
customer_id = pay.Stripe_Customer_ID
|
|
payment_data = {
|
|
'payment_id': pay.id,
|
|
'customer_id': customer_id,
|
|
'amount': pay.Payment_Amount,
|
|
'currency': "aud",
|
|
'description': f"Payment ID: {pay.id} - Splynx ID: {pay.Splynx_ID}",
|
|
'stripe_pm': pay.Stripe_Payment_Method
|
|
}
|
|
logger.debug(f"payment_data: {json.dumps(payment_data,indent=2)}")
|
|
payment_tasks.append(payment_data)
|
|
|
|
# Process this chunk with ThreadPoolExecutor
|
|
with ThreadPoolExecutor(max_workers=max_threads) as executor:
|
|
# Submit tasks for this chunk
|
|
future_to_payment = {
|
|
executor.submit(process_single_payment, processor, task): task
|
|
for task in payment_tasks
|
|
}
|
|
|
|
# Process results as they complete (NO TIMEOUT on as_completed)
|
|
for future in as_completed(future_to_payment):
|
|
try:
|
|
result = future.result(timeout=60) # Individual payment timeout
|
|
|
|
if result['success'] and result['result']:
|
|
# IMMEDIATELY commit each successful payment to database
|
|
update_single_payment_result(result['payment_id'], result['result'])
|
|
processed_count += 1
|
|
logger.info(f"Payment {result['payment_id']} processed and committed ({processed_count}/{len(cust_pay)})")
|
|
else:
|
|
failed_count += 1
|
|
logger.warning(f"Payment {result['payment_id']} failed ({failed_count} failures total)")
|
|
|
|
except Exception as e:
|
|
payment_data = future_to_payment[future]
|
|
failed_count += 1
|
|
logger.error(f"Thread exception for payment {payment_data['payment_id']}: {e}")
|
|
|
|
logger.info(f"Chunk completed: {processed_count} processed, {failed_count} failed")
|
|
|
|
logger.info(f"Batch {batch} completed: {processed_count}/{len(cust_pay)} payments processed successfully")
|
|
|
|
def process_payintent_mode(processor):
|
|
"""Handle payment intent follow-up processing."""
|
|
to_check = {
|
|
"pay": db.session.query(Payments).filter(Payments.PI_FollowUp == True).all(),
|
|
"singlepay": db.session.query(SinglePayments).filter(SinglePayments.PI_FollowUp == True).all(),
|
|
}
|
|
|
|
total_pending = 0
|
|
succeeded_count = 0
|
|
failed_count = 0
|
|
still_pending = 0
|
|
|
|
for key, value in to_check.items():
|
|
logger.debug(f"Processing payment intent follow-up for {len(value)} {key} items")
|
|
total_pending += len(value)
|
|
|
|
for pi in value:
|
|
try:
|
|
intent_result = processor.check_payment_intent(pi.Payment_Intent)
|
|
logger.debug(f"Intent result: {json.dumps(intent_result, indent=2)}")
|
|
print(f"\n\npayintent result: {json.dumps(intent_result, indent=2)}\n\n")
|
|
if intent_result['status'] == "succeeded":
|
|
pi.PI_FollowUp_JSON = json.dumps(intent_result)
|
|
pi.PI_FollowUp = False
|
|
pi.PI_Last_Check = datetime.now()
|
|
pi.Success = True
|
|
|
|
# Mark invoices as paid when payment intent succeeds
|
|
#if PROCESS_LIVE:
|
|
# find_pay_splynx_invoices(pi.Splynx_ID)
|
|
|
|
#if intent_result.get('charge_id').startswith('ch_'):
|
|
# pi.Stripe_Charge_ID = intent_result.get('charge_id')
|
|
print("\nProcess payment results coz it succeeded")
|
|
processPaymentResult(pay_id=pi.id, result=intent_result, key=key)
|
|
succeeded_count += 1
|
|
elif intent_result['status'] == "failed":
|
|
pi.PI_FollowUp_JSON = json.dumps(intent_result)
|
|
pi.PI_FollowUp = False
|
|
pi.PI_Last_Check = datetime.now()
|
|
failed_count += 1
|
|
else:
|
|
# Still pending
|
|
pi.PI_FollowUp_JSON = json.dumps(intent_result)
|
|
pi.PI_Last_Check = datetime.now()
|
|
if intent_result.get('failure_reason'):
|
|
processPaymentResult(pay_id=pi.id, result=intent_result, key=key)
|
|
pi.PI_FollowUp = False
|
|
pi.Error = json.dumps(intent_result)
|
|
failed_count += 1
|
|
else:
|
|
still_pending += 1
|
|
|
|
db.session.commit()
|
|
except Exception as e:
|
|
logger.error(f"Error processing payment intent {pi.Payment_Intent}: {e}")
|
|
failed_count += 1
|
|
|
|
# Log payment intent follow-up results
|
|
if total_pending > 0:
|
|
log_payment_intent_followup(total_pending, succeeded_count, failed_count, still_pending)
|
|
logger.info(f"Payment intent follow-up completed: {succeeded_count} succeeded, {failed_count} failed, {still_pending} still pending")
|
|
else:
|
|
logger.info("No payment intents requiring follow-up")
|
|
|
|
return succeeded_count, failed_count
|
|
|
|
def process_refund_followup_mode(processor):
|
|
"""Handle refund follow-up processing for pending refunds."""
|
|
to_check = {
|
|
"pay": db.session.query(Payments).filter(Payments.Refund_FollowUp == True).all(),
|
|
"singlepay": db.session.query(SinglePayments).filter(SinglePayments.Refund_FollowUp == True).all(),
|
|
}
|
|
|
|
total_pending = 0
|
|
completed_count = 0
|
|
failed_count = 0
|
|
still_pending = 0
|
|
|
|
for key, value in to_check.items():
|
|
logger.debug(f"Processing refund follow-up for {len(value)} {key} items")
|
|
total_pending += len(value)
|
|
|
|
for refund_record in value:
|
|
try:
|
|
if not refund_record.Stripe_Refund_ID:
|
|
logger.error(f"No Stripe refund ID found for {key} record {refund_record.id}")
|
|
failed_count += 1
|
|
continue
|
|
print(f"refund_record.Stripe_Refund_ID: {refund_record.Stripe_Refund_ID}")
|
|
refund_result = processor.check_refund_status(refund_record.Stripe_Refund_ID)
|
|
logger.debug(f"Refund result: {json.dumps(refund_result, indent=2)}")
|
|
|
|
# Check if the API call was successful
|
|
if not refund_result.get('success', False):
|
|
logger.error(f"Failed to check refund status: {refund_result.get('error', 'Unknown error')}")
|
|
failed_count += 1
|
|
continue
|
|
|
|
if refund_result['status'] == "succeeded":
|
|
# Refund completed successfully
|
|
refund_record.Refund = True
|
|
refund_record.Refund_FollowUp = False
|
|
refund_record.Refund_JSON = json.dumps(refund_result)
|
|
|
|
# Delete associated Splynx payment record if in live mode
|
|
if PROCESS_LIVE and refund_record.Payment_Intent:
|
|
delete_result = delete_splynx_invoices(
|
|
splynx_id=refund_record.Splynx_ID,
|
|
payintent=refund_record.Payment_Intent
|
|
)
|
|
if delete_result.get('success'):
|
|
logger.info(f"Deleted Splynx payment for refund completion: customer {refund_record.Splynx_ID}")
|
|
else:
|
|
logger.warning(f"Failed to delete Splynx payment: {delete_result.get('error')}")
|
|
|
|
completed_count += 1
|
|
logger.info(f"✅ Refund completed: {refund_record.Stripe_Refund_ID}")
|
|
|
|
elif refund_result['status'] in ["failed", "canceled"]:
|
|
# Refund failed
|
|
refund_record.Refund_FollowUp = False
|
|
refund_record.Refund_JSON = json.dumps(refund_result)
|
|
failed_count += 1
|
|
logger.warning(f"❌ Refund failed: {refund_record.Stripe_Refund_ID} - {refund_result['status']}")
|
|
|
|
elif refund_result['status'] == "pending":
|
|
# Still pending - update JSON but keep follow-up flag
|
|
refund_record.Refund_JSON = json.dumps(refund_result)
|
|
still_pending += 1
|
|
logger.info(f"⏳ Refund still pending: {refund_record.Stripe_Refund_ID}")
|
|
|
|
else:
|
|
# Unknown status
|
|
refund_record.Refund_JSON = json.dumps(refund_result)
|
|
still_pending += 1
|
|
logger.warning(f"⚠️ Unknown refund status: {refund_record.Stripe_Refund_ID} - {refund_result['status']}")
|
|
|
|
db.session.commit()
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing refund {refund_record.Stripe_Refund_ID}: {e}")
|
|
failed_count += 1
|
|
|
|
# Log refund follow-up results
|
|
if total_pending > 0:
|
|
logger.info(f"Refund follow-up completed: {completed_count} completed, {failed_count} failed, {still_pending} still pending")
|
|
|
|
# Log the activity for tracking
|
|
from services import log_activity
|
|
try:
|
|
log_activity(
|
|
user_id=1, # System user
|
|
action="refund_followup",
|
|
entity_type="script",
|
|
entity_id=None,
|
|
details=f"Processed {total_pending} pending refunds: {completed_count} completed, {failed_count} failed, {still_pending} still pending"
|
|
)
|
|
except Exception as log_error:
|
|
logger.error(f"Failed to log refund follow-up activity: {log_error}")
|
|
else:
|
|
logger.info("No refunds requiring follow-up")
|
|
|
|
return completed_count, failed_count
|
|
|
|
def handle_failed_payment_notification(payment_record, error_details: str, payment_type: str = "batch"):
|
|
"""
|
|
Handle notification and ticket creation for failed payments.
|
|
|
|
Args:
|
|
payment_record: Database payment record (Payments or SinglePayments)
|
|
error_details: Error message details
|
|
payment_type: Type of payment ("batch" or "single")
|
|
"""
|
|
try:
|
|
# Initialize notification service
|
|
notification_service = NotificationService()
|
|
|
|
# Get customer information from Splynx
|
|
try:
|
|
customer_data = splynx.Customer(payment_record.Splynx_ID)
|
|
customer_name = customer_data.get('name', 'Unknown Customer') if customer_data != 'unknown' else 'Unknown Customer'
|
|
except:
|
|
customer_name = 'Unknown Customer'
|
|
|
|
# Prepare payment data for notification
|
|
payment_data = {
|
|
'payment_id': payment_record.id,
|
|
'splynx_id': payment_record.Splynx_ID,
|
|
'amount': abs(payment_record.Payment_Amount),
|
|
'error': error_details,
|
|
'payment_method': payment_record.Payment_Method or 'Unknown',
|
|
'customer_name': customer_name,
|
|
'payment_type': payment_type,
|
|
'stripe_customer_id': payment_record.Stripe_Customer_ID,
|
|
'payment_intent': payment_record.Payment_Intent
|
|
}
|
|
|
|
# Revert pending invoices back to "not_paid" (only in live mode)
|
|
if PROCESS_LIVE:
|
|
updated_invoices = find_set_pending_splynx_invoices_to_unpaid(splynx_id=payment_record.Splynx_ID)
|
|
if updated_invoices:
|
|
logger.info(f"✅ Payment failure pending invoices reverted back to not_paid Splynx ID: {payment_record.Splynx_ID} - PayID: {payment_record.id}")
|
|
else:
|
|
logger.error(f"❌ Failed to send payment failure email for payment {payment_record.id}")
|
|
|
|
# Send email notification (only in live mode)
|
|
if PROCESS_LIVE:
|
|
email_sent = notification_service.send_payment_failure_notification(payment_data)
|
|
if email_sent:
|
|
logger.info(f"✅ Payment failure email sent for payment {payment_record.id}")
|
|
else:
|
|
logger.error(f"❌ Failed to send payment failure email for payment {payment_record.id}")
|
|
|
|
# Create Splynx ticket (only in live mode)
|
|
if PROCESS_LIVE:
|
|
ticket_subject = f"Payment Failure - Customer {payment_record.Splynx_ID} - ${abs(payment_record.Payment_Amount):.2f}"
|
|
internal_message=f"""
|
|
<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.0 Transitional//EN" "http://www.w3.org/TR/REC-html40/loose.dtd">
|
|
<html>
|
|
<body>
|
|
<div>Payment processing has failed for customer {customer_name} (ID: {payment_record.Splynx_ID}).</div>
|
|
<div><br></div>
|
|
<div><strong>Payment Details:</strong></div>
|
|
<ul>
|
|
<li>Payment ID: {payment_record.id} ({payment_type}</li>
|
|
<li>Amount: ${abs(payment_record.Payment_Amount):.2f} AUD</li>
|
|
<li>Payment Method: {payment_record.Payment_Method or 'Unknown'}</li>
|
|
<li>Stripe Customer: {payment_record.Stripe_Customer_ID}</li>
|
|
<li>Payment Intent: {payment_record.Payment_Intent or 'N/A'}</li>
|
|
|
|
</ul>
|
|
<div><br></div>
|
|
<div><strong>Error Information:</strong></div>
|
|
<div>{error_details}</div>
|
|
<div><br></div>
|
|
<div>This ticket was automatically created by the Plutus Payment System.</div>
|
|
</body>
|
|
</html>
|
|
"""
|
|
# Create customer-friendly message
|
|
payment_data_for_msg = {
|
|
'amount': payment_data['amount'],
|
|
'splynx_id': payment_data['splynx_id'],
|
|
'pi_json': payment_record.PI_JSON
|
|
}
|
|
customer_message = create_customer_friendly_message(payment_data_for_msg, error_details)
|
|
ticket_result = splynx.create_ticket(
|
|
customer_id = payment_record.Splynx_ID,
|
|
subject = ticket_subject,
|
|
priority = 'medium',
|
|
type_id = 1,
|
|
group_id = 7,
|
|
status_id = 1,
|
|
)
|
|
#splynx.create_ticket(
|
|
# customer_id=payment_record.Splynx_ID,
|
|
# subject=ticket_subject,
|
|
# message=internal_message,
|
|
# priority="medium"
|
|
#)
|
|
|
|
if ticket_result.get('success'):
|
|
logger.info(f"✅ Splynx ticket created: #{ticket_result['ticket_id']} for payment {payment_record.id}")
|
|
|
|
# Optionally store ticket ID in payment record for tracking
|
|
# This would require adding a Splynx_Ticket_ID field to the models
|
|
|
|
## Adds internal note
|
|
add_message = splynx.add_ticket_message(
|
|
ticket_id=ticket_result['ticket_id'],
|
|
message=internal_message,
|
|
is_admin=False,
|
|
hide_for_customer=True,
|
|
message_type="note"
|
|
)
|
|
|
|
#result['ticket_id']
|
|
add_message = splynx.add_ticket_message(
|
|
ticket_id=ticket_result['ticket_id'],
|
|
message=customer_message,
|
|
is_admin=False,
|
|
hide_for_customer=False,
|
|
message_type="message"
|
|
)
|
|
|
|
else:
|
|
logger.error(f"❌ Failed to create Splynx ticket for payment {payment_record.id}: {ticket_result.get('error')}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error handling failed payment notification for payment {payment_record.id}: {e}")
|
|
|
|
if __name__ == "__main__":
|
|
## Payment Method:
|
|
## 2 - Direct Debit (Automatic)
|
|
## 3 - Card Payment (Automatic)
|
|
## 9 - Payment Plan
|
|
|
|
### Running Mode
|
|
## batch = Monthly Direct Debit/Credit Cards
|
|
## payintent = Check outstanding Payment Intents and update
|
|
## payplan = Check for Payment Plans to run
|
|
## refund = Check outstanding Refunds and update
|
|
|
|
start_time = datetime.now()
|
|
success_count = 0
|
|
failed_count = 0
|
|
total_amount = 0.0
|
|
batch_ids = []
|
|
errors = []
|
|
|
|
try:
|
|
if sys.argv[1] == "batch":
|
|
running_mode = "batch"
|
|
elif sys.argv[1] == "payintent":
|
|
running_mode = "payintent"
|
|
elif sys.argv[1] == "payplan":
|
|
running_mode = "payplan"
|
|
elif sys.argv[1] == "refund":
|
|
running_mode = "refund"
|
|
else:
|
|
logger.error(f"Invalid running mode: {sys.argv[1]}")
|
|
logger.info("Valid modes: batch, payintent, payplan, refund")
|
|
sys.exit(1)
|
|
try:
|
|
if sys.argv[2] == "live":
|
|
PROCESS_LIVE = True
|
|
except IndexError:
|
|
logger.info("Processing payments against Sandbox")
|
|
except IndexError:
|
|
logger.info("No running mode specified, defaulting to 'payintent'")
|
|
running_mode = "payintent"
|
|
|
|
# Create Flask application context
|
|
app = create_app()
|
|
if PROCESS_LIVE:
|
|
api_key = Config.STRIPE_LIVE_API_KEY
|
|
else:
|
|
api_key = Config.STRIPE_TEST_API_KEY
|
|
print(f"api_key: {api_key}")
|
|
processor = StripePaymentProcessor(api_key=api_key, enable_logging=True)
|
|
|
|
with app.app_context():
|
|
# Log script start
|
|
environment = "live" if PROCESS_LIVE else "sandbox"
|
|
log_script_start("query_mysql.py", running_mode, environment)
|
|
logger.info(f"Starting query_mysql.py in {running_mode} mode ({environment})")
|
|
|
|
try:
|
|
if running_mode == "batch":
|
|
batch_ids, success_count, failed_count, total_amount = process_batch_mode(processor)
|
|
execute_payment_batches(processor, batch_ids)
|
|
elif running_mode == "payplan":
|
|
batch_ids, success_count, failed_count, total_amount = process_payplan_mode(processor)
|
|
execute_payment_batches(processor, batch_ids)
|
|
elif running_mode == "payintent":
|
|
success_count, failed_count = process_payintent_mode(processor)
|
|
elif running_mode == "refund":
|
|
success_count, failed_count = process_refund_followup_mode(processor)
|
|
except Exception as e:
|
|
logger.error(f"Script execution failed: {e}")
|
|
errors.append(str(e))
|
|
failed_count += 1
|
|
|
|
# Calculate execution time and log completion
|
|
end_time = datetime.now()
|
|
duration_seconds = (end_time - start_time).total_seconds()
|
|
|
|
log_script_completion(
|
|
script_name="query_mysql.py",
|
|
mode=running_mode,
|
|
success_count=success_count,
|
|
failed_count=failed_count,
|
|
total_amount=total_amount,
|
|
batch_ids=batch_ids if batch_ids else None,
|
|
duration_seconds=duration_seconds,
|
|
errors=errors if errors else None
|
|
)
|
|
|
|
logger.info(f"Script completed in {duration_seconds:.1f}s: {success_count} successful, {failed_count} failed")
|
|
|