The Problem Statement
redis-streaming is to have multiple consumers processing incoming messages simultaneously.
The current system uses Redis streaming to quickly process incoming messages and is required to have the ability to scale up the processing power with concurrency, given specific time frames (exp: duplicating consumer pods). The order of processing incoming messages is ignored to push POC releases.
As the business model grows, we encounter a case where 2 or more messages need to be processed concurrently got pick up by 2 consumers, and processed at the same time, resulting in data conflict. The only restriction is we must use the current infrastructure and Redis, without introducing a new event messaging platform(exp: Kafka).
The master, consumers pattern
The master, consumers pattern is about having a master act as a proxy to traffic control to decide which consumers to delegate the incoming message to.
In Redis streaming, only consumers in a group can pick up incoming messages in the stream, read then acknowledge the message as completed.
To implement the pattern, the master will be a glorified consumer. The master will have the priority to first XREADGROUP >
the incoming message, read then choose which consumer to delegate to using XCLAIM
. Finally, consumers can use XREADGROUP 0
to process the delegated messages.
The reason the master must XREADGROUP
is:
- The message needs to be processed to know which consumer to deliver to.
- To use
XCLAIM
on a message so other consumers can not read it, it must be in thePEL
(Pending Entries List). A message can only be inPEL
after a consumer runsXREADGROUP
.
Example implementation
We will be handling messages containing ticket_id
. Messages with the same ticket_id
must be handled orderly one by one.
This will also include saving the current session and failure recovery for pending messages.
Redis configs
-
ticket_stream
, business services willXADD
messages to this stream. The master then delegates messages from this stream to consumers. -
concurrency_stream_group
, consumer group.
Redis objects for the master:
-
tickets
, a key/value map of processingticket_id
:ticket_id
key, data:consumer_name
, the current processor consumer for theticket_id
.message_ids
, list ids of processing messages with the sameticket_id
.
-
consumers
, a key/value map ofconsumers
:consumer_name
key, data:healthURL
, the URL for the master to check if the consumer is alive.ticket_ids
, list of processing ticket ids.
Master consumer service
An API service with access to Redis client.
Configs:
ticket_stream
.concurrency_stream_group
.master
, master consumer name, used to callXREADGROUP
.
Endpoints:
-
api/consumers/register
, the consumers call this to register itself to the master, updateconsumers
withhealthURL
. -
api/messages/acknowledge
, the consumers call this to notify successful of consuming the message. The master updatestickets
, andconsumer
to remove the completed message.
Delegate Flow:
-
First, the master reloads the last session from Redis's
tickets
, andconsumers
objects. -
Delegate incoming messages from
ticket_stream
to consumers:XREADGROUP ticket_stream concurrency_stream_group master >
, get incoming messages undermaster
consumer, then parse forticket_id
.- Check
tickets
for the currentticket_id
's processing consumer. If not exist, can choose a random consumer fromconsumers
. - From the selected consumer, the master calls
XCLAIM
to delegate the message to the selected consumer. - Update
tickets
, andconsumers
with theticket_id
andmessage_id
.
Failure recovery flow:
- Offline consumers recovery, cronjob (exp: thread with
sleep
):- Ping consumers in
consumers
(throughhealthURL
) for the health check. - If failed to call
healthURL
, checkticket_ids
for processingticket_id
- For each
ticket_id
, checktickets
for processingmessage_ids
then delegate those to another consumer.
- Ping consumers in
- Idle messages recovery with
XAUTOCLAIM
, cronjob (exp: a thread withsleep
), for messages in thePEL
ofticket_stream
but not exist in thetickets
map.XAUTOCLAIM
to get idle messages passedmin_idle_time
.- Parse the messages for
ticket_id
. - Delegate the messages to consumers:
- If
ticket_id
exists intickets
map, call processing consumer health:- If the consumer is alive, delegate the message to that consumer.
- If the consumer is disconnected, delegate the event and the rest of the events in the
ticket_id
to another consumer.
- If
ticket_id
does not exist, delegate the event to a random consumer.
- If
- Update
tickets
, andconsumers
with theticket_id
andmessage_id
.
Delegated consumers
An API service with access to Redis client.
Configs:
ticket_stream
.concurrency_stream_group
.consumer
, consumer name, used to callXREADGROUP
,XACK
.master_register_api_url
, use to register the consumer to the master.master_acknowledge_api_url
, use to notify completion of processing a message to the master.
Endpoints:
api/health
, to check if the consumer is alive
Flow:
- First, the consumer pings the master via
master_register_api_url
to register itself to the master, inputconsumer
name, andapi/health
. XREADGROUP ticket_stream concurrency_stream_group consumer 0
gets delegated messages.- Process the message according to business requirements.
- After processed,
XACK
to mark the message as completed, and remove it from thePEL
- Call
master_acknowledge_api_url
to notify that the consumer has completed processing of the message.