Lightweight Message Queue RedisQueue#
Title: Lightweight Message Queue RedisQueue
URL Source: https://newlifex.com/core/redisqueue
Markdown Content:
A message queue is an essential middleware for distributed systems. Most message queue products (such as RocketMQ/RabbitMQ/Kafka, etc.) require teams to have relatively strong technical capabilities, making them unsuitable for small and medium-sized teams, and they do not provide sufficient support for .NET technology. The lightweight message queue implemented by Redis is very simple, relying only on regular Redis operations, and almost does not require the development team to master additional knowledge!
With the powerful release of .NET5, how can the .NET technology stack be without the best message queue partner?
This article starts from the high-performance Redis component NewLife.Redis and uses the express delivery business scenario to explain how to use Redis as a message queue in .NET to build an enterprise-level distributed system architecture!
The example code in this article requires creating a console project and referencing NewLife.Redis from NuGet to use.
Practical Scenarios:
ZTO Express, RedisQueue is used for big data computation, batching statistics to reduce database write pressure, with a daily message volume of 1 billion.
Deyi Intelligent, RedisQueue is used for decoupling distributed systems, with over 300 message topics involving more than 50 application systems, and a daily message volume of 10 million.
What is a Message Queue#
A message queue is a container that holds messages during transmission, with its core functions being peak shaving and decoupling!
During the morning peak, delivery trucks arrive at various stations to unload goods, and multiple station staff use PDAs to scan arrivals, resulting in a large amount of information entering the system (1000 tps), while the interface notifying the delivery company can only handle 400 tps.
By adding an MQ to hold messages, messages exceeding the system's processing capacity can be retained until after the morning peak, allowing the system to complete processing. This is peak shaving!
In the express cabinet business process, after the courier deposits the package, three business actions need to be performed: deducting system fees, notifying users via SMS, and pushing notifications to the delivery company. The traditional approach requires executing these business actions sequentially, and if any step encounters an exception (e.g., the user's phone is off or the delivery company's interface fails), it will delay or even interrupt the entire deposit process, severely affecting user experience.
If the interface layer writes the deposit data to the MQ after receiving it, the subsequent three subsystems can each consume and process the messages, perfectly solving this problem, and subsystem failures do not affect upstream systems! This is decoupling!
In-Memory Message Queue#
The simplest message queue can be implemented using BlockingCollection.
public static void Start()
{
var queue = new BlockingCollection<Area>();
// Independent thread consumption
var thread = new Thread(s => Consume(queue));
thread.Start();
// Publish messages
Publish(queue);
}
private static void Publish(BlockingCollection<Area> queue)
{
var area = new Area { Code = 110000, Name = "Beijing" };
XTrace.WriteLine("Public {0} {1}", area.Code, area.Name);
queue.Add(area);
Thread.Sleep(1000);
area = new Area { Code = 310000, Name = "Shanghai" };
XTrace.WriteLine("Public {0} {1}", area.Code, area.Name);
queue.Add(area);
Thread.Sleep(1000);
area = new Area { Code = 440100, Name = "Guangzhou" };
XTrace.WriteLine("Public {0} {1}", area.Code, area.Name);
queue.Add(area);
Thread.Sleep(1000);
}
private static void Consume(BlockingCollection<Area> queue)
{
while (true)
{
var msg = queue.Take();
if (msg != null)
{
XTrace.WriteLine("Consume {0} {1}", msg.Code, msg.Name);
}
}
}
One message is produced per second (Thread 1), and it is consumed by an independent thread (Thread 9).
Using Redis as a Message Queue#
The LIST structure of Redis has the functionality of left-in and right-out, and by using BRPOP for blocking pop, a basic message queue RedisQueue can be completed. BRPOP ensures that each message is consumed and only consumed once.
GetQueue retrieves the queue, and the Add method publishes messages.
TakeOne pulls and consumes a message, specifying a 10-second blocking timeout; if there is a message within 10 seconds, it returns immediately; otherwise, it returns null after 10 seconds.
Note: When instantiating Redis as a consumer, a timeout greater than the blocking time must be specified, such as new FullRedis { Timeout = 11_000 }.
The consumer is actually a large loop consuming messages. It is recommended to open 8 threads or tasks for each topic to execute large loop consumption (the number of threads can be adjusted to other numbers as needed, such as 1/2/5/16). Each thread gets its own exclusive queue instance after calling GetQueue, and all instances jointly consume and process messages under that topic. Thus, the total number of consumers for the message topic is the number of deployed instances multiplied by the number of threads (8), and can be adjusted as needed without partition skew issues, making it very flexible.
public static void Start(FullRedis redis)
{
var topic = "EasyQueue";
// Independent thread consumption
var thread = new Thread(s => Consume(redis, topic));
thread.Start();
// Publish messages. In reality, production and consumption are on different servers
Publish(redis, topic);
}
private static void Publish(FullRedis redis, String topic)
{
var queue = redis.GetQueue<Area>(topic);
queue.Add(new Area { Code = 110000, Name = "Beijing" });
Thread.Sleep(1000);
queue.Add(new Area { Code = 310000, Name = "Shanghai" });
Thread.Sleep(1000);
queue.Add(new Area { Code = 440100, Name = "Guangzhou" });
Thread.Sleep(1000);
}
private static void Consume(FullRedis redis, String topic)
{
var queue = redis.GetQueue<Area>(topic);
while (true)
{
var msg = queue.TakeOne(10);
if (msg != null)
{
XTrace.WriteLine("Consume {0} {1}", msg.Code, msg.Name);
}
}
}
LPUSH produces messages (Thread 1 inserts into the list), and BRPOP consumes messages (Thread 11 pops from the list), so the messages disappear after being consumed!
Note: The NewLife.Redis message queue defaults to JSON serialization for messages, and during consumption, the message is obtained as a string and then deserialized back into a message object.
From the log timestamps, it can be seen that the time difference between production and consumption is between 1-3 ms, with extremely low latency!
After commenting out the consumption code and rerunning, the published messages can be seen in Redis.
Note: The drawback of RedisQueue is that messages are deleted from Redis after being consumed. If the consumer fails to process the message, there is a risk of message loss! For important business, please use the confirmed RedisReliableQueue.
Reliable Queue#
What if there is an error processing messages in the logistics push subsystem notifying the delivery company, and messages are lost? Clearly, it is not possible to let the upstream send it again!
Here we need a reliable queue RedisReliableQueue that supports consumption confirmation. After consumption, unless the program actively confirms consumption, Redis will not allow message deletion.
RedisReliableQueue uses Redis's LIST structure, LPUSH to publish messages, and then uses BRPOPLPUSH for blocking pop, while backing up messages to a pending list. After successful consumption confirmation, the message is deleted from the pending list. If consumption fails, the message will remain in the pending list and will automatically return to the main queue after a certain time, ready for reallocation for consumption. BRPOPLPUSH ensures that each message is consumed and only consumed once.
GetReliableQueue retrieves the queue instance, Add publishes messages, and TakeOneAsync asynchronously consumes a message, specifying a 10-second blocking timeout, and confirms after processing is complete.
Note: When instantiating Redis as a consumer, a timeout greater than the blocking time must be specified, such as new FullRedis { Timeout = 11_000 }.
The consumer is actually a large loop consuming messages. It is recommended to open 8 threads or tasks for each topic to execute large loop consumption (the number of threads can be adjusted to other numbers as needed, such as 1/2/5/16). Each thread gets its own exclusive queue instance after calling GetReliableQueue, and all instances jointly consume and process messages under that topic. Thus, the total number of consumers for the message topic is the number of deployed instances multiplied by the number of threads (8), and can be adjusted as needed without partition skew issues, making it very flexible.
public static void Start(FullRedis redis)
{
var topic = "AckQueue";
// Independent thread consumption
var source = new CancellationTokenSource();
Task.Run(() => ConsumeAsync(redis, topic, source.Token));
// Publish messages. In reality, production and consumption are on different servers
Publish(redis, topic);
source.Cancel();
}
private static void Publish(FullRedis redis, String topic)
{
var queue = redis.GetReliableQueue<Area>(topic);
queue.Add(new Area { Code = 110000, Name = "Beijing" });
Thread.Sleep(1000);
queue.Add(new Area { Code = 310000, Name = "Shanghai" });
Thread.Sleep(1000);
queue.Add(new Area { Code = 440100, Name = "Guangzhou" });
Thread.Sleep(1000);
}
private static async Task ConsumeAsync(FullRedis redis, String topic, CancellationToken token)
{
var queue = redis.GetReliableQueue<String>(topic);
while (!token.IsCancellationRequested)
{
var mqMsg = await queue.TakeOneAsync(10);
if (mqMsg != null)
{
var msg = mqMsg.ToJsonEntity<Area>();
XTrace.WriteLine("Consume {0} {1}", msg.Code, msg.Name);
queue.Acknowledge(mqMsg);
}
}
}
LPUSH produces messages (Thread 1 inserts into the list), and BRPOPLPUSH consumes messages (Threads 6 and 5 pop from the list and insert into another Ack pending list), which is the key to ensuring no message loss. LREM deletes from the Ack pending list for confirmation after consumption.
If an exception occurs during consumption, the confirmation operation will not be executed, and messages remaining in the Ack pending list will return to the main list after 60 seconds for reallocation for consumption.
Riddle: What should be done with unconfirmed messages if the application process exits abnormally?
After commenting out the consumption code and rerunning, the published messages can be seen in Redis, just like a normal queue, using the LIST structure.
When processing the "Beijing" message, if there is no Acknowledge confirmation, Redis will show a LIST structure named AckQueue:Ack:* containing this message.
Thus, the reliable queue essentially synchronously backs up messages to another LIST during consumption, and the confirmation operation deletes from the pending confirmation LIST.
Since the introduction of this reliable queue, it has basically been sufficient to meet over 90% of business needs.
Delayed Queue#
One day, Brother Ma said that if the courier deposits the package for a certain period and the user does not come to pick it up, the system needs to charge an overdue pickup fee, requiring a delayed queue.
Thus, we thought of Redis's ZSET, and created a RedisDelayQueue, where the Add method for producing messages includes an additional parameter specifying the number of seconds after which the message can be consumed, with consumption usage similar to the reliable queue.
The consumer is actually a large loop consuming messages. It is recommended to open 8 threads or tasks for each topic to execute large loop consumption (the number of threads can be adjusted to other numbers as needed, such as 1/2/5/16). Each thread gets its own exclusive queue instance after calling GetDelayQueue, and all instances jointly consume and process messages under that topic. Thus, the total number of consumers for the message topic is the number of deployed instances multiplied by the number of threads (8), and can be adjusted as needed without partition skew issues, making it very flexible.
public static void Start(FullRedis redis)
{
var topic = "DelayQueue";
// Independent thread consumption
var source = new CancellationTokenSource();
Task.Run(() => ConsumeAsync(redis, topic, source.Token));
// Publish messages. In reality, production and consumption are on different servers
Publish(redis, topic);
source.Cancel();
}
private static void Publish(FullRedis redis, String topic)
{
var queue = redis.GetDelayQueue<Area>(topic);
queue.Add(new Area { Code = 110000, Name = "Beijing" }, 2);
Thread.Sleep(1000);
queue.Add(new Area { Code = 310000, Name = "Shanghai" }, 2);
Thread.Sleep(1000);
queue.Add(new Area { Code = 440100, Name = "Guangzhou" }, 2);
Thread.Sleep(1000);
}
private static async Task ConsumeAsync(FullRedis redis, String topic, CancellationToken token)
{
var queue = redis.GetDelayQueue<String>(topic);
while (!token.IsCancellationRequested)
{
var mqMsg = await queue.TakeOneAsync(10);
if (mqMsg != null)
{
var msg = mqMsg.ToJsonEntity<Area>();
XTrace.WriteLine("Consume {0} {1}", msg.Code, msg.Name);
queue.Acknowledge(mqMsg);
}
}
}
As shown in the figure, one message is produced per second, and after 2 seconds, the message for Beijing is consumed, followed by Shanghai after another second (exactly 2 seconds after the publication of Shanghai). The message for Guangzhou is missing because the test program exited just 1 second after producing it.
We can see the message for Guangzhou stored in the ZSET structure in Redis.
Multiple Consumer Groups RedisStream#
Another day, the data middle platform team wanted to consume the order queue, but they couldn't, as the queue made with the LIST structure allows each message to be consumed only once. If the data middle platform system consumes it, other business systems will lose the message.
We thought of the STREAM structure introduced in Redis 5.0, and encapsulated RedisStream again.
Note: When instantiating Redis as a consumer, a timeout greater than the blocking time must be specified, such as new FullRedis { Timeout = 11_000 }.
The consumer is actually a large loop consuming messages. It is recommended to open 8 threads or tasks for each topic to execute large loop consumption (the number of threads can be adjusted to other numbers as needed, such as 1/2/5/16). Each thread gets its own exclusive queue instance after calling GetStream, and all instances jointly consume and process messages under that topic. Thus, the total number of consumers for the message topic is the number of deployed instances multiplied by the number of threads (8), and can be adjusted as needed without partition skew issues, making it very flexible.
public static void Start(FullRedis redis)
{
var topic = "FullQueue";
// Two consumer groups consume independently
var source = new CancellationTokenSource();
{
var queue = redis.GetStream<Area>(topic);
queue.Group = "Group1";
_ = queue.ConsumeAsync(OnConsume, source.Token);
}
{
var queue = redis.GetStream<Area>(topic);
queue.Group = "Group2";
_ = queue.ConsumeAsync(OnConsume2, source.Token);
}
// Publish messages. In reality, production and consumption are on different servers
Publish(redis, topic);
Thread.Sleep(1000);
source.Cancel();
}
private static void Publish(FullRedis redis, String topic)
{
var queue = redis.GetStream<Area>(topic);
queue.Add(new Area { Code = 110000, Name = "Beijing" });
Thread.Sleep(1000);
queue.Add(new Area { Code = 310000, Name = "Shanghai" });
Thread.Sleep(1000);
queue.Add(new Area { Code = 440100, Name = "Guangzhou" });
Thread.Sleep(1000);
}
private static void OnConsume(Area area)
{
XTrace.WriteLine("Group1.Consume {0} {1}", area.Code, area.Name);
}
private static Task OnConsume2(Area area, Message message, CancellationToken token)
{
XTrace.WriteLine("Group2.Consume {0} {1} Id={2}", area.Code, area.Name, message.Id);
return Task.CompletedTask;
}
The production process remains unchanged, but the consumption loop is a bit special, mainly because the messages consumed from STREAM have their own Id, and only this Id needs to be confirmed.
In the figure, the red box represents production, and the purple box represents consumption.
Looking at Redis, we can see that STREAM messages are still present. The data middle platform group can independently consume using different consumer groups, without worrying about other systems losing messages.
Best Practices#
RedisQueue is used in ZTO's big data analysis to buffer data waiting to be written to Oracle/MySql, with multi-threaded computation writing to the queue, and a dedicated thread periodically pulling a batch (500 rows) to perform batch Insert/Update operations. This system queue handles 1 billion messages daily, with Redis memory allocation of 8G, and actual usage of less than 100M, unless consumer failures cause backlog.
Note: When instantiating Redis as a consumer, a timeout greater than the blocking time must be specified, such as new FullRedis { Timeout = 11_000 }.
Deyi Intelligent Technology uses reliable queues RedisReliableQueue for all operations, with about 300 queues distributed across their respective Redis instances in the public cloud with 2G memory master-slave versions. When the backlog of messages is less than 100,000, the memory usage of the dedicated Redis instance for the queue is less than 100M, occupying almost no memory space.
The company's business generates over 1 million orders daily, resulting in approximately 10 million messages, with no messages ever lost!
Example Code#
Code: https://github.com/NewLifeX/NewLife.Redis/tree/master/QueueDemo