在 ETL 數據處理過程中,許多場景無法直接同步處理數據,需要通過消息隊列進行異步處理,這也能有效實現削峰填谷。
使用案例#
以下是消息隊列的主要使用案例,如果您的業務需求可以應用。
解耦系統#
將生產者與消費者解耦,這是一種發布 - 訂閱模型。我們可以從這種模式中獲益,以獲得更好的可擴展性和韌性。
異步處理#
任務不需要立即處理。我們可以從這種模式中獲益,將耗時的任務卸載到背景中處理。
限流#
可以控制任務的速率,以避免對下游系統(如數據庫)的過載。我們可以從中受益,以避免因請求突然激增而感到不堪重負。
緩衝#
在高峰請求期間暫時存儲消息,稍後再處理。
我們可以從中受益,以平滑流量的高峰。
微服務通信#
消息通知在微服務架構中交換數據和命令。
我們可以從這種模式中受益,以實現具有可擴展性和韌性的異步系統。
隊列模型選擇#
為了更好地實現業務數據交互,有必要熟悉隊列模式。
p2p#
生產者 → Q → 消費者
單一生產者只能由單一消費者處理。
例如:aws SQS,rabbitmq
pub/sub#
發布者 → Q(主題)→ 訂閱者(1-2-3-N)
消息需要通過主題傳遞給多個消費者組,每個消費者組接收消息的副本,組內的消費者將共享消費消息。
例如:aws kinesis,Azure service bus,aws mq,rabbitmq,kafka,azure eventhubs
Fanout#
生產者 → 交換 → Q(1-2-3-N)→ 消費者(1-2-3-N)
消息可以由多個獨立的消費者處理。
例如:azure service bus,aws sns+sqs,aws mq,rabbitmq
工作隊列#
生產者 → Q → 工作負載(1-2-3-N)
消息可以由工作者異步處理,以提高吞吐量和可靠性。
例如:azure storage account queue,aws sqs,rabbitmq,kafka,aws kinesis,azure service bus
優先隊列#
生產者 → 優先 Q → 消費者
隊列中的消息具有權重,可以重新排序以由優先消費者處理。
例如:rabbitmq,aws sqs
DLQ(死信隊列)#
生產者 → Q → 消費者 - 失敗則 → DLQ
消息在經過一定次數的嘗試後仍然失敗,則移動到死信隊列以進行進一步監控和重播。
DLQ 模式幫助我們設置一個次要隊列,捕獲無法由主要隊列處理的消息。
PS. 建議將此模式添加到處理關鍵消息的任何系統中,如果您選擇的隊列系統不原生支持 DLQ,您可以使用其他消息隊列作為 DLQ。
例如:aws sqs,azure service bus(原生支持 dlq),rabbitmq(插件支持)。
消息大小配額超限策略#
消息隊列服務 | 最大消息大小 |
---|---|
Amazon SQS | 256 KB |
Azure Service Bus | 256 KB(標準),1 MB(高級) |
Amazon Kinesis | 1 MB |
Apache Kafka | 1 MB(可配置) |
RabbitMQ | 無嚴格限制(建議 < 128 KB) |
Azure Eventhubs | 1 MB(包括元數據) |
Azure Storage Queue | 64 KB |
Tip
雖然某些隊列可以擴展消息大小配額,但增加最大消息大小可能會影響性能和內存使用,因此應謹慎進行。
通用策略#
使用壓縮#
首先要考慮的是使用消息壓縮,而大多數隊列原生不支持壓縮。
消息隊列系統 | 原生壓縮支持 | 手動壓縮可能 | ||
---|---|---|---|---|
Amazon SQS | 否 | 是 | ||
Azure Service Bus | 否 | 是 | ||
Amazon Kinesis | 否 | 是 | ||
Apache Kafka | 是 | 不適用 | ||
RabbitMQ | 否 | 是 | ||
Azure Storage Queue | 否 | 是 |
算法選擇#
-
Gzip:良好的平衡,廣泛支持。
-
Snappy:非常快,壓縮比低。
-
LZ4:極快,適合高吞吐量。
-
Zstandard(zstd):最佳壓縮比,速度合理。
選擇最適合您性能和壓縮需求的算法。在大多數場景中,Gzip 將是良好的選擇。
### 編碼
import gzip
import base64
from azure.storage.queue import QueueClient
def compress_message_gzip(message):
compressed_message = gzip.compress(message.encode('utf-8'))
return base64.b64encode(compressed_message).decode('utf-8')
# 示例用法
large_message = "這是一條需要壓縮的非常大的消息。"
compressed_message = compress_message_gzip(large_message)
# 將壓縮的消息發送到Azure Storage Queue
queue_client = QueueClient.from_connection_string("your_connection_string", "your_queue_name")
queue_client.send_message(compressed_message)
### 解碼
def decompress_message_gzip(compressed_message):
compressed_message = base64.b64decode(compressed_message.encode('utf-8'))
return gzip.decompress(compressed_message).decode('utf-8')
# 從Azure Storage Queue檢索消息
messages = queue_client.receive_messages()
for message in messages:
decompressed_message = decompress_message_gzip(message.content)
print(decompressed_message)
queue_client.delete_message(message)
將大型消息存儲在外部存儲中#
將大型消息存儲在外部存儲服務(aws s3,azure blob storage,google cloud storage)中,並在隊列中發送對該消息的引用。
### 發送消息
import boto3
s3_client = boto3.client('s3')
def store_message_in_s3(large_message, bucket_name, object_key):
s3_client.put_object(Bucket=bucket_name, Key=object_key, Body=large_message)
return f's3://{bucket_name}/{object_key}'
def send_reference_to_queue(queue, reference):
queue.send_message(MessageBody=reference)
# 示例用法
large_message = "這是一條需要存儲在S3中的非常大的消息。"
bucket_name = "my-bucket"
object_key = "large_message.txt"
reference = store_message_in_s3(large_message, bucket_name, object_key)
send_reference_to_queue(queue, reference)
### 接收消息
def parse_s3_reference(reference):
parts = reference.replace("s3://", "").split("/")
bucket_name = parts[0]
object_key = "/".join(parts[1:])
return bucket_name, object_key
def get_message_from_s3(reference):
bucket_name, object_key = parse_s3_reference(reference)
response = s3_client.get_object(Bucket=bucket_name, Key=object_key)
return response['Body'].read().decode('utf-8')
def receive_reference_from_queue(queue):
messages = queue.receive_messages(MaxNumberOfMessages=1)
return messages[0].body
# 示例用法
reference = receive_reference_from_queue(queue)
large_message = get_message_from_s3(reference)
print(large_message)
分塊和重組#
在配額大小限制內將消息分塊,然後根據塊 ID 在消費者中重組。
然而,這種方法在分佈式編碼環境中,塊 ID 的處理將變得更加複雜,我們還會遇到塊丟失異常問題,設計這種消費者系統的複雜性相對較高,因此不建議使用此解決方案。
結論
我們可以混合使用壓縮和大型消息處理機制來處理非常大的消息。
### 發送者
def hybrid_send(queue, large_message, bucket_name, object_key, chunk_size):
compressed_message = compress_message(large_message)
if len(compressed_message) > chunk_size:
reference = store_message_in_s3(compressed_message, bucket_name, object_key)
send_reference_to_queue(queue, reference)
else:
send_compressed_message_to_queue(queue, compressed_message)
# 示例用法
large_message = "這是一條需要以混合方式處理的非常大的消息。"
bucket_name = "my-bucket"
object_key = "large_message_compressed.txt"
chunk_size = 256 # 示例塊大小
hybrid_send(queue, large_message, bucket_name, object_key, chunk_size)
### 接收者
def hybrid_receive(queue):
message = receive_reference_from_queue(queue)
if message.startswith("s3://"):
compressed_message = get_message_from_s3(message)
else:
compressed_message = message
large_message = decompress_message(compressed_message)
print(large_message)
# 示例用法
hybrid_receive(queue)