GarethNg

Gareth Ng

With a bamboo staff and straw sandals, I feel lighter than riding a horse, In a cloak amidst the misty rain, I live my life as it comes.
github
email
x
telegram

在ETL過程中使用消息系統

在 ETL 數據處理過程中,許多場景無法直接同步處理數據,需要通過消息隊列進行異步處理,這也能有效實現削峰填谷。

使用案例#

以下是消息隊列的主要使用案例,如果您的業務需求可以應用。

解耦系統#

將生產者與消費者解耦,這是一種發布 - 訂閱模型。我們可以從這種模式中獲益,以獲得更好的可擴展性和韌性。

異步處理#

任務不需要立即處理。我們可以從這種模式中獲益,將耗時的任務卸載到背景中處理。

限流#

可以控制任務的速率,以避免對下游系統(如數據庫)的過載。我們可以從中受益,以避免因請求突然激增而感到不堪重負。

緩衝#

在高峰請求期間暫時存儲消息,稍後再處理。

我們可以從中受益,以平滑流量的高峰。

微服務通信#

消息通知在微服務架構中交換數據和命令。

我們可以從這種模式中受益,以實現具有可擴展性和韌性的異步系統。

隊列模型選擇#

為了更好地實現業務數據交互,有必要熟悉隊列模式。

p2p#

生產者 → Q → 消費者

單一生產者只能由單一消費者處理。

例如:aws SQS,rabbitmq

pub/sub#

發布者 → Q(主題)→ 訂閱者(1-2-3-N)

消息需要通過主題傳遞給多個消費者組,每個消費者組接收消息的副本,組內的消費者將共享消費消息。

例如:aws kinesis,Azure service bus,aws mq,rabbitmq,kafka,azure eventhubs

Fanout#

生產者 → 交換 → Q(1-2-3-N)→ 消費者(1-2-3-N)

消息可以由多個獨立的消費者處理。

例如:azure service bus,aws sns+sqs,aws mq,rabbitmq

工作隊列#

生產者 → Q → 工作負載(1-2-3-N)

消息可以由工作者異步處理,以提高吞吐量和可靠性。

例如:azure storage account queue,aws sqs,rabbitmq,kafka,aws kinesis,azure service bus

優先隊列#

生產者 → 優先 Q → 消費者

隊列中的消息具有權重,可以重新排序以由優先消費者處理。

例如:rabbitmq,aws sqs

DLQ(死信隊列)#

生產者 → Q → 消費者 - 失敗則 → DLQ

消息在經過一定次數的嘗試後仍然失敗,則移動到死信隊列以進行進一步監控和重播。

DLQ 模式幫助我們設置一個次要隊列,捕獲無法由主要隊列處理的消息。

PS. 建議將此模式添加到處理關鍵消息的任何系統中,如果您選擇的隊列系統不原生支持 DLQ,您可以使用其他消息隊列作為 DLQ。

例如:aws sqs,azure service bus(原生支持 dlq),rabbitmq(插件支持)。

消息大小配額超限策略#

消息隊列服務最大消息大小
Amazon SQS256 KB
Azure Service Bus256 KB(標準),1 MB(高級)
Amazon Kinesis1 MB
Apache Kafka1 MB(可配置)
RabbitMQ無嚴格限制(建議 < 128 KB)
Azure Eventhubs1 MB(包括元數據)
Azure Storage Queue64 KB

Tip

雖然某些隊列可以擴展消息大小配額,但增加最大消息大小可能會影響性能和內存使用,因此應謹慎進行。

通用策略#

使用壓縮#

首先要考慮的是使用消息壓縮,而大多數隊列原生不支持壓縮。

消息隊列系統原生壓縮支持手動壓縮可能
Amazon SQS
Azure Service Bus
Amazon Kinesis
Apache Kafka不適用
RabbitMQ
Azure Storage Queue

算法選擇#

  • Gzip:良好的平衡,廣泛支持。

  • Snappy:非常快,壓縮比低。

  • LZ4:極快,適合高吞吐量。

  • Zstandard(zstd):最佳壓縮比,速度合理。

選擇最適合您性能和壓縮需求的算法。在大多數場景中,Gzip 將是良好的選擇。

### 編碼
import gzip
import base64
from azure.storage.queue import QueueClient

def compress_message_gzip(message):
    compressed_message = gzip.compress(message.encode('utf-8'))
    return base64.b64encode(compressed_message).decode('utf-8')

# 示例用法
large_message = "這是一條需要壓縮的非常大的消息。"
compressed_message = compress_message_gzip(large_message)

# 將壓縮的消息發送到Azure Storage Queue
queue_client = QueueClient.from_connection_string("your_connection_string", "your_queue_name")
queue_client.send_message(compressed_message)


### 解碼
def decompress_message_gzip(compressed_message):
    compressed_message = base64.b64decode(compressed_message.encode('utf-8'))
    return gzip.decompress(compressed_message).decode('utf-8')

# 從Azure Storage Queue檢索消息
messages = queue_client.receive_messages()
for message in messages:
    decompressed_message = decompress_message_gzip(message.content)
    print(decompressed_message)
    queue_client.delete_message(message)

將大型消息存儲在外部存儲中#

將大型消息存儲在外部存儲服務(aws s3,azure blob storage,google cloud storage)中,並在隊列中發送對該消息的引用。

### 發送消息
import boto3

s3_client = boto3.client('s3')

def store_message_in_s3(large_message, bucket_name, object_key):
    s3_client.put_object(Bucket=bucket_name, Key=object_key, Body=large_message)
    return f's3://{bucket_name}/{object_key}'

def send_reference_to_queue(queue, reference):
    queue.send_message(MessageBody=reference)

# 示例用法
large_message = "這是一條需要存儲在S3中的非常大的消息。"
bucket_name = "my-bucket"
object_key = "large_message.txt"
reference = store_message_in_s3(large_message, bucket_name, object_key)
send_reference_to_queue(queue, reference)

### 接收消息
def parse_s3_reference(reference):
    parts = reference.replace("s3://", "").split("/")
    bucket_name = parts[0]
    object_key = "/".join(parts[1:])
    return bucket_name, object_key

def get_message_from_s3(reference):
    bucket_name, object_key = parse_s3_reference(reference)
    response = s3_client.get_object(Bucket=bucket_name, Key=object_key)
    return response['Body'].read().decode('utf-8')

def receive_reference_from_queue(queue):
    messages = queue.receive_messages(MaxNumberOfMessages=1)
    return messages[0].body

# 示例用法
reference = receive_reference_from_queue(queue)
large_message = get_message_from_s3(reference)
print(large_message)

分塊和重組#

在配額大小限制內將消息分塊,然後根據塊 ID 在消費者中重組。

然而,這種方法在分佈式編碼環境中,塊 ID 的處理將變得更加複雜,我們還會遇到塊丟失異常問題,設計這種消費者系統的複雜性相對較高,因此不建議使用此解決方案。

結論
我們可以混合使用壓縮和大型消息處理機制來處理非常大的消息。

### 發送者
def hybrid_send(queue, large_message, bucket_name, object_key, chunk_size):
    compressed_message = compress_message(large_message)
    if len(compressed_message) > chunk_size:
        reference = store_message_in_s3(compressed_message, bucket_name, object_key)
        send_reference_to_queue(queue, reference)
    else:
        send_compressed_message_to_queue(queue, compressed_message)

# 示例用法
large_message = "這是一條需要以混合方式處理的非常大的消息。"
bucket_name = "my-bucket"
object_key = "large_message_compressed.txt"
chunk_size = 256  # 示例塊大小
hybrid_send(queue, large_message, bucket_name, object_key, chunk_size)

### 接收者
def hybrid_receive(queue):
    message = receive_reference_from_queue(queue)
    if message.startswith("s3://"):
        compressed_message = get_message_from_s3(message)
    else:
        compressed_message = message
    large_message = decompress_message(compressed_message)
    print(large_message)

# 示例用法
hybrid_receive(queue)
載入中......
此文章數據所有權由區塊鏈加密技術和智能合約保障僅歸創作者所有。