new logging ...
This commit is contained in:
parent
87e00ae867
commit
85d5eface6
|
|
@ -790,8 +790,6 @@ def mark_as_processed(bucket: str, key: str, worker_name: str, invalid_inboxes:
|
|||
MetadataDirective='REPLACE'
|
||||
)
|
||||
|
||||
log(f"✓ Marked s3://{bucket}/{key} as processed", 'SUCCESS', worker_name)
|
||||
|
||||
except Exception as e:
|
||||
log(f"Failed to mark as processed: {e}", 'WARNING', worker_name)
|
||||
|
||||
|
|
@ -818,8 +816,6 @@ def mark_as_all_invalid(bucket: str, key: str, invalid_inboxes: List[str], worke
|
|||
MetadataDirective='REPLACE'
|
||||
)
|
||||
|
||||
log(f"✓ Marked s3://{bucket}/{key} as failed (all invalid)", 'SUCCESS', worker_name)
|
||||
|
||||
except Exception as e:
|
||||
log(f"Failed to mark as all invalid: {e}", 'WARNING', worker_name)
|
||||
|
||||
|
|
@ -881,17 +877,14 @@ def process_message(domain: str, message: dict, receive_count: int) -> bool:
|
|||
bucket = domain_to_bucket_name(domain)
|
||||
key = message_id
|
||||
|
||||
log(f"\n{'='*70}", 'INFO', worker_name)
|
||||
log(f"Processing Email (SNS/SES):", 'INFO', worker_name)
|
||||
log(f" ID: {key}", 'INFO', worker_name)
|
||||
log(f" Recipients: {len(recipients)} -> {recipients}", 'INFO', worker_name)
|
||||
log(f" Bucket: {bucket}", 'INFO', worker_name)
|
||||
# Compact single-line log for email processing
|
||||
recipients_str = recipients[0] if len(recipients) == 1 else f"{len(recipients)} recipients"
|
||||
log(f"📧 Processing: {key[:20]}... -> {recipients_str}", 'INFO', worker_name)
|
||||
|
||||
# 3. LADEN AUS S3
|
||||
try:
|
||||
response = s3.get_object(Bucket=bucket, Key=key)
|
||||
raw_bytes = response['Body'].read()
|
||||
log(f"✓ Loaded {len(raw_bytes)} bytes from S3", 'INFO', worker_name)
|
||||
except s3.exceptions.NoSuchKey:
|
||||
if receive_count < 5:
|
||||
log(f"⏳ S3 Object not found yet (Attempt {receive_count}). Retrying...", 'WARNING', worker_name)
|
||||
|
|
@ -966,20 +959,21 @@ def process_message(domain: str, message: dict, receive_count: int) -> bool:
|
|||
emails_processed.labels(domain=domain, status='temporary_failure').inc()
|
||||
|
||||
# 7. RESULTAT & CLEANUP
|
||||
log(f"📊 Results: {len(successful)} OK, {len(failed_temporary)} TempFail, {len(failed_permanent)} PermFail", 'INFO', worker_name)
|
||||
|
||||
if len(successful) > 0:
|
||||
mark_as_processed(bucket, key, worker_name, failed_permanent if failed_permanent else None)
|
||||
log(f"✅ Success. Deleted from queue.", 'INFO', worker_name)
|
||||
result_info = f"{len(successful)} OK"
|
||||
if failed_permanent:
|
||||
result_info += f", {len(failed_permanent)} invalid"
|
||||
log(f"✅ Delivered ({result_info})", 'SUCCESS', worker_name)
|
||||
return True
|
||||
|
||||
elif len(failed_permanent) == len(recipients):
|
||||
mark_as_all_invalid(bucket, key, failed_permanent, worker_name)
|
||||
log(f"🛑 All recipients invalid. Deleted from queue.", 'INFO', worker_name)
|
||||
log(f"🛑 All {len(recipients)} recipients invalid", 'ERROR', worker_name)
|
||||
return True
|
||||
|
||||
else:
|
||||
log(f"🔄 Temporary failures. Keeping in queue.", 'INFO', worker_name)
|
||||
log(f"🔄 Temp failure ({len(failed_temporary)} failed), will retry", 'WARNING', worker_name)
|
||||
return False
|
||||
|
||||
except Exception as e:
|
||||
|
|
@ -991,13 +985,13 @@ def process_message(domain: str, message: dict, receive_count: int) -> bool:
|
|||
# DOMAIN POLLER
|
||||
# ============================================
|
||||
|
||||
def poll_domain(domain: str, queue_url: str, stop_event: threading.Event):
|
||||
def poll_domain(domain: str, queue_url: str, stop_event: threading.Event,
|
||||
domain_stats: Dict[str, int] = None, stats_lock: threading.Lock = None):
|
||||
"""Poll single domain's queue continuously"""
|
||||
worker_name = f"worker-{domain}"
|
||||
log(f"🚀 Starting poller for {domain}", 'INFO', worker_name)
|
||||
|
||||
messages_processed = 0
|
||||
last_activity = time.time()
|
||||
|
||||
while not stop_event.is_set():
|
||||
try:
|
||||
|
|
@ -1025,14 +1019,9 @@ def poll_domain(domain: str, queue_url: str, stop_event: threading.Event):
|
|||
pass
|
||||
|
||||
if not messages:
|
||||
# Only log every 5 minutes to reduce noise
|
||||
if time.time() - last_activity > 300:
|
||||
log(f"Idle (processed: {messages_processed})", 'INFO', worker_name)
|
||||
last_activity = time.time()
|
||||
continue
|
||||
|
||||
log(f"\n✉ Received {len(messages)} message(s) from queue", 'INFO', worker_name)
|
||||
last_activity = time.time()
|
||||
log(f"✉ Received {len(messages)} message(s)", 'INFO', worker_name)
|
||||
|
||||
for message in messages:
|
||||
if stop_event.is_set():
|
||||
|
|
@ -1041,12 +1030,6 @@ def poll_domain(domain: str, queue_url: str, stop_event: threading.Event):
|
|||
receipt_handle = message['ReceiptHandle']
|
||||
receive_count = int(message.get('Attributes', {}).get('ApproximateReceiveCount', 1))
|
||||
|
||||
sent_timestamp = int(message.get('Attributes', {}).get('SentTimestamp', 0)) / 1000
|
||||
queue_time = int(time.time() - sent_timestamp) if sent_timestamp else 0
|
||||
|
||||
if queue_time > 0:
|
||||
log(f"Message was in queue for {queue_time}s", 'INFO', worker_name)
|
||||
|
||||
if PROMETHEUS_ENABLED:
|
||||
emails_in_flight.inc()
|
||||
start_time = time.time()
|
||||
|
|
@ -1059,10 +1042,14 @@ def poll_domain(domain: str, queue_url: str, stop_event: threading.Event):
|
|||
QueueUrl=queue_url,
|
||||
ReceiptHandle=receipt_handle
|
||||
)
|
||||
log("✓ Message deleted from queue", 'INFO', worker_name)
|
||||
messages_processed += 1
|
||||
|
||||
# Update shared stats
|
||||
if domain_stats is not None and stats_lock is not None:
|
||||
with stats_lock:
|
||||
domain_stats[domain] = messages_processed
|
||||
else:
|
||||
log(f"⚠ Message kept in queue for retry (attempt {receive_count}/3)", 'WARNING', worker_name)
|
||||
log(f"⚠ Retry queued (attempt {receive_count}/3)", 'WARNING', worker_name)
|
||||
|
||||
except json.JSONDecodeError as e:
|
||||
log(f"✗ Invalid message format: {e}", 'ERROR', worker_name)
|
||||
|
|
@ -1081,10 +1068,10 @@ def poll_domain(domain: str, queue_url: str, stop_event: threading.Event):
|
|||
processing_time.labels(domain=domain).observe(time.time() - start_time)
|
||||
|
||||
except Exception as e:
|
||||
log(f"✗ Error polling {domain}: {e}", 'ERROR', worker_name)
|
||||
log(f"✗ Error polling: {e}", 'ERROR', worker_name)
|
||||
time.sleep(5)
|
||||
|
||||
log(f"👋 Stopped poller for {domain} (processed: {messages_processed})", 'INFO', worker_name)
|
||||
log(f"👋 Stopped (processed: {messages_processed})", 'INFO', worker_name)
|
||||
|
||||
# ============================================
|
||||
# UNIFIED WORKER
|
||||
|
|
@ -1098,6 +1085,9 @@ class UnifiedWorker:
|
|||
self.domains: List[str] = []
|
||||
self.queue_urls: Dict[str, str] = {}
|
||||
self.poller_threads: List[threading.Thread] = []
|
||||
# Shared stats across all pollers
|
||||
self.domain_stats: Dict[str, int] = {} # domain -> processed count
|
||||
self.stats_lock = threading.Lock()
|
||||
|
||||
def setup(self):
|
||||
"""Initialize worker"""
|
||||
|
|
@ -1127,10 +1117,14 @@ class UnifiedWorker:
|
|||
|
||||
def start(self):
|
||||
"""Start all domain pollers"""
|
||||
# Initialize stats for all domains
|
||||
for domain in self.queue_urls.keys():
|
||||
self.domain_stats[domain] = 0
|
||||
|
||||
for domain, queue_url in self.queue_urls.items():
|
||||
thread = threading.Thread(
|
||||
target=poll_domain,
|
||||
args=(domain, queue_url, self.stop_event),
|
||||
args=(domain, queue_url, self.stop_event, self.domain_stats, self.stats_lock),
|
||||
name=f"poller-{domain}",
|
||||
daemon=True
|
||||
)
|
||||
|
|
@ -1149,12 +1143,29 @@ class UnifiedWorker:
|
|||
|
||||
# Log status summary every 5 minutes
|
||||
if time.time() - last_status_log > status_interval:
|
||||
active_threads = sum(1 for t in self.poller_threads if t.is_alive())
|
||||
log(f"📊 Status: {active_threads}/{len(self.poller_threads)} pollers active, {len(self.queue_urls)} domains")
|
||||
self._log_status_table()
|
||||
last_status_log = time.time()
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
|
||||
def _log_status_table(self):
|
||||
"""Log a compact status table"""
|
||||
active_threads = sum(1 for t in self.poller_threads if t.is_alive())
|
||||
|
||||
with self.stats_lock:
|
||||
total_processed = sum(self.domain_stats.values())
|
||||
|
||||
# Build compact stats: only show domains with activity or top domains
|
||||
stats_parts = []
|
||||
for domain in sorted(self.queue_urls.keys()):
|
||||
count = self.domain_stats.get(domain, 0)
|
||||
# Shorten domain for display
|
||||
short_domain = domain.split('.')[0][:12]
|
||||
stats_parts.append(f"{short_domain}:{count}")
|
||||
|
||||
stats_line = " | ".join(stats_parts)
|
||||
log(f"📊 Status: {active_threads}/{len(self.poller_threads)} active, total:{total_processed} | {stats_line}")
|
||||
|
||||
def stop(self):
|
||||
"""Stop gracefully"""
|
||||
log("⚠ Stopping worker...")
|
||||
|
|
|
|||
Loading…
Reference in New Issue