diff --git a/unified-worker/unified_worker.py b/unified-worker/unified_worker.py index 4392aa2..ad78685 100644 --- a/unified-worker/unified_worker.py +++ b/unified-worker/unified_worker.py @@ -139,31 +139,21 @@ sqs = boto3.client('sqs', region_name=config.aws_region) s3 = boto3.client('s3', region_name=config.aws_region) ses = boto3.client('ses', region_name=config.aws_region) -# DynamoDB - initialized lazily +# DynamoDB dynamodb = boto3.resource('dynamodb', region_name=config.aws_region) DYNAMODB_AVAILABLE = False rules_table = None messages_table = None -_dynamodb_initialized = False -def init_dynamodb(): - """Initialize DynamoDB tables (called once from main)""" - global DYNAMODB_AVAILABLE, rules_table, messages_table, _dynamodb_initialized - - if _dynamodb_initialized: - return - _dynamodb_initialized = True - - try: - rules_table = dynamodb.Table(config.rules_table) - messages_table = dynamodb.Table(config.messages_table) - # Test connection - rules_table.table_status - messages_table.table_status - DYNAMODB_AVAILABLE = True - log(f"DynamoDB connected: {config.rules_table}, {config.messages_table}") - except Exception as e: - log(f"Warning: DynamoDB not available: {e}", 'WARNING') +try: + rules_table = dynamodb.Table(config.rules_table) + messages_table = dynamodb.Table(config.messages_table) + # Test connection + rules_table.table_status + messages_table.table_status + DYNAMODB_AVAILABLE = True +except Exception as e: + pass # Will be logged at startup # ============================================ # SMTP CONNECTION POOL @@ -1035,8 +1025,9 @@ def poll_domain(domain: str, queue_url: str, stop_event: threading.Event): pass if not messages: - if time.time() - last_activity > 60: - log(f"Waiting for messages... (processed: {messages_processed})", 'INFO', worker_name) + # Only log every 5 minutes to reduce noise + if time.time() - last_activity > 300: + log(f"Idle (processed: {messages_processed})", 'INFO', worker_name) last_activity = time.time() continue @@ -1148,10 +1139,19 @@ class UnifiedWorker: log(f"Started {len(self.poller_threads)} domain pollers") - # Warten bis Shutdown + # Periodic status log (every 5 minutes) + last_status_log = time.time() + status_interval = 300 # 5 minutes + try: while not self.stop_event.is_set(): - self.stop_event.wait(timeout=1) + self.stop_event.wait(timeout=10) + + # Log status summary every 5 minutes + if time.time() - last_status_log > status_interval: + active_threads = sum(1 for t in self.poller_threads if t.is_alive()) + log(f"📊 Status: {active_threads}/{len(self.poller_threads)} pollers active, {len(self.queue_urls)} domains") + last_status_log = time.time() except KeyboardInterrupt: pass @@ -1209,9 +1209,18 @@ def start_health_server(worker: UnifiedWorker): self.end_headers() def log_message(self, format, *args): - pass # Suppress HTTP logs + pass # Suppress HTTP access logs - server = HTTPServer(('0.0.0.0', config.health_port), HealthHandler) + class SilentHTTPServer(HTTPServer): + """HTTP Server that ignores connection reset errors from scanners""" + def handle_error(self, request, client_address): + exc_type = sys.exc_info()[0] + if exc_type in (ConnectionResetError, BrokenPipeError, ConnectionAbortedError): + pass # Silently ignore - these are just scanners/health checks disconnecting + else: + log(f"Health server error from {client_address[0]}: {sys.exc_info()[1]}", 'WARNING') + + server = SilentHTTPServer(('0.0.0.0', config.health_port), HealthHandler) thread = threading.Thread(target=server.serve_forever, daemon=True, name='health-server') thread.start() log(f"Health server on port {config.health_port}") @@ -1221,20 +1230,6 @@ def start_health_server(worker: UnifiedWorker): # ============================================ def main(): - # Use a lock file to prevent double execution - import fcntl - lock_file = '/tmp/unified_worker.lock' - - try: - lock_fd = open(lock_file, 'w') - fcntl.flock(lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB) - except (IOError, OSError): - print("Another instance is already running, exiting.", flush=True) - sys.exit(0) - - # Initialize DynamoDB (only once) - init_dynamodb() - worker = UnifiedWorker() def signal_handler(signum, frame):