diff --git a/unified-worker/unified_worker.py b/unified-worker/unified_worker.py index ad78685..1d4cb0d 100644 --- a/unified-worker/unified_worker.py +++ b/unified-worker/unified_worker.py @@ -790,8 +790,6 @@ def mark_as_processed(bucket: str, key: str, worker_name: str, invalid_inboxes: MetadataDirective='REPLACE' ) - log(f"āœ“ Marked s3://{bucket}/{key} as processed", 'SUCCESS', worker_name) - except Exception as e: log(f"Failed to mark as processed: {e}", 'WARNING', worker_name) @@ -818,8 +816,6 @@ def mark_as_all_invalid(bucket: str, key: str, invalid_inboxes: List[str], worke MetadataDirective='REPLACE' ) - log(f"āœ“ Marked s3://{bucket}/{key} as failed (all invalid)", 'SUCCESS', worker_name) - except Exception as e: log(f"Failed to mark as all invalid: {e}", 'WARNING', worker_name) @@ -881,17 +877,14 @@ def process_message(domain: str, message: dict, receive_count: int) -> bool: bucket = domain_to_bucket_name(domain) key = message_id - log(f"\n{'='*70}", 'INFO', worker_name) - log(f"Processing Email (SNS/SES):", 'INFO', worker_name) - log(f" ID: {key}", 'INFO', worker_name) - log(f" Recipients: {len(recipients)} -> {recipients}", 'INFO', worker_name) - log(f" Bucket: {bucket}", 'INFO', worker_name) + # Compact single-line log for email processing + recipients_str = recipients[0] if len(recipients) == 1 else f"{len(recipients)} recipients" + log(f"šŸ“§ Processing: {key[:20]}... -> {recipients_str}", 'INFO', worker_name) # 3. LADEN AUS S3 try: response = s3.get_object(Bucket=bucket, Key=key) raw_bytes = response['Body'].read() - log(f"āœ“ Loaded {len(raw_bytes)} bytes from S3", 'INFO', worker_name) except s3.exceptions.NoSuchKey: if receive_count < 5: log(f"ā³ S3 Object not found yet (Attempt {receive_count}). Retrying...", 'WARNING', worker_name) @@ -966,20 +959,21 @@ def process_message(domain: str, message: dict, receive_count: int) -> bool: emails_processed.labels(domain=domain, status='temporary_failure').inc() # 7. RESULTAT & CLEANUP - log(f"šŸ“Š Results: {len(successful)} OK, {len(failed_temporary)} TempFail, {len(failed_permanent)} PermFail", 'INFO', worker_name) - if len(successful) > 0: mark_as_processed(bucket, key, worker_name, failed_permanent if failed_permanent else None) - log(f"āœ… Success. Deleted from queue.", 'INFO', worker_name) + result_info = f"{len(successful)} OK" + if failed_permanent: + result_info += f", {len(failed_permanent)} invalid" + log(f"āœ… Delivered ({result_info})", 'SUCCESS', worker_name) return True elif len(failed_permanent) == len(recipients): mark_as_all_invalid(bucket, key, failed_permanent, worker_name) - log(f"šŸ›‘ All recipients invalid. Deleted from queue.", 'INFO', worker_name) + log(f"šŸ›‘ All {len(recipients)} recipients invalid", 'ERROR', worker_name) return True else: - log(f"šŸ”„ Temporary failures. Keeping in queue.", 'INFO', worker_name) + log(f"šŸ”„ Temp failure ({len(failed_temporary)} failed), will retry", 'WARNING', worker_name) return False except Exception as e: @@ -991,13 +985,13 @@ def process_message(domain: str, message: dict, receive_count: int) -> bool: # DOMAIN POLLER # ============================================ -def poll_domain(domain: str, queue_url: str, stop_event: threading.Event): +def poll_domain(domain: str, queue_url: str, stop_event: threading.Event, + domain_stats: Dict[str, int] = None, stats_lock: threading.Lock = None): """Poll single domain's queue continuously""" worker_name = f"worker-{domain}" log(f"šŸš€ Starting poller for {domain}", 'INFO', worker_name) messages_processed = 0 - last_activity = time.time() while not stop_event.is_set(): try: @@ -1025,14 +1019,9 @@ def poll_domain(domain: str, queue_url: str, stop_event: threading.Event): pass if not messages: - # Only log every 5 minutes to reduce noise - if time.time() - last_activity > 300: - log(f"Idle (processed: {messages_processed})", 'INFO', worker_name) - last_activity = time.time() continue - log(f"\nāœ‰ Received {len(messages)} message(s) from queue", 'INFO', worker_name) - last_activity = time.time() + log(f"āœ‰ Received {len(messages)} message(s)", 'INFO', worker_name) for message in messages: if stop_event.is_set(): @@ -1041,12 +1030,6 @@ def poll_domain(domain: str, queue_url: str, stop_event: threading.Event): receipt_handle = message['ReceiptHandle'] receive_count = int(message.get('Attributes', {}).get('ApproximateReceiveCount', 1)) - sent_timestamp = int(message.get('Attributes', {}).get('SentTimestamp', 0)) / 1000 - queue_time = int(time.time() - sent_timestamp) if sent_timestamp else 0 - - if queue_time > 0: - log(f"Message was in queue for {queue_time}s", 'INFO', worker_name) - if PROMETHEUS_ENABLED: emails_in_flight.inc() start_time = time.time() @@ -1059,10 +1042,14 @@ def poll_domain(domain: str, queue_url: str, stop_event: threading.Event): QueueUrl=queue_url, ReceiptHandle=receipt_handle ) - log("āœ“ Message deleted from queue", 'INFO', worker_name) messages_processed += 1 + + # Update shared stats + if domain_stats is not None and stats_lock is not None: + with stats_lock: + domain_stats[domain] = messages_processed else: - log(f"⚠ Message kept in queue for retry (attempt {receive_count}/3)", 'WARNING', worker_name) + log(f"⚠ Retry queued (attempt {receive_count}/3)", 'WARNING', worker_name) except json.JSONDecodeError as e: log(f"āœ— Invalid message format: {e}", 'ERROR', worker_name) @@ -1081,10 +1068,10 @@ def poll_domain(domain: str, queue_url: str, stop_event: threading.Event): processing_time.labels(domain=domain).observe(time.time() - start_time) except Exception as e: - log(f"āœ— Error polling {domain}: {e}", 'ERROR', worker_name) + log(f"āœ— Error polling: {e}", 'ERROR', worker_name) time.sleep(5) - log(f"šŸ‘‹ Stopped poller for {domain} (processed: {messages_processed})", 'INFO', worker_name) + log(f"šŸ‘‹ Stopped (processed: {messages_processed})", 'INFO', worker_name) # ============================================ # UNIFIED WORKER @@ -1098,6 +1085,9 @@ class UnifiedWorker: self.domains: List[str] = [] self.queue_urls: Dict[str, str] = {} self.poller_threads: List[threading.Thread] = [] + # Shared stats across all pollers + self.domain_stats: Dict[str, int] = {} # domain -> processed count + self.stats_lock = threading.Lock() def setup(self): """Initialize worker""" @@ -1127,10 +1117,14 @@ class UnifiedWorker: def start(self): """Start all domain pollers""" + # Initialize stats for all domains + for domain in self.queue_urls.keys(): + self.domain_stats[domain] = 0 + for domain, queue_url in self.queue_urls.items(): thread = threading.Thread( target=poll_domain, - args=(domain, queue_url, self.stop_event), + args=(domain, queue_url, self.stop_event, self.domain_stats, self.stats_lock), name=f"poller-{domain}", daemon=True ) @@ -1149,12 +1143,29 @@ class UnifiedWorker: # Log status summary every 5 minutes if time.time() - last_status_log > status_interval: - active_threads = sum(1 for t in self.poller_threads if t.is_alive()) - log(f"šŸ“Š Status: {active_threads}/{len(self.poller_threads)} pollers active, {len(self.queue_urls)} domains") + self._log_status_table() last_status_log = time.time() except KeyboardInterrupt: pass + def _log_status_table(self): + """Log a compact status table""" + active_threads = sum(1 for t in self.poller_threads if t.is_alive()) + + with self.stats_lock: + total_processed = sum(self.domain_stats.values()) + + # Build compact stats: only show domains with activity or top domains + stats_parts = [] + for domain in sorted(self.queue_urls.keys()): + count = self.domain_stats.get(domain, 0) + # Shorten domain for display + short_domain = domain.split('.')[0][:12] + stats_parts.append(f"{short_domain}:{count}") + + stats_line = " | ".join(stats_parts) + log(f"šŸ“Š Status: {active_threads}/{len(self.poller_threads)} active, total:{total_processed} | {stats_line}") + def stop(self): """Stop gracefully""" log("⚠ Stopping worker...") @@ -1271,4 +1282,4 @@ def main(): worker.start() if __name__ == '__main__': - main() + main() \ No newline at end of file