You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

1020 lines
40 KiB

from flask import Blueprint, render_template, request, jsonify, flash, redirect, url_for
from flask_login import login_required, current_user
from sqlalchemy import func, case
import json
import pymysql
from app import db
from models import PaymentBatch, Payments, SinglePayments, PaymentPlans
from splynx import Splynx, SPLYNX_URL, SPLYNX_KEY, SPLYNX_SECRET
from stripe_payment_processor import StripePaymentProcessor
from config import Config
from services import log_activity
splynx = Splynx(url=SPLYNX_URL, key=SPLYNX_KEY, secret=SPLYNX_SECRET)
def processPaymentResult(pay_id, result, key):
"""Process payment result and update database record."""
from datetime import datetime
if key == "pay":
payment = db.session.query(Payments).filter(Payments.id == pay_id).first()
elif key == "singlepay":
payment = db.session.query(SinglePayments).filter(SinglePayments.id == pay_id).first()
try:
if result.get('error') and not result.get('needs_fee_update'):
payment.Error = f"Error Type: {result['error_type']}\nError: {result['error']}"
payment.Success = result['success']
payment.PI_JSON = json.dumps(result)
else:
if result.get('needs_fee_update'):
payment.PI_FollowUp = True
payment.Payment_Intent = result['payment_intent_id']
payment.Success = result['success']
if result['success'] and Config.PROCESS_LIVE and key == "singlepay":
# Only update Splynx for successful single payments in live mode
find_pay_splynx_invoices(payment.Splynx_ID)
add_payment_splynx(
splynx_id=payment.Splynx_ID,
pi_id=result['payment_intent_id'],
pay_id=payment.id,
amount=payment.Payment_Amount
)
if result.get('payment_method_type') == "card":
payment.Payment_Method = result['estimated_fee_details']['card_display_brand']
elif result.get('payment_method_type') == "au_becs_debit":
payment.Payment_Method = result['payment_method_type']
if payment.PI_JSON:
combined = {**json.loads(payment.PI_JSON), **result}
payment.PI_JSON = json.dumps(combined)
else:
payment.PI_JSON = json.dumps(result)
if result.get('fee_details'):
payment.Fee_Total = result['fee_details']['total_fee']
for fee_type in result['fee_details']['fee_breakdown']:
if fee_type['type'] == "tax":
payment.Fee_Tax = fee_type['amount']
elif fee_type['type'] == "stripe_fee":
payment.Fee_Stripe = fee_type['amount']
except Exception as e:
print(f"processPaymentResult error: {e}\n{json.dumps(result)}")
payment.PI_FollowUp = True
def find_pay_splynx_invoices(splynx_id):
"""Mark Splynx invoices as paid for the given customer ID."""
result = splynx.get(url=f"/api/2.0/admin/finance/invoices?main_attributes[customer_id]={splynx_id}&main_attributes[status]=not_paid")
invoice_pay = {
"status": "paid"
}
for pay in result:
res = splynx.put(url=f"/api/2.0/admin/finance/invoices/{pay['id']}", params=invoice_pay)
return res
def add_payment_splynx(splynx_id, pi_id, pay_id, amount):
"""Add a payment record to Splynx."""
from datetime import datetime
stripe_pay = {
"customer_id": splynx_id,
"amount": amount,
"date": str(datetime.now().strftime('%Y-%m-%d')),
"field_1": pi_id,
"field_2": f"Single Payment_ID: {pay_id}"
}
res = splynx.post(url="/api/2.0/admin/finance/payments", params=stripe_pay)
if res:
return res['id']
else:
return False
def get_stripe_customer_id(splynx_id):
"""Get Stripe customer ID from MySQL for a given Splynx customer ID."""
connection = None
try:
# Connect to MySQL database
connection = pymysql.connect(
host=Config.MYSQL_CONFIG['host'],
database=Config.MYSQL_CONFIG['database'],
user=Config.MYSQL_CONFIG['user'],
password=Config.MYSQL_CONFIG['password'],
port=Config.MYSQL_CONFIG['port'],
autocommit=False,
cursorclass=pymysql.cursors.DictCursor
)
query = """
SELECT
cb.customer_id,
cb.deposit,
cb.payment_method,
pad.field_1 AS stripe_customer_id
FROM customer_billing cb
LEFT OUTER JOIN payment_account_data pad ON cb.customer_id = pad.customer_id
WHERE cb.customer_id = %s
ORDER BY cb.payment_method ASC
LIMIT 1
"""
with connection.cursor() as cursor:
cursor.execute(query, (splynx_id,))
result = cursor.fetchone()
if result and result['stripe_customer_id']:
return result['stripe_customer_id']
else:
return None
except pymysql.Error as e:
print(f"MySQL Error in get_stripe_customer_id: {e}")
return None
except Exception as e:
print(f"Unexpected Error in get_stripe_customer_id: {e}")
return None
finally:
if connection:
connection.close()
def get_stripe_payment_methods(stripe_customer_id):
"""Get payment methods for a Stripe customer."""
try:
# Initialize Stripe processor
if Config.PROCESS_LIVE:
api_key = "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"
else:
api_key = "sk_test_51Rsi9gPfYyg6zE1S4ZpaPI1ehpbsHRLsGhysYXKwAWCZ7w6KYgVXy4pV095Nd8tyjUw9AkBhqfxqsIiiWJg5fexI00Dw36vnvx"
processor = StripePaymentProcessor(api_key=api_key, enable_logging=False)
# Get payment methods from Stripe
stripe_customer_id = "cus_SoMyDihTxRsa7U"
payment_methods = processor.get_payment_methods(stripe_customer_id)
return payment_methods
except Exception as e:
print(f"Error fetching payment methods: {e}")
return []
main_bp = Blueprint('main', __name__)
@main_bp.app_template_filter('format_json')
def format_json_filter(json_string):
"""Format JSON string with proper indentation."""
if not json_string:
return ''
try:
# Parse the JSON string and format it with indentation
parsed = json.loads(json_string)
return json.dumps(parsed, indent=2, ensure_ascii=False)
except (json.JSONDecodeError, TypeError):
# If it's not valid JSON, return as-is
return json_string
@main_bp.app_template_filter('currency')
def currency_filter(value):
"""Format number as currency with digit grouping."""
if value is None:
return '$0.00'
try:
# Convert to float if it's not already
num_value = float(value)
# Format with comma separators and 2 decimal places
return f"${num_value:,.2f}"
except (ValueError, TypeError):
return '$0.00'
@main_bp.route('/')
@login_required
def index():
return render_template('main/index.html')
@main_bp.route('/batches')
@login_required
def batch_list():
"""Display list of all payment batches with summary information."""
# Query all batches with summary statistics
batches = db.session.query(
PaymentBatch.id,
PaymentBatch.Created,
func.count(Payments.id).label('payment_count'),
func.sum(Payments.Payment_Amount).label('total_amount'),
func.sum(Payments.Fee_Stripe).label('total_fees'),
func.sum(case((Payments.Success == True, 1), else_=0)).label('successful_count'),
func.sum(case((Payments.Success == False, 1), else_=0)).label('failed_count'),
func.sum(case((Payments.Error.isnot(None), 1), else_=0)).label('error_count')
).outerjoin(Payments, PaymentBatch.id == Payments.PaymentBatch_ID)\
.group_by(PaymentBatch.id, PaymentBatch.Created)\
.order_by(PaymentBatch.Created.desc()).all()
return render_template('main/batch_list.html', batches=batches)
@main_bp.route('/batch/<int:batch_id>')
@login_required
def batch_detail(batch_id):
"""Display detailed view of a specific payment batch."""
# Get batch information
batch = PaymentBatch.query.get_or_404(batch_id)
# Get summary statistics for this batch
summary = db.session.query(
func.count(Payments.id).label('payment_count'),
func.sum(Payments.Payment_Amount).label('total_amount'),
func.sum(Payments.Fee_Stripe).label('total_fees'),
func.sum(case((Payments.Success == True, 1), else_=0)).label('successful_count'),
func.sum(case((Payments.Success == False, 1), else_=0)).label('failed_count'),
func.sum(case((Payments.Error.isnot(None), 1), else_=0)).label('error_count')
).filter(Payments.PaymentBatch_ID == batch_id).first()
# Get all payments for this batch ordered by Splynx_ID
payments = Payments.query.filter_by(PaymentBatch_ID=batch_id)\
.order_by(Payments.Splynx_ID.asc()).all()
return render_template('main/batch_detail.html',
batch=batch,
summary=summary,
payments=payments)
@main_bp.route('/single-payment')
@login_required
def single_payment():
"""Display single payment form page."""
return render_template('main/single_payment.html')
@main_bp.route('/single-payments')
@login_required
def single_payments_list():
"""Display list of all single payments with summary information."""
# Query all single payments with user information
from models import Users
payments = db.session.query(
SinglePayments.id,
SinglePayments.Splynx_ID,
SinglePayments.Stripe_Customer_ID,
SinglePayments.Payment_Intent,
SinglePayments.Payment_Method,
SinglePayments.Payment_Amount,
SinglePayments.Fee_Stripe,
SinglePayments.Fee_Total,
SinglePayments.Success,
SinglePayments.Error,
SinglePayments.PI_JSON,
SinglePayments.Created,
Users.FullName.label('processed_by')
).outerjoin(Users, SinglePayments.Who == Users.id)\
.order_by(SinglePayments.Created.desc()).all()
# Calculate summary statistics
total_payments = len(payments)
successful_payments = sum(1 for p in payments if p.Success == True)
failed_payments = sum(1 for p in payments if p.Success == False)
pending_payments = sum(1 for p in payments if p.Success == None)
total_amount = sum(p.Payment_Amount or 0 for p in payments if p.Success == True)
total_fees = sum(p.Fee_Stripe or 0 for p in payments if p.Success == True)
summary = {
'total_payments': total_payments,
'successful_payments': successful_payments,
'failed_payments': failed_payments,
'pending_payments': pending_payments,
'total_amount': total_amount,
'total_fees': total_fees,
'success_rate': (successful_payments / total_payments * 100) if total_payments > 0 else 0
}
return render_template('main/single_payments_list.html', payments=payments, summary=summary)
@main_bp.route('/single-payment/detail/<int:payment_id>')
@login_required
def single_payment_detail(payment_id):
"""Display detailed view of a specific single payment."""
# Get payment information
from models import Users
payment = db.session.query(
SinglePayments.id,
SinglePayments.Splynx_ID,
SinglePayments.Stripe_Customer_ID,
SinglePayments.Payment_Intent,
SinglePayments.PI_FollowUp,
SinglePayments.PI_Last_Check,
SinglePayments.Payment_Method,
SinglePayments.Fee_Tax,
SinglePayments.Fee_Stripe,
SinglePayments.Fee_Total,
SinglePayments.Payment_Amount,
SinglePayments.PI_JSON,
SinglePayments.PI_FollowUp_JSON,
SinglePayments.Error,
SinglePayments.Success,
SinglePayments.Created,
Users.FullName.label('processed_by')
).outerjoin(Users, SinglePayments.Who == Users.id)\
.filter(SinglePayments.id == payment_id).first()
if not payment:
flash('Payment not found.', 'error')
return redirect(url_for('main.single_payments_list'))
return render_template('main/single_payment_detail.html', payment=payment)
@main_bp.route('/payment/detail/<int:payment_id>')
@login_required
def payment_detail(payment_id):
"""Display detailed view of a specific batch payment."""
# Get payment information with all fields needed for the detail view
payment = db.session.query(Payments).filter(Payments.id == payment_id).first()
if not payment:
flash('Payment not found.', 'error')
return redirect(url_for('main.batch_list'))
# Log the payment detail view access
log_activity(
user_id=current_user.id,
action="view_payment_detail",
entity_type="payment",
entity_id=payment_id,
details=f"Viewed batch payment detail for payment ID {payment_id}"
)
return render_template('main/payment_detail.html', payment=payment)
@main_bp.route('/single-payment/check-intent/<int:payment_id>', methods=['POST'])
@login_required
def check_payment_intent(payment_id):
"""Check the status of a payment intent and update the record."""
from datetime import datetime
try:
# Get the payment record
payment = SinglePayments.query.get_or_404(payment_id)
if not payment.Payment_Intent:
return jsonify({'success': False, 'error': 'No payment intent found'}), 400
# Initialize Stripe processor
if Config.PROCESS_LIVE:
api_key = "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"
else:
api_key = "sk_test_51Rsi9gPfYyg6zE1S4ZpaPI1ehpbsHRLsGhysYXKwAWCZ7w6KYgVXy4pV095Nd8tyjUw9AkBhqfxqsIiiWJg5fexI00Dw36vnvx"
processor = StripePaymentProcessor(api_key=api_key, enable_logging=True)
# Check payment intent status
intent_result = processor.check_payment_intent(payment.Payment_Intent)
print(json.dumps(intent_result, indent=2))
if intent_result['status'] == "succeeded":
payment.PI_FollowUp_JSON = json.dumps(intent_result)
payment.PI_FollowUp = False
payment.PI_Last_Check = datetime.now()
processPaymentResult(pay_id=payment.id, result=intent_result, key="singlepay")
else:
payment.PI_FollowUp_JSON = json.dumps(intent_result)
payment.PI_Last_Check = datetime.now()
db.session.commit()
return jsonify({
'success': True,
'status': intent_result['status'],
'payment_succeeded': intent_result['status'] == "succeeded",
'message': f'Payment intent status: {intent_result["status"]}'
})
except Exception as e:
db.session.rollback()
print(f"Check payment intent error: {e}")
return jsonify({'success': False, 'error': 'Failed to check payment intent'}), 500
@main_bp.route('/single-payment/process', methods=['POST'])
@login_required
def process_single_payment():
"""Process a single payment using Stripe."""
try:
# Get form data
splynx_id = request.form.get('splynx_id')
amount = request.form.get('amount')
# Validate inputs
if not splynx_id or not amount:
return jsonify({'success': False, 'error': 'Missing required fields'}), 400
try:
splynx_id = int(splynx_id)
amount = float(amount)
except (ValueError, TypeError):
return jsonify({'success': False, 'error': 'Invalid input format'}), 400
if amount <= 0:
return jsonify({'success': False, 'error': 'Amount must be greater than 0'}), 400
# Get customer details from Splynx
customer_data = splynx.Customer(splynx_id)
if not customer_data:
return jsonify({'success': False, 'error': 'Customer not found in Splynx'}), 404
# Get Stripe customer ID from MySQL
stripe_customer_id = get_stripe_customer_id(splynx_id)
if not stripe_customer_id:
return jsonify({'success': False, 'error': 'Customer does not have a valid Stripe payment method'}), 400
# Create payment record in database
payment_record = SinglePayments(
Splynx_ID=splynx_id,
Stripe_Customer_ID=stripe_customer_id,
Payment_Amount=amount,
Who=current_user.id
)
db.session.add(payment_record)
db.session.commit() # Commit to get the payment ID
# Initialize Stripe processor
if Config.PROCESS_LIVE:
print("LIVE Payment")
api_key = "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"
else:
print("SANDBOX Payment")
api_key = "sk_test_51Rsi9gPfYyg6zE1S4ZpaPI1ehpbsHRLsGhysYXKwAWCZ7w6KYgVXy4pV095Nd8tyjUw9AkBhqfxqsIiiWJg5fexI00Dw36vnvx"
# Use test customer for sandbox
import random
test_customers = ['cus_SoNAgAbkbFo8ZY', 'cus_SoMyDihTxRsa7U', 'cus_SoQedaG3q2ecKG', 'cus_SoMVPWxdYstYbr']
stripe_customer_id = random.choice(test_customers)
processor = StripePaymentProcessor(api_key=api_key, enable_logging=True)
print(f"stripe_customer_id: {stripe_customer_id}")
# Process payment
result = processor.process_payment(
customer_id=stripe_customer_id,
amount=amount,
currency="aud",
description=f"Single Payment - Splynx ID: {splynx_id} - Payment ID: {payment_record.id}"
)
# Update payment record with results
payment_record.Success = result.get('success', False)
payment_record.Payment_Intent = result.get('payment_intent_id')
payment_record.PI_JSON = json.dumps(result)
if result.get('error') and not result.get('needs_fee_update'):
payment_record.Error = f"Error Type: {result.get('error_type', 'Unknown')}\nError: {result['error']}"
if result.get('needs_fee_update'):
payment_record.PI_FollowUp = True
if result.get('payment_method_type') == "card":
payment_record.Payment_Method = result.get('estimated_fee_details', {}).get('card_display_brand', 'card')
elif result.get('payment_method_type') == "au_becs_debit":
payment_record.Payment_Method = result['payment_method_type']
if result.get('fee_details'):
payment_record.Fee_Total = result['fee_details']['total_fee']
for fee_type in result['fee_details']['fee_breakdown']:
if fee_type['type'] == "tax":
payment_record.Fee_Tax = fee_type['amount']
elif fee_type['type'] == "stripe_fee":
payment_record.Fee_Stripe = fee_type['amount']
# Commit the updated payment record
db.session.commit()
# Check if payment was actually successful
if result.get('success'):
# Payment succeeded - update Splynx if in live mode
if Config.PROCESS_LIVE:
try:
# Mark invoices as paid in Splynx
find_pay_splynx_invoices(splynx_id)
# Add payment record to Splynx
splynx_payment_id = add_payment_splynx(
splynx_id=splynx_id,
pi_id=result.get('payment_intent_id'),
pay_id=payment_record.id,
amount=amount
)
if splynx_payment_id:
print(f"✅ Splynx payment record created: {splynx_payment_id}")
else:
print("⚠️ Failed to create Splynx payment record")
except Exception as splynx_error:
print(f"❌ Error updating Splynx: {splynx_error}")
# Continue processing even if Splynx update fails
# Log successful payment
log_activity(
current_user.id,
"PAYMENT_SUCCESS",
"SinglePayment",
payment_record.id,
details=f"Single payment successful: ${amount:,.2f} for customer {splynx_id} ({customer_data.get('name', 'Unknown')})"
)
# Payment succeeded
return jsonify({
'success': True,
'payment_success': True,
'payment_id': payment_record.id,
'payment_intent': result.get('payment_intent_id'),
'amount': amount,
'customer_name': customer_data.get('name'),
'message': f'Payment processed successfully for {customer_data.get("name")}'
})
else:
# Payment failed - log the failure
log_activity(
current_user.id,
"PAYMENT_FAILED",
"SinglePayment",
payment_record.id,
details=f"Single payment failed: ${amount:,.2f} for customer {splynx_id} ({customer_data.get('name', 'Unknown')}) - {result.get('error', 'Unknown error')}"
)
# Payment failed - return the specific error
if result.get('needs_fee_update'):
fee_update = True
else:
fee_update = False
return jsonify({
'success': False,
'payment_success': False,
'fee_update': fee_update,
'payment_id': payment_record.id,
'error': result.get('error', 'Payment failed'),
'error_type': result.get('error_type', 'unknown_error'),
'stripe_error': result.get('error', 'Unknown payment error'),
'customer_name': customer_data.get('name')
}), 422 # 422 Unprocessable Entity for business logic failures
except Exception as e:
db.session.rollback()
print(f"Single payment processing error: {e}")
return jsonify({'success': False, 'error': 'Payment processing failed. Please try again.'}), 500
@main_bp.route('/payment-plans')
@login_required
def payment_plans_list():
"""Display list of all payment plans with summary information."""
from models import Users
# Query all payment plans with user information
plans = db.session.query(
PaymentPlans.id,
PaymentPlans.Splynx_ID,
PaymentPlans.Amount,
PaymentPlans.Frequency,
PaymentPlans.Start_Date,
PaymentPlans.Stripe_Payment_Method,
PaymentPlans.Enabled,
PaymentPlans.Created,
Users.FullName.label('created_by')
).outerjoin(Users, PaymentPlans.Who == Users.id)\
.order_by(PaymentPlans.Created.desc()).all()
# Calculate summary statistics
total_plans = len(plans)
active_plans = sum(1 for p in plans if p.Enabled == True)
inactive_plans = sum(1 for p in plans if p.Enabled == False)
total_recurring_amount = sum(p.Amount or 0 for p in plans if p.Enabled == True)
summary = {
'total_plans': total_plans,
'active_plans': active_plans,
'inactive_plans': inactive_plans,
'total_recurring_amount': total_recurring_amount
}
return render_template('main/payment_plans_list.html', plans=plans, summary=summary)
@main_bp.route('/payment-plans/create')
@login_required
def payment_plans_create():
"""Display payment plan creation form."""
return render_template('main/payment_plans_form.html', edit_mode=False)
@main_bp.route('/payment-plans/create', methods=['POST'])
@login_required
def payment_plans_create_post():
"""Handle payment plan creation."""
try:
# Get form data
splynx_id = request.form.get('splynx_id')
amount = request.form.get('amount')
frequency = request.form.get('frequency')
start_date = request.form.get('start_date')
stripe_payment_method = request.form.get('stripe_payment_method')
# Validate inputs
if not all([splynx_id, amount, frequency, start_date, stripe_payment_method]):
flash('All fields are required.', 'error')
return redirect(url_for('main.payment_plans_create'))
try:
splynx_id = int(splynx_id)
amount = float(amount)
from datetime import datetime
start_date = datetime.strptime(start_date, '%Y-%m-%d')
except (ValueError, TypeError):
flash('Invalid input format.', 'error')
return redirect(url_for('main.payment_plans_create'))
if amount <= 0:
flash('Amount must be greater than 0.', 'error')
return redirect(url_for('main.payment_plans_create'))
# Validate customer exists in Splynx
customer_data = splynx.Customer(splynx_id)
if not customer_data:
flash('Customer not found in Splynx.', 'error')
return redirect(url_for('main.payment_plans_create'))
# Create payment plan record
payment_plan = PaymentPlans(
Splynx_ID=splynx_id,
Amount=amount,
Frequency=frequency,
Start_Date=start_date,
Stripe_Payment_Method=stripe_payment_method,
Who=current_user.id
)
db.session.add(payment_plan)
db.session.commit()
# Log payment plan creation
log_activity(
current_user.id,
"PAYPLAN_CREATED",
"PaymentPlan",
payment_plan.id,
details=f"Payment plan created: ${amount:,.2f} {frequency} for customer {splynx_id} ({customer_data.get('name', 'Unknown')})"
)
flash(f'Payment plan created successfully for {customer_data.get("name", "customer")}.', 'success')
return redirect(url_for('main.payment_plans_detail', plan_id=payment_plan.id))
except Exception as e:
db.session.rollback()
print(f"Payment plan creation error: {e}")
flash('Failed to create payment plan. Please try again.', 'error')
return redirect(url_for('main.payment_plans_create'))
@main_bp.route('/payment-plans/edit/<int:plan_id>')
@login_required
def payment_plans_edit(plan_id):
"""Display payment plan edit form."""
plan = PaymentPlans.query.get_or_404(plan_id)
return render_template('main/payment_plans_form.html', plan=plan, edit_mode=True)
@main_bp.route('/payment-plans/edit/<int:plan_id>', methods=['POST'])
@login_required
def payment_plans_edit_post(plan_id):
"""Handle payment plan updates."""
try:
plan = PaymentPlans.query.get_or_404(plan_id)
# Get form data
amount = request.form.get('amount')
frequency = request.form.get('frequency')
start_date = request.form.get('start_date')
stripe_payment_method = request.form.get('stripe_payment_method')
# Validate inputs
if not all([amount, frequency, start_date, stripe_payment_method]):
flash('All fields are required.', 'error')
return redirect(url_for('main.payment_plans_edit', plan_id=plan_id))
try:
amount = float(amount)
from datetime import datetime
start_date = datetime.strptime(start_date, '%Y-%m-%d')
except (ValueError, TypeError):
flash('Invalid input format.', 'error')
return redirect(url_for('main.payment_plans_edit', plan_id=plan_id))
if amount <= 0:
flash('Amount must be greater than 0.', 'error')
return redirect(url_for('main.payment_plans_edit', plan_id=plan_id))
# Update payment plan
plan.Amount = amount
plan.Frequency = frequency
plan.Start_Date = start_date
plan.Stripe_Payment_Method = stripe_payment_method
db.session.commit()
# Log payment plan update
log_activity(
current_user.id,
"PAYPLAN_UPDATED",
"PaymentPlan",
plan.id,
details=f"Payment plan updated: ${amount:,.2f} {frequency} starting {start_date.strftime('%Y-%m-%d')}"
)
flash('Payment plan updated successfully.', 'success')
return redirect(url_for('main.payment_plans_detail', plan_id=plan.id))
except Exception as e:
db.session.rollback()
print(f"Payment plan update error: {e}")
flash('Failed to update payment plan. Please try again.', 'error')
return redirect(url_for('main.payment_plans_edit', plan_id=plan_id))
@main_bp.route('/payment-plans/delete/<int:plan_id>', methods=['POST'])
@login_required
def payment_plans_delete(plan_id):
"""Handle payment plan deletion (soft delete)."""
try:
plan = PaymentPlans.query.get_or_404(plan_id)
# Soft delete by setting Enabled to False
plan.Enabled = False
db.session.commit()
flash('Payment plan has been disabled.', 'success')
return redirect(url_for('main.payment_plans_list'))
except Exception as e:
db.session.rollback()
print(f"Payment plan deletion error: {e}")
flash('Failed to disable payment plan. Please try again.', 'error')
return redirect(url_for('main.payment_plans_detail', plan_id=plan_id))
@main_bp.route('/payment-plans/toggle/<int:plan_id>', methods=['POST'])
@login_required
def payment_plans_toggle(plan_id):
"""Toggle payment plan enabled status."""
try:
plan = PaymentPlans.query.get_or_404(plan_id)
# Toggle enabled status
plan.Enabled = not plan.Enabled
db.session.commit()
# Log payment plan toggle
action = "PAYPLAN_ENABLED" if plan.Enabled else "PAYPLAN_DISABLED"
log_activity(
current_user.id,
action,
"PaymentPlan",
plan.id,
details=f"Payment plan {'enabled' if plan.Enabled else 'disabled'}: ${plan.Amount:,.2f} {plan.Frequency}"
)
status = "enabled" if plan.Enabled else "disabled"
flash(f'Payment plan has been {status}.', 'success')
return redirect(url_for('main.payment_plans_detail', plan_id=plan_id))
except Exception as e:
db.session.rollback()
print(f"Payment plan toggle error: {e}")
flash('Failed to update payment plan status. Please try again.', 'error')
return redirect(url_for('main.payment_plans_detail', plan_id=plan_id))
@main_bp.route('/payment-plans/detail/<int:plan_id>')
@login_required
def payment_plans_detail(plan_id):
"""Display detailed view of a specific payment plan."""
from models import Users
# Get payment plan with user information
plan = db.session.query(
PaymentPlans.id,
PaymentPlans.Splynx_ID,
PaymentPlans.Amount,
PaymentPlans.Frequency,
PaymentPlans.Start_Date,
PaymentPlans.Stripe_Payment_Method,
PaymentPlans.Enabled,
PaymentPlans.Created,
Users.FullName.label('created_by')
).outerjoin(Users, PaymentPlans.Who == Users.id)\
.filter(PaymentPlans.id == plan_id).first()
if not plan:
flash('Payment plan not found.', 'error')
return redirect(url_for('main.payment_plans_list'))
# Get associated single payments
associated_payments = db.session.query(
Payments.id,
Payments.Payment_Amount,
Payments.Success,
Payments.Error,
Payments.Created,
Payments.Payment_Intent)\
.filter(Payments.PaymentPlan_ID == plan_id)\
.order_by(Payments.Created.desc()).all()
return render_template('main/payment_plans_detail.html',
plan=plan,
associated_payments=associated_payments)
@main_bp.route('/api/stripe-payment-methods/<stripe_customer_id>')
@login_required
def api_stripe_payment_methods(stripe_customer_id):
"""Get Stripe payment methods for a customer."""
try:
payment_methods = get_stripe_payment_methods(stripe_customer_id)
return jsonify({'success': True, 'payment_methods': payment_methods})
except Exception as e:
print(f"Error fetching payment methods: {e}")
return jsonify({'success': False, 'error': 'Failed to fetch payment methods'}), 500
@main_bp.route('/payment/check-intent/<int:payment_id>', methods=['POST'])
@login_required
def check_batch_payment_intent(payment_id):
"""Check the status of a batch payment intent and update the record."""
from datetime import datetime
try:
# Get the payment record from Payments table (batch payments)
payment = Payments.query.get_or_404(payment_id)
if not payment.Payment_Intent:
return jsonify({'success': False, 'error': 'No payment intent found'}), 400
# Initialize Stripe processor with correct API key
if Config.PROCESS_LIVE:
api_key = Config.STRIPE_LIVE_API_KEY
else:
api_key = Config.STRIPE_TEST_API_KEY
processor = StripePaymentProcessor(api_key=api_key, enable_logging=True)
# Check payment intent status
intent_result = processor.check_payment_intent(payment.Payment_Intent)
if intent_result['status'] == "succeeded":
payment.PI_FollowUp_JSON = json.dumps(intent_result)
payment.PI_FollowUp = False
payment.PI_Last_Check = datetime.now()
payment.Success = True
if intent_result.get('charge_id'):
payment.Stripe_Charge_ID = intent_result.get('charge_id')
processPaymentResult(pay_id=payment.id, result=intent_result, key="pay")
elif intent_result['status'] == "failed":
payment.PI_FollowUp_JSON = json.dumps(intent_result)
payment.PI_FollowUp = False
payment.PI_Last_Check = datetime.now()
payment.Success = False
else:
# Still pending
payment.PI_FollowUp_JSON = json.dumps(intent_result)
payment.PI_Last_Check = datetime.now()
db.session.commit()
# Log the intent check activity
log_activity(
user_id=current_user.id,
action="check_payment_intent",
entity_type="payment",
entity_id=payment_id,
details=f"Checked payment intent {payment.Payment_Intent}, status: {intent_result['status']}"
)
return jsonify({
'success': True,
'status': intent_result['status'],
'payment_succeeded': intent_result['status'] == "succeeded",
'message': f'Payment intent status: {intent_result["status"]}'
})
except Exception as e:
db.session.rollback()
print(f"Check batch payment intent error: {e}")
return jsonify({'success': False, 'error': 'Failed to check payment intent'}), 500
@main_bp.route('/payment/refund/<int:payment_id>', methods=['POST'])
@login_required
def process_payment_refund(payment_id):
"""Process a refund for a batch payment."""
from datetime import datetime
import stripe
try:
# Get the payment record from Payments table (batch payments)
payment = Payments.query.get_or_404(payment_id)
# Validate payment can be refunded
if not payment.Success:
return jsonify({'success': False, 'error': 'Cannot refund an unsuccessful payment'}), 400
if payment.Refund:
return jsonify({'success': False, 'error': 'Payment has already been refunded'}), 400
if not payment.Stripe_Charge_ID:
return jsonify({'success': False, 'error': 'No Stripe charge ID found for this payment'}), 400
# Get refund reason from request
data = request.get_json()
reason = data.get('reason', 'requested_by_customer')
# Initialize Stripe with correct API key
if Config.PROCESS_LIVE:
stripe.api_key = Config.STRIPE_LIVE_API_KEY
else:
stripe.api_key = Config.STRIPE_TEST_API_KEY
# Create refund parameters
refund_params = {
'charge': payment.Stripe_Charge_ID,
'reason': reason
}
# Process the refund with Stripe
refund = stripe.Refund.create(**refund_params)
if refund['status'] == "succeeded":
# Update payment record with refund information
payment.Refund = True
payment.Stripe_Refund_ID = refund.id
payment.Stripe_Refund_Created = datetime.fromtimestamp(refund.created)
payment.Refund_JSON = json.dumps(refund)
db.session.commit()
# Log the refund activity
log_activity(
user_id=current_user.id,
action="process_refund",
entity_type="payment",
entity_id=payment_id,
details=f"Processed refund {refund.id} for payment {payment_id}, amount: ${refund.amount/100:.2f}"
)
return jsonify({
'success': True,
'refund_id': refund.id,
'amount_refunded': f"${refund.amount/100:.2f}",
'status': refund.status,
'message': 'Refund processed successfully'
})
else:
# Refund failed
payment.Refund = False
payment.Refund_JSON = json.dumps(refund)
db.session.commit()
return jsonify({
'success': False,
'error': f'Refund failed with status: {refund.status}'
}), 400
except stripe.error.InvalidRequestError as e:
# Handle Stripe-specific errors
error_msg = str(e)
if "has already been refunded" in error_msg:
# Mark as refunded in our database even if Stripe says it's already refunded
payment.Refund = True
payment.Stripe_Refund_Created = datetime.now()
db.session.commit()
return jsonify({'success': False, 'error': 'Payment has already been refunded in Stripe'}), 400
else:
return jsonify({'success': False, 'error': f'Stripe error: {error_msg}'}), 400
except Exception as e:
db.session.rollback()
print(f"Process payment refund error: {e}")
return jsonify({'success': False, 'error': 'Failed to process refund'}), 500
@main_bp.route('/api/splynx/<int:id>')
@login_required
def api_splynx_customer(id):
"""
Get Splynx customer information by ID
Security: Restricted to operational and financial staff who need customer data access
"""
try:
log_activity(current_user.id, "API_ACCESS", "SplynxCustomer", id,
details=f"Accessed Splynx customer API for customer {id}")
print(f"Splynx Customer API: {id}")
res = splynx.Customer(id)
if res:
log_activity(current_user.id, "API_SUCCESS", "SplynxCustomer", id,
details=f"Successfully retrieved Splynx customer {id}")
return res
else:
log_activity(current_user.id, "API_NOT_FOUND", "SplynxCustomer", id,
details=f"Splynx customer {id} not found")
return {"error": "Customer not found"}, 404
except Exception as e:
log_activity(current_user.id, "API_ERROR", "SplynxCustomer", id,
details=f"Splynx customer API error: {str(e)}")
return {"error": "Internal server error"}, 500