You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

400 lines
15 KiB

"""
Log retention and cleanup system for Plutus Payment Processing.
This module provides automated log cleanup, archiving, and retention policies
to manage log file growth and maintain system performance.
"""
import os
import glob
import gzip
import shutil
import logging
from datetime import datetime, timedelta
from typing import List, Dict, Optional
from pathlib import Path
import schedule
import time
import threading
from logging_config import get_logger
logger = get_logger('log_retention')
class LogRetentionManager:
"""Manages log file retention, rotation, and cleanup."""
def __init__(self, config: Optional[Dict] = None):
"""
Initialize log retention manager.
Args:
config: Configuration dictionary with retention policies
"""
self.config = config or self.get_default_config()
self.logs_dir = Path(self.config.get('logs_directory', 'logs'))
self.archive_dir = Path(self.config.get('archive_directory', 'logs/archive'))
# Ensure directories exist
self.logs_dir.mkdir(exist_ok=True)
self.archive_dir.mkdir(parents=True, exist_ok=True)
# Scheduler for automated cleanup
self._scheduler_thread = None
self._stop_scheduler = False
def get_default_config(self) -> Dict:
"""Get default retention configuration."""
return {
'logs_directory': 'logs',
'archive_directory': 'logs/archive',
'retention_policies': {
'application.log': {'days': 30, 'compress_after_days': 7},
'performance.log': {'days': 14, 'compress_after_days': 3},
'security.log': {'days': 90, 'compress_after_days': 7},
'plutus_detailed.log': {'days': 21, 'compress_after_days': 7},
'payment_processing.log': {'days': 60, 'compress_after_days': 14},
'default': {'days': 30, 'compress_after_days': 7}
},
'max_file_size_mb': 100,
'cleanup_schedule': '02:00', # Run at 2 AM daily
'archive_old_logs': True,
'compress_archives': True
}
def cleanup_logs(self) -> Dict[str, int]:
"""
Perform log cleanup based on retention policies.
Returns:
Dict with cleanup statistics
"""
stats = {
'files_compressed': 0,
'files_archived': 0,
'files_deleted': 0,
'space_freed_mb': 0
}
try:
logger.info("Starting log cleanup process")
# Get all log files
log_files = self.get_log_files()
for log_file in log_files:
try:
policy = self.get_retention_policy(log_file.name)
file_stats = datetime.fromtimestamp(log_file.stat().st_mtime)
file_age = (datetime.now() - file_stats).days
file_size_mb = log_file.stat().st_size / (1024 * 1024)
# Check if file should be deleted
if file_age > policy['days']:
if self.config.get('archive_old_logs', True):
# Archive before deletion
if self.archive_log_file(log_file):
stats['files_archived'] += 1
stats['space_freed_mb'] += file_size_mb
log_file.unlink()
stats['files_deleted'] += 1
logger.info(f"Deleted old log file: {log_file.name} (age: {file_age} days)")
# Check if file should be compressed
elif file_age > policy['compress_after_days'] and not log_file.name.endswith('.gz'):
if self.compress_log_file(log_file):
stats['files_compressed'] += 1
logger.info(f"Compressed log file: {log_file.name}")
# Check file size limits
elif file_size_mb > self.config.get('max_file_size_mb', 100):
if self.rotate_large_log_file(log_file):
logger.info(f"Rotated large log file: {log_file.name} ({file_size_mb:.1f}MB)")
except Exception as e:
logger.error(f"Error processing log file {log_file.name}: {e}")
logger.info(f"Log cleanup completed: {stats}")
return stats
except Exception as e:
logger.error(f"Error during log cleanup: {e}")
return stats
def get_log_files(self) -> List[Path]:
"""Get all log files in the logs directory."""
log_patterns = ['*.log', '*.log.*']
log_files = []
for pattern in log_patterns:
log_files.extend(self.logs_dir.glob(pattern))
return log_files
def get_retention_policy(self, filename: str) -> Dict[str, int]:
"""Get retention policy for a specific log file."""
policies = self.config.get('retention_policies', {})
# Check for exact filename match
if filename in policies:
return policies[filename]
# Check for pattern matches
for pattern, policy in policies.items():
if pattern in filename:
return policy
# Return default policy
return policies.get('default', {'days': 30, 'compress_after_days': 7})
def compress_log_file(self, log_file: Path) -> bool:
"""
Compress a log file using gzip.
Args:
log_file: Path to the log file to compress
Returns:
True if compression was successful
"""
try:
compressed_file = log_file.with_suffix(log_file.suffix + '.gz')
with open(log_file, 'rb') as f_in:
with gzip.open(compressed_file, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
# Remove original file after successful compression
log_file.unlink()
return True
except Exception as e:
logger.error(f"Failed to compress {log_file.name}: {e}")
return False
def archive_log_file(self, log_file: Path) -> bool:
"""
Archive a log file to the archive directory.
Args:
log_file: Path to the log file to archive
Returns:
True if archiving was successful
"""
try:
# Create dated archive subdirectory
archive_date = datetime.now().strftime('%Y%m')
archive_subdir = self.archive_dir / archive_date
archive_subdir.mkdir(exist_ok=True)
# Generate archive filename with timestamp
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
archive_name = f"{log_file.stem}_{timestamp}{log_file.suffix}"
archive_path = archive_subdir / archive_name
# Copy and optionally compress
if self.config.get('compress_archives', True):
archive_path = archive_path.with_suffix(archive_path.suffix + '.gz')
with open(log_file, 'rb') as f_in:
with gzip.open(archive_path, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
else:
shutil.copy2(log_file, archive_path)
return True
except Exception as e:
logger.error(f"Failed to archive {log_file.name}: {e}")
return False
def rotate_large_log_file(self, log_file: Path) -> bool:
"""
Rotate a log file that has grown too large.
Args:
log_file: Path to the log file to rotate
Returns:
True if rotation was successful
"""
try:
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
rotated_name = f"{log_file.stem}_{timestamp}{log_file.suffix}"
rotated_path = log_file.parent / rotated_name
# Move current log to rotated name
shutil.move(str(log_file), str(rotated_path))
# Compress the rotated file
if self.compress_log_file(rotated_path):
logger.info(f"Rotated and compressed large log file: {log_file.name}")
return True
except Exception as e:
logger.error(f"Failed to rotate large log file {log_file.name}: {e}")
return False
def get_log_statistics(self) -> Dict:
"""Get statistics about log files and disk usage."""
stats = {
'total_files': 0,
'total_size_mb': 0,
'compressed_files': 0,
'oldest_log': None,
'newest_log': None,
'logs_by_type': {},
'archive_stats': {
'total_files': 0,
'total_size_mb': 0
}
}
try:
log_files = self.get_log_files()
oldest_time = None
newest_time = None
for log_file in log_files:
file_stat = log_file.stat()
file_size_mb = file_stat.st_size / (1024 * 1024)
file_time = datetime.fromtimestamp(file_stat.st_mtime)
stats['total_files'] += 1
stats['total_size_mb'] += file_size_mb
if log_file.name.endswith('.gz'):
stats['compressed_files'] += 1
# Track oldest and newest
if oldest_time is None or file_time < oldest_time:
oldest_time = file_time
stats['oldest_log'] = {'name': log_file.name, 'date': file_time.isoformat()}
if newest_time is None or file_time > newest_time:
newest_time = file_time
stats['newest_log'] = {'name': log_file.name, 'date': file_time.isoformat()}
# Count by log type
log_type = log_file.stem.split('_')[0] if '_' in log_file.stem else log_file.stem
if log_type not in stats['logs_by_type']:
stats['logs_by_type'][log_type] = {'count': 0, 'size_mb': 0}
stats['logs_by_type'][log_type]['count'] += 1
stats['logs_by_type'][log_type]['size_mb'] += file_size_mb
# Get archive statistics
if self.archive_dir.exists():
archive_files = list(self.archive_dir.rglob('*'))
for archive_file in archive_files:
if archive_file.is_file():
stats['archive_stats']['total_files'] += 1
stats['archive_stats']['total_size_mb'] += archive_file.stat().st_size / (1024 * 1024)
# Round sizes
stats['total_size_mb'] = round(stats['total_size_mb'], 2)
stats['archive_stats']['total_size_mb'] = round(stats['archive_stats']['total_size_mb'], 2)
for log_type in stats['logs_by_type']:
stats['logs_by_type'][log_type]['size_mb'] = round(stats['logs_by_type'][log_type]['size_mb'], 2)
except Exception as e:
logger.error(f"Error getting log statistics: {e}")
return stats
def start_scheduled_cleanup(self):
"""Start the scheduled cleanup service."""
if self._scheduler_thread and self._scheduler_thread.is_alive():
logger.warning("Scheduled cleanup is already running")
return
# Schedule daily cleanup
schedule.clear()
cleanup_time = self.config.get('cleanup_schedule', '02:00')
schedule.every().day.at(cleanup_time).do(self.cleanup_logs)
logger.info(f"Scheduled daily log cleanup at {cleanup_time}")
def run_scheduler():
while not self._stop_scheduler:
schedule.run_pending()
time.sleep(60) # Check every minute
self._stop_scheduler = False
self._scheduler_thread = threading.Thread(target=run_scheduler, daemon=True)
self._scheduler_thread.start()
def stop_scheduled_cleanup(self):
"""Stop the scheduled cleanup service."""
self._stop_scheduler = True
if self._scheduler_thread:
self._scheduler_thread.join(timeout=5)
schedule.clear()
logger.info("Stopped scheduled log cleanup")
def emergency_cleanup(self, target_size_mb: int = 500) -> Dict:
"""
Perform emergency cleanup when disk space is low.
Args:
target_size_mb: Target total size for log files in MB
Returns:
Dict with cleanup statistics
"""
logger.warning(f"Starting emergency log cleanup to reduce size to {target_size_mb}MB")
stats = {'files_deleted': 0, 'space_freed_mb': 0}
# Get all log files sorted by age (oldest first)
log_files = self.get_log_files()
log_files.sort(key=lambda x: x.stat().st_mtime)
current_size_mb = sum(f.stat().st_size for f in log_files) / (1024 * 1024)
for log_file in log_files:
if current_size_mb <= target_size_mb:
break
file_size_mb = log_file.stat().st_size / (1024 * 1024)
# Archive critical logs before deletion
if any(pattern in log_file.name for pattern in ['security', 'payment_processing']):
self.archive_log_file(log_file)
log_file.unlink()
stats['files_deleted'] += 1
stats['space_freed_mb'] += file_size_mb
current_size_mb -= file_size_mb
logger.info(f"Emergency cleanup: deleted {log_file.name} ({file_size_mb:.1f}MB)")
logger.warning(f"Emergency cleanup completed: {stats}")
return stats
# Global retention manager instance
retention_manager = LogRetentionManager()
def initialize_log_retention():
"""Initialize the log retention system."""
try:
retention_manager.start_scheduled_cleanup()
logger.info("Log retention system initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize log retention system: {e}")
def get_retention_stats():
"""Get current log retention statistics."""
return retention_manager.get_log_statistics()
def manual_cleanup():
"""Perform manual log cleanup."""
return retention_manager.cleanup_logs()
if __name__ == "__main__":
# Run log cleanup manually
manager = LogRetentionManager()
stats = manager.cleanup_logs()
print(f"Log cleanup completed: {stats}")