#!/usr/bin/env python3 """ External script to query MySQL database (Splynx) for customer billing data. This script runs independently of the Flask application. Usage: python query_mysql.py """ import pymysql import sys import json import random import threading from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime, timedelta from stripe_payment_processor import StripePaymentProcessor from config import Config from app import create_app, db from models import Logs, Payments, PaymentBatch, SinglePayments, PaymentPlans from splynx import Splynx, SPLYNX_URL, SPLYNX_KEY, SPLYNX_SECRET # Initialize Splynx API splynx = Splynx(url=SPLYNX_URL, key=SPLYNX_KEY, secret=SPLYNX_SECRET) # Import constants from config PAYMENT_METHOD_DIRECT_DEBIT = Config.PAYMENT_METHOD_DIRECT_DEBIT PAYMENT_METHOD_CARD = Config.PAYMENT_METHOD_CARD PAYMENT_METHOD_PAYMENT_PLAN = Config.PAYMENT_METHOD_PAYMENT_PLAN PROCESS_LIVE = Config.PROCESS_LIVE if PROCESS_LIVE: api_key = "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" else: api_key = "sk_test_51Rsi9gPfYyg6zE1S4ZpaPI1ehpbsHRLsGhysYXKwAWCZ7w6KYgVXy4pV095Nd8tyjUw9AkBhqfxqsIiiWJg5fexI00Dw36vnvx" test_stripe_customers = ['cus_SoQqMGLmCjiBDZ', 'cus_SoQptxwe8hczGz', 'cus_SoQjeNXkKOdORI', 'cus_SoQiDcSrNRxbPF', 'cus_SoQedaG3q2ecKG', 'cus_SoQeTkzMA7AaLR', 'cus_SoQeijBTETQcGb', 'cus_SoQe259iKMgz7o', 'cus_SoQejTstdXEDTO', 'cus_SoQeQH2ORWBOWX', 'cus_SoQevtyWxqXtpC', 'cus_SoQekOFUHugf26', 'cus_SoPq6Zh0MCUR9W', 'cus_SoPovwUPJmvugz', 'cus_SoPnvGfejhpSR5', 'cus_SoNAgAbkbFo8ZY', 'cus_SoMyDihTxRsa7U', 'cus_SoMVPWxdYstYbr', 'cus_SoMVQ6Xj2dIrCR', 'cus_SoMVmBn1xipFEB', 'cus_SoMVNvZ2Iawb7Y', 'cus_SoMVZupj6wRy5e', 'cus_SoMVqjH7zkc5Qe', 'cus_SoMVkzj0ZUK0Ai', 'cus_SoMVFq3BUD3Njw', 'cus_SoLcrRrvoy9dJ4', 'cus_SoLcqHN1k0WD8j', 'cus_SoLcLtYDZGG32V', 'cus_SoLcG23ilNeMYt', 'cus_SoLcFhtUVzqumj', 'cus_SoLcPgMnuogINl', 'cus_SoLccGTY9mMV7T', 'cus_SoLRxqvJxuKFes', 'cus_SoKs7cjdcvW1oO'] def find_pay_splynx_invoices(splynx_id): result = splynx.get(url=f"/api/2.0/admin/finance/invoices?main_attributes[customer_id]={splynx_id}&main_attributes[status]=not_paid") invoice_pay = { "status": "paid" } for pay in result: res = splynx.put(url=f"/api/2.0/admin/finance/invoices/{pay['id']}", params=invoice_pay) #print(json.dumps(res,indent=2)) return res def add_payment_splynx(splynx_id, pi_id, pay_id, amount): stripe_pay = { "customer_id": splynx_id, "amount": amount, "date": str(datetime.now().strftime('%Y-%m-%d')), "field_1": pi_id, "field_2": f"Payment_ID (Batch): {pay_id}" } res = splynx.post(url="/api/2.0/admin/finance/payments", params=stripe_pay) if res: return res['id'] else: return False def handle_database_operation(operation_func, operation_name): """ Reusable function to handle database operations with consistent error handling. Args: operation_func: Function that performs the database operation operation_name: String description of the operation for error messages Returns: Result of operation_func or None if failed """ try: result = operation_func() db.session.commit() return result except Exception as e: db.session.rollback() print(f"āŒ {operation_name} failed: {e}") return None def is_payment_day(start_date_string, payplan_schedule, date_format="%Y-%m-%d"): """ Check if today is a fortnightly payment day based on a start date. Args: start_date_string (str): The first payment date date_format (str): Format of the date string Returns: bool: True if today is a payment day, False otherwise """ try: if payplan_schedule == "Weekly": num_days = 7 elif payplan_schedule == "Fortnightly": num_days = 14 start_date = datetime.strptime(start_date_string, date_format) today = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) start_date = start_date.replace(hour=0, minute=0, second=0, microsecond=0) # Calculate days since start date days_since_start = (today - start_date).days # Check if it's a multiple of 14 days (fortnight) return days_since_start >= 0 and days_since_start % num_days == 0 except ValueError as e: print(f"Error parsing date: {e}") return False def query_payplan_customers(): """Query customer billing data from MySQL database and find Payment Plan customers.""" to_return = [] customers = db.session.query(PaymentPlans).filter(PaymentPlans.Enabled == True).all() for cust in customers: if is_payment_day(start_date_string=str(cust.Start_Date.strftime('%Y-%m-%d')), payplan_schedule=cust.Frequency): blah = { "customer_id": cust.Splynx_ID, "stripe_customer_id": cust.Stripe_Customer_ID, "deposit": cust.Amount*-1, "stripe_pm": cust.Stripe_Payment_Method, "paymentplan_id": cust.id } to_return.append(blah) return to_return def query_splynx_customers(pm): """Query customer billing data from MySQL database.""" connection = None try: # Connect to MySQL database connection = pymysql.connect( host=Config.MYSQL_CONFIG['host'], database=Config.MYSQL_CONFIG['database'], user=Config.MYSQL_CONFIG['user'], password=Config.MYSQL_CONFIG['password'], port=Config.MYSQL_CONFIG['port'], autocommit=False, cursorclass=pymysql.cursors.DictCursor # Return results as dictionaries ) print("āœ… Connected to MySQL database successfully") print(f"Database: {Config.MYSQL_CONFIG['database']} on {Config.MYSQL_CONFIG['host']}") print("-" * 80) ## Payment Method: ## 2 - Direct Debit (Automatic) ## 3 - Card Payment (Automatic) ## 9 - Payment Plan # Execute the query query = """ SELECT cb.customer_id, cb.deposit, cb.payment_method, pad.field_1 AS stripe_customer_id FROM customer_billing cb LEFT OUTER JOIN payment_account_data pad ON cb.customer_id = pad.customer_id WHERE cb.payment_method = %s AND cb.deposit < %s ORDER BY cb.payment_method ASC LIMIT %s """ with connection.cursor() as cursor: cursor.execute(query, (pm, Config.DEPOSIT_THRESHOLD, Config.DEFAULT_QUERY_LIMIT)) #cursor.execute(query, (Config.DEFAULT_QUERY_LIMIT)) results = cursor.fetchall() if results: print(f"šŸ“Š Found {len(results)} rows:") return results else: print("ā„¹ļø No rows found matching the criteria") return False except pymysql.Error as e: print(f"āŒ MySQL Error: {e}") sys.exit(1) except Exception as e: print(f"āŒ Unexpected Error: {e}") sys.exit(1) finally: if connection: connection.close() print("\nšŸ”’ MySQL connection closed") def addInitialPayments(customers, batch_id): added = {"added": 0, "failed": 0} payments_to_add = [] # Prepare all payments first for cust in customers: if PROCESS_LIVE: stripe_customer_id = cust['stripe_customer_id'] else: #stripe_customer_id = cust['stripe_customer_id'] stripe_customer_id = test_stripe_customers[random.randint(1, len(test_stripe_customers)-1)] add_payer = Payments( PaymentBatch_ID = batch_id, Splynx_ID = cust['customer_id'], Stripe_Customer_ID = stripe_customer_id, Payment_Amount = float(cust['deposit'])*-1, Stripe_Payment_Method = cust.get('stripe_pm', None), PaymentPlan_ID = cust.get('paymentplan_id', None) ) payments_to_add.append(add_payer) db.session.add(add_payer) # Atomic commit for entire batch try: db.session.commit() added["added"] = len(payments_to_add) print(f"āœ… Successfully added {len(payments_to_add)} payments to batch {batch_id}") except Exception as e: db.session.rollback() added["failed"] = len(payments_to_add) print(f"āŒ addInitialPayments failed for entire batch {batch_id}: {e}") print(f"Plutus DB: {json.dumps(added,indent=2)}\n") def addPaymentBatch(): """Create a new payment batch and return its ID.""" add_batch = PaymentBatch() try: db.session.add(add_batch) db.session.commit() return add_batch.id except Exception as e: db.session.rollback() print(f"āŒ addPaymentBatch failed: {e}") return None def processPaymentResult(pay_id, result, key): if key == "pay": payment = db.session.query(Payments).filter(Payments.id == pay_id).first() elif key == "singlepay": payment = db.session.query(SinglePayments).filter(SinglePayments.id == pay_id).first() try: if result.get('error') and not result.get('needs_fee_update'): payment.Error = f"Error Type: {result['error_type']}\nError: {result['error']}" payment.Success = result['success'] payment.PI_JSON = json.dumps(result) else: if result.get('needs_fee_update'): payment.PI_FollowUp = True payment.Payment_Intent = result['payment_intent_id'] payment.Success = result['success'] if result['success'] and PROCESS_LIVE: find_pay_splynx_invoices(payment.Splynx_ID) add_payment_splynx( splynx_id=payment.Splynx_ID, pi_id=result['payment_intent_id'], pay_id=payment.id, amount=payment.Payment_Amount ) if result.get('payment_method_type') == "card": payment.Payment_Method = result['estimated_fee_details']['card_display_brand'] elif result.get('payment_method_type') == "au_becs_debit": payment.Payment_Method = result['payment_method_type'] if payment.PI_JSON: combined = {**json.loads(payment.PI_JSON), **result} payment.PI_JSON = json.dumps(combined) else: payment.PI_JSON = json.dumps(result) if result.get('fee_details'): payment.Fee_Total = result['fee_details']['total_fee'] for fee_type in result['fee_details']['fee_breakdown']: if fee_type['type'] == "tax": payment.Fee_Tax = fee_type['amount'] elif fee_type['type'] == "stripe_fee": payment.Fee_Stripe = fee_type['amount'] except Exception as e: print(f"processPaymentResult: {e}\n{json.dumps(result)}") payment.PI_FollowUp = True def _update_payment(): return True # Just need to trigger commit, payment is already modified handle_database_operation(_update_payment, "processPaymentResult") # Thread lock for database operations db_lock = threading.Lock() def process_single_payment(processor, payment_data): """ Thread-safe function to process a single payment. Args: processor: StripePaymentProcessor instance payment_data: Dict containing payment information Returns: Dict with payment result and metadata """ try: # Process payment with Stripe (thread-safe) result = processor.process_payment( customer_id=payment_data['customer_id'], amount=payment_data['amount'], currency=payment_data['currency'], description=payment_data['description'], stripe_pm=payment_data['stripe_pm'] ) # Return result with payment ID for database update return { 'payment_id': payment_data['payment_id'], 'result': result, 'success': True } except Exception as e: print(f"āŒ Payment processing failed for payment ID {payment_data['payment_id']}: {e}") return { 'payment_id': payment_data['payment_id'], 'result': None, 'success': False, 'error': str(e) } def update_single_payment_result(payment_id, result): """ Thread-safe immediate update of single payment result to database. Commits immediately to ensure data safety. Args: payment_id: ID of the payment to update result: Payment processing result """ with db_lock: try: if result: processPaymentResult(pay_id=payment_id, result=result, key="pay") print(f"āœ… Payment {payment_id} result committed to database") else: print(f"āš ļø No result to commit for payment {payment_id}") except Exception as e: print(f"āŒ Failed to update payment {payment_id}: {e}") def process_batch_mode(processor): """Handle batch processing for Direct Debit and Card payments.""" to_run_batches = [] payment_methods = [PAYMENT_METHOD_DIRECT_DEBIT, PAYMENT_METHOD_CARD] for pm in payment_methods: batch_id = addPaymentBatch() if batch_id is not None: to_run_batches.append(batch_id) customers = query_splynx_customers(pm) addInitialPayments(customers=customers, batch_id=batch_id) else: print(f"āŒ Failed to create batch for payment method {pm}") return to_run_batches def process_payplan_mode(processor): """Handle payment plan processing.""" to_run_batches = [] batch_id = addPaymentBatch() if batch_id is not None: to_run_batches.append(batch_id) customers = query_payplan_customers() addInitialPayments(customers=customers, batch_id=batch_id) else: print(f"āŒ Failed to create batch for payment plan processing") return to_run_batches def execute_payment_batches(processor, batch_ids): """Execute payments for all provided batch IDs using safe threading with immediate commits.""" if not batch_ids: print("āš ļø No valid batches to process") return max_threads = Config.MAX_PAYMENT_THREADS for batch in batch_ids: if batch is None: print("āš ļø Skipping None batch ID") continue cust_pay = db.session.query(Payments).filter(Payments.PaymentBatch_ID == batch).all() if not cust_pay: print(f"ā„¹ļø No payments found for batch {batch}") continue print(f"šŸ”„ Processing {len(cust_pay)} payments in batch {batch} using {max_threads} threads") print(f"šŸ“Š Safety Mode: Each payment will be committed immediately to database") # Process payments in smaller chunks to avoid timeout issues processed_count = 0 failed_count = 0 # Process payments in chunks chunk_size = max_threads * 2 # Process 2x thread count at a time for i in range(0, len(cust_pay), chunk_size): chunk = cust_pay[i:i + chunk_size] print(f"šŸ”„ Processing chunk {i//chunk_size + 1}: payments {i+1}-{min(i+chunk_size, len(cust_pay))}") # Prepare payment data for this chunk payment_tasks = [] for pay in chunk: if PROCESS_LIVE: customer_id = pay.Stripe_Customer_ID else: customer_id = pay.Stripe_Customer_ID #customer_id = test_stripe_customers[random.randint(1, len(test_stripe_customers)-1)] payment_data = { 'payment_id': pay.id, 'customer_id': customer_id, 'amount': pay.Payment_Amount, 'currency': "aud", 'description': f"Payment ID: {pay.id} - Splynx ID: {pay.Splynx_ID}", 'stripe_pm': pay.Stripe_Payment_Method } print(f"payment_data: {json.dumps(payment_data,indent=2)}") payment_tasks.append(payment_data) # Process this chunk with ThreadPoolExecutor with ThreadPoolExecutor(max_workers=max_threads) as executor: # Submit tasks for this chunk future_to_payment = { executor.submit(process_single_payment, processor, task): task for task in payment_tasks } # Process results as they complete (NO TIMEOUT on as_completed) for future in as_completed(future_to_payment): try: result = future.result(timeout=60) # Individual payment timeout if result['success'] and result['result']: # IMMEDIATELY commit each successful payment to database update_single_payment_result(result['payment_id'], result['result']) processed_count += 1 print(f"āœ… Payment {result['payment_id']} processed and committed ({processed_count}/{len(cust_pay)})") else: failed_count += 1 print(f"āŒ Payment {result['payment_id']} failed ({failed_count} failures total)") except Exception as e: payment_data = future_to_payment[future] failed_count += 1 print(f"āŒ Thread exception for payment {payment_data['payment_id']}: {e}") print(f"šŸ“Š Chunk completed: {processed_count} processed, {failed_count} failed") print(f"āœ… Batch {batch} completed: {processed_count}/{len(cust_pay)} payments processed successfully") def process_payintent_mode(processor): """Handle payment intent follow-up processing.""" to_check = { "pay": db.session.query(Payments).filter(Payments.PI_FollowUp == True).all(), "singlepay": db.session.query(SinglePayments).filter(SinglePayments.PI_FollowUp == True).all(), } #pis = db.session.query(Payments).filter(Payments.PI_FollowUp == True).all() #to_check.append(pis) #pis = db.session.query(SinglePayments).filter(SinglePayments.PI_FollowUp == True).all() #to_check.append(pis) for key, value in to_check.items(): print(value) for pi in value: intent_result = processor.check_payment_intent(pi.Payment_Intent) print(json.dumps(intent_result, indent=2)) if intent_result['status'] == "succeeded": pi.PI_FollowUp_JSON = json.dumps(intent_result) pi.PI_FollowUp = False pi.PI_Last_Check = datetime.now() processPaymentResult(pay_id=pi.id, result=intent_result, key=key) else: pi.PI_FollowUp_JSON = json.dumps(intent_result) pi.PI_Last_Check = datetime.now() db.session.commit() if __name__ == "__main__": ## Payment Method: ## 2 - Direct Debit (Automatic) ## 3 - Card Payment (Automatic) ## 9 - Payment Plan ### Running Mode ## batch = Monthly Direct Debit/Credit Cards ## payintent = Check outstanding Payment Intents and update ## payplan = Check for Payment Plans to run try: if sys.argv[1] == "batch": running_mode = "batch" elif sys.argv[1] == "payintent": running_mode = "payintent" elif sys.argv[1] == "payplan": running_mode = "payplan" else: print(f"āŒ Invalid running mode: {sys.argv[1]}") print("Valid modes: batch, payintent, payplan") sys.exit(1) try: if sys.argv[2] == "live": PROCESS_LIVE = True except: print("Processing payments against Sandbox") except IndexError: print("ā„¹ļø No running mode specified, defaulting to 'payintent'") running_mode = "payintent" # Create Flask application context app = create_app() processor = StripePaymentProcessor(api_key=api_key, enable_logging=True) with app.app_context(): if running_mode == "batch": batch_ids = process_batch_mode(processor) execute_payment_batches(processor, batch_ids) elif running_mode == "payplan": batch_ids = process_payplan_mode(processor) execute_payment_batches(processor, batch_ids) elif running_mode == "payintent": process_payintent_mode(processor)