在 ETL 数据处理过程中,许多场景无法直接同步处理数据,需要通过消息队列异步处理,这也可以有效实现削峰填谷。
用例#
以下是消息队列的主要用例,如果您的业务需求适用。
解耦系统#
将生产者与消费者解耦,这是一种发布 - 订阅模型。我们可以从这种模式中受益,以获得更好的可扩展性和弹性。
异步处理#
任务不需要立即处理。我们可以从这种模式中受益,将耗时的任务卸载到后台处理。
限流#
可以控制任务的速率,以避免对下游系统(如数据库)的过载。我们可以从中受益,以防止请求的突然激增。
缓冲#
在高峰请求期间暂时存储消息,并在稍后处理它们。
我们可以从中受益,以平滑流量的高峰。
微服务通信#
消息通知在微服务架构中交换数据和命令。
我们可以从这种模式中受益,以实现具有可扩展性和弹性的异步系统。
队列模型选择#
为了更好地实现业务数据交互,有必要熟悉队列模式。
点对点#
生产者 → Q → 消费者
单个生产者只能由单个消费者处理。
例如:aws SQS,rabbitmq
发布 / 订阅#
发布者 → Q(主题) → 订阅者(1-2-3-N)
消息需要通过主题传递给多个消费者组,每个消费者组接收一份消息,组内的消费者将共享消费消息。
例如:aws kinesis,Azure service bus,aws mq,rabbitmq,kafka,azure eventhubs
扇出#
生产者 → 交换机 → Q(1-2-3-N) → 消费者(1-2-3-N)
消息可以被多个独立的消费者处理。
例如:azure service bus,aws sns+sqs,aws mq,rabbitmq
工作队列#
生产者 → Q → 工作负载(1-2-3-N)
消息可以被工作者异步处理,以提高吞吐量和可靠性。
例如:azure storage account queue,aws sqs,rabbitmq,kafka,aws kinesis,azure service bus
优先级队列#
生产者 → 优先级 Q → 消费者
队列中的消息具有权重,可以重新排序以优先处理。
例如:rabbitmq,aws sqs
死信队列(DLQ)#
生产者 → Q → 消费者 - 失败后 --> DLQ
消息在经过一定次数的尝试后仍然失败,则移动到死信队列以便进一步监控和重放。
DLQ 模式帮助我们设置一个次级队列,以捕获无法由主队列处理的消息。
PS. 建议将此模式添加到处理关键消息的任何系统中,如果您选择的队列系统不原生支持 DLQ,您可以使用其他消息队列作为 DLQ。
例如:aws sqs,azure service bus(原生支持 dlq),rabbitmq(插件支持)
消息大小配额超出策略#
消息队列服务 | 最大消息大小 |
---|---|
Amazon SQS | 256 KB |
Azure Service Bus | 256 KB(标准),1 MB(高级) |
Amazon Kinesis | 1 MB |
Apache Kafka | 1 MB(可配置) |
RabbitMQ | 没有严格限制(推荐 < 128 KB) |
Azure Eventhubs | 1 MB(包括元数据) |
Azure Storage Queue | 64 KB |
Tip
尽管某些队列可以扩展消息大小配额,但增加最大消息大小可能会影响性能和内存使用,因此应谨慎进行。
通用策略#
使用压缩#
首先要考虑的是使用消息压缩,而大多数队列原生不支持压缩。
消息队列系统 | 原生压缩支持 | 手动压缩可能 | ||
---|---|---|---|---|
Amazon SQS | 否 | 是 | ||
Azure Service Bus | 否 | 是 | ||
Amazon Kinesis | 否 | 是 | ||
Apache Kafka | 是 | 不适用 | ||
RabbitMQ | 否 | 是 | ||
Azure Storage Queue | 否 | 是 |
算法选择#
-
Gzip:良好的平衡,广泛支持。
-
Snappy:非常快,压缩比低。
-
LZ4:极快,适合高吞吐量。
-
Zstandard(zstd):最佳压缩比,速度合理。
选择最适合您性能和压缩需求的算法。在大多数情况下,Gzip 将是不错的选择。
### 编码
import gzip
import base64
from azure.storage.queue import QueueClient
def compress_message_gzip(message):
compressed_message = gzip.compress(message.encode('utf-8'))
return base64.b64encode(compressed_message).decode('utf-8')
# 示例用法
large_message = "这是一个需要压缩的非常大的消息。"
compressed_message = compress_message_gzip(large_message)
# 将压缩消息发送到Azure存储队列
queue_client = QueueClient.from_connection_string("your_connection_string", "your_queue_name")
queue_client.send_message(compressed_message)
### 解码
def decompress_message_gzip(compressed_message):
compressed_message = base64.b64decode(compressed_message.encode('utf-8'))
return gzip.decompress(compressed_message).decode('utf-8')
# 从Azure存储队列检索消息
messages = queue_client.receive_messages()
for message in messages:
decompressed_message = decompress_message_gzip(message.content)
print(decompressed_message)
queue_client.delete_message(message)
在外部存储中存储大消息#
将大消息存储在外部存储服务(aws s3,azure blob storage,google cloud storage)中,并在队列中发送对消息的引用。
### 发送消息
import boto3
s3_client = boto3.client('s3')
def store_message_in_s3(large_message, bucket_name, object_key):
s3_client.put_object(Bucket=bucket_name, Key=object_key, Body=large_message)
return f's3://{bucket_name}/{object_key}'
def send_reference_to_queue(queue, reference):
queue.send_message(MessageBody=reference)
# 示例用法
large_message = "这是一个需要存储在S3中的非常大的消息。"
bucket_name = "my-bucket"
object_key = "large_message.txt"
reference = store_message_in_s3(large_message, bucket_name, object_key)
send_reference_to_queue(queue, reference)
### 接收消息
def parse_s3_reference(reference):
parts = reference.replace("s3://", "").split("/")
bucket_name = parts[0]
object_key = "/".join(parts[1:])
return bucket_name, object_key
def get_message_from_s3(reference):
bucket_name, object_key = parse_s3_reference(reference)
response = s3_client.get_object(Bucket=bucket_name, Key=object_key)
return response['Body'].read().decode('utf-8')
def receive_reference_from_queue(queue):
messages = queue.receive_messages(MaxNumberOfMessages=1)
return messages[0].body
# 示例用法
reference = receive_reference_from_queue(queue)
large_message = get_message_from_s3(reference)
print(large_message)
分块和重组#
在配额大小限制内对消息进行分块,然后根据块 ID 在消费者中重组。
然而,在分布式编码环境中,块 ID 的处理将变得更加复杂,我们还会遇到块丢失异常问题,因此设计这个消费者系统的复杂性相对较高,因此不建议使用此解决方案。
结论
我们可以结合压缩和大消息处理机制来处理非常大的消息。
### 发送者
def hybrid_send(queue, large_message, bucket_name, object_key, chunk_size):
compressed_message = compress_message(large_message)
if len(compressed_message) > chunk_size:
reference = store_message_in_s3(compressed_message, bucket_name, object_key)
send_reference_to_queue(queue, reference)
else:
send_compressed_message_to_queue(queue, compressed_message)
# 示例用法
large_message = "这是一个需要以混合方式处理的非常大的消息。"
bucket_name = "my-bucket"
object_key = "large_message_compressed.txt"
chunk_size = 256 # 示例块大小
hybrid_send(queue, large_message, bucket_name, object_key, chunk_size)
### 接收者
def hybrid_receive(queue):
message = receive_reference_from_queue(queue)
if message.startswith("s3://"):
compressed_message = get_message_from_s3(message)
else:
compressed_message = message
large_message = decompress_message(compressed_message)
print(large_message)
# 示例用法
hybrid_receive(queue)