ConnectionResetError Spam, Waiting for messages all 300sec

This commit is contained in:
Andreas Knuth 2026-01-11 18:12:31 -06:00
parent 7dfad647e9
commit 87e00ae867
1 changed files with 35 additions and 40 deletions

View File

@ -139,20 +139,11 @@ sqs = boto3.client('sqs', region_name=config.aws_region)
s3 = boto3.client('s3', region_name=config.aws_region) s3 = boto3.client('s3', region_name=config.aws_region)
ses = boto3.client('ses', region_name=config.aws_region) ses = boto3.client('ses', region_name=config.aws_region)
# DynamoDB - initialized lazily # DynamoDB
dynamodb = boto3.resource('dynamodb', region_name=config.aws_region) dynamodb = boto3.resource('dynamodb', region_name=config.aws_region)
DYNAMODB_AVAILABLE = False DYNAMODB_AVAILABLE = False
rules_table = None rules_table = None
messages_table = None messages_table = None
_dynamodb_initialized = False
def init_dynamodb():
"""Initialize DynamoDB tables (called once from main)"""
global DYNAMODB_AVAILABLE, rules_table, messages_table, _dynamodb_initialized
if _dynamodb_initialized:
return
_dynamodb_initialized = True
try: try:
rules_table = dynamodb.Table(config.rules_table) rules_table = dynamodb.Table(config.rules_table)
@ -161,9 +152,8 @@ def init_dynamodb():
rules_table.table_status rules_table.table_status
messages_table.table_status messages_table.table_status
DYNAMODB_AVAILABLE = True DYNAMODB_AVAILABLE = True
log(f"DynamoDB connected: {config.rules_table}, {config.messages_table}")
except Exception as e: except Exception as e:
log(f"Warning: DynamoDB not available: {e}", 'WARNING') pass # Will be logged at startup
# ============================================ # ============================================
# SMTP CONNECTION POOL # SMTP CONNECTION POOL
@ -1035,8 +1025,9 @@ def poll_domain(domain: str, queue_url: str, stop_event: threading.Event):
pass pass
if not messages: if not messages:
if time.time() - last_activity > 60: # Only log every 5 minutes to reduce noise
log(f"Waiting for messages... (processed: {messages_processed})", 'INFO', worker_name) if time.time() - last_activity > 300:
log(f"Idle (processed: {messages_processed})", 'INFO', worker_name)
last_activity = time.time() last_activity = time.time()
continue continue
@ -1148,10 +1139,19 @@ class UnifiedWorker:
log(f"Started {len(self.poller_threads)} domain pollers") log(f"Started {len(self.poller_threads)} domain pollers")
# Warten bis Shutdown # Periodic status log (every 5 minutes)
last_status_log = time.time()
status_interval = 300 # 5 minutes
try: try:
while not self.stop_event.is_set(): while not self.stop_event.is_set():
self.stop_event.wait(timeout=1) self.stop_event.wait(timeout=10)
# Log status summary every 5 minutes
if time.time() - last_status_log > status_interval:
active_threads = sum(1 for t in self.poller_threads if t.is_alive())
log(f"📊 Status: {active_threads}/{len(self.poller_threads)} pollers active, {len(self.queue_urls)} domains")
last_status_log = time.time()
except KeyboardInterrupt: except KeyboardInterrupt:
pass pass
@ -1209,9 +1209,18 @@ def start_health_server(worker: UnifiedWorker):
self.end_headers() self.end_headers()
def log_message(self, format, *args): def log_message(self, format, *args):
pass # Suppress HTTP logs pass # Suppress HTTP access logs
server = HTTPServer(('0.0.0.0', config.health_port), HealthHandler) class SilentHTTPServer(HTTPServer):
"""HTTP Server that ignores connection reset errors from scanners"""
def handle_error(self, request, client_address):
exc_type = sys.exc_info()[0]
if exc_type in (ConnectionResetError, BrokenPipeError, ConnectionAbortedError):
pass # Silently ignore - these are just scanners/health checks disconnecting
else:
log(f"Health server error from {client_address[0]}: {sys.exc_info()[1]}", 'WARNING')
server = SilentHTTPServer(('0.0.0.0', config.health_port), HealthHandler)
thread = threading.Thread(target=server.serve_forever, daemon=True, name='health-server') thread = threading.Thread(target=server.serve_forever, daemon=True, name='health-server')
thread.start() thread.start()
log(f"Health server on port {config.health_port}") log(f"Health server on port {config.health_port}")
@ -1221,20 +1230,6 @@ def start_health_server(worker: UnifiedWorker):
# ============================================ # ============================================
def main(): def main():
# Use a lock file to prevent double execution
import fcntl
lock_file = '/tmp/unified_worker.lock'
try:
lock_fd = open(lock_file, 'w')
fcntl.flock(lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
except (IOError, OSError):
print("Another instance is already running, exiting.", flush=True)
sys.exit(0)
# Initialize DynamoDB (only once)
init_dynamodb()
worker = UnifiedWorker() worker = UnifiedWorker()
def signal_handler(signum, frame): def signal_handler(signum, frame):