GarethNg

Gareth Ng

With a bamboo staff and straw sandals, I feel lighter than riding a horse, In a cloak amidst the misty rain, I live my life as it comes.
github
email
x
telegram

在ETL过程中的消息系统使用

在 ETL 数据处理过程中,许多场景无法直接同步处理数据,需要通过消息队列异步处理,这也可以有效实现削峰填谷。

用例#

以下是消息队列的主要用例,如果您的业务需求适用。

解耦系统#

将生产者与消费者解耦,这是一种发布 - 订阅模型。我们可以从这种模式中受益,以获得更好的可扩展性和弹性。

异步处理#

任务不需要立即处理。我们可以从这种模式中受益,将耗时的任务卸载到后台处理。

限流#

可以控制任务的速率,以避免对下游系统(如数据库)的过载。我们可以从中受益,以防止请求的突然激增。

缓冲#

在高峰请求期间暂时存储消息,并在稍后处理它们。

我们可以从中受益,以平滑流量的高峰。

微服务通信#

消息通知在微服务架构中交换数据和命令。

我们可以从这种模式中受益,以实现具有可扩展性和弹性的异步系统。

队列模型选择#

为了更好地实现业务数据交互,有必要熟悉队列模式。

点对点#

生产者 → Q → 消费者

单个生产者只能由单个消费者处理。

例如:aws SQS,rabbitmq

发布 / 订阅#

发布者 → Q(主题) → 订阅者(1-2-3-N)

消息需要通过主题传递给多个消费者组,每个消费者组接收一份消息,组内的消费者将共享消费消息。

例如:aws kinesis,Azure service bus,aws mq,rabbitmq,kafka,azure eventhubs

扇出#

生产者 → 交换机 → Q(1-2-3-N) → 消费者(1-2-3-N)

消息可以被多个独立的消费者处理。

例如:azure service bus,aws sns+sqs,aws mq,rabbitmq

工作队列#

生产者 → Q → 工作负载(1-2-3-N)

消息可以被工作者异步处理,以提高吞吐量和可靠性。

例如:azure storage account queue,aws sqs,rabbitmq,kafka,aws kinesis,azure service bus

优先级队列#

生产者 → 优先级 Q → 消费者

队列中的消息具有权重,可以重新排序以优先处理。

例如:rabbitmq,aws sqs

死信队列(DLQ)#

生产者 → Q → 消费者 - 失败后 --> DLQ

消息在经过一定次数的尝试后仍然失败,则移动到死信队列以便进一步监控和重放。

DLQ 模式帮助我们设置一个次级队列,以捕获无法由主队列处理的消息。

PS. 建议将此模式添加到处理关键消息的任何系统中,如果您选择的队列系统不原生支持 DLQ,您可以使用其他消息队列作为 DLQ。

例如:aws sqs,azure service bus(原生支持 dlq),rabbitmq(插件支持)

消息大小配额超出策略#

消息队列服务最大消息大小
Amazon SQS256 KB
Azure Service Bus256 KB(标准),1 MB(高级)
Amazon Kinesis1 MB
Apache Kafka1 MB(可配置)
RabbitMQ没有严格限制(推荐 < 128 KB)
Azure Eventhubs1 MB(包括元数据)
Azure Storage Queue64 KB

Tip

尽管某些队列可以扩展消息大小配额,但增加最大消息大小可能会影响性能和内存使用,因此应谨慎进行。

通用策略#

使用压缩#

首先要考虑的是使用消息压缩,而大多数队列原生不支持压缩。

消息队列系统原生压缩支持手动压缩可能
Amazon SQS
Azure Service Bus
Amazon Kinesis
Apache Kafka不适用
RabbitMQ
Azure Storage Queue

算法选择#

  • Gzip:良好的平衡,广泛支持。

  • Snappy:非常快,压缩比低。

  • LZ4:极快,适合高吞吐量。

  • Zstandard(zstd):最佳压缩比,速度合理。

选择最适合您性能和压缩需求的算法。在大多数情况下,Gzip 将是不错的选择。

### 编码
import gzip
import base64
from azure.storage.queue import QueueClient

def compress_message_gzip(message):
    compressed_message = gzip.compress(message.encode('utf-8'))
    return base64.b64encode(compressed_message).decode('utf-8')

# 示例用法
large_message = "这是一个需要压缩的非常大的消息。"
compressed_message = compress_message_gzip(large_message)

# 将压缩消息发送到Azure存储队列
queue_client = QueueClient.from_connection_string("your_connection_string", "your_queue_name")
queue_client.send_message(compressed_message)


### 解码
def decompress_message_gzip(compressed_message):
    compressed_message = base64.b64decode(compressed_message.encode('utf-8'))
    return gzip.decompress(compressed_message).decode('utf-8')

# 从Azure存储队列检索消息
messages = queue_client.receive_messages()
for message in messages:
    decompressed_message = decompress_message_gzip(message.content)
    print(decompressed_message)
    queue_client.delete_message(message)

在外部存储中存储大消息#

将大消息存储在外部存储服务(aws s3,azure blob storage,google cloud storage)中,并在队列中发送对消息的引用。

### 发送消息
import boto3

s3_client = boto3.client('s3')

def store_message_in_s3(large_message, bucket_name, object_key):
    s3_client.put_object(Bucket=bucket_name, Key=object_key, Body=large_message)
    return f's3://{bucket_name}/{object_key}'

def send_reference_to_queue(queue, reference):
    queue.send_message(MessageBody=reference)

# 示例用法
large_message = "这是一个需要存储在S3中的非常大的消息。"
bucket_name = "my-bucket"
object_key = "large_message.txt"
reference = store_message_in_s3(large_message, bucket_name, object_key)
send_reference_to_queue(queue, reference)

### 接收消息
def parse_s3_reference(reference):
    parts = reference.replace("s3://", "").split("/")
    bucket_name = parts[0]
    object_key = "/".join(parts[1:])
    return bucket_name, object_key

def get_message_from_s3(reference):
    bucket_name, object_key = parse_s3_reference(reference)
    response = s3_client.get_object(Bucket=bucket_name, Key=object_key)
    return response['Body'].read().decode('utf-8')

def receive_reference_from_queue(queue):
    messages = queue.receive_messages(MaxNumberOfMessages=1)
    return messages[0].body

# 示例用法
reference = receive_reference_from_queue(queue)
large_message = get_message_from_s3(reference)
print(large_message)

分块和重组#

在配额大小限制内对消息进行分块,然后根据块 ID 在消费者中重组。

然而,在分布式编码环境中,块 ID 的处理将变得更加复杂,我们还会遇到块丢失异常问题,因此设计这个消费者系统的复杂性相对较高,因此不建议使用此解决方案。

结论
我们可以结合压缩和大消息处理机制来处理非常大的消息。

### 发送者
def hybrid_send(queue, large_message, bucket_name, object_key, chunk_size):
    compressed_message = compress_message(large_message)
    if len(compressed_message) > chunk_size:
        reference = store_message_in_s3(compressed_message, bucket_name, object_key)
        send_reference_to_queue(queue, reference)
    else:
        send_compressed_message_to_queue(queue, compressed_message)

# 示例用法
large_message = "这是一个需要以混合方式处理的非常大的消息。"
bucket_name = "my-bucket"
object_key = "large_message_compressed.txt"
chunk_size = 256  # 示例块大小
hybrid_send(queue, large_message, bucket_name, object_key, chunk_size)

### 接收者
def hybrid_receive(queue):
    message = receive_reference_from_queue(queue)
    if message.startswith("s3://"):
        compressed_message = get_message_from_s3(message)
    else:
        compressed_message = message
    large_message = decompress_message(compressed_message)
    print(large_message)

# 示例用法
hybrid_receive(queue)
加载中...
此文章数据所有权由区块链加密技术和智能合约保障仅归创作者所有。