email-amazon/unified-worker/email-worker/aws/sqs_handler.py

104 lines
3.0 KiB
Python

#!/usr/bin/env python3
"""
SQS operations handler
"""
from typing import Optional, List, Dict, Any
import boto3
from botocore.exceptions import ClientError
from logger import log
from config import config, domain_to_queue_name
class SQSHandler:
"""Handles all SQS operations"""
def __init__(self):
self.client = boto3.client('sqs', region_name=config.aws_region)
def get_queue_url(self, domain: str) -> Optional[str]:
"""
Get SQS queue URL for domain
Args:
domain: Email domain
Returns:
Queue URL or None if not found
"""
queue_name = domain_to_queue_name(domain)
try:
response = self.client.get_queue_url(QueueName=queue_name)
return response['QueueUrl']
except ClientError as e:
if e.response['Error']['Code'] == 'AWS.SimpleQueueService.NonExistentQueue':
log(f"Queue not found for domain: {domain}", 'WARNING')
else:
log(f"Error getting queue URL for {domain}: {e}", 'ERROR')
return None
def receive_messages(self, queue_url: str) -> List[Dict[str, Any]]:
"""
Receive messages from queue
Args:
queue_url: SQS Queue URL
Returns:
List of message dictionaries
"""
try:
response = self.client.receive_message(
QueueUrl=queue_url,
MaxNumberOfMessages=config.max_messages,
WaitTimeSeconds=config.poll_interval,
VisibilityTimeout=config.visibility_timeout,
AttributeNames=['ApproximateReceiveCount', 'SentTimestamp']
)
return response.get('Messages', [])
except Exception as e:
log(f"Error receiving messages: {e}", 'ERROR')
return []
def delete_message(self, queue_url: str, receipt_handle: str):
"""
Delete message from queue
Args:
queue_url: SQS Queue URL
receipt_handle: Message receipt handle
"""
try:
self.client.delete_message(
QueueUrl=queue_url,
ReceiptHandle=receipt_handle
)
except Exception as e:
log(f"Error deleting message: {e}", 'ERROR')
raise
def get_queue_size(self, queue_url: str) -> int:
"""
Get approximate number of messages in queue
Args:
queue_url: SQS Queue URL
Returns:
Number of messages (0 if error)
"""
try:
attrs = self.client.get_queue_attributes(
QueueUrl=queue_url,
AttributeNames=['ApproximateNumberOfMessages']
)
return int(attrs['Attributes'].get('ApproximateNumberOfMessages', 0))
except Exception:
return 0