This commit is contained in:
Andreas Knuth 2026-01-10 18:42:31 -06:00
parent afe33ef381
commit 7dfad647e9
1 changed files with 16 additions and 74 deletions

View File

@ -1104,12 +1104,9 @@ class UnifiedWorker:
def __init__(self):
self.stop_event = threading.Event()
self.executor: Optional[ThreadPoolExecutor] = None
self.domains: List[str] = []
self.queue_urls: Dict[str, str] = {}
self.active_pollers: Dict[str, threading.Thread] = {}
self.poller_lock = threading.Lock()
self.reload_interval = int(os.environ.get('DOMAIN_RELOAD_INTERVAL', '60')) # Sekunden
self.poller_threads: List[threading.Thread] = []
def setup(self):
"""Initialize worker"""
@ -1138,21 +1135,18 @@ class UnifiedWorker:
log(f"Initialized with {len(self.queue_urls)} domains")
def start(self):
"""Start all domain pollers with hot-reload support"""
# Initial pollers starten
"""Start all domain pollers"""
for domain, queue_url in self.queue_urls.items():
self._start_poller(domain, queue_url)
log(f"Started {len(self.active_pollers)} domain pollers")
# Domain reload thread starten
reload_thread = threading.Thread(
target=self._domain_reload_loop,
name='domain-reloader',
thread = threading.Thread(
target=poll_domain,
args=(domain, queue_url, self.stop_event),
name=f"poller-{domain}",
daemon=True
)
reload_thread.start()
log(f"Domain hot-reload enabled (checking every {self.reload_interval}s)")
thread.start()
self.poller_threads.append(thread)
log(f"Started {len(self.poller_threads)} domain pollers")
# Warten bis Shutdown
try:
@ -1161,67 +1155,16 @@ class UnifiedWorker:
except KeyboardInterrupt:
pass
def _start_poller(self, domain: str, queue_url: str):
"""Start a single domain poller thread"""
with self.poller_lock:
if domain in self.active_pollers:
return # Already running
thread = threading.Thread(
target=poll_domain,
args=(domain, queue_url, self.stop_event),
name=f"poller-{domain}",
daemon=True
)
thread.start()
self.active_pollers[domain] = thread
log(f"🚀 Started poller for {domain}", 'INFO', f"worker-{domain}")
def _domain_reload_loop(self):
"""Periodically check for new domains and start pollers"""
while not self.stop_event.is_set():
self.stop_event.wait(timeout=self.reload_interval)
if self.stop_event.is_set():
break
try:
# Aktuelle Domains laden
current_domains = load_domains()
# Neue Domains finden
new_domains = set(current_domains) - set(self.queue_urls.keys())
if new_domains:
log(f"🔄 Hot-reload: Found {len(new_domains)} new domain(s): {', '.join(new_domains)}")
for domain in new_domains:
queue_url = get_queue_url(domain)
if queue_url:
self.queue_urls[domain] = queue_url
self._start_poller(domain, queue_url)
else:
log(f"⚠ Could not find queue for new domain: {domain}", 'WARNING')
# Cleanup: Entfernte Domains (optional - Threads laufen weiter bis restart)
removed_domains = set(self.queue_urls.keys()) - set(current_domains)
if removed_domains:
log(f" Domains removed from config (will stop on restart): {', '.join(removed_domains)}")
except Exception as e:
log(f"Error in domain reload: {e}", 'ERROR')
def stop(self):
"""Stop gracefully"""
log("⚠ Stopping worker...")
self.stop_event.set()
# Warten auf Poller-Threads (max 10 Sekunden)
with self.poller_lock:
for domain, thread in self.active_pollers.items():
for thread in self.poller_threads:
thread.join(timeout=10)
if thread.is_alive():
log(f"Warning: Poller for {domain} did not stop gracefully", 'WARNING')
log(f"Warning: {thread.name} did not stop gracefully", 'WARNING')
smtp_pool.close_all()
log("👋 Worker stopped")
@ -1311,10 +1254,9 @@ def main():
start_health_server(worker)
log(f"\n{'='*70}")
log(f"🚀 UNIFIED EMAIL WORKER (Full Featured)")
log(f"🚀 UNIFIED EMAIL WORKER")
log(f"{'='*70}")
log(f" Domains: {len(worker.queue_urls)}")
log(f" Hot-Reload: Every {worker.reload_interval}s")
log(f" DynamoDB: {'Connected' if DYNAMODB_AVAILABLE else 'Not Available'}")
log(f" SMTP Pool: {config.smtp_pool_size} connections -> {config.smtp_host}:{config.smtp_port}")
log(f" Poll Interval: {config.poll_interval}s")