

























The notification emails stopped sending at 02:14. Not a crash. The Redis Pub/Sub channel still showed one subscriber. But the subscriber process had silently restarted during a deployment, and Pub/Sub delivered exactly zero messages to the connection that was briefly missing. By the time the team noticed at 08:45, 4,200 onboarding emails had evaporated.
This is not a Redis bug. This is Redis Pub/Sub working exactly as designed. PUBLISH sends a message to every currently connected subscriber. If nobody is listening, the message is gone forever. No queue. No persistence. No replay. This makes Pub/Sub fine for ephemeral notifications (live dashboards, chat typing indicators) and catastrophically wrong for anything you cannot afford to lose.
Redis Streams, introduced in Redis 5.0, fixes this. It is an append-only log with consumer groups, acknowledgment tracking, and configurable retention. It gives you the durability of a message queue without running Kafka, and it fits in a Node.js app with about 150 lines of careful code.
This post covers the three patterns that turn Redis Streams from a “fancy list” into a production-grade message processor: consumer groups that survive crashes, dead-letter handling for poison messages, and backpressure that keeps your Redis memory from filling up when your workers slow down.
A consumer group is a logical subscription to a stream that tracks which messages each consumer has processed. When a consumer crashes before acknowledging a message, the group knows. When a new consumer joins, the group assigns it pending messages that other consumers abandoned.
Here is the minimal producer:
import { createClient } from 'redis';
const redis = createClient({ url: process.env.REDIS_URL });
await redis.connect();
async function enqueue(email) {
await redis.xAdd('notifications:email', '*', {
to: email.to,
subject: email.subject,
body: email.body,
created_at: Date.now().toString(),
});
}
The * tells Redis to auto-generate the stream ID (a millisecond timestamp plus a sequence number). You can supply your own ID if you want causality, but * is the right default.
And the consumer that will not lose messages:
const GROUP = 'email-sender-group';
const CONSUMER = `worker-${process.pid}`;
async function startConsumer() {
const redis = createClient({ url: process.env.REDIS_URL });
await redis.connect();
// Create the group if it does not exist.
// MKSTREAM creates the stream automatically.
try {
await redis.xGroupCreate('notifications:email', GROUP, '0', {
MKSTREAM: true,
});
} catch (err) {
// BUSYGROUP means the group already exists. That is fine.
if (!err.message.includes('BUSYGROUP')) throw err;
}
console.log(`Consumer ${CONSUMER} listening...`);
while (true) {
try {
// XREADGROUP reads messages assigned to this consumer.
// Block for up to 5 seconds if no messages are available.
const results = await redis.xReadGroup(
GROUP,
CONSUMER,
[{ key: 'notifications:email', id: '>' }],
{ COUNT: 10, BLOCK: 5000 }
);
if (!results || results.length === 0) continue;
for (const { messages } of results) {
for (const msg of messages) {
await processMessage(msg);
// Acknowledge after successful processing.
await redis.xAck('notifications:email', GROUP, msg.id);
}
}
} catch (err) {
console.error(`Consumer error:`, err);
// Wait before retrying to avoid tight error loops.
await sleep(1000);
}
}
}
The critical detail is id: '>'. The > character is a special marker that tells Redis “give me messages that have never been delivered to this consumer.” Without >, you are reading by ID range, which is how you reprocess dead letters (more on that later).
The xAck call is the commit. Until you acknowledge a message, Redis considers it pending and will reassign it to another consumer if your worker disconnects or if the pending time exceeds a threshold. This is the at-least-once guarantee: every message gets processed by at least one consumer, even if the original consumer dies halfway through.
Here is where the pattern pays for itself. Run this query on a stream that has been processing for a while:
redis-cli XPENDING notifications:email email-sender-group
You will see output like this:
1) (integer) 12 # total pending messages
2) "1718234512000-0" # oldest pending ID
3) "1718234522000-0" # newest pending ID
4) 1) "worker-1234"
2) "worker-5678"
Each pending message has a consumer name, a delivery count, and an idle time. If worker-1234 has 11 pending messages with idle times over 30 seconds, that worker is probably dead. Redis does not auto-reclaim these messages. You must claim them explicitly or configure a consumer to do it.
Here is the recovery loop you run alongside your main consumer:
async function claimStaleMessages() {
const redis = createClient({ url: process.env.REDIS_URL });
await redis.connect();
const MAX_PENDING = 100;
const MIN_IDLE_MS = 30_000; // 30 seconds without progress = dead
while (true) {
await sleep(10_000); // Check every 10 seconds
try {
// Get pending messages for the group.
// XPENDING returns summary-level data.
// XAUTOCLAIM does the actual re-assignment.
const claimed = await redis.xAutoClaim(
'notifications:email',
GROUP,
CONSUMER,
MIN_IDLE_MS,
'0-0',
{ COUNT: MAX_PENDING }
);
if (claimed.messages.length > 0) {
console.log(`Claimed ${claimed.messages.length} stale messages`);
}
} catch (err) {
console.error(`Claim error:`, err);
}
}
}
XAUTOCLAIM (added in Redis 6.2) replaces the older multi-step pattern of XPENDING + XCLAIM. It scans the pending entries list, finds messages that have been idle longer than MIN_IDLE_MS, and reassigns them to the current consumer. The '0-0' is the start cursor. The command returns a cursor for the next batch, so you can paginate through millions of pending entries.
This is the safety net. Without it, a crashed consumer takes its unacknowledged messages to the grave. The stream retains them forever (assuming you have not set a trimming policy), but no consumer will touch them unless something explicitly claims them.
Not every message should be retried. If a message has a malformed payload, references a user ID that does not exist, or causes your email provider to return a 400, re-processing it three times will produce three identical failures. These are poison messages, and they need a dead-letter queue (DLQ).
Here is the pattern that moves bad messages out of the main stream:
const MAX_DELIVERIES = 3;
async function processMessage(msg) {
const payload = JSON.parse(msg.message);
// Check delivery count from stream metadata.
// XREADGROUP response includes a delivery count in the message ID sequence.
// For XPENDING-style tracking, use XINFO or the pending entry details.
const deliveryCount = await getDeliveryCount(msg.id);
if (deliveryCount > MAX_DELIVERIES) {
await deadLetter(msg, 'Max delivery count exceeded');
// Acknowledge the message to remove it from pending.
await redis.xAck('notifications:email', GROUP, msg.id);
return;
}
try {
await sendEmail(payload);
} catch (err) {
if (isPermanentFailure(err)) {
// No point retrying a schema violation or auth error.
await deadLetter(msg, err.message);
await redis.xAck('notifications:email', GROUP, msg.id);
} else {
// Transient failure. Do not ack. Let the claim loop retry.
throw err;
}
}
}
async function deadLetter(msg, reason) {
await redis.xAdd('notifications:email:dlq', '*', {
original_id: msg.id,
payload: JSON.stringify(msg.message),
reason,
failed_at: Date.now().toString(),
});
}
function isPermanentFailure(err) {
// Specific errors that no retry will fix.
const permanent = ['INVALID_PAYLOAD', 'USER_NOT_FOUND', 'RATE_LIMITED'];
return permanent.some((code) => err.message.includes(code));
}
The dead-letter stream is a separate stream with the same structure. You monitor it, alert on it, and reprocess messages manually after fixing whatever caused the failure. The alternative (leaving poison messages in the pending list forever) bloats the pending entries list and slows down XAUTOCLAIM scans.
A note on the delivery count: Redis does not natively increment a counter on each delivery. You have two options:
XPENDING details. Each pending entry includes a delivery counter. XAUTOCLAIM returns the delivery count with each claimed message.deliveries: 0 when you produce, and increment it in the consumer before re-enqueuing.Option 1 is cleaner because it uses Redis’s built-in tracking. But it only counts deliveries within the pending list, meaning messages that were acknowledged and re-added to the stream start at 1 again. For most use cases this is acceptable. If you need an absolute count across all delivery attempts, embed it in the payload.
The stream grows unboundedly unless you trim it. A busy stream processing 1,000 messages per second with 1KB payloads consumes about 84GB per day. Without trimming, Redis runs out of memory, evicts other keys, or crashes.
Redis Streams offers three trimming strategies, and you need to pick the right one before the pager goes off at 3am.
// Keep approximately 100,000 messages. The ~ makes it efficient.
await redis.xTrim('notifications:email', 'MAXLEN', '~', 100_000);
The tilde is critical. Without it (MAXLEN 100000), Redis truncates the stream to exactly 100,000 messages every time you trim. With ~, Redis trims to roughly 100,000 messages but only when it can evict a full macro-node (a Redis internal data structure). In practice this makes trimming a constant-time operation instead of O(n).
Set this in a periodic job or at the end of each consumer loop iteration. Do not do it on every produce call, because XTRIM is not free even with ~.
// Remove messages older than 7 days.
const SEVEN_DAYS_MS = 7 * 24 * 60 * 60 * 1000;
const cutoff = Date.now() - SEVEN_DAYS_MS;
await redis.xTrim('notifications:email', 'MINID', '~', cutoff);
MINID removes messages with IDs older than the specified timestamp. This is the right strategy when you care about retention duration (e.g., “keep 7 days of messages for reprocessing”) instead of count. Use ~ here too for the same performance reason.
There is a subtle bug with naive trimming: you can trim messages that consumers have not processed yet. If a consumer is 10,000 messages behind and you trim to MAXLEN ~ 5000, those 10,000 messages disappear before the consumer reads them.
Check the consumer group lag before trimming:
async function safeTrim(stream, group, maxLen) {
const info = await redis.xInfoGroups(stream);
const groupInfo = info.find((g) => g.name === group);
const lag = groupInfo['lag']; // Redis 7.0+ field
// Only trim if the consumer is within 20% of the stream head.
// lag can be null for older Redis versions.
if (lag !== null && lag > maxLen * 0.2) {
console.warn(`Consumer lag is ${lag}. Skipping trim.`);
return;
}
await redis.xTrim(stream, 'MAXLEN', '~', maxLen);
}
In Redis 7.0+, XINFO GROUPS returns a lag field that tells you how many messages the slowest consumer is behind. Use it. Without consumer-group-aware trimming, you will randomly lose messages that consumers have not processed, and the symptoms (missing data with no errors) are the hardest kind to debug.
Here is the complete consumer pattern that combines all three mechanisms:
import { createClient } from 'redis';
const STREAM = 'notifications:email';
const GROUP = 'email-sender-group';
const CONSUMER = `worker-${process.pid}`;
const MAX_PENDING_READ = 100; // messages per batch
let shuttingDown = false;
async function main() {
const redis = createClient({ url: process.env.REDIS_URL });
await redis.connect();
await ensureGroup(redis);
// Start claim loop in background.
const claimHandle = startClaimLoop(redis);
// Trim loop in background.
const trimHandle = startTrimLoop(redis);
process.on('SIGTERM', async () => {
shuttingDown = true;
clearInterval(claimHandle);
clearInterval(trimHandle);
await redis.quit();
process.exit(0);
});
while (!shuttingDown) {
try {
const results = await redis.xReadGroup(
GROUP,
CONSUMER,
[{ key: STREAM, id: '>' }],
{ COUNT: MAX_PENDING_READ, BLOCK: 2000 }
);
if (!results) continue;
for (const { messages } of results) {
for (const msg of messages) {
try {
await handleMessage(msg);
await redis.xAck(STREAM, GROUP, msg.id);
} catch (err) {
if (isPermanent(err)) {
await deadLetter(redis, msg, err.message);
await redis.xAck(STREAM, GROUP, msg.id);
}
// Transient: do not ack. Claim loop will retry.
}
}
}
} catch (err) {
console.error('Read loop error:', err);
await sleep(2000);
}
}
}
async function ensureGroup(redis) {
try {
await redis.xGroupCreate(STREAM, GROUP, '0', { MKSTREAM: true });
} catch (err) {
if (!err.message.includes('BUSYGROUP')) throw err;
}
}
function startClaimLoop(redis) {
return setInterval(async () => {
try {
const result = await redis.xAutoClaim(
STREAM, GROUP, CONSUMER, 30_000, '0-0',
{ COUNT: 100 }
);
if (result.messages.length > 0) {
console.log(`Claimed ${result.messages.length} stale messages`);
}
} catch (err) {
console.error('Claim loop error:', err);
}
}, 10_000);
}
function startTrimLoop(redis) {
return setInterval(async () => {
try {
const info = await redis.xInfoGroups(STREAM);
const groupInfo = info.find((g) => g.name === GROUP);
const lag = groupInfo?.lag ?? Infinity;
if (lag < 1000) {
await redis.xTrim(STREAM, 'MAXLEN', '~', 50_000);
}
} catch (err) {
console.error('Trim loop error:', err);
}
}, 60_000);
}
// -- helpers --
async function handleMessage(msg) {
const data = JSON.parse(msg.message);
// Your actual processing logic here.
// Throws on transient errors, returns on success.
}
function isPermanent(err) {
return ['INVALID', 'NOT_FOUND'].some((c) => err.message.includes(c));
}
async function deadLetter(redis, msg, reason) {
await redis.xAdd(`${STREAM}:dlq`, '*', {
original_id: msg.id,
payload: msg.message,
reason,
failed_at: Date.now().toString(),
});
}
function sleep(ms) {
return new Promise((r) => setTimeout(r, ms));
}
main().catch(console.error);
This is about 110 lines and covers everything you need: at-least-once delivery, dead-letter isolation, crash recovery via auto-claim, and memory safety via consumer-group-aware trimming.
Redis Streams is a great fit when:
It is a poor fit when:
The most common mistake teams make is using Redis Streams for everything because Redis is already in the stack. If your messaging needs include long-term storage, schema evolution, or replay from arbitrary points in time, Kafka’s log-compacted topics and schema registry handle those patterns natively. Redis Streams is a queue, not an event store. Treat it as one.
Redis Pub/Sub drops messages on disconnect. Redis Streams with consumer groups does not. The three-pattern combination of XREADGROUP + XAUTOCLAIM + MAXLEN ~ trimming gives you a message processing pipeline that survives process crashes, handles poison messages gracefully, and stays within its memory budget. The total implementation is about 150 lines of Node.js. The alternative (running a separate message broker) is a whole new deployment.
Every production stream needs all three patterns. Consumer groups without auto-claim leaks pending entries. Consumer groups without dead letters retries poison messages into infinity. Consumer groups without trimming eats all your Redis memory. Implement the trio or do not bother with streams at all.
Building message processing infrastructure that survives process crashes, handles poison messages, and stays within its memory budget is the kind of reliability engineering that separates systems that wake someone up at 3am from systems that quietly self-heal. Yojji’s senior engineering teams design and implement these patterns regularly across the JavaScript ecosystem, building production data pipelines on Redis, Postgres, and cloud infrastructure for clients in e-commerce, fintech, and logistics.
Yojji is an international custom software development company founded in 2016, with offices in Europe, the US, and the UK. Their teams specialize in the full cycle of product delivery from discovery through deployment, with deep expertise in Node.js, TypeScript, React, cloud platforms on AWS, Azure, and Google Cloud, and the production-hardened backend patterns that keep data flowing reliably even when things go wrong.
此内容由惯性聚合(RSS阅读器)自动聚合整理,仅供阅读参考。 原文来自 — 版权归原作者所有。