ETL データ処理のプロセスでは、多くのシナリオでデータを直接同期的に処理することができず、メッセージキューを介して非同期的に処理する必要があります。これにより、ピークシェービングとバレーフィリングを効果的に実現することもできます。
ユースケース#
ビジネス要件に適用できるメッセージキューの主なユースケースは以下の通りです。
システムの分離#
プロデューサーとコンシューマーを分離し、パブサブモデルを実現します。このパターンから、より良いスケーラビリティとレジリエンシーを得ることができます。
非同期処理#
タスクは即座に処理される必要はありません。このパターンから、時間のかかるタスクをバックグラウンドで処理するためにオフロードすることができます。
レート制限#
タスクは、データベースなどの下流システムが過負荷にならないようにレートを制御できます。突然のリクエストの急増による圧倒から利益を得ることができます。
バッファリング#
ピークリクエスト中にメッセージを一時的に保存し、後で処理します。
トラフィックの急増を平滑化することから利益を得ることができます。
マイクロサービス間通信#
マイクロサービスアーキテクチャ内でサービス間のデータとコマンドを交換するためにメッセージを通知します。
このパターンから、スケーラビリティとレジリエンシーを持つ非同期システムを実装することができます。
キューのモデル選択#
ビジネスデータの相互作用をより良く実現するためには、キューのモードに慣れる必要があります。
p2p#
プロデューサー → Q → コンシューマー
単一のプロデューサーは単一のコンシューマーによってのみ処理されます。
例:aws SQS、rabbitmq
pub/sub#
パブリッシャー → Q(トピック) → サブスクライバー(1-2-3-N)
メッセージはトピックを介して複数のコンシューマーグループに配信される必要があり、各コンシューマーグループはメッセージのコピーを受け取り、グループ内のコンシューマーはメッセージを共有して消費します。
例:aws kinesis、Azure service bus、aws mq、rabbitmq、kafka、azure eventhubs
ファンアウト#
プロデューサー → エクスチェンジ → Q(1-2-3-N) → コンシューマー(1-2-3-N)
メッセージは複数の独立したコンシューマーによって処理されることができます。
例:azure service bus、aws sns+sqs、aws mq、rabbitmq
ワークキュー#
プロデューサー → Q → ワークロード(1-2-3-N)
メッセージはワーカーによって非同期に処理され、スループットと信頼性を向上させることができます。
例:azure storage account queue、aws sqs、rabbitmq、kafka、aws kinesis、azure service bus
プライオリティキュー#
プロデューサー → プライオリティ Q → コンシューマー
キュー内のメッセージには重みがあり、優先度のあるコンシューマーによって処理されるために再順序付けされることができます。
例:rabbitmq、aws sqs
DLQ(デッドレターキュー)#
プロデューサー → Q → コンシューマー - 失敗した場合 → DLQ
特定の回数の試行後に処理されるが、まだ失敗した場合は、さらなる監視と再処理の目的でデッドレターキューに移動します。
DLQ パターンは、プライマリーキューによって処理できないメッセージをキャッチするためのセカンダリーキューを設定するのに役立ちます。
PS. 重要なメッセージを処理するシステムにはこのパターンを追加することをお勧めします。選択したキューシステムがネイティブに DLQ をサポートしていない場合は、他のメッセージキューを DLQ として使用できます。
例:aws sqs、azure service bus(ネイティブサポート DLQ)、rabbitmq(プラグインサポート)
メッセージサイズのクォータ超過戦略#
メッセージキューサービス | 最大メッセージサイズ |
---|---|
Amazon SQS | 256 KB |
Azure Service Bus | 256 KB(スタンダード)、1 MB(プレミアム) |
Amazon Kinesis | 1 MB |
Apache Kafka | 1 MB(設定可能) |
RabbitMQ | 厳密な制限なし(推奨 < 128 KB) |
Azure Eventhubs | 1 MB(メタデータを含む) |
Azure Storage Queue | 64 KB |
Tip
一部のキューはメッセージサイズのクォータを拡張できますが、最大メッセージサイズを増やすことはパフォーマンスとメモリ使用量に影響を与える可能性があるため、注意して行うべきです。
一般的な戦略#
圧縮の使用#
最初に考慮すべきことは、メッセージ圧縮を使用することです。ほとんどのキューはネイティブに圧縮をサポートしていません。
メッセージキューシステム | ネイティブ圧縮サポート | 手動圧縮可能 | ||
---|---|---|---|---|
Amazon SQS | いいえ | はい | ||
Azure Service Bus | いいえ | はい | ||
Amazon Kinesis | いいえ | はい | ||
Apache Kafka | はい | N/A | ||
RabbitMQ | いいえ | はい | ||
Azure Storage Queue | いいえ | はい |
アルゴリズムの選択#
-
Gzip: 良好なバランス、広くサポートされています。
-
Snappy: 非常に速い、圧縮率は低め。
-
LZ4: 極めて速い、高スループットに適しています。
-
Zstandard(zstd): 理にかなった速度で最高の圧縮率。
パフォーマンスと圧縮のニーズに最も適したアルゴリズムを選択してください。ほとんどのシナリオでは、Gzip が良い選択肢となります。
### エンコード
import gzip
import base64
from azure.storage.queue import QueueClient
def compress_message_gzip(message):
compressed_message = gzip.compress(message.encode('utf-8'))
return base64.b64encode(compressed_message).decode('utf-8')
# 使用例
large_message = "これは圧縮する必要がある非常に大きなメッセージです。"
compressed_message = compress_message_gzip(large_message)
# Azure Storage Queueに圧縮メッセージを送信
queue_client = QueueClient.from_connection_string("your_connection_string", "your_queue_name")
queue_client.send_message(compressed_message)
### デコード
def decompress_message_gzip(compressed_message):
compressed_message = base64.b64decode(compressed_message.encode('utf-8'))
return gzip.decompress(compressed_message).decode('utf-8')
# Azure Storage Queueからメッセージを取得
messages = queue_client.receive_messages()
for message in messages:
decompressed_message = decompress_message_gzip(message.content)
print(decompressed_message)
queue_client.delete_message(message)
大きなメッセージを外部ストレージに保存#
大きなメッセージを外部ストレージサービス(aws s3、azure blob storage、google cloud storage)に保存し、キュー内にメッセージへの参照を送信します。
### メッセージ送信
import boto3
s3_client = boto3.client('s3')
def store_message_in_s3(large_message, bucket_name, object_key):
s3_client.put_object(Bucket=bucket_name, Key=object_key, Body=large_message)
return f's3://{bucket_name}/{object_key}'
def send_reference_to_queue(queue, reference):
queue.send_message(MessageBody=reference)
# 使用例
large_message = "これはS3に保存する必要がある非常に大きなメッセージです。"
bucket_name = "my-bucket"
object_key = "large_message.txt"
reference = store_message_in_s3(large_message, bucket_name, object_key)
send_reference_to_queue(queue, reference)
### メッセージ受信
def parse_s3_reference(reference):
parts = reference.replace("s3://", "").split("/")
bucket_name = parts[0]
object_key = "/".join(parts[1:])
return bucket_name, object_key
def get_message_from_s3(reference):
bucket_name, object_key = parse_s3_reference(reference)
response = s3_client.get_object(Bucket=bucket_name, Key=object_key)
return response['Body'].read().decode('utf-8')
def receive_reference_from_queue(queue):
messages = queue.receive_messages(MaxNumberOfMessages=1)
return messages[0].body
# 使用例
reference = receive_reference_from_queue(queue)
large_message = get_message_from_s3(reference)
print(large_message)
チャンクと再構成#
クォータサイズ制限内でメッセージをチャンク化し、コンシューマーでチャンク ID に基づいて再構成します。
ただし、この方法は分散コーディング環境では、チャンク ID の処理が非常に複雑になり、チャンクの損失例外問題にも直面するため、このコンシューマーシステムの設計の複雑さは比較的高く、使用することはお勧めできません。
結論として、圧縮と大きなメッセージ処理メカニズムを組み合わせて非常に大きなメッセージを処理することができます。
### 送信者
def hybrid_send(queue, large_message, bucket_name, object_key, chunk_size):
compressed_message = compress_message(large_message)
if len(compressed_message) > chunk_size:
reference = store_message_in_s3(compressed_message, bucket_name, object_key)
send_reference_to_queue(queue, reference)
else:
send_compressed_message_to_queue(queue, compressed_message)
# 使用例
large_message = "これはハイブリッド方式で処理する必要がある非常に大きなメッセージです。"
bucket_name = "my-bucket"
object_key = "large_message_compressed.txt"
chunk_size = 256 # 例のチャンクサイズ
hybrid_send(queue, large_message, bucket_name, object_key, chunk_size)
### 受信者
def hybrid_receive(queue):
message = receive_reference_from_queue(queue)
if message.startswith("s3://"):
compressed_message = get_message_from_s3(message)
else:
compressed_message = message
large_message = decompress_message(compressed_message)
print(large_message)
# 使用例
hybrid_receive(queue)