iTranslated by AI

The content below is an AI-generated translation. This is an experimental feature, and may contain errors. View original article
💫

Messaging Technology Supporting Microservices: Redis Streams

に公開

Introduction

This article is for day 16 of the airCloset Advent Calendar 2021!
Recently, I have often been involved in the integration between multiple services and the design of microservices, and I would like to introduce the technology selection we made there.

What We Want to Achieve

Now, here is what we wanted to do.

  • Externalize the user authentication infrastructure so that users can log in with a common account across multiple services.
  • Standardize data for the same items across multiple services.

Note: Discussion about the authentication infrastructure is omitted here.

Problems to Solve

As an approach to this, we considered externalizing it using a common API/DB. This is what is known as microservices.

Just as I was getting pumped up thinking, "Alright, let's build the API!", I had the following concerns:

  1. Performance: There are concerns about read speeds when moving user-related access to microservices.
  2. Since there are too many reference points, switching to API communication everywhere involves high risks of transaction issues, scope identification, and bugs.

It was clear that identifying all data references from all applications and replacing them with microservices would be extremely risky and time-consuming, much like major surgery.
Since we couldn't secure enough time to validate and resolve these concerns, we decided to aim for a more realistic approach.

Therefore, we considered a gradual migration method to microservices.

What I came up with was creating a service that receives messages and writes to each client DB based on those messages (a data synchronization service).

While this approach resolves performance issues regarding speed, it doesn't allow for a full migration to microservices, resulting in a state where some parts are migrated and others remain in the existing system.

For example, when the number of applications wanting to use this microservice increases, modifications to the data synchronization service will occur. I believe microservices should have zero or minimal knowledge of the client applications.

The configuration looks like this:

"Wait, we're moving common items to a shared DB, but keeping references to the client DBs...?"

Our goal this time was "to ensure data consistency between each application and the microservice." Viewing redundancy as a benefit for performance, we decided to update the existing DB alongside the update to the microservice DB.

Now, finally, to the main topic.

Messaging Between Services

Since we have been using Redis internally, I wondered if we could use Redis's pub/sub mechanism for messaging effectively.

Pub/Sub vs Streams

Redis has a Pub/Sub mechanism suitable for message processing and a Streams mechanism.

Since Streams was introduced after Pub/Sub, it seems to be superior in several ways:

  • Pub/Sub
    • Messages are not stored, so you cannot receive messages sent while you were not subscribed.
      • Published messages are volatile regardless of whether a Subscriber exists.
  • Streams
    • Past data remains even after it is retrieved.
      • Messages can be received asynchronously later.
      • Even if a Subscriber is down, it can collect messages upon recovery.
    • Care is needed regarding memory usage.

In conclusion, we adopted Streams because it met several of our requirements.

Pub / Sub

The commands in redis-cli are very simple.

127.0.0.1:36379> SUBSCRIBE channel [channel ...]
127.0.0.1:36379> PUBLISH channel message

Reference: https://redis.io/topics/pubsub

Streams

Streams followed Pub/Sub and became available starting with Redis v5.

Use XADD to add a message to the queue.

127.0.0.1:36379> XADD key ID field string [field string ...]
127.0.0.1:36379> XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]

Let's try creating a queue with a maximum length of 10 in a stream named a-stream and putting the value hello in the message key:

127.0.0.1:36379> XADD a-stream MAXLEN 10 * message hello
"1639666330803-0"

To read this queue, use XREAD.

127.0.0.1:36379> XREAD STREAMS a-stream 0
1) 1) "a-stream"
   2) 1) 1) "1639666330803-0"
         2) 1) "message"
            2) "hello"

You run the command in the form STREAM <stream-name> <ID to read>. Specifying 0 reads from the beginning.

127.0.0.1:36379> XADD a-stream MAXLEN 10 * message world
"1639666566585-0"

127.0.0.1:36379> XREAD STREAMS a-stream 0
1) 1) "a-stream"
   2) 1) 1) "1639666330803-0"
         2) 1) "message"
            2) "hello"
      2) 1) "1639666566585-0"
         2) 1) "message"
            2) "world"

127.0.0.1:36379> XREAD STREAMS a-stream 1639666330803-0
1) 1) "a-stream"
   2) 1) 1) "1639666566585-0"
         2) 1) "message"
            2) "world"

To wait for a queue like the Pub/Sub mentioned earlier, use BLOCK:

XREAD BLOCK 0 STREAMS a-stream $

The $ here is a special ID that allows you to receive messages added after the point you started the XREAD command.

After receiving a message added via XADD once, the process terminates immediately.

As mentioned in this article, you can implement the queue behavior with "code that waits with $ at startup, and specifies the last received ID thereafter."

async function consume(startId = "$") {
  let nextId = startId ? startId : "$";

  const res = await redis.xread(
    "BLOCK",
    "0",
    "COUNT",
    "20",
    "STREAMS",
    "stream-sample",
    startId
  );
  for (const [, val] of res) {
    for (const [id, messages] of val) {
      nextId = id;
      // Some processing
    }
  }

  await consume(nextId);
}

consume();
Unread / Read Management

We were able to receive messages with XADD and XREAD so far, but what happens to the behavior when running with multiple processes?

As shown here, multiple processes end up in a subscribed state simultaneously, making it difficult to handle them across multiple processes at the same time.

Therefore, we use the XGROUP / XREADGROUP commands.

The difference from XREAD is that you first need to create a group using XGROUP.

127.0.0.1:36379> XGROUP CREATE a-stream mygroup01 $
OK

This is the command for Alice, a member of group1, to read data that has not yet been read by other members.

XREADGROUP GROUP group1 Alice STREAMS a-stream >

> specifies "new messages that have not been delivered to other consumers."

After running XADD a few times and then XREADGROUP, we can see that it retrieves all unread messages, and the second time, there are zero unread messages.

Combining this with the logic mentioned earlier, it looks like the following:

export async function subscribeStream() {
  // Create a group to use xreadgroup.
  await redis.xgroup('CREATE', 'a-stream', 'group1', '$', 'MKSTREAM').catch(() =>

  while (true) {
    // Subscribe only to unread messages using xreadgroup
    const reply = await redis.xreadgroup('GROUP', 'group1', 'consumer', 'BLOCK', '0', 'COUNT', '0', 'STREAMS', 'a-stream', '>');

    for (const [, val] of reply) {
      for (const [id, messages] of val) {
        // Some processing
      }
    }
  }
}

MKSTREAM is an option to "automatically create the stream if it does not exist, using the optional MKSTREAM subcommand as the last argument."

I specified it as a bit of a "magic spell" to avoid errors, so you might not need to be particularly conscious of it.


Unlike the intuitive Pub / Sub, Streams is feature-rich but comes with special symbols like $ and >, so you really have to learn it by doing.

Please give it a try if you are interested!


References

Discussion