In the process of ETL data processing, many scenarios cannot directly process data synchronously, and need to be processed asynchronously through message queues, which can also effectively achieve peak shaving and valley filling.
Use Cases#
Here are primary use cases for message queues, if your biz requirements can be applied.
decouple systems#
decouple producers from consumers, which is a pub-sub model. We can benefit from this pattern to get better scalability and resiliency
Asynchronous Processing#
Tasks do not need to be processed immediately. We can benefit from this pattern to offload time-consuming tasks to be processed in the background
Rate Limiting#
Tasks can be controlled the rate to avoid overloading the downstream system such as database. We can benefit from being overwhelmed by a sudden surge in requests
Buffering#
Temporarily storing messages in during peak requests and processed them later
we can benefit from smoothing out spikes in traffic
Microservice communication#
Message notify to exchange data and command between services in a micro-service architecture
We can benefit from this pattern to implement asynchronous system with scalability and resiliency
Queue Models Selection#
In order to better realize the business data interaction, it is necessary to familiarize ourselves with the queue modes.
p2p#
producer → Q → consumer
single producer only can be processed by single consumer
e.g. aws SQS, rabbitmq
pub/sub#
publisher → Q(Topic) → subscribers(1-2-3-N)
Message need to be delivery to multiple group of consumers through topic, each consumer group receives a copy of message, consumers in a group will share consumption messages
e.g. aws kinesis Azure service bus, aws mq, rabbitmq, kafka, azure eventhubs
Fanout#
producer –> exchange → Q(1-2-3-N) → consumers(1-2-3-N)
message can be processed by multiple independent consumers
e.g. azure service bus, aws sns+sqs, aws mq, rabbitmq
Work queue#
producer → Q → workloads(1-2-3-N)
Message can be processed by workers asynchronously to improve throughput and reliability
e.g. azure storage account queue, aws sqs, rabbitmq, kafka, aws kinesis, azure service bus
priority queue#
producer → Priority Q → consumers
Messages in queue having weight and can be reordered to be processed by consumers with priority
e.g. rabbitmq, aws sqs
DLQ(Dead Letter Queue)#
producer → Q → Consumers -failure if--> DLQ
Message be processed after a certain number of attempts but still failing then move to dead letter queue for further monitoring and replay purpose
DLQ pattern helps us setting up a secondary queue that captures messages that can not be processed by primary queue
PS. it is recommended that this pattern be added to any system that handles critical messages, if the queue systems you choose do not natively support DLQ, you can use other message queue as the DLQ
e.g. aws sqs, azure service bus (native support dlq), rabbitmq (plugin support)
Message Size Quota Exceeding Strategy#
Message Queue Service | Maximum Message Size |
---|---|
Amazon SQS | 256 KB |
Azure Service Bus | 256 KB (Standard), 1 MB (Premium) |
Amazon Kinesis | 1 MB |
Apache Kafka | 1 MB (configurable) |
RabbitMQ | No strict limit (recommended < 128 KB) |
Azure Eventhubs | 1 MB (including metadata) |
Azure Storage Queue | 64 KB |
Tip
Although some queue can extend message size quota, but increasing the maximum message size can impact performance and memory usage, so it should be done with caution.
Generic Strategies#
Using Compression#
The first thing to consider should be using message compression, whereas most queues natively not support compression
Message Queue System | Native Compression Support | Manual Compression Possible | ||
---|---|---|---|---|
Amazon SQS | No | Yes | ||
Azure Service Bus | No | Yes | ||
Amazon Kinesis | No | Yes | ||
Apache Kafka | Yes | N/A | ||
RabbitMQ | No | Yes | ||
Azure Storage Queue | No | Yes |
Algorithm Selection#
-
Gzip: Good balance, widely supported.
-
Snappy: Very fast, lower compression ratio.
-
LZ4: Extremely fast, suitable for high-throughput.
-
Zstandard (zstd): Best compression ratio with reasonable speed.
Choose the algorithm that best fits your performance and compression needs. For most scenarios, Gzip will be good choices.
### encode
import gzip
import base64
from azure.storage.queue import QueueClient
def compress_message_gzip(message):
compressed_message = gzip.compress(message.encode('utf-8'))
return base64.b64encode(compressed_message).decode('utf-8')
# Example usage
large_message = "This is a very large message that needs to be compressed."
compressed_message = compress_message_gzip(large_message)
# Send compressed message to Azure Storage Queue
queue_client = QueueClient.from_connection_string("your_connection_string", "your_queue_name")
queue_client.send_message(compressed_message)
### decode
def decompress_message_gzip(compressed_message):
compressed_message = base64.b64decode(compressed_message.encode('utf-8'))
return gzip.decompress(compressed_message).decode('utf-8')
# Retrieve message from Azure Storage Queue
messages = queue_client.receive_messages()
for message in messages:
decompressed_message = decompress_message_gzip(message.content)
print(decompressed_message)
queue_client.delete_message(message)
Storing Large Messages in External Storage#
Store the large message in an external storage service(aws s3, azure blob storage, google cloud storage) and send a reference to the message in the queue.
### send message
import boto3
s3_client = boto3.client('s3')
def store_message_in_s3(large_message, bucket_name, object_key):
s3_client.put_object(Bucket=bucket_name, Key=object_key, Body=large_message)
return f's3://{bucket_name}/{object_key}'
def send_reference_to_queue(queue, reference):
queue.send_message(MessageBody=reference)
# Example usage
large_message = "This is a very large message that needs to be stored in S3."
bucket_name = "my-bucket"
object_key = "large_message.txt"
reference = store_message_in_s3(large_message, bucket_name, object_key)
send_reference_to_queue(queue, reference)
### receive message
def parse_s3_reference(reference):
parts = reference.replace("s3://", "").split("/")
bucket_name = parts[0]
object_key = "/".join(parts[1:])
return bucket_name, object_key
def get_message_from_s3(reference):
bucket_name, object_key = parse_s3_reference(reference)
response = s3_client.get_object(Bucket=bucket_name, Key=object_key)
return response['Body'].read().decode('utf-8')
def receive_reference_from_queue(queue):
messages = queue.receive_messages(MaxNumberOfMessages=1)
return messages[0].body
# Example usage
reference = receive_reference_from_queue(queue)
large_message = get_message_from_s3(reference)
print(large_message)
Chunk and Ressemble#
Chunking message within quota size limit then reassembled base on chunk id in consumers
However, this method in distributed coding environment , processing of chunk id will become much more complex, we will also encounter chunk loss exception issues the complexity of design this consumer system is relatively high, so it is not recommended to use this solution.
Conclusion
we can hybrid with compression and large message processing mechanism to handle very large message.
### sender
def hybrid_send(queue, large_message, bucket_name, object_key, chunk_size):
compressed_message = compress_message(large_message)
if len(compressed_message) > chunk_size:
reference = store_message_in_s3(compressed_message, bucket_name, object_key)
send_reference_to_queue(queue, reference)
else:
send_compressed_message_to_queue(queue, compressed_message)
# Example usage
large_message = "This is a very large message that needs to be handled in a hybrid way."
bucket_name = "my-bucket"
object_key = "large_message_compressed.txt"
chunk_size = 256 # Example chunk size
hybrid_send(queue, large_message, bucket_name, object_key, chunk_size)
### receiver
def hybrid_receive(queue):
message = receive_reference_from_queue(queue)
if message.startswith("s3://"):
compressed_message = get_message_from_s3(message)
else:
compressed_message = message
large_message = decompress_message(compressed_message)
print(large_message)
# Example usage
hybrid_receive(queue)