Redis Streams, introduced in Redis 5.0, provide an append-only log data structure ideal for event sourcing, message queuing, and real-time data processing. Unlike Redis Pub/Sub, Streams persist messages and support consumer groups for reliable, distributed message processing. This guide covers practical patterns for building event-driven systems with Redis Streams.
Understanding Redis Streams
A Redis Stream is similar to Apache Kafka's log — messages are appended with auto-generated IDs (timestamp-based), can be read by multiple consumers, and persist until explicitly trimmed. Key advantages over other Redis data structures:
- Persistence — messages survive server restarts (unlike Pub/Sub)
- Consumer groups — distribute work across multiple consumers with acknowledgment
- Message IDs — time-ordered, enabling range queries and replay
- Backpressure — configurable max length prevents unbounded growth
Basic Stream Operations
Adding Messages
# Add an event to a stream (auto-generated ID with *)
XADD orders * customer_id 42 product "vps-basic" action "purchase" amount 9.99
# Add with max length cap (approximate trimming with ~)
XADD orders MAXLEN ~ 100000 * customer_id 42 product "vps-basic" action "purchase"
# Response: "1709234567890-0" (timestamp-sequence ID)
Reading Messages
# Read from beginning
XRANGE orders - + COUNT 10
# Read from a specific ID onward
XRANGE orders 1709234567890-0 +
# Read last 5 messages
XREVRANGE orders + - COUNT 5
# Block and wait for new messages (like tail -f)
XREAD BLOCK 5000 COUNT 10 STREAMS orders $
Consumer Groups
Consumer groups enable multiple workers to process stream messages in parallel, with guaranteed delivery and acknowledgment:
# Create a consumer group starting from the beginning
XGROUP CREATE orders order-processors 0
# Create from current position (only new messages)
XGROUP CREATE orders order-processors $ MKSTREAM
# Consumer reads messages (each message goes to only one consumer)
XREADGROUP GROUP order-processors worker-1 COUNT 5 BLOCK 2000 STREAMS orders >
# Acknowledge processed messages
XACK orders order-processors 1709234567890-0 1709234567891-0
# Check pending (unacknowledged) messages
XPENDING orders order-processors - + 10
Practical Example: Order Processing Pipeline
Producer (Node.js)
import Redis from 'ioredis';
const redis = new Redis();
async function publishOrder(order) {
const id = await redis.xadd(
'orders',
'MAXLEN', '~', '500000',
'*',
'order_id', order.id,
'customer_id', order.customerId,
'total', order.total,
'items', JSON.stringify(order.items),
'timestamp', Date.now().toString()
);
console.log(`Published order ${order.id} with stream ID ${id}`);
return id;
}
Consumer (Node.js)
import Redis from 'ioredis';
const redis = new Redis();
const GROUP = 'order-processors';
const CONSUMER = process.env.WORKER_ID || 'worker-1';
async function processOrders() {
// Ensure group exists
try {
await redis.xgroup('CREATE', 'orders', GROUP, '0', 'MKSTREAM');
} catch (e) {
// Group already exists — that is fine
}
while (true) {
const results = await redis.xreadgroup(
'GROUP', GROUP, CONSUMER,
'COUNT', 10,
'BLOCK', 5000,
'STREAMS', 'orders', '>'
);
if (!results) continue;
for (const [stream, messages] of results) {
for (const [id, fields] of messages) {
const order = parseFields(fields);
try {
await handleOrder(order);
await redis.xack('orders', GROUP, id);
} catch (err) {
console.error(`Failed to process ${id}:`, err);
// Message stays pending for retry
}
}
}
}
}
function parseFields(fields) {
const obj = {};
for (let i = 0; i < fields.length; i += 2) {
obj[fields[i]] = fields[i + 1];
}
if (obj.items) obj.items = JSON.parse(obj.items);
return obj;
}
Handling Failed Messages
Messages that are read but not acknowledged remain in the Pending Entries List (PEL). Implement a dead-letter mechanism for messages that fail repeatedly:
async function claimStaleMessages() {
// Claim messages idle for more than 60 seconds
const stale = await redis.xautoclaim(
'orders', GROUP, CONSUMER,
60000, // min idle time ms
'0-0', // start ID
'COUNT', 10
);
const [nextId, messages, deletedIds] = stale;
for (const [id, fields] of messages) {
const info = await redis.xpending('orders', GROUP, id, id, 1);
const deliveryCount = info[0]?.[3] || 0;
if (deliveryCount > 5) {
// Move to dead letter stream
await redis.xadd('orders-dlq', '*', ...fields, 'original_id', id, 'error', 'max_retries');
await redis.xack('orders', GROUP, id);
}
}
}
Stream Trimming Strategies
# Trim to approximate max length (efficient)
XADD orders MAXLEN ~ 100000 * key value
# Trim by minimum ID (time-based retention)
XTRIM orders MINID ~ 1709000000000-0
# Periodic trimming via cron
redis-cli XTRIM orders MAXLEN ~ 500000
Monitoring Streams
# Stream info
XINFO STREAM orders
# Consumer group details
XINFO GROUPS orders
# Individual consumer stats
XINFO CONSUMERS orders order-processors
# Key metrics to monitor:
# - Stream length (XLEN orders)
# - Pending message count per consumer
# - Consumer lag (last delivered vs. last stream entry)
# - Memory usage (MEMORY USAGE orders)
Performance Considerations
- Use
MAXLEN ~(approximate) instead of exact trimming — it is significantly faster as Redis trims in whole radix tree nodes - Batch reads with
COUNTto reduce round trips - Use
BLOCKin XREADGROUP to avoid busy-waiting - Keep field names short — they are stored with every message
- For high-throughput streams, consider pipelining XADD commands
- Monitor memory usage — each stream entry uses approximately 100-200 bytes of overhead plus field data
Redis Streams vs. Alternatives
Choose Redis Streams when you need sub-millisecond latency, already use Redis, and your throughput is under ~100K messages/second per stream. For higher throughput or strict ordering guarantees across partitions, consider Apache Kafka. For simple task queues, Redis Lists with BRPOPLPUSH may be simpler.