軽量級メッセージキュー RedisQueue#
Title: 軽量級メッセージキュー RedisQueue
URL Source: https://newlifex.com/core/redisqueue
Markdown Content:
メッセージキュー(Message Queue)は分散システムに欠かせないミドルウェアであり、大部分のメッセージキュー製品(RocketMQ/RabbitMQ/Kafka など)は、チームに比較的強い技術力を要求し、中小チームには適しておらず、.NET 技術のサポートが不十分です。しかし、Redis が実現した軽量級メッセージキューは非常にシンプルで、Redis の通常の操作のみで、開発チームが追加の知識を習得する必要はほとんどありません!
強力な.NET5 のリリースに伴い、.NET 技術スタックに最適なメッセージキューのパートナーがないわけがありません。
この記事では、高性能 Redis コンポーネント NewLife.Redis を基に、宅配便業務のシナリオを借りて、.NET で Redis をメッセージキューとして使用し、企業向けの分散システムアーキテクチャを構築する方法を説明します!
この記事の例のコードは、コンソールプロジェクトを新規作成した後、nuget から NewLife.Redis を参照する必要があります。
実践シナリオ:
中通快遞、RedisQueue は大データ計算に使用され、統計データをバッチアップロードしてデータベースの落盤圧力を軽減し、メッセージ量は毎日 10 億です。
遞易智能、RedisQueue は分散システムのデカップリングに使用され、メッセージトピックは 300 以上、関連するアプリケーションシステムは 50 以上、メッセージ量は毎日 1000 万です。
メッセージキューとは#
メッセージキューとは、メッセージが伝送中に保存されるコンテナであり、その核心的な機能はピークシフトとデカップリングです!
朝のピーク時、宅配便会社のトラックが各ステーションに荷物を降ろし、多くのステーションのスタッフが PDA を使用して到着をスキャンし、大量の情報がシステムに入ります(1000tps)、しかし宅配便会社への通知インターフェースは 400tps の処理能力しかありません。
MQ を追加してメッセージを保存することで、システムの処理能力を超えるメッセージが滞留し、朝のピークが過ぎた後にシステムが処理を完了できます。これがピークシフトです!
宅配ボックスの業務プロセスでは、配達員がボックスに投函した後、システム料金の減算、ユーザーへの SMS 通知、宅配便会社へのプッシュ通知という 3 つの業務アクションを経る必要があります。従来の方法では、これらの業務を順次実行する必要があり、その中のいずれかのステップで異常が発生した場合(例えば、ユーザーの携帯電話がオフになっているか、宅配便会社のインターフェースに障害が発生した場合)、全体の投函プロセスが遅延または中断され、ユーザー体験に深刻な影響を与えます。
インターフェース層が投函データを受け取った後、MQ にメッセージを書き込み、後続の 3 つのサブシステムがそれぞれ消費処理を行うことで、この問題を完璧に解決でき、サブシステムの障害は上流システムに影響を与えません!これがデカップリングです!
メモリメッセージキュー#
最もシンプルなメッセージキューは、BlockingCollection を使用して実装できます。
public static void Start()
{
var queue = new BlockingCollection<Area>();
// 独立スレッドで消費
var thread = new Thread(s =\> Consume(queue));
thread.Start();
// メッセージを発行
Publish(queue);
}
private static void Publish(BlockingCollection<Area> queue)
{
var area = new Area {Code = 110000, Name = "北京市"};
XTrace.WriteLine("Public {0} {1}", area.Code, area.Name);
queue.Add(area);
Thread.Sleep(1000);
area = new Area { Code = 310000, Name = "上海市" };
XTrace.WriteLine("Public {0} {1}", area.Code, area.Name);
queue.Add(area);
Thread.Sleep(1000);
area = new Area { Code = 440100, Name = "広州市" };
XTrace.WriteLine("Public {0} {1}", area.Code, area.Name);
queue.Add(area);
Thread.Sleep(1000);
}
private static void Consume(BlockingCollection<Area> queue)
{
while (true)
{
var msg = queue.Take();
if (msg != null)
{
XTrace.WriteLine("Consume {0} {1}", msg.Code, msg.Name);
}
}
}
毎秒 1 つのメッセージを生成(1 号スレッド)、独立スレッド(9 号スレッド)で消費されます。
Redis をメッセージキューとして使用#
Redis の LIST 構造は、左から入れて右から出す機能を持ち、BRPOP のブロッキングポップを使用することで、最も基本的なメッセージキュー RedisQueue<T> を完成させることができます。BRPOP は各メッセージが消費され、かつ一度だけ消費されることを保証します。
GetQueue でキューを取得した後、Add メソッドでメッセージを発行します。
TakeOne で 1 つのメッセージを消費し、10 秒のブロッキングを指定し、10 秒以内にメッセージがあればすぐに返し、そうでなければ 10 秒のタイムアウト後に空を返します。
注意:消費端としてインスタンス化された Redis は、ブロッキング時間よりも長いタイムアウト時間を指定する必要があります。例えば、new FullRedis {Timeout = 11_000} 。
消費端は実際には大ループで消費され、各トピックに対して 8 つのスレッドまたはタスクを開いて大ループ消費を実行することをお勧めします(8 スレッドはビジネスニーズに応じて他の数字に調整できます。例えば、1/2/5/16)。各スレッドは GetQueue を取得した後、自分専用のキューインスタンスを得て、すべてのインスタンスが共同でそのトピックのメッセージを消費処理します。したがって、メッセージトピックの総消費者数はデプロイインスタンス数にスレッド数 8 を掛けたものであり、必要に応じて増減でき、パーティションの偏り問題がなく、非常に柔軟です。
public static void Start(FullRedis redis)
{
var topic = "EasyQueue";
// 独立スレッドで消費
var thread = new Thread(s =\> Consume(redis, topic));
thread.Start();
// メッセージを発行。実際には生産と消費は異なるサーバーにあります
Publish(redis, topic);
}
private static void Publish(FullRedis redis, String topic)
{
var queue = redis.GetQueue<Area>(topic);
queue.Add(new Area { Code = 110000, Name = "北京市" });
Thread.Sleep(1000);
queue.Add(new Area { Code = 310000, Name = "上海市" });
Thread.Sleep(1000);
queue.Add(new Area { Code = 440100, Name = "広州市" });
Thread.Sleep(1000);
}
private static void Consume(FullRedis redis, String topic)
{
var queue = redis.GetQueue<Area>(topic);
while (true)
{
var msg = queue.TakeOne(10);
if (msg != null)
{
XTrace.WriteLine("Consume {0} {1}", msg.Code, msg.Name);
}
}
}
LPUSH でメッセージを生成(1 号スレッドがリストに挿入)、BRPOP でメッセージを消費(11 号スレッドがリストからポップ)、したがって、メッセージが消費された後は消えます!
注:NewLife.Redis メッセージキューはデフォルトでメッセージを Json シリアライズし、消費時に String としてメッセージを取得し、再びメッセージオブジェクトにデシリアライズします。
ログの時間から見ると、生産と消費の時間差は 1〜3ms の間で、遅延は非常に低いです!
消費コードをコメントアウトして再実行すると、Redis に発行されたメッセージを見ることができます。
注意:RedisQueue の欠点は、メッセージが消費された後に Redis から削除されるため、消費端がメッセージを処理できなかった場合、メッセージが失われるリスクがあります!重要なビジネスには確認が必要な RedisReliableQueue キューを使用してください。
確認が必要なキュー#
宅配便会社の物流プッシュサブシステムがメッセージを処理する際にエラーが発生した場合、メッセージが失われるのはどうすればよいでしょうか?明らかに上流に再度送信させることはできません!
ここで、消費確認をサポートする信頼できるキュー RedisReliableQueue<T> が必要です。消費後、プログラムが明示的に消費を確認しない限り、Redis はメッセージを削除しません。
RedisReliableQueue は Redis の LIST 構造を採用し、LPUSH でメッセージを発行し、BRPOPLPUSH のブロッキングポップを使用して、同時にメッセージを保留リストにバックアップします。消費が成功した後に確認されると、保留リストから削除されます。消費処理が失敗した場合、メッセージは保留リストに滞留し、一定時間後に自動的に主キューに戻され、再度消費が割り当てられます。BRPOPLPUSH は各メッセージが消費され、かつ一度だけ消費されることを保証します。
GetReliableQueue でキューインスタンスを取得した後、Add でメッセージを発行し、TakeOneAsync で 1 つのメッセージを非同期に消費し、10 秒のブロッキングタイムアウトを指定し、処理が完了した後に Acknowledge で確認します。
注意:消費端としてインスタンス化された Redis は、ブロッキング時間よりも長いタイムアウト時間を指定する必要があります。例えば、new FullRedis {Timeout = 11_000} 。
消費端は実際には大ループで消費され、各トピックに対して 8 つのスレッドまたはタスクを開いて大ループ消費を実行することをお勧めします(8 スレッドはビジネスニーズに応じて他の数字に調整できます。例えば、1/2/5/16)。各スレッドは GetReliableQueue を取得した後、自分専用のキューインスタンスを得て、すべてのインスタンスが共同でそのトピックのメッセージを消費処理します。したがって、メッセージトピックの総消費者数はデプロイインスタンス数にスレッド数 8 を掛けたものであり、必要に応じて増減でき、パーティションの偏り問題がなく、非常に柔軟です。
public static void Start(FullRedis redis)
{
var topic = "AckQueue";
// 独立スレッドで消費
var source = new CancellationTokenSource();
Task.Run(() =\> ConsumeAsync(redis, topic, source.Token));
// メッセージを発行。実際には生産と消費は異なるサーバーにあります
Publish(redis, topic);
source.Cancel();
}
private static void Publish(FullRedis redis, String topic)
{
var queue = redis.GetReliableQueue<Area>(topic);
queue.Add(new Area { Code = 110000, Name = "北京市" });
Thread.Sleep(1000);
queue.Add(new Area { Code = 310000, Name = "上海市" });
Thread.Sleep(1000);
queue.Add(new Area { Code = 440100, Name = "広州市" });
Thread.Sleep(1000);
}
private static async Task ConsumeAsync(FullRedis redis, String topic, CancellationToken token)
{
var queue = redis.GetReliableQueue<String>(topic);
while (!token.IsCancellationRequested)
{
var mqMsg = await queue.TakeOneAsync(10);
if (mqMsg != null)
{
var msg = mqMsg.ToJsonEntity<Area\>();
XTrace.WriteLine("Consume {0} {1}", msg.Code, msg.Name);
queue.Acknowledge(mqMsg);
}
}
}
LPUSH でメッセージを生成(1 号スレッドがリストに挿入)、BRPOPLPUSH でメッセージを消費(6 号 5 号スレッドがリストからポップして別の Ack 保留リストに挿入)、これがメッセージを失わないための鍵です。LREM は Ack 保留リストから削除し、消費完了後の確認に使用されます。
消費に異常が発生した場合、確認操作は実行されず、Ack 保留リストに滞留しているメッセージは 60 秒後に主リストに戻り、再度消費が割り当てられます。
脳トレ:アプリケーションプロセスが異常終了した場合、未確認のメッセージはどう処理しますか?
消費コードをコメントアウトして再実行すると、Redis に発行されたメッセージが見え、通常のキューと同様に LIST 構造を使用しています。
「北京市」のメッセージを処理する際、Acknowledge 確認がなければ、Redis には AckQueue:Ack:* という LIST 構造が見え、このメッセージが保存されます。
したがって、信頼できるキューは本質的に、消費時にメッセージを別の LIST にバックアップし、確認操作は待機確認 LIST から削除することです。
この信頼できるキューが登場して以来、基本的に 90% 以上のビジネスニーズを満たすのに十分です。
遅延キュー#
ある日、小馬さんが言いました。配達員がボックスに投函した後、一定時間内にユーザーが受け取らなければ、システムは超過受取手数料を徴収する必要があり、遅延キューが必要です。
そこで Redis の ZSET を思いつき、RedisDelayQueue<T> を作成しました。Add でメッセージを生成する際に、何秒後にそのメッセージを消費できるかを指定するパラメータが追加され、消費の使い方は信頼できるキューと同様です。
消費端は実際には大ループで消費され、各トピックに対して 8 つのスレッドまたはタスクを開いて大ループ消費を実行することをお勧めします(8 スレッドはビジネスニーズに応じて他の数字に調整できます。例えば、1/2/5/16)。各スレッドは GetDelayQueue を取得した後、自分専用のキューインスタンスを得て、すべてのインスタンスが共同でそのトピックのメッセージを消費処理します。したがって、メッセージトピックの総消費者数はデプロイインスタンス数にスレッド数 8 を掛けたものであり、必要に応じて増減でき、パーティションの偏り問題がなく、非常に柔軟です。
public static void Start(FullRedis redis)
{
var topic = "DelayQueue";
// 独立スレッドで消費
var source = new CancellationTokenSource();
Task.Run(() =\> ConsumeAsync(redis, topic, source.Token));
// メッセージを発行。実際には生産と消費は異なるサーバーにあります
Publish(redis, topic);
source.Cancel();
}
private static void Publish(FullRedis redis, String topic)
{
var queue = redis.GetDelayQueue<Area>(topic);
queue.Add(new Area { Code = 110000, Name = "北京市" }, 2);
Thread.Sleep(1000);
queue.Add(new Area { Code = 310000, Name = "上海市" }, 2);
Thread.Sleep(1000);
queue.Add(new Area { Code = 440100, Name = "広州市" }, 2);
Thread.Sleep(1000);
}
private static async Task ConsumeAsync(FullRedis redis, String topic, CancellationToken token)
{
var queue = redis.GetDelayQueue<String>(topic);
while (!token.IsCancellationRequested)
{
var mqMsg = await queue.TakeOneAsync(10);
if (mqMsg != null)
{
var msg = mqMsg.ToJsonEntity<Area\>();
XTrace.WriteLine("Consume {0} {1}", msg.Code, msg.Name);
queue.Acknowledge(mqMsg);
}
}
}
上の図では、毎秒 1 つのメッセージを生成し、2 秒後に北京市を消費し、さらに 1 秒後に上海市を消費しています(上海市の発行からちょうど 2 秒後です)。ここで広州市は欠けています。テストプログラムは広州市を生成した後、1 秒待って終了したためです。
Redis から広州市のメッセージを見ることができ、ZSET 構造に保存されています。
複数消費グループ RedisStream#
またある日、データ中台の仲間が注文キューを消費したいと言いましたが、できません。LIST 構造で作られたキューでは、各メッセージは一度しか消費できないため、データ中台のシステムが消費してしまうと、他のビジネスシステムはメッセージを失ってしまいます。
そこで、Redis5.0 から新たに追加された STREAM 構造を思いつき、RedisStream を再度封装しました。
注意:消費端としてインスタンス化された Redis は、ブロッキング時間よりも長いタイムアウト時間を指定する必要があります。例えば、new FullRedis {Timeout = 11_000} 。
消費端は実際には大ループで消費され、各トピックに対して 8 つのスレッドまたはタスクを開いて大ループ消費を実行することをお勧めします(8 スレッドはビジネスニーズに応じて他の数字に調整できます。例えば、1/2/5/16)。各スレッドは GetStream を取得した後、自分専用のキューインスタンスを得て、すべてのインスタンスが共同でそのトピックのメッセージを消費処理します。したがって、メッセージトピックの総消費者数はデプロイインスタンス数にスレッド数 8 を掛けたものであり、必要に応じて増減でき、パーティションの偏り問題がなく、非常に柔軟です。
public static void Start(FullRedis redis)
{
var topic = "FullQueue";
// 2つの消費グループがそれぞれ独立して消費
var source = new CancellationTokenSource();
{
var queue = redis.GetStream<Area\>(topic);
queue.Group = "Group1";
\_ = queue.ConsumeAsync(OnConsume, source.Token);
}
{
var queue = redis.GetStream<Area\>(topic);
queue.Group = "Group2";
\_ = queue.ConsumeAsync(OnConsume2, source.Token);
}
// メッセージを発行。実際には生産と消費は異なるサーバーにあります
Publish(redis, topic);
Thread.Sleep(1000);
source.Cancel();
}
private static void Publish(FullRedis redis, String topic)
{
var queue = redis.GetStream<Area>(topic);
queue.Add(new Area { Code = 110000, Name = "北京市" });
Thread.Sleep(1000);
queue.Add(new Area { Code = 310000, Name = "上海市" });
Thread.Sleep(1000);
queue.Add(new Area { Code = 440100, Name = "広州市" });
Thread.Sleep(1000);
}
private static void OnConsume(Area area)
{
XTrace.WriteLine("Group1.Consume {0} {1}", area.Code, area.Name);
}
private static Task OnConsume2(Area area, Message message, CancellationToken token)
{
XTrace.WriteLine("Group2.Consume {0} {1} Id={2}", area.Code, area.Name, message.Id);
return Task.CompletedTask;
}
生産プロセスは変わらず、消費の大ループは少し特別で、主に STREAM から消費されたメッセージには独自の Id があり、この Id を確認すれば良いだけです。
上の図で、赤い枠は生産、紫の枠は消費です。
Redis を見てみると、STREAM メッセージがまだそこにあります。データ中台グループは異なる消費グループ Group を使用するだけで独立して消費でき、他のシステムのメッセージを奪う心配はありません。
ベストプラクティス#
RedisQueue は中通の大データ分析で、Oracle/MySql に書き込むのを待つデータをバッファリングするために使用され、多スレッド計算後にキューに書き込み、専用スレッドが定期的に一括(500 行)を取得し、バッチ Insert/Update 操作を実行します。このシステムキューは、毎日 10 億件のメッセージを処理し、Redis のメモリ割り当ては 8G で、実際の使用量は 100M 未満です。消費端の障害が発生しない限り、蓄積は発生しません。
注意:消費端としてインスタンス化された Redis は、ブロッキング時間よりも長いタイムアウト時間を指定する必要があります。例えば、new FullRedis {Timeout = 11_000} 。
遞易智能科技はすべて信頼できるキュー RedisReliableQueue を使用しており、約 300 以上のキューがシステムごとにそれぞれの Redis インスタンスに分散されています。パブリッククラウドの 2G メモリの主従版です。蓄積メッセージが 10 万未満のとき、キュー専用の Redis インスタンスのメモリ使用量は 100M 未満で、ほとんどメモリを占有しません。
会社のビジネスは毎日 100 万件以上の注文をもたらし、それに伴うメッセージ数は約 1000 万件で、メッセージが失われたことはありません!
例のコード#
コード:https://github.com/NewLifeX/NewLife.Redis/tree/master/QueueDemo