104 lines
3.0 KiB
Python
104 lines
3.0 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
SQS operations handler
|
|
"""
|
|
|
|
from typing import Optional, List, Dict, Any
|
|
import boto3
|
|
from botocore.exceptions import ClientError
|
|
|
|
from logger import log
|
|
from config import config, domain_to_queue_name
|
|
|
|
|
|
class SQSHandler:
|
|
"""Handles all SQS operations"""
|
|
|
|
def __init__(self):
|
|
self.client = boto3.client('sqs', region_name=config.aws_region)
|
|
|
|
def get_queue_url(self, domain: str) -> Optional[str]:
|
|
"""
|
|
Get SQS queue URL for domain
|
|
|
|
Args:
|
|
domain: Email domain
|
|
|
|
Returns:
|
|
Queue URL or None if not found
|
|
"""
|
|
queue_name = domain_to_queue_name(domain)
|
|
|
|
try:
|
|
response = self.client.get_queue_url(QueueName=queue_name)
|
|
return response['QueueUrl']
|
|
|
|
except ClientError as e:
|
|
if e.response['Error']['Code'] == 'AWS.SimpleQueueService.NonExistentQueue':
|
|
log(f"Queue not found for domain: {domain}", 'WARNING')
|
|
else:
|
|
log(f"Error getting queue URL for {domain}: {e}", 'ERROR')
|
|
return None
|
|
|
|
def receive_messages(self, queue_url: str) -> List[Dict[str, Any]]:
|
|
"""
|
|
Receive messages from queue
|
|
|
|
Args:
|
|
queue_url: SQS Queue URL
|
|
|
|
Returns:
|
|
List of message dictionaries
|
|
"""
|
|
try:
|
|
response = self.client.receive_message(
|
|
QueueUrl=queue_url,
|
|
MaxNumberOfMessages=config.max_messages,
|
|
WaitTimeSeconds=config.poll_interval,
|
|
VisibilityTimeout=config.visibility_timeout,
|
|
AttributeNames=['ApproximateReceiveCount', 'SentTimestamp']
|
|
)
|
|
|
|
return response.get('Messages', [])
|
|
|
|
except Exception as e:
|
|
log(f"Error receiving messages: {e}", 'ERROR')
|
|
return []
|
|
|
|
def delete_message(self, queue_url: str, receipt_handle: str):
|
|
"""
|
|
Delete message from queue
|
|
|
|
Args:
|
|
queue_url: SQS Queue URL
|
|
receipt_handle: Message receipt handle
|
|
"""
|
|
try:
|
|
self.client.delete_message(
|
|
QueueUrl=queue_url,
|
|
ReceiptHandle=receipt_handle
|
|
)
|
|
except Exception as e:
|
|
log(f"Error deleting message: {e}", 'ERROR')
|
|
raise
|
|
|
|
def get_queue_size(self, queue_url: str) -> int:
|
|
"""
|
|
Get approximate number of messages in queue
|
|
|
|
Args:
|
|
queue_url: SQS Queue URL
|
|
|
|
Returns:
|
|
Number of messages (0 if error)
|
|
"""
|
|
try:
|
|
attrs = self.client.get_queue_attributes(
|
|
QueueUrl=queue_url,
|
|
AttributeNames=['ApproximateNumberOfMessages']
|
|
)
|
|
return int(attrs['Attributes'].get('ApproximateNumberOfMessages', 0))
|
|
|
|
except Exception:
|
|
return 0
|