You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

532 lines
21 KiB

#!/usr/bin/env python3
"""
External script to query MySQL database (Splynx) for customer billing data.
This script runs independently of the Flask application.
Usage: python query_mysql.py
"""
import pymysql
import sys
import json
import random
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime, timedelta
from stripe_payment_processor import StripePaymentProcessor
from config import Config
from app import create_app, db
from models import Logs, Payments, PaymentBatch, SinglePayments, PaymentPlans
from splynx import Splynx, SPLYNX_URL, SPLYNX_KEY, SPLYNX_SECRET
# Initialize Splynx API
splynx = Splynx(url=SPLYNX_URL, key=SPLYNX_KEY, secret=SPLYNX_SECRET)
# Import constants from config
PAYMENT_METHOD_DIRECT_DEBIT = Config.PAYMENT_METHOD_DIRECT_DEBIT
PAYMENT_METHOD_CARD = Config.PAYMENT_METHOD_CARD
PAYMENT_METHOD_PAYMENT_PLAN = Config.PAYMENT_METHOD_PAYMENT_PLAN
PROCESS_LIVE = Config.PROCESS_LIVE
if PROCESS_LIVE:
api_key = "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"
else:
api_key = "sk_test_51Rsi9gPfYyg6zE1S4ZpaPI1ehpbsHRLsGhysYXKwAWCZ7w6KYgVXy4pV095Nd8tyjUw9AkBhqfxqsIiiWJg5fexI00Dw36vnvx"
test_stripe_customers = ['cus_SoQqMGLmCjiBDZ', 'cus_SoQptxwe8hczGz', 'cus_SoQjeNXkKOdORI', 'cus_SoQiDcSrNRxbPF', 'cus_SoQedaG3q2ecKG', 'cus_SoQeTkzMA7AaLR', 'cus_SoQeijBTETQcGb', 'cus_SoQe259iKMgz7o', 'cus_SoQejTstdXEDTO', 'cus_SoQeQH2ORWBOWX', 'cus_SoQevtyWxqXtpC', 'cus_SoQekOFUHugf26', 'cus_SoPq6Zh0MCUR9W', 'cus_SoPovwUPJmvugz', 'cus_SoPnvGfejhpSR5', 'cus_SoNAgAbkbFo8ZY', 'cus_SoMyDihTxRsa7U', 'cus_SoMVPWxdYstYbr', 'cus_SoMVQ6Xj2dIrCR', 'cus_SoMVmBn1xipFEB', 'cus_SoMVNvZ2Iawb7Y', 'cus_SoMVZupj6wRy5e', 'cus_SoMVqjH7zkc5Qe', 'cus_SoMVkzj0ZUK0Ai', 'cus_SoMVFq3BUD3Njw', 'cus_SoLcrRrvoy9dJ4', 'cus_SoLcqHN1k0WD8j', 'cus_SoLcLtYDZGG32V', 'cus_SoLcG23ilNeMYt', 'cus_SoLcFhtUVzqumj', 'cus_SoLcPgMnuogINl', 'cus_SoLccGTY9mMV7T', 'cus_SoLRxqvJxuKFes', 'cus_SoKs7cjdcvW1oO']
def find_pay_splynx_invoices(splynx_id):
result = splynx.get(url=f"/api/2.0/admin/finance/invoices?main_attributes[customer_id]={splynx_id}&main_attributes[status]=not_paid")
invoice_pay = {
"status": "paid"
}
for pay in result:
res = splynx.put(url=f"/api/2.0/admin/finance/invoices/{pay['id']}", params=invoice_pay)
#print(json.dumps(res,indent=2))
return res
def add_payment_splynx(splynx_id, pi_id, pay_id, amount):
stripe_pay = {
"customer_id": splynx_id,
"amount": amount,
"date": str(datetime.now().strftime('%Y-%m-%d')),
"field_1": pi_id,
"field_2": f"Payment_ID (Batch): {pay_id}"
}
res = splynx.post(url="/api/2.0/admin/finance/payments", params=stripe_pay)
if res:
return res['id']
else:
return False
def handle_database_operation(operation_func, operation_name):
"""
Reusable function to handle database operations with consistent error handling.
Args:
operation_func: Function that performs the database operation
operation_name: String description of the operation for error messages
Returns:
Result of operation_func or None if failed
"""
try:
result = operation_func()
db.session.commit()
return result
except Exception as e:
db.session.rollback()
print(f"{operation_name} failed: {e}")
return None
def is_payment_day(start_date_string, payplan_schedule, date_format="%Y-%m-%d"):
"""
Check if today is a fortnightly payment day based on a start date.
Args:
start_date_string (str): The first payment date
date_format (str): Format of the date string
Returns:
bool: True if today is a payment day, False otherwise
"""
try:
if payplan_schedule == "Weekly":
num_days = 7
elif payplan_schedule == "Fortnightly":
num_days = 14
start_date = datetime.strptime(start_date_string, date_format)
today = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
start_date = start_date.replace(hour=0, minute=0, second=0, microsecond=0)
# Calculate days since start date
days_since_start = (today - start_date).days
# Check if it's a multiple of 14 days (fortnight)
return days_since_start >= 0 and days_since_start % num_days == 0
except ValueError as e:
print(f"Error parsing date: {e}")
return False
def query_payplan_customers():
"""Query customer billing data from MySQL database and find Payment Plan customers."""
to_return = []
customers = db.session.query(PaymentPlans).filter(PaymentPlans.Enabled == True).all()
for cust in customers:
if is_payment_day(start_date_string=str(cust.Start_Date.strftime('%Y-%m-%d')), payplan_schedule=cust.Frequency):
blah = {
"customer_id": cust.Splynx_ID,
"stripe_customer_id": cust.Stripe_Customer_ID,
"deposit": cust.Amount*-1,
"stripe_pm": cust.Stripe_Payment_Method,
"paymentplan_id": cust.id
}
to_return.append(blah)
return to_return
def query_splynx_customers(pm):
"""Query customer billing data from MySQL database."""
connection = None
try:
# Connect to MySQL database
connection = pymysql.connect(
host=Config.MYSQL_CONFIG['host'],
database=Config.MYSQL_CONFIG['database'],
user=Config.MYSQL_CONFIG['user'],
password=Config.MYSQL_CONFIG['password'],
port=Config.MYSQL_CONFIG['port'],
autocommit=False,
cursorclass=pymysql.cursors.DictCursor # Return results as dictionaries
)
print("✅ Connected to MySQL database successfully")
print(f"Database: {Config.MYSQL_CONFIG['database']} on {Config.MYSQL_CONFIG['host']}")
print("-" * 80)
## Payment Method:
## 2 - Direct Debit (Automatic)
## 3 - Card Payment (Automatic)
## 9 - Payment Plan
# Execute the query
query = """
SELECT
cb.customer_id,
cb.deposit,
cb.payment_method,
pad.field_1 AS stripe_customer_id
FROM customer_billing cb
LEFT OUTER JOIN payment_account_data pad ON cb.customer_id = pad.customer_id
WHERE cb.payment_method = %s
AND cb.deposit < %s
ORDER BY cb.payment_method ASC
LIMIT %s
"""
with connection.cursor() as cursor:
cursor.execute(query, (pm, Config.DEPOSIT_THRESHOLD, Config.DEFAULT_QUERY_LIMIT))
#cursor.execute(query, (Config.DEFAULT_QUERY_LIMIT))
results = cursor.fetchall()
if results:
print(f"📊 Found {len(results)} rows:")
return results
else:
print("ℹ️ No rows found matching the criteria")
return False
except pymysql.Error as e:
print(f"❌ MySQL Error: {e}")
sys.exit(1)
except Exception as e:
print(f"❌ Unexpected Error: {e}")
sys.exit(1)
finally:
if connection:
connection.close()
print("\n🔒 MySQL connection closed")
def addInitialPayments(customers, batch_id):
added = {"added": 0, "failed": 0}
payments_to_add = []
# Prepare all payments first
for cust in customers:
if PROCESS_LIVE:
stripe_customer_id = cust['stripe_customer_id']
else:
#stripe_customer_id = cust['stripe_customer_id']
stripe_customer_id = test_stripe_customers[random.randint(1, len(test_stripe_customers)-1)]
add_payer = Payments(
PaymentBatch_ID = batch_id,
Splynx_ID = cust['customer_id'],
Stripe_Customer_ID = stripe_customer_id,
Payment_Amount = float(cust['deposit'])*-1,
Stripe_Payment_Method = cust.get('stripe_pm', None),
PaymentPlan_ID = cust.get('paymentplan_id', None)
)
payments_to_add.append(add_payer)
db.session.add(add_payer)
# Atomic commit for entire batch
try:
db.session.commit()
added["added"] = len(payments_to_add)
print(f"✅ Successfully added {len(payments_to_add)} payments to batch {batch_id}")
except Exception as e:
db.session.rollback()
added["failed"] = len(payments_to_add)
print(f"❌ addInitialPayments failed for entire batch {batch_id}: {e}")
print(f"Plutus DB: {json.dumps(added,indent=2)}\n")
def addPaymentBatch():
"""Create a new payment batch and return its ID."""
add_batch = PaymentBatch()
try:
db.session.add(add_batch)
db.session.commit()
return add_batch.id
except Exception as e:
db.session.rollback()
print(f"❌ addPaymentBatch failed: {e}")
return None
def processPaymentResult(pay_id, result, key):
if key == "pay":
payment = db.session.query(Payments).filter(Payments.id == pay_id).first()
elif key == "singlepay":
payment = db.session.query(SinglePayments).filter(SinglePayments.id == pay_id).first()
try:
if result.get('error') and not result.get('needs_fee_update'):
payment.Error = f"Error Type: {result['error_type']}\nError: {result['error']}"
payment.Success = result['success']
payment.PI_JSON = json.dumps(result)
else:
if result.get('needs_fee_update'):
payment.PI_FollowUp = True
payment.Payment_Intent = result['payment_intent_id']
payment.Success = result['success']
if result['success'] and PROCESS_LIVE:
find_pay_splynx_invoices(payment.Splynx_ID)
add_payment_splynx(
splynx_id=payment.Splynx_ID,
pi_id=result['payment_intent_id'],
pay_id=payment.id,
amount=payment.Payment_Amount
)
if result.get('payment_method_type') == "card":
payment.Payment_Method = result['estimated_fee_details']['card_display_brand']
elif result.get('payment_method_type') == "au_becs_debit":
payment.Payment_Method = result['payment_method_type']
if payment.PI_JSON:
combined = {**json.loads(payment.PI_JSON), **result}
payment.PI_JSON = json.dumps(combined)
else:
payment.PI_JSON = json.dumps(result)
if result.get('fee_details'):
payment.Fee_Total = result['fee_details']['total_fee']
for fee_type in result['fee_details']['fee_breakdown']:
if fee_type['type'] == "tax":
payment.Fee_Tax = fee_type['amount']
elif fee_type['type'] == "stripe_fee":
payment.Fee_Stripe = fee_type['amount']
except Exception as e:
print(f"processPaymentResult: {e}\n{json.dumps(result)}")
payment.PI_FollowUp = True
def _update_payment():
return True # Just need to trigger commit, payment is already modified
handle_database_operation(_update_payment, "processPaymentResult")
# Thread lock for database operations
db_lock = threading.Lock()
def process_single_payment(processor, payment_data):
"""
Thread-safe function to process a single payment.
Args:
processor: StripePaymentProcessor instance
payment_data: Dict containing payment information
Returns:
Dict with payment result and metadata
"""
try:
# Process payment with Stripe (thread-safe)
result = processor.process_payment(
customer_id=payment_data['customer_id'],
amount=payment_data['amount'],
currency=payment_data['currency'],
description=payment_data['description'],
stripe_pm=payment_data['stripe_pm']
)
# Return result with payment ID for database update
return {
'payment_id': payment_data['payment_id'],
'result': result,
'success': True
}
except Exception as e:
print(f"❌ Payment processing failed for payment ID {payment_data['payment_id']}: {e}")
return {
'payment_id': payment_data['payment_id'],
'result': None,
'success': False,
'error': str(e)
}
def update_single_payment_result(payment_id, result):
"""
Thread-safe immediate update of single payment result to database.
Commits immediately to ensure data safety.
Args:
payment_id: ID of the payment to update
result: Payment processing result
"""
with db_lock:
try:
if result:
processPaymentResult(pay_id=payment_id, result=result, key="pay")
print(f"✅ Payment {payment_id} result committed to database")
else:
print(f"⚠️ No result to commit for payment {payment_id}")
except Exception as e:
print(f"❌ Failed to update payment {payment_id}: {e}")
def process_batch_mode(processor):
"""Handle batch processing for Direct Debit and Card payments."""
to_run_batches = []
payment_methods = [PAYMENT_METHOD_DIRECT_DEBIT, PAYMENT_METHOD_CARD]
for pm in payment_methods:
batch_id = addPaymentBatch()
if batch_id is not None:
to_run_batches.append(batch_id)
customers = query_splynx_customers(pm)
addInitialPayments(customers=customers, batch_id=batch_id)
else:
print(f"❌ Failed to create batch for payment method {pm}")
return to_run_batches
def process_payplan_mode(processor):
"""Handle payment plan processing."""
to_run_batches = []
batch_id = addPaymentBatch()
if batch_id is not None:
to_run_batches.append(batch_id)
customers = query_payplan_customers()
addInitialPayments(customers=customers, batch_id=batch_id)
else:
print(f"❌ Failed to create batch for payment plan processing")
return to_run_batches
def execute_payment_batches(processor, batch_ids):
"""Execute payments for all provided batch IDs using safe threading with immediate commits."""
if not batch_ids:
print("⚠️ No valid batches to process")
return
max_threads = Config.MAX_PAYMENT_THREADS
for batch in batch_ids:
if batch is None:
print("⚠️ Skipping None batch ID")
continue
cust_pay = db.session.query(Payments).filter(Payments.PaymentBatch_ID == batch).all()
if not cust_pay:
print(f"ℹ️ No payments found for batch {batch}")
continue
print(f"🔄 Processing {len(cust_pay)} payments in batch {batch} using {max_threads} threads")
print(f"📊 Safety Mode: Each payment will be committed immediately to database")
# Process payments in smaller chunks to avoid timeout issues
processed_count = 0
failed_count = 0
# Process payments in chunks
chunk_size = max_threads * 2 # Process 2x thread count at a time
for i in range(0, len(cust_pay), chunk_size):
chunk = cust_pay[i:i + chunk_size]
print(f"🔄 Processing chunk {i//chunk_size + 1}: payments {i+1}-{min(i+chunk_size, len(cust_pay))}")
# Prepare payment data for this chunk
payment_tasks = []
for pay in chunk:
if PROCESS_LIVE:
customer_id = pay.Stripe_Customer_ID
else:
customer_id = pay.Stripe_Customer_ID
#customer_id = test_stripe_customers[random.randint(1, len(test_stripe_customers)-1)]
payment_data = {
'payment_id': pay.id,
'customer_id': customer_id,
'amount': pay.Payment_Amount,
'currency': "aud",
'description': f"Payment ID: {pay.id} - Splynx ID: {pay.Splynx_ID}",
'stripe_pm': pay.Stripe_Payment_Method
}
print(f"payment_data: {json.dumps(payment_data,indent=2)}")
payment_tasks.append(payment_data)
# Process this chunk with ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=max_threads) as executor:
# Submit tasks for this chunk
future_to_payment = {
executor.submit(process_single_payment, processor, task): task
for task in payment_tasks
}
# Process results as they complete (NO TIMEOUT on as_completed)
for future in as_completed(future_to_payment):
try:
result = future.result(timeout=60) # Individual payment timeout
if result['success'] and result['result']:
# IMMEDIATELY commit each successful payment to database
update_single_payment_result(result['payment_id'], result['result'])
processed_count += 1
print(f"✅ Payment {result['payment_id']} processed and committed ({processed_count}/{len(cust_pay)})")
else:
failed_count += 1
print(f"❌ Payment {result['payment_id']} failed ({failed_count} failures total)")
except Exception as e:
payment_data = future_to_payment[future]
failed_count += 1
print(f"❌ Thread exception for payment {payment_data['payment_id']}: {e}")
print(f"📊 Chunk completed: {processed_count} processed, {failed_count} failed")
print(f"✅ Batch {batch} completed: {processed_count}/{len(cust_pay)} payments processed successfully")
def process_payintent_mode(processor):
"""Handle payment intent follow-up processing."""
to_check = {
"pay": db.session.query(Payments).filter(Payments.PI_FollowUp == True).all(),
"singlepay": db.session.query(SinglePayments).filter(SinglePayments.PI_FollowUp == True).all(),
}
#pis = db.session.query(Payments).filter(Payments.PI_FollowUp == True).all()
#to_check.append(pis)
#pis = db.session.query(SinglePayments).filter(SinglePayments.PI_FollowUp == True).all()
#to_check.append(pis)
for key, value in to_check.items():
print(value)
for pi in value:
intent_result = processor.check_payment_intent(pi.Payment_Intent)
print(json.dumps(intent_result, indent=2))
if intent_result['status'] == "succeeded":
pi.PI_FollowUp_JSON = json.dumps(intent_result)
pi.PI_FollowUp = False
pi.PI_Last_Check = datetime.now()
processPaymentResult(pay_id=pi.id, result=intent_result, key=key)
else:
pi.PI_FollowUp_JSON = json.dumps(intent_result)
pi.PI_Last_Check = datetime.now()
db.session.commit()
if __name__ == "__main__":
## Payment Method:
## 2 - Direct Debit (Automatic)
## 3 - Card Payment (Automatic)
## 9 - Payment Plan
### Running Mode
## batch = Monthly Direct Debit/Credit Cards
## payintent = Check outstanding Payment Intents and update
## payplan = Check for Payment Plans to run
try:
if sys.argv[1] == "batch":
running_mode = "batch"
elif sys.argv[1] == "payintent":
running_mode = "payintent"
elif sys.argv[1] == "payplan":
running_mode = "payplan"
else:
print(f"❌ Invalid running mode: {sys.argv[1]}")
print("Valid modes: batch, payintent, payplan")
sys.exit(1)
try:
if sys.argv[2] == "live":
PROCESS_LIVE = True
except:
print("Processing payments against Sandbox")
except IndexError:
print("ℹ️ No running mode specified, defaulting to 'payintent'")
running_mode = "payintent"
# Create Flask application context
app = create_app()
processor = StripePaymentProcessor(api_key=api_key, enable_logging=True)
with app.app_context():
if running_mode == "batch":
batch_ids = process_batch_mode(processor)
execute_payment_batches(processor, batch_ids)
elif running_mode == "payplan":
batch_ids = process_payplan_mode(processor)
execute_payment_batches(processor, batch_ids)
elif running_mode == "payintent":
process_payintent_mode(processor)