Docs / Databases / Using Redis Streams for Event-Driven Architectures

Using Redis Streams for Event-Driven Architectures

By Admin · Mar 15, 2026 · Updated Apr 23, 2026 · 285 views · 4 min read

Redis Streams, introduced in Redis 5.0, provide an append-only log data structure ideal for event sourcing, message queuing, and real-time data processing. Unlike Redis Pub/Sub, Streams persist messages and support consumer groups for reliable, distributed message processing. This guide covers practical patterns for building event-driven systems with Redis Streams.

Understanding Redis Streams

A Redis Stream is similar to Apache Kafka's log — messages are appended with auto-generated IDs (timestamp-based), can be read by multiple consumers, and persist until explicitly trimmed. Key advantages over other Redis data structures:

  • Persistence — messages survive server restarts (unlike Pub/Sub)
  • Consumer groups — distribute work across multiple consumers with acknowledgment
  • Message IDs — time-ordered, enabling range queries and replay
  • Backpressure — configurable max length prevents unbounded growth

Basic Stream Operations

Adding Messages

# Add an event to a stream (auto-generated ID with *)
XADD orders * customer_id 42 product "vps-basic" action "purchase" amount 9.99

# Add with max length cap (approximate trimming with ~)
XADD orders MAXLEN ~ 100000 * customer_id 42 product "vps-basic" action "purchase"

# Response: "1709234567890-0" (timestamp-sequence ID)

Reading Messages

# Read from beginning
XRANGE orders - + COUNT 10

# Read from a specific ID onward
XRANGE orders 1709234567890-0 +

# Read last 5 messages
XREVRANGE orders + - COUNT 5

# Block and wait for new messages (like tail -f)
XREAD BLOCK 5000 COUNT 10 STREAMS orders $

Consumer Groups

Consumer groups enable multiple workers to process stream messages in parallel, with guaranteed delivery and acknowledgment:

# Create a consumer group starting from the beginning
XGROUP CREATE orders order-processors 0

# Create from current position (only new messages)
XGROUP CREATE orders order-processors $ MKSTREAM

# Consumer reads messages (each message goes to only one consumer)
XREADGROUP GROUP order-processors worker-1 COUNT 5 BLOCK 2000 STREAMS orders >

# Acknowledge processed messages
XACK orders order-processors 1709234567890-0 1709234567891-0

# Check pending (unacknowledged) messages
XPENDING orders order-processors - + 10

Practical Example: Order Processing Pipeline

Producer (Node.js)

import Redis from 'ioredis';
const redis = new Redis();

async function publishOrder(order) {
  const id = await redis.xadd(
    'orders',
    'MAXLEN', '~', '500000',
    '*',
    'order_id', order.id,
    'customer_id', order.customerId,
    'total', order.total,
    'items', JSON.stringify(order.items),
    'timestamp', Date.now().toString()
  );
  console.log(`Published order ${order.id} with stream ID ${id}`);
  return id;
}

Consumer (Node.js)

import Redis from 'ioredis';
const redis = new Redis();

const GROUP = 'order-processors';
const CONSUMER = process.env.WORKER_ID || 'worker-1';

async function processOrders() {
  // Ensure group exists
  try {
    await redis.xgroup('CREATE', 'orders', GROUP, '0', 'MKSTREAM');
  } catch (e) {
    // Group already exists — that is fine
  }

  while (true) {
    const results = await redis.xreadgroup(
      'GROUP', GROUP, CONSUMER,
      'COUNT', 10,
      'BLOCK', 5000,
      'STREAMS', 'orders', '>'
    );

    if (!results) continue;

    for (const [stream, messages] of results) {
      for (const [id, fields] of messages) {
        const order = parseFields(fields);
        try {
          await handleOrder(order);
          await redis.xack('orders', GROUP, id);
        } catch (err) {
          console.error(`Failed to process ${id}:`, err);
          // Message stays pending for retry
        }
      }
    }
  }
}

function parseFields(fields) {
  const obj = {};
  for (let i = 0; i < fields.length; i += 2) {
    obj[fields[i]] = fields[i + 1];
  }
  if (obj.items) obj.items = JSON.parse(obj.items);
  return obj;
}

Handling Failed Messages

Messages that are read but not acknowledged remain in the Pending Entries List (PEL). Implement a dead-letter mechanism for messages that fail repeatedly:

async function claimStaleMessages() {
  // Claim messages idle for more than 60 seconds
  const stale = await redis.xautoclaim(
    'orders', GROUP, CONSUMER,
    60000,  // min idle time ms
    '0-0',  // start ID
    'COUNT', 10
  );

  const [nextId, messages, deletedIds] = stale;
  for (const [id, fields] of messages) {
    const info = await redis.xpending('orders', GROUP, id, id, 1);
    const deliveryCount = info[0]?.[3] || 0;

    if (deliveryCount > 5) {
      // Move to dead letter stream
      await redis.xadd('orders-dlq', '*', ...fields, 'original_id', id, 'error', 'max_retries');
      await redis.xack('orders', GROUP, id);
    }
  }
}

Stream Trimming Strategies

# Trim to approximate max length (efficient)
XADD orders MAXLEN ~ 100000 * key value

# Trim by minimum ID (time-based retention)
XTRIM orders MINID ~ 1709000000000-0

# Periodic trimming via cron
redis-cli XTRIM orders MAXLEN ~ 500000

Monitoring Streams

# Stream info
XINFO STREAM orders

# Consumer group details
XINFO GROUPS orders

# Individual consumer stats
XINFO CONSUMERS orders order-processors

# Key metrics to monitor:
# - Stream length (XLEN orders)
# - Pending message count per consumer
# - Consumer lag (last delivered vs. last stream entry)
# - Memory usage (MEMORY USAGE orders)

Performance Considerations

  • Use MAXLEN ~ (approximate) instead of exact trimming — it is significantly faster as Redis trims in whole radix tree nodes
  • Batch reads with COUNT to reduce round trips
  • Use BLOCK in XREADGROUP to avoid busy-waiting
  • Keep field names short — they are stored with every message
  • For high-throughput streams, consider pipelining XADD commands
  • Monitor memory usage — each stream entry uses approximately 100-200 bytes of overhead plus field data

Redis Streams vs. Alternatives

Choose Redis Streams when you need sub-millisecond latency, already use Redis, and your throughput is under ~100K messages/second per stream. For higher throughput or strict ordering guarantees across partitions, consider Apache Kafka. For simple task queues, Redis Lists with BRPOPLPUSH may be simpler.

Was this article helpful?