""" Log retention and cleanup system for Plutus Payment Processing. This module provides automated log cleanup, archiving, and retention policies to manage log file growth and maintain system performance. """ import os import glob import gzip import shutil import logging from datetime import datetime, timedelta from typing import List, Dict, Optional from pathlib import Path import schedule import time import threading from logging_config import get_logger logger = get_logger('log_retention') class LogRetentionManager: """Manages log file retention, rotation, and cleanup.""" def __init__(self, config: Optional[Dict] = None): """ Initialize log retention manager. Args: config: Configuration dictionary with retention policies """ self.config = config or self.get_default_config() self.logs_dir = Path(self.config.get('logs_directory', 'logs')) self.archive_dir = Path(self.config.get('archive_directory', 'logs/archive')) # Ensure directories exist self.logs_dir.mkdir(exist_ok=True) self.archive_dir.mkdir(parents=True, exist_ok=True) # Scheduler for automated cleanup self._scheduler_thread = None self._stop_scheduler = False def get_default_config(self) -> Dict: """Get default retention configuration.""" return { 'logs_directory': 'logs', 'archive_directory': 'logs/archive', 'retention_policies': { 'application.log': {'days': 30, 'compress_after_days': 7}, 'performance.log': {'days': 14, 'compress_after_days': 3}, 'security.log': {'days': 90, 'compress_after_days': 7}, 'plutus_detailed.log': {'days': 21, 'compress_after_days': 7}, 'payment_processing.log': {'days': 60, 'compress_after_days': 14}, 'default': {'days': 30, 'compress_after_days': 7} }, 'max_file_size_mb': 100, 'cleanup_schedule': '02:00', # Run at 2 AM daily 'archive_old_logs': True, 'compress_archives': True } def cleanup_logs(self) -> Dict[str, int]: """ Perform log cleanup based on retention policies. Returns: Dict with cleanup statistics """ stats = { 'files_compressed': 0, 'files_archived': 0, 'files_deleted': 0, 'space_freed_mb': 0 } try: logger.info("Starting log cleanup process") # Get all log files log_files = self.get_log_files() for log_file in log_files: try: policy = self.get_retention_policy(log_file.name) file_stats = datetime.fromtimestamp(log_file.stat().st_mtime) file_age = (datetime.now() - file_stats).days file_size_mb = log_file.stat().st_size / (1024 * 1024) # Check if file should be deleted if file_age > policy['days']: if self.config.get('archive_old_logs', True): # Archive before deletion if self.archive_log_file(log_file): stats['files_archived'] += 1 stats['space_freed_mb'] += file_size_mb log_file.unlink() stats['files_deleted'] += 1 logger.info(f"Deleted old log file: {log_file.name} (age: {file_age} days)") # Check if file should be compressed elif file_age > policy['compress_after_days'] and not log_file.name.endswith('.gz'): if self.compress_log_file(log_file): stats['files_compressed'] += 1 logger.info(f"Compressed log file: {log_file.name}") # Check file size limits elif file_size_mb > self.config.get('max_file_size_mb', 100): if self.rotate_large_log_file(log_file): logger.info(f"Rotated large log file: {log_file.name} ({file_size_mb:.1f}MB)") except Exception as e: logger.error(f"Error processing log file {log_file.name}: {e}") logger.info(f"Log cleanup completed: {stats}") return stats except Exception as e: logger.error(f"Error during log cleanup: {e}") return stats def get_log_files(self) -> List[Path]: """Get all log files in the logs directory.""" log_patterns = ['*.log', '*.log.*'] log_files = [] for pattern in log_patterns: log_files.extend(self.logs_dir.glob(pattern)) return log_files def get_retention_policy(self, filename: str) -> Dict[str, int]: """Get retention policy for a specific log file.""" policies = self.config.get('retention_policies', {}) # Check for exact filename match if filename in policies: return policies[filename] # Check for pattern matches for pattern, policy in policies.items(): if pattern in filename: return policy # Return default policy return policies.get('default', {'days': 30, 'compress_after_days': 7}) def compress_log_file(self, log_file: Path) -> bool: """ Compress a log file using gzip. Args: log_file: Path to the log file to compress Returns: True if compression was successful """ try: compressed_file = log_file.with_suffix(log_file.suffix + '.gz') with open(log_file, 'rb') as f_in: with gzip.open(compressed_file, 'wb') as f_out: shutil.copyfileobj(f_in, f_out) # Remove original file after successful compression log_file.unlink() return True except Exception as e: logger.error(f"Failed to compress {log_file.name}: {e}") return False def archive_log_file(self, log_file: Path) -> bool: """ Archive a log file to the archive directory. Args: log_file: Path to the log file to archive Returns: True if archiving was successful """ try: # Create dated archive subdirectory archive_date = datetime.now().strftime('%Y%m') archive_subdir = self.archive_dir / archive_date archive_subdir.mkdir(exist_ok=True) # Generate archive filename with timestamp timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') archive_name = f"{log_file.stem}_{timestamp}{log_file.suffix}" archive_path = archive_subdir / archive_name # Copy and optionally compress if self.config.get('compress_archives', True): archive_path = archive_path.with_suffix(archive_path.suffix + '.gz') with open(log_file, 'rb') as f_in: with gzip.open(archive_path, 'wb') as f_out: shutil.copyfileobj(f_in, f_out) else: shutil.copy2(log_file, archive_path) return True except Exception as e: logger.error(f"Failed to archive {log_file.name}: {e}") return False def rotate_large_log_file(self, log_file: Path) -> bool: """ Rotate a log file that has grown too large. Args: log_file: Path to the log file to rotate Returns: True if rotation was successful """ try: timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') rotated_name = f"{log_file.stem}_{timestamp}{log_file.suffix}" rotated_path = log_file.parent / rotated_name # Move current log to rotated name shutil.move(str(log_file), str(rotated_path)) # Compress the rotated file if self.compress_log_file(rotated_path): logger.info(f"Rotated and compressed large log file: {log_file.name}") return True except Exception as e: logger.error(f"Failed to rotate large log file {log_file.name}: {e}") return False def get_log_statistics(self) -> Dict: """Get statistics about log files and disk usage.""" stats = { 'total_files': 0, 'total_size_mb': 0, 'compressed_files': 0, 'oldest_log': None, 'newest_log': None, 'logs_by_type': {}, 'archive_stats': { 'total_files': 0, 'total_size_mb': 0 } } try: log_files = self.get_log_files() oldest_time = None newest_time = None for log_file in log_files: file_stat = log_file.stat() file_size_mb = file_stat.st_size / (1024 * 1024) file_time = datetime.fromtimestamp(file_stat.st_mtime) stats['total_files'] += 1 stats['total_size_mb'] += file_size_mb if log_file.name.endswith('.gz'): stats['compressed_files'] += 1 # Track oldest and newest if oldest_time is None or file_time < oldest_time: oldest_time = file_time stats['oldest_log'] = {'name': log_file.name, 'date': file_time.isoformat()} if newest_time is None or file_time > newest_time: newest_time = file_time stats['newest_log'] = {'name': log_file.name, 'date': file_time.isoformat()} # Count by log type log_type = log_file.stem.split('_')[0] if '_' in log_file.stem else log_file.stem if log_type not in stats['logs_by_type']: stats['logs_by_type'][log_type] = {'count': 0, 'size_mb': 0} stats['logs_by_type'][log_type]['count'] += 1 stats['logs_by_type'][log_type]['size_mb'] += file_size_mb # Get archive statistics if self.archive_dir.exists(): archive_files = list(self.archive_dir.rglob('*')) for archive_file in archive_files: if archive_file.is_file(): stats['archive_stats']['total_files'] += 1 stats['archive_stats']['total_size_mb'] += archive_file.stat().st_size / (1024 * 1024) # Round sizes stats['total_size_mb'] = round(stats['total_size_mb'], 2) stats['archive_stats']['total_size_mb'] = round(stats['archive_stats']['total_size_mb'], 2) for log_type in stats['logs_by_type']: stats['logs_by_type'][log_type]['size_mb'] = round(stats['logs_by_type'][log_type]['size_mb'], 2) except Exception as e: logger.error(f"Error getting log statistics: {e}") return stats def start_scheduled_cleanup(self): """Start the scheduled cleanup service.""" if self._scheduler_thread and self._scheduler_thread.is_alive(): logger.warning("Scheduled cleanup is already running") return # Schedule daily cleanup schedule.clear() cleanup_time = self.config.get('cleanup_schedule', '02:00') schedule.every().day.at(cleanup_time).do(self.cleanup_logs) logger.info(f"Scheduled daily log cleanup at {cleanup_time}") def run_scheduler(): while not self._stop_scheduler: schedule.run_pending() time.sleep(60) # Check every minute self._stop_scheduler = False self._scheduler_thread = threading.Thread(target=run_scheduler, daemon=True) self._scheduler_thread.start() def stop_scheduled_cleanup(self): """Stop the scheduled cleanup service.""" self._stop_scheduler = True if self._scheduler_thread: self._scheduler_thread.join(timeout=5) schedule.clear() logger.info("Stopped scheduled log cleanup") def emergency_cleanup(self, target_size_mb: int = 500) -> Dict: """ Perform emergency cleanup when disk space is low. Args: target_size_mb: Target total size for log files in MB Returns: Dict with cleanup statistics """ logger.warning(f"Starting emergency log cleanup to reduce size to {target_size_mb}MB") stats = {'files_deleted': 0, 'space_freed_mb': 0} # Get all log files sorted by age (oldest first) log_files = self.get_log_files() log_files.sort(key=lambda x: x.stat().st_mtime) current_size_mb = sum(f.stat().st_size for f in log_files) / (1024 * 1024) for log_file in log_files: if current_size_mb <= target_size_mb: break file_size_mb = log_file.stat().st_size / (1024 * 1024) # Archive critical logs before deletion if any(pattern in log_file.name for pattern in ['security', 'payment_processing']): self.archive_log_file(log_file) log_file.unlink() stats['files_deleted'] += 1 stats['space_freed_mb'] += file_size_mb current_size_mb -= file_size_mb logger.info(f"Emergency cleanup: deleted {log_file.name} ({file_size_mb:.1f}MB)") logger.warning(f"Emergency cleanup completed: {stats}") return stats # Global retention manager instance retention_manager = LogRetentionManager() def initialize_log_retention(): """Initialize the log retention system.""" try: retention_manager.start_scheduled_cleanup() logger.info("Log retention system initialized successfully") except Exception as e: logger.error(f"Failed to initialize log retention system: {e}") def get_retention_stats(): """Get current log retention statistics.""" return retention_manager.get_log_statistics() def manual_cleanup(): """Perform manual log cleanup.""" return retention_manager.cleanup_logs() if __name__ == "__main__": # Run log cleanup manually manager = LogRetentionManager() stats = manager.cleanup_logs() print(f"Log cleanup completed: {stats}")