""" Enhanced logging configuration for Plutus Payment Processing System. This module provides structured logging with correlation IDs, performance monitoring, security event tracking, and centralized log management. """ import logging import logging.handlers import json import time import uuid from datetime import datetime, timezone from typing import Dict, Any, Optional, Union from contextlib import contextmanager from functools import wraps import threading import os # Thread-local storage for correlation IDs _thread_local = threading.local() class CorrelatedFormatter(logging.Formatter): """Custom formatter that adds correlation ID and structured data to logs.""" def format(self, record): # Add correlation ID to log record if hasattr(_thread_local, 'correlation_id'): record.correlation_id = _thread_local.correlation_id else: record.correlation_id = 'no-correlation' # Add structured data if present if hasattr(record, 'structured_data'): record.structured_data_str = json.dumps(record.structured_data, default=str) else: record.structured_data_str = '' return super().format(record) class StructuredLogger: """Enhanced logger with structured logging capabilities.""" def __init__(self, name: str): self.logger = logging.getLogger(name) self._setup_logger() def _setup_logger(self): """Configure the logger with enhanced formatting.""" if not self.logger.handlers: # Avoid duplicate handlers # Create logs directory if it doesn't exist os.makedirs('logs', exist_ok=True) # Console handler with structured formatting console_handler = logging.StreamHandler() console_formatter = CorrelatedFormatter( '%(asctime)s - [%(correlation_id)s] - %(name)s - %(levelname)s - %(message)s %(structured_data_str)s' ) console_handler.setFormatter(console_formatter) # File handler with detailed formatting file_handler = logging.handlers.RotatingFileHandler( 'logs/plutus_detailed.log', maxBytes=10*1024*1024, # 10MB backupCount=5 ) file_formatter = CorrelatedFormatter( '%(asctime)s - [%(correlation_id)s] - %(name)s - %(levelname)s - %(funcName)s:%(lineno)d - %(message)s %(structured_data_str)s' ) file_handler.setFormatter(file_formatter) self.logger.addHandler(console_handler) self.logger.addHandler(file_handler) self.logger.setLevel(logging.INFO) def info(self, message: str, **kwargs): """Log info message with optional structured data.""" extra = {'structured_data': kwargs} if kwargs else {} self.logger.info(message, extra=extra) def error(self, message: str, **kwargs): """Log error message with optional structured data.""" extra = {'structured_data': kwargs} if kwargs else {} self.logger.error(message, extra=extra) def warning(self, message: str, **kwargs): """Log warning message with optional structured data.""" extra = {'structured_data': kwargs} if kwargs else {} self.logger.warning(message, extra=extra) def debug(self, message: str, **kwargs): """Log debug message with optional structured data.""" extra = {'structured_data': kwargs} if kwargs else {} self.logger.debug(message, extra=extra) def critical(self, message: str, **kwargs): """Log critical message with optional structured data.""" extra = {'structured_data': kwargs} if kwargs else {} self.logger.critical(message, extra=extra) class SecurityLogger: """Specialized logger for security events.""" def __init__(self): self.logger = StructuredLogger('security') # Ensure logs directory exists os.makedirs('logs', exist_ok=True) # Additional security log file security_handler = logging.handlers.RotatingFileHandler( 'logs/security.log', maxBytes=5*1024*1024, # 5MB backupCount=10 ) security_formatter = CorrelatedFormatter( '%(asctime)s - SECURITY - [%(correlation_id)s] - %(levelname)s - %(message)s %(structured_data_str)s' ) security_handler.setFormatter(security_formatter) self.logger.logger.addHandler(security_handler) def log_login_attempt(self, username: str, success: bool, ip_address: str, user_agent: str = None): """Log login attempts.""" event_type = "LOGIN_SUCCESS" if success else "LOGIN_FAILED" self.logger.info( f"{event_type} for user: {username}", event_type=event_type, username=username, ip_address=ip_address, user_agent=user_agent, timestamp=datetime.now(timezone.utc).isoformat() ) def log_permission_denied(self, username: str, action: str, resource: str, ip_address: str): """Log permission denied events.""" self.logger.warning( f"PERMISSION_DENIED: User {username} attempted {action} on {resource}", event_type="PERMISSION_DENIED", username=username, action=action, resource=resource, ip_address=ip_address, timestamp=datetime.now(timezone.utc).isoformat() ) def log_payment_fraud_alert(self, payment_id: int, customer_id: str, reason: str, amount: float): """Log potential fraud alerts.""" self.logger.critical( f"FRAUD_ALERT: Payment {payment_id} for customer {customer_id}", event_type="FRAUD_ALERT", payment_id=payment_id, customer_id=customer_id, reason=reason, amount=amount, timestamp=datetime.now(timezone.utc).isoformat() ) class PerformanceLogger: """Performance monitoring logger.""" def __init__(self): self.logger = StructuredLogger('performance') # Ensure logs directory exists os.makedirs('logs', exist_ok=True) # Additional performance log file perf_handler = logging.handlers.RotatingFileHandler( 'logs/performance.log', maxBytes=10*1024*1024, # 10MB backupCount=5 ) perf_formatter = CorrelatedFormatter( '%(asctime)s - PERF - [%(correlation_id)s] - %(message)s %(structured_data_str)s' ) perf_handler.setFormatter(perf_formatter) self.logger.logger.addHandler(perf_handler) def log_request_time(self, endpoint: str, method: str, duration_ms: float, status_code: int, user_id: int = None): """Log HTTP request performance.""" self.logger.info( f"REQUEST: {method} {endpoint} - {duration_ms:.2f}ms - {status_code}", endpoint=endpoint, method=method, duration_ms=duration_ms, status_code=status_code, user_id=user_id, timestamp=datetime.now(timezone.utc).isoformat() ) def log_database_query(self, query_type: str, table: str, duration_ms: float, row_count: int = None): """Log database query performance.""" self.logger.info( f"DB_QUERY: {query_type} on {table} - {duration_ms:.2f}ms", query_type=query_type, table=table, duration_ms=duration_ms, row_count=row_count, timestamp=datetime.now(timezone.utc).isoformat() ) def log_stripe_api_call(self, operation: str, duration_ms: float, success: bool, error_code: str = None): """Log Stripe API call performance.""" status = "SUCCESS" if success else f"FAILED ({error_code})" self.logger.info( f"STRIPE_API: {operation} - {duration_ms:.2f}ms - {status}", operation=operation, duration_ms=duration_ms, success=success, error_code=error_code, timestamp=datetime.now(timezone.utc).isoformat() ) # Global logger instances app_logger = StructuredLogger('plutus.app') security_logger = SecurityLogger() performance_logger = PerformanceLogger() def get_logger(name: str) -> StructuredLogger: """Get a structured logger instance.""" return StructuredLogger(name) def set_correlation_id(correlation_id: str = None) -> str: """Set correlation ID for current thread.""" if correlation_id is None: correlation_id = str(uuid.uuid4())[:8] _thread_local.correlation_id = correlation_id return correlation_id def get_correlation_id() -> str: """Get current correlation ID.""" return getattr(_thread_local, 'correlation_id', 'no-correlation') @contextmanager def log_context(correlation_id: str = None): """Context manager for setting correlation ID.""" old_id = get_correlation_id() new_id = set_correlation_id(correlation_id) try: yield new_id finally: _thread_local.correlation_id = old_id def log_performance(operation_name: str = None): """Decorator to log function performance.""" def decorator(func): @wraps(func) def wrapper(*args, **kwargs): name = operation_name or f"{func.__module__}.{func.__name__}" start_time = time.time() try: result = func(*args, **kwargs) duration_ms = (time.time() - start_time) * 1000 performance_logger.logger.info( f"OPERATION: {name} completed in {duration_ms:.2f}ms", operation=name, duration_ms=duration_ms, success=True ) return result except Exception as e: duration_ms = (time.time() - start_time) * 1000 performance_logger.logger.error( f"OPERATION: {name} failed after {duration_ms:.2f}ms - {str(e)}", operation=name, duration_ms=duration_ms, success=False, error=str(e), exception_type=type(e).__name__ ) raise return wrapper return decorator def setup_flask_logging(app): """Setup Flask application logging.""" # Configure Flask's default logger app.logger.handlers.clear() # Add structured logging to Flask flask_logger = get_logger('plutus.flask') # Override Flask's logger class FlaskLogHandler(logging.Handler): def emit(self, record): flask_logger.logger.handle(record) handler = FlaskLogHandler() app.logger.addHandler(handler) app.logger.setLevel(logging.INFO) # Log retention configuration LOG_RETENTION_DAYS = 30 LOG_CLEANUP_SCHEDULE = "0 2 * * *" # Daily at 2 AM def cleanup_old_logs(): """Clean up old log files based on retention policy.""" import glob import os from datetime import datetime, timedelta cutoff_date = datetime.now() - timedelta(days=LOG_RETENTION_DAYS) log_patterns = [ 'logs/*.log', 'logs/*.log.*' ] for pattern in log_patterns: for log_file in glob.glob(pattern): try: file_time = datetime.fromtimestamp(os.path.getctime(log_file)) if file_time < cutoff_date: os.remove(log_file) app_logger.info(f"Cleaned up old log file: {log_file}") except Exception as e: app_logger.error(f"Failed to clean up log file {log_file}: {e}")