GarethNg

Gareth Ng

With a bamboo staff and straw sandals, I feel lighter than riding a horse, In a cloak amidst the misty rain, I live my life as it comes.
github
email
x
telegram

Using Message System in ETL Process

In the process of ETL data processing, many scenarios cannot directly process data synchronously, and need to be processed asynchronously through message queues, which can also effectively achieve peak shaving and valley filling.

Use Cases#

Here are primary use cases for message queues, if your biz requirements can be applied.

decouple systems#

decouple producers from consumers, which is a pub-sub model. We can benefit from this pattern to get better scalability and resiliency

Asynchronous Processing#

Tasks do not need to be processed immediately. We can benefit from this pattern to offload time-consuming tasks to be processed in the background

Rate Limiting#

Tasks can be controlled the rate to avoid overloading the downstream system such as database. We can benefit from being overwhelmed by a sudden surge in requests

Buffering#

Temporarily storing messages in during peak requests and processed them later

we can benefit from smoothing out spikes in traffic

Microservice communication#

Message notify to exchange data and command between services in a micro-service architecture

We can benefit from this pattern to implement asynchronous system with scalability and resiliency

Queue Models Selection#

In order to better realize the business data interaction, it is necessary to familiarize ourselves with the queue modes.

p2p#

producer → Q → consumer

single producer only can be processed by single consumer

e.g. aws SQS, rabbitmq

pub/sub#

publisher → Q(Topic) → subscribers(1-2-3-N)

Message need to be delivery to multiple group of consumers through topic, each consumer group receives a copy of message, consumers in a group will share consumption messages

e.g. aws kinesis Azure service bus, aws mq, rabbitmq, kafka, azure eventhubs

Fanout#

producer –> exchange → Q(1-2-3-N) → consumers(1-2-3-N)

message can be processed by multiple independent consumers

e.g. azure service bus, aws sns+sqs, aws mq, rabbitmq

Work queue#

producer → Q → workloads(1-2-3-N)

Message can be processed by workers asynchronously to improve throughput and reliability

e.g. azure storage account queue, aws sqs, rabbitmq, kafka, aws kinesis, azure service bus

priority queue#

producer → Priority Q → consumers

Messages in queue having weight and can be reordered to be processed by consumers with priority

e.g. rabbitmq, aws sqs

DLQ(Dead Letter Queue)#

producer → Q → Consumers -failure if--> DLQ

Message be processed after a certain number of attempts but still failing then move to dead letter queue for further monitoring and replay purpose

DLQ pattern helps us setting up a secondary queue that captures messages that can not be processed by primary queue

PS. it is recommended that this pattern be added to any system that handles critical messages, if the queue systems you choose do not natively support DLQ, you can use other message queue as the DLQ

e.g. aws sqs, azure service bus (native support dlq), rabbitmq (plugin support)

Message Size Quota Exceeding Strategy#

Message Queue ServiceMaximum Message Size
Amazon SQS256 KB
Azure Service Bus256 KB (Standard), 1 MB (Premium)
Amazon Kinesis1 MB
Apache Kafka1 MB (configurable)
RabbitMQNo strict limit (recommended < 128 KB)
Azure Eventhubs1 MB (including metadata)
Azure Storage Queue64 KB

Tip

Although some queue can extend message size quota, but increasing the maximum message size can impact performance and memory usage, so it should be done with caution.

Generic Strategies#

Using Compression#

The first thing to consider should be using message compression, whereas most queues natively not support compression

Message Queue SystemNative Compression SupportManual Compression Possible
Amazon SQSNoYes
Azure Service BusNoYes
Amazon KinesisNoYes
Apache KafkaYesN/A
RabbitMQNoYes
Azure Storage QueueNoYes

Algorithm Selection#

  • Gzip: Good balance, widely supported.

  • Snappy: Very fast, lower compression ratio.

  • LZ4: Extremely fast, suitable for high-throughput.

  • Zstandard (zstd): Best compression ratio with reasonable speed.

Choose the algorithm that best fits your performance and compression needs. For most scenarios, Gzip will be good choices.

### encode
import gzip
import base64
from azure.storage.queue import QueueClient

def compress_message_gzip(message):
    compressed_message = gzip.compress(message.encode('utf-8'))
    return base64.b64encode(compressed_message).decode('utf-8')

# Example usage
large_message = "This is a very large message that needs to be compressed."
compressed_message = compress_message_gzip(large_message)

# Send compressed message to Azure Storage Queue
queue_client = QueueClient.from_connection_string("your_connection_string", "your_queue_name")
queue_client.send_message(compressed_message)


### decode
def decompress_message_gzip(compressed_message):
    compressed_message = base64.b64decode(compressed_message.encode('utf-8'))
    return gzip.decompress(compressed_message).decode('utf-8')

# Retrieve message from Azure Storage Queue
messages = queue_client.receive_messages()
for message in messages:
    decompressed_message = decompress_message_gzip(message.content)
    print(decompressed_message)
    queue_client.delete_message(message)

Storing Large Messages in External Storage#

Store the large message in an external storage service(aws s3, azure blob storage, google cloud storage) and send a reference to the message in the queue.

### send message
import boto3

s3_client = boto3.client('s3')

def store_message_in_s3(large_message, bucket_name, object_key):
    s3_client.put_object(Bucket=bucket_name, Key=object_key, Body=large_message)
    return f's3://{bucket_name}/{object_key}'

def send_reference_to_queue(queue, reference):
    queue.send_message(MessageBody=reference)

# Example usage
large_message = "This is a very large message that needs to be stored in S3."
bucket_name = "my-bucket"
object_key = "large_message.txt"
reference = store_message_in_s3(large_message, bucket_name, object_key)
send_reference_to_queue(queue, reference)

### receive message
def parse_s3_reference(reference):
    parts = reference.replace("s3://", "").split("/")
    bucket_name = parts[0]
    object_key = "/".join(parts[1:])
    return bucket_name, object_key

def get_message_from_s3(reference):
    bucket_name, object_key = parse_s3_reference(reference)
    response = s3_client.get_object(Bucket=bucket_name, Key=object_key)
    return response['Body'].read().decode('utf-8')

def receive_reference_from_queue(queue):
    messages = queue.receive_messages(MaxNumberOfMessages=1)
    return messages[0].body

# Example usage
reference = receive_reference_from_queue(queue)
large_message = get_message_from_s3(reference)
print(large_message)

Chunk and Ressemble#

Chunking message within quota size limit then reassembled base on chunk id in consumers

However, this method in distributed coding environment , processing of chunk id will become much more complex, we will also encounter chunk loss exception issues the complexity of design this consumer system is relatively high, so it is not recommended to use this solution.

Conclusion
we can hybrid with compression and large message processing mechanism to handle very large message.

### sender
def hybrid_send(queue, large_message, bucket_name, object_key, chunk_size):
    compressed_message = compress_message(large_message)
    if len(compressed_message) > chunk_size:
        reference = store_message_in_s3(compressed_message, bucket_name, object_key)
        send_reference_to_queue(queue, reference)
    else:
        send_compressed_message_to_queue(queue, compressed_message)

# Example usage
large_message = "This is a very large message that needs to be handled in a hybrid way."
bucket_name = "my-bucket"
object_key = "large_message_compressed.txt"
chunk_size = 256  # Example chunk size
hybrid_send(queue, large_message, bucket_name, object_key, chunk_size)

### receiver
def hybrid_receive(queue):
    message = receive_reference_from_queue(queue)
    if message.startswith("s3://"):
        compressed_message = get_message_from_s3(message)
    else:
        compressed_message = message
    large_message = decompress_message(compressed_message)
    print(large_message)

# Example usage
hybrid_receive(queue)
Loading...
Ownership of this post data is guaranteed by blockchain and smart contracts to the creator alone.