DEV Community

Cover image for Building a Fault-Tolerant Job Queue: Node.js Producers, Elixir/OTP Consumers
Temitope
Temitope

Posted on

Building a Fault-Tolerant Job Queue: Node.js Producers, Elixir/OTP Consumers

The pitch for OTP is always the same: "let it crash," nine nines of uptime, Erlang running phone switches. That's all true and also completely useless when you're staring at a blank mix new project wondering how to actually structure the thing.

This tutorial skips the theory tour. We build something real: a distributed job processing system where a Node.js API enqueues work into Redis, and an Elixir/OTP application consumes it — with a supervision tree that keeps the whole thing running when individual workers die, when Redis blips, and when a job payload is malformed.

By the end you'll have:

  • A Node.js producer API with Redis streams (not just lists — we want consumer groups)
  • An Elixir Application with a proper OTP supervision tree
  • A QueueConsumer GenServer that polls Redis and dispatches work
  • A WorkerSupervisor (DynamicSupervisor) that spawns and monitors per-job workers
  • A JobWorker GenServer that processes a job, retries on failure, and dead-letters after max attempts
  • A Telemetry integration so you can see what's actually happening

No Oban, no Exq. We're building the layer below so you understand what those libraries are doing.


Architecture Overview

┌─────────────────────────────────────────────────────────┐
│  Node.js Producer API                                   │
│  POST /jobs  →  Redis XADD  →  Stream: "jobs:work"     │
└─────────────────────────────────────────────────────────┘
                          │ Redis Streams
                          ▼
┌─────────────────────────────────────────────────────────┐
│  Elixir OTP Application                                 │
│                                                         │
│  Application (supervisor)                               │
│  ├── RedisPool (Redix connections)                      │
│  ├── QueueConsumer (GenServer — polls + dispatches)     │
│  └── WorkerSupervisor (DynamicSupervisor)               │
│       ├── JobWorker<job_id_1> (GenServer)               │
│       ├── JobWorker<job_id_2> (GenServer)               │
│       └── JobWorker<job_id_n> (GenServer)               │
└─────────────────────────────────────────────────────────┘
Enter fullscreen mode Exit fullscreen mode

Redis Streams give us persistent, consumer-group-aware queuing. A job isn't acknowledged until the worker finishes it — crash the worker mid-job and Redis redelivers it on restart.


Part 1: The Node.js Producer

Project Setup

mkdir job-producer && cd job-producer
npm init -y
npm install express ioredis ulid zod
Enter fullscreen mode Exit fullscreen mode

Redis Stream Producer

We use XADD to append jobs to a Redis stream. Unlike LPUSH/RPUSH, streams give us:

  • Persistent, ordered log of all jobs (not consumed on read)
  • Consumer groups (multiple consumers, each gets different jobs)
  • Built-in pending entry list (PEL) — unacknowledged jobs are trackable
// src/redis.js
const Redis = require('ioredis');

const redis = new Redis({
  host: process.env.REDIS_HOST || 'localhost',
  port: parseInt(process.env.REDIS_PORT || '6379'),
  maxRetriesPerRequest: 3,
  retryStrategy: (times) => Math.min(times * 50, 2000),
  lazyConnect: false,
});

redis.on('error',  (err) => console.error('[redis] error:', err.message));
redis.on('connect', ()   => console.log('[redis] connected'));

module.exports = redis;
Enter fullscreen mode Exit fullscreen mode
// src/jobs.js
const { ulid }  = require('ulid');
const redis     = require('./redis');
const { z }     = require('zod');

const STREAM_KEY    = 'jobs:work';
const MAX_LEN       = 10_000;  // cap stream length, trim old entries

// Job schema — validate before enqueuing
const JobSchema = z.object({
  type:    z.enum(['email', 'report', 'webhook', 'thumbnail']),
  payload: z.record(z.unknown()),
  priority: z.number().int().min(1).max(10).default(5),
});

async function enqueueJob(rawInput) {
  const parsed = JobSchema.parse(rawInput);  // throws ZodError if invalid

  const job = {
    id:         ulid(),           // sortable, unique job ID
    type:       parsed.type,
    payload:    JSON.stringify(parsed.payload),
    priority:   String(parsed.priority),
    enqueued_at: new Date().toISOString(),
    attempts:   '0',
  };

  // XADD stream MAXLEN ~ 10000 * id field value field value ...
  // '*' tells Redis to auto-generate the stream entry ID
  const entryId = await redis.xadd(
    STREAM_KEY,
    'MAXLEN', '~', String(MAX_LEN),
    '*',                          // auto-ID
    ...Object.entries(job).flat() // field-value pairs
  );

  console.log(`[jobs] enqueued ${job.type} job=${job.id} entry=${entryId}`);
  return { jobId: job.id, streamEntryId: entryId };
}

async function getJobStats() {
  const [length, groups] = await Promise.all([
    redis.xlen(STREAM_KEY),
    redis.xinfo('GROUPS', STREAM_KEY).catch(() => []),
  ]);

  return { stream: STREAM_KEY, length, consumerGroups: groups };
}

module.exports = { enqueueJob, getJobStats };
Enter fullscreen mode Exit fullscreen mode

The API

// src/index.js
const express        = require('express');
const { enqueueJob, getJobStats } = require('./jobs');

const app  = express();
app.use(express.json());

// POST /jobs  — enqueue a new job
app.post('/jobs', async (req, res) => {
  try {
    const result = await enqueueJob(req.body);
    res.status(202).json({ status: 'accepted', ...result });
  } catch (err) {
    if (err.name === 'ZodError') {
      return res.status(422).json({ error: 'Invalid job', issues: err.issues });
    }
    console.error('[api] enqueue error:', err);
    res.status(500).json({ error: 'Internal error' });
  }
});

// GET /jobs/stats  — queue depth and consumer group info
app.get('/jobs/stats', async (req, res) => {
  const stats = await getJobStats();
  res.json(stats);
});

const PORT = process.env.PORT || 3000;
app.listen(PORT, () => console.log(`[api] listening on :${PORT}`));
Enter fullscreen mode Exit fullscreen mode

Test it:

node src/index.js &

curl -s -X POST http://localhost:3000/jobs \
  -H 'Content-Type: application/json' \
  -d '{"type":"email","payload":{"to":"user@example.com","template":"welcome"}}' \
  | jq .

# => { "status": "accepted", "jobId": "01HXYZ...", "streamEntryId": "1699...-0" }
Enter fullscreen mode Exit fullscreen mode

Part 2: The Elixir/OTP Consumer

Project Setup

mix new job_consumer --sup   # --sup scaffolds an Application module
cd job_consumer
Enter fullscreen mode Exit fullscreen mode

The --sup flag is important — it generates a JobConsumer.Application module with a supervision tree stub. We'll fill that in.

# mix.exs
defp deps do
  [
    {:redix, "~> 1.4"},
    {:poolboy, "~> 1.5"},
    {:jason, "~> 1.4"},
    {:telemetry, "~> 1.2"},
    {:telemetry_metrics, "~> 0.6"},
  ]
end
Enter fullscreen mode Exit fullscreen mode
mix deps.get
Enter fullscreen mode Exit fullscreen mode

Configuration

# config/config.exs
import Config

config :job_consumer,
  redis_host:       System.get_env("REDIS_HOST", "localhost"),
  redis_port:       String.to_integer(System.get_env("REDIS_PORT", "6379")),
  stream_key:       System.get_env("STREAM_KEY", "jobs:work"),
  consumer_group:   System.get_env("CONSUMER_GROUP", "elixir-workers"),
  consumer_name:    System.get_env("CONSUMER_NAME", "consumer-#{:inet.gethostname() |> elem(1)}"),
  max_concurrency:  String.to_integer(System.get_env("MAX_CONCURRENCY", "10")),
  poll_interval_ms: String.to_integer(System.get_env("POLL_INTERVAL_MS", "100")),
  max_attempts:     String.to_integer(System.get_env("MAX_ATTEMPTS", "3"))
Enter fullscreen mode Exit fullscreen mode

The Supervision Tree

This is the heart of the OTP design. Get this right and everything else is pluggable.

# lib/job_consumer/application.ex
defmodule JobConsumer.Application do
  use Application
  require Logger

  @impl true
  def start(_type, _args) do
    config = Application.get_all_env(:job_consumer)

    children = [
      # 1. Redis connection pool — must start before anything that uses Redis
      {JobConsumer.RedisPool, config},

      # 2. Queue consumer — polls Redis, dispatches to WorkerSupervisor
      #    Depends on RedisPool being up; supervision order matters
      {JobConsumer.QueueConsumer, config},

      # 3. Dynamic supervisor — spawns/monitors per-job worker processes
      {JobConsumer.WorkerSupervisor, config},

      # 4. Telemetry — attach handlers after workers are up
      JobConsumer.Telemetry,
    ]

    # :one_for_one — if one child crashes, only restart that child
    # This is correct here: a crashed QueueConsumer shouldn't kill the WorkerSupervisor
    # and vice versa. The RedisPool restart policy handles reconnection.
    opts = [
      strategy:  :one_for_one,
      name:      JobConsumer.Supervisor,
      max_restarts: 10,
      max_seconds:  60,
    ]

    Logger.info("[app] starting supervision tree")
    Supervisor.start_link(children, opts)
  end
end
Enter fullscreen mode Exit fullscreen mode

A note on strategy choice: :one_for_one is right here because our children are loosely coupled — the QueueConsumer and WorkerSupervisor don't share state. If we had children where a crash in one makes the others' state invalid, we'd use :one_for_all (restart everyone) or :rest_for_one (restart the crashed child and all children started after it).


The Redis Connection Pool

We use Poolboy to maintain a pool of Redix connections. One connection handles one command at a time; pooling gives us concurrency.

# lib/job_consumer/redis_pool.ex
defmodule JobConsumer.RedisPool do
  @pool_name :redix_pool

  def child_spec(config) do
    pool_opts = [
      name:          {:local, @pool_name},
      worker_module: JobConsumer.RedisWorker,
      size:          10,    # idle connections
      max_overflow:  5,     # burst connections
    ]

    redis_opts = [
      host: config[:redis_host],
      port: config[:redis_port],
    ]

    :poolboy.child_spec(@pool_name, pool_opts, redis_opts)
  end

  # Execute a Redis command, borrowing a connection from the pool
  def command(cmd) do
    :poolboy.transaction(@pool_name, fn worker ->
      Redix.command(worker, cmd)
    end)
  end

  # Pipeline multiple commands in one round trip
  def pipeline(cmds) do
    :poolboy.transaction(@pool_name, fn worker ->
      Redix.pipeline(worker, cmds)
    end)
  end
end
Enter fullscreen mode Exit fullscreen mode
# lib/job_consumer/redis_worker.ex
defmodule JobConsumer.RedisWorker do
  use GenServer

  def start_link(redis_opts) do
    GenServer.start_link(__MODULE__, redis_opts)
  end

  @impl true
  def init(opts) do
    host = Keyword.get(opts, :host, "localhost")
    port = Keyword.get(opts, :port, 6379)

    case Redix.start_link(host: host, port: port) do
      {:ok, conn}     -> {:ok, conn}
      {:error, reason} -> {:stop, reason}
    end
  end

  # Delegate all GenServer calls to the Redix connection
  @impl true
  def handle_call(request, from, conn) do
    GenServer.reply(from, Redix.command(conn, request))
    {:noreply, conn}
  end
end
Enter fullscreen mode Exit fullscreen mode

Ensuring the Consumer Group Exists

Redis consumer groups must be created before XREADGROUP can be called. We do this lazily in the QueueConsumer init:

# lib/job_consumer/stream.ex
defmodule JobConsumer.Stream do
  require Logger

  @doc """
  Ensure the consumer group exists on the stream.
  XGROUP CREATE with MKSTREAM creates the stream if it doesn't exist yet.
  '$' means 'start from new messages only' (use '0' to reprocess all).
  """
  def ensure_consumer_group!(stream_key, group_name) do
    case JobConsumer.RedisPool.command(
      ["XGROUP", "CREATE", stream_key, group_name, "$", "MKSTREAM"]
    ) do
      {:ok, "OK"} ->
        Logger.info("[stream] created consumer group '#{group_name}' on '#{stream_key}'")

      {:error, %Redix.Error{message: "BUSYGROUP" <> _}} ->
        # Group already exists — this is fine, not an error
        :ok

      {:error, reason} ->
        raise "Failed to create consumer group: #{inspect(reason)}"
    end
  end

  @doc """
  Read up to `count` new messages from the stream via consumer group.
  '>' means 'give me messages not yet delivered to any consumer'.
  """
  def read_new(stream_key, group_name, consumer_name, count \\ 10) do
    JobConsumer.RedisPool.command([
      "XREADGROUP",
      "GROUP",    group_name,
      consumer_name,
      "COUNT",    Integer.to_string(count),
      "BLOCK",    "0",   # block until messages available (ms); 0 = indefinite
      "STREAMS",  stream_key,
      ">"         # deliver only undelivered messages
    ])
  end

  @doc """
  Re-claim messages that have been pending (delivered but not acknowledged)
  for longer than `min_idle_ms`. Used for crash recovery.
  """
  def reclaim_stale(stream_key, group_name, consumer_name, min_idle_ms \\ 30_000) do
    JobConsumer.RedisPool.command([
      "XAUTOCLAIM",
      stream_key,
      group_name,
      consumer_name,
      Integer.to_string(min_idle_ms),
      "0-0",      # start from beginning of PEL
      "COUNT", "100"
    ])
  end

  @doc """
  Acknowledge a message — removes it from the Pending Entry List.
  Call this only after successful processing.
  """
  def ack(stream_key, group_name, entry_id) do
    JobConsumer.RedisPool.command(["XACK", stream_key, group_name, entry_id])
  end
end
Enter fullscreen mode Exit fullscreen mode

The QueueConsumer GenServer

This is the poller. It wakes up, reads a batch of jobs from Redis, spawns a JobWorker for each via the WorkerSupervisor, and loops.

# lib/job_consumer/queue_consumer.ex
defmodule JobConsumer.QueueConsumer do
  use GenServer
  require Logger

  alias JobConsumer.{Stream, WorkerSupervisor}

  @reclaim_interval_ms 30_000  # check for stale pending entries every 30s

  # ── Public API ─────────────────────────────────────────────────────────────

  def start_link(config) do
    GenServer.start_link(__MODULE__, config, name: __MODULE__)
  end

  def status do
    GenServer.call(__MODULE__, :status)
  end

  # ── GenServer Callbacks ────────────────────────────────────────────────────

  @impl true
  def init(config) do
    stream_key     = config[:stream_key]
    group_name     = config[:consumer_group]
    consumer_name  = config[:consumer_name]
    poll_interval  = config[:poll_interval_ms]
    max_concurrent = config[:max_concurrency]

    # Ensure the consumer group exists before we start polling
    Stream.ensure_consumer_group!(stream_key, group_name)

    state = %{
      stream_key:     stream_key,
      group_name:     group_name,
      consumer_name:  consumer_name,
      poll_interval:  poll_interval,
      max_concurrent: max_concurrent,
      dispatched:     0,
      errors:         0,
    }

    # Schedule first poll immediately, then reclaim loop
    send(self(), :poll)
    Process.send_after(self(), :reclaim_stale, @reclaim_interval_ms)

    Logger.info("[consumer] started — group=#{group_name} consumer=#{consumer_name}")
    {:ok, state}
  end

  @impl true
  def handle_info(:poll, state) do
    # Backpressure: don't read more jobs than we can handle concurrently
    active_workers = WorkerSupervisor.active_count()

    new_state =
      if active_workers >= state.max_concurrent do
        Logger.debug("[consumer] at capacity (#{active_workers}/#{state.max_concurrent}), skipping poll")
        state
      else
        read_and_dispatch(state)
      end

    # Schedule next poll
    Process.send_after(self(), :poll, state.poll_interval)
    {:noreply, new_state}
  end

  @impl true
  def handle_info(:reclaim_stale, state) do
    case Stream.reclaim_stale(state.stream_key, state.group_name, state.consumer_name) do
      {:ok, [_next_id, entries, _]} when entries != [] ->
        Logger.warning("[consumer] reclaimed #{length(entries)} stale entries")
        dispatch_entries(entries, state)

      {:ok, _} ->
        :ok

      {:error, reason} ->
        Logger.error("[consumer] reclaim failed: #{inspect(reason)}")
    end

    Process.send_after(self(), :reclaim_stale, @reclaim_interval_ms)
    {:noreply, state}
  end

  @impl true
  def handle_call(:status, _from, state) do
    {:reply, Map.take(state, [:dispatched, :errors, :max_concurrent]), state}
  end

  # ── Private ────────────────────────────────────────────────────────────────

  defp read_and_dispatch(state) do
    case Stream.read_new(
      state.stream_key,
      state.group_name,
      state.consumer_name,
      state.max_concurrent
    ) do
      {:ok, [[_stream_key, entries]]} ->
        dispatch_entries(entries, state)

      {:ok, nil} ->
        # Timeout with no messages — normal
        state

      {:error, reason} ->
        Logger.error("[consumer] read error: #{inspect(reason)}")
        %{state | errors: state.errors + 1}
    end
  end

  defp dispatch_entries(entries, state) do
    Enum.reduce(entries, state, fn {entry_id, fields}, acc ->
      job = parse_job(entry_id, fields)

      case WorkerSupervisor.start_worker(job) do
        {:ok, _pid} ->
          Logger.debug("[consumer] dispatched job=#{job.id} entry=#{entry_id}")
          %{acc | dispatched: acc.dispatched + 1}

        {:error, reason} ->
          Logger.error("[consumer] dispatch failed job=#{job.id}: #{inspect(reason)}")
          %{acc | errors: acc.errors + 1}
      end
    end)
  end

  defp parse_job(entry_id, fields) do
    field_map = Enum.chunk_every(fields, 2)
                |> Enum.into(%{}, fn [k, v] -> {k, v} end)

    %{
      stream_entry_id: entry_id,
      id:              field_map["id"],
      type:            field_map["type"],
      payload:         Jason.decode!(field_map["payload"]),
      priority:        String.to_integer(field_map["priority"] || "5"),
      attempts:        String.to_integer(field_map["attempts"] || "0"),
      enqueued_at:     field_map["enqueued_at"],
    }
  end
end
Enter fullscreen mode Exit fullscreen mode

The backpressure check (active_workers >= state.max_concurrent) is critical. Without it, a burst of 10,000 jobs would spawn 10,000 GenServer processes simultaneously. With it, we cap concurrency and let Redis hold the overflow.


The WorkerSupervisor

A DynamicSupervisor that spawns JobWorker processes on demand and supervises them independently.

# lib/job_consumer/worker_supervisor.ex
defmodule JobConsumer.WorkerSupervisor do
  use DynamicSupervisor
  require Logger

  def start_link(config) do
    DynamicSupervisor.start_link(__MODULE__, config, name: __MODULE__)
  end

  @impl true
  def init(_config) do
    # :one_for_one is the only strategy DynamicSupervisor supports
    # max_restarts/max_seconds: if a worker crashes more than 3 times in 5s,
    # the supervisor itself crashes and gets restarted by Application supervisor
    DynamicSupervisor.init(
      strategy:     :one_for_one,
      max_restarts: 3,
      max_seconds:  5
    )
  end

  @doc "Spawn a supervised JobWorker for the given job map"
  def start_worker(job) do
    spec = {JobConsumer.JobWorker, job}
    DynamicSupervisor.start_child(__MODULE__, spec)
  end

  @doc "Count currently active (living) worker processes"
  def active_count do
    DynamicSupervisor.count_children(__MODULE__).active
  end

  @doc "List all active worker PIDs"
  def list_workers do
    DynamicSupervisor.which_children(__MODULE__)
    |> Enum.map(fn {_id, pid, _type, _modules} -> pid end)
  end
end
Enter fullscreen mode Exit fullscreen mode

The JobWorker GenServer

This is where the actual work happens. Each job gets its own process — isolated heap, isolated failure domain, independent retry logic.

# lib/job_consumer/job_worker.ex
defmodule JobConsumer.JobWorker do
  use GenServer, restart: :temporary  # don't auto-restart crashed workers
  require Logger

  alias JobConsumer.{Stream, DeadLetter}

  @base_retry_delay_ms 1_000
  @max_attempts        Application.compile_env(:job_consumer, :max_attempts, 3)

  # ── Public API ─────────────────────────────────────────────────────────────

  def start_link(job) do
    GenServer.start_link(__MODULE__, job)
  end

  # ── GenServer Callbacks ────────────────────────────────────────────────────

  @impl true
  def init(job) do
    # Process the job immediately after init — don't block the supervisor
    send(self(), :process)
    {:ok, job}
  end

  @impl true
  def handle_info(:process, job) do
    start_time = System.monotonic_time()

    :telemetry.execute(
      [:job_consumer, :job, :start],
      %{system_time: System.system_time()},
      %{job_type: job.type, job_id: job.id}
    )

    result =
      try do
        {:ok, execute_job(job)}
      rescue
        e -> {:error, Exception.format(:error, e, __STACKTRACE__)}
      catch
        :exit, reason -> {:error, "exit: #{inspect(reason)}"}
      end

    duration = System.monotonic_time() - start_time

    case result do
      {:ok, _output} ->
        handle_success(job, duration)

      {:error, reason} ->
        handle_failure(job, reason, duration)
    end

    # Worker is done — stop normally. The supervisor does not restart :temporary workers.
    {:stop, :normal, job}
  end

  # ── Job Dispatch ───────────────────────────────────────────────────────────

  defp execute_job(%{type: "email"} = job) do
    # Simulate: in production, call your mailer here
    %{to: to, template: template} = atomize(job.payload)
    Logger.info("[worker] sending email to=#{to} template=#{template}")
    Process.sleep(100)  # simulate I/O
    %{sent_to: to, template: template}
  end

  defp execute_job(%{type: "report"} = job) do
    %{report_id: id} = atomize(job.payload)
    Logger.info("[worker] generating report id=#{id}")
    Process.sleep(500)
    %{report_id: id, rows: :rand.uniform(10_000)}
  end

  defp execute_job(%{type: "webhook"} = job) do
    %{url: url} = atomize(job.payload)
    Logger.info("[worker] dispatching webhook to=#{url}")
    # In production: HTTP call here; raise on non-2xx for retry
    Process.sleep(200)
    %{url: url, status: 200}
  end

  defp execute_job(%{type: type}) do
    raise "Unknown job type: #{type}"
  end

  # ── Success / Failure Handling ─────────────────────────────────────────────

  defp handle_success(job, duration_native) do
    duration_ms = System.convert_time_unit(duration_native, :native, :millisecond)

    # Acknowledge the message — removes it from Redis PEL
    case Stream.ack(config(:stream_key), config(:consumer_group), job.stream_entry_id) do
      {:ok, 1} ->
        Logger.info("[worker] ✓ job=#{job.id} type=#{job.type} duration=#{duration_ms}ms")

      {:ok, 0} ->
        Logger.warning("[worker] ack returned 0 for job=#{job.id} — already acked?")

      {:error, reason} ->
        Logger.error("[worker] ack failed job=#{job.id}: #{inspect(reason)}")
    end

    :telemetry.execute(
      [:job_consumer, :job, :success],
      %{duration: duration_native},
      %{job_type: job.type, job_id: job.id}
    )
  end

  defp handle_failure(job, reason, duration_native) do
    attempts = job.attempts + 1
    duration_ms = System.convert_time_unit(duration_native, :native, :millisecond)

    Logger.error("[worker] ✗ job=#{job.id} type=#{job.type} attempt=#{attempts} reason=#{inspect(reason)}")

    :telemetry.execute(
      [:job_consumer, :job, :failure],
      %{duration: duration_native},
      %{job_type: job.type, job_id: job.id, attempt: attempts, reason: reason}
    )

    if attempts >= @max_attempts do
      # Max attempts reached — move to dead letter stream, then ack to clear PEL
      Logger.error("[worker] dead-lettering job=#{job.id} after #{attempts} attempts")
      DeadLetter.push(job, reason)
      Stream.ack(config(:stream_key), config(:consumer_group), job.stream_entry_id)
    else
      # Exponential backoff retry: re-enqueue with incremented attempts
      # We ack the current entry and re-add to the stream with updated attempts count
      delay_ms = @base_retry_delay_ms * :math.pow(2, attempts) |> round()
      Logger.info("[worker] retrying job=#{job.id} in #{delay_ms}ms (attempt #{attempts}/#{@max_attempts})")

      Stream.ack(config(:stream_key), config(:consumer_group), job.stream_entry_id)

      # Re-enqueue after delay — spawn a detached process so we don't block
      job_to_retry = %{job | attempts: attempts}
      Task.start(fn ->
        Process.sleep(delay_ms)
        re_enqueue(job_to_retry)
      end)
    end
  end

  defp re_enqueue(job) do
    fields = [
      "id",          job.id,
      "type",        job.type,
      "payload",     Jason.encode!(job.payload),
      "priority",    Integer.to_string(job.priority),
      "enqueued_at", job.enqueued_at,
      "attempts",    Integer.to_string(job.attempts),
    ]

    JobConsumer.RedisPool.command(
      ["XADD", config(:stream_key), "MAXLEN", "~", "10000", "*" | fields]
    )
  end

  defp atomize(map) do
    Map.new(map, fn {k, v} -> {String.to_existing_atom(k), v} end)
  end

  defp config(key), do: Application.fetch_env!(:job_consumer, key)
end
Enter fullscreen mode Exit fullscreen mode

The restart: :temporary option on use GenServer is essential. It tells the WorkerSupervisor not to automatically restart a worker that exits — whether normally or abnormally. We want full control over retry logic inside the worker itself. Auto-restart would bypass our backoff and dead-letter logic.


Dead Letter Queue

Jobs that exhaust retries go here for inspection, not into the void:

# lib/job_consumer/dead_letter.ex
defmodule JobConsumer.DeadLetter do
  require Logger

  @stream_key "jobs:dead"

  def push(job, reason) do
    fields = [
      "original_id",  job.id,
      "type",         job.type,
      "payload",      Jason.encode!(job.payload),
      "attempts",     Integer.to_string(job.attempts),
      "failed_at",    DateTime.utc_now() |> DateTime.to_iso8601(),
      "reason",       inspect(reason),
    ]

    case JobConsumer.RedisPool.command(["XADD", @stream_key, "*" | fields]) do
      {:ok, entry_id} ->
        Logger.info("[dead_letter] stored job=#{job.id} at entry=#{entry_id}")
        {:ok, entry_id}

      {:error, reason} ->
        Logger.error("[dead_letter] failed to store job=#{job.id}: #{inspect(reason)}")
        {:error, reason}
    end
  end

  def list(count \\ 100) do
    case JobConsumer.RedisPool.command(["XRANGE", @stream_key, "-", "+", "COUNT", Integer.to_string(count)]) do
      {:ok, entries} -> {:ok, Enum.map(entries, &parse_entry/1)}
      error -> error
    end
  end

  defp parse_entry({entry_id, fields}) do
    field_map = Enum.chunk_every(fields, 2)
                |> Enum.into(%{}, fn [k, v] -> {k, v} end)
    Map.put(field_map, "entry_id", entry_id)
  end
end
Enter fullscreen mode Exit fullscreen mode

Telemetry

Wire up metrics so you actually know what's happening:

# lib/job_consumer/telemetry.ex
defmodule JobConsumer.Telemetry do
  use GenServer
  require Logger

  def start_link(_opts) do
    GenServer.start_link(__MODULE__, [], name: __MODULE__)
  end

  @impl true
  def init(_) do
    events = [
      [:job_consumer, :job, :start],
      [:job_consumer, :job, :success],
      [:job_consumer, :job, :failure],
    ]

    :telemetry.attach_many(
      "job-consumer-logger",
      events,
      &__MODULE__.handle_event/4,
      nil
    )

    {:ok, %{processed: 0, failed: 0}}
  end

  def handle_event([:job_consumer, :job, :start], _measurements, meta, _config) do
    Logger.debug("[telemetry] job started type=#{meta.job_type} id=#{meta.job_id}")
  end

  def handle_event([:job_consumer, :job, :success], measurements, meta, _config) do
    duration_ms = System.convert_time_unit(measurements.duration, :native, :millisecond)
    Logger.info("[telemetry] job success type=#{meta.job_type} id=#{meta.job_id} duration=#{duration_ms}ms")
    # In production: emit to StatsD, Prometheus, Datadog, etc.
  end

  def handle_event([:job_consumer, :job, :failure], _measurements, meta, _config) do
    Logger.warning("[telemetry] job failure type=#{meta.job_type} id=#{meta.job_id} attempt=#{meta.attempt}")
  end
end
Enter fullscreen mode Exit fullscreen mode

Watching the Supervision Tree in Action

Start the Elixir application:

mix run --no-halt
Enter fullscreen mode Exit fullscreen mode

In another terminal, enqueue a batch of jobs:

for i in $(seq 1 20); do
  curl -s -X POST http://localhost:3000/jobs \
    -H 'Content-Type: application/json' \
    -d "{\"type\":\"email\",\"payload\":{\"to\":\"user${i}@example.com\",\"template\":\"welcome\"}}" \
    > /dev/null
done
echo "Enqueued 20 jobs"
Enter fullscreen mode Exit fullscreen mode

Observe the Elixir logs — you'll see workers spawning, processing, and acknowledging:

[consumer] dispatched job=01HX... entry=1699...-0
[worker] sending email to=user1@example.com template=welcome
[worker] ✓ job=01HX... type=email duration=103ms
[consumer] dispatched job=01HY... entry=1699...-1
...
Enter fullscreen mode Exit fullscreen mode

Now simulate a crash. In iex:

# Kill the QueueConsumer process directly
Process.whereis(JobConsumer.QueueConsumer) |> Process.exit(:kill)

# The Application supervisor restarts it automatically within milliseconds
# Watch the logs:
# [consumer] started — group=elixir-workers consumer=consumer-hostname
Enter fullscreen mode Exit fullscreen mode

The supervision tree just restarted the consumer. Any jobs that were mid-flight but not yet acknowledged are still in the Redis PEL — the reclaim_stale loop will pick them up on the next cycle.


The Failure Matrix

Failure What happens Recovery
JobWorker crashes mid-job Job stays in Redis PEL (not acked) XAUTOCLAIM reclaims after 30s
JobWorker raises exception try/rescue catches it, retry logic runs Exponential backoff, then dead letter
QueueConsumer crashes App supervisor restarts it Polls resume; PEL intact in Redis
WorkerSupervisor crashes App supervisor restarts it All workers lost; PEL covers in-flight jobs
Redis connection drops Redix auto-reconnects; pool returns errors Consumer logs errors, retries next poll
Job exceeds max_attempts Moved to jobs:dead stream, PEL cleared Manual inspection + replay
Burst of jobs Backpressure check caps concurrency Overflow sits in Redis stream safely

Every cell in that table has code behind it in what we built. None of it relies on hope.


Where to Take It Next

  • Priorities: Add a separate stream per priority level (jobs:high, jobs:normal, jobs:low). Poll high-priority first; fall through to lower streams only when high is empty.
  • Observability: Replace the Telemetry logger with a Prometheus exporter. Track queue depth (Redis XLEN), processing rate, p99 duration per job type.
  • Horizontal scaling: Run multiple Elixir nodes. Each gets a unique consumer_name. Redis consumer groups handle deduplication automatically — no coordinator needed.
  • Rate limiting: Add a RateLimiter GenServer that tracks jobs-per-second per job type and blocks the QueueConsumer dispatch when limits are hit.
  • Job cancellation: XDEL a stream entry by ID before it's claimed. Workers should check a cancellation flag at the start of execute_job.

The OTP supervision tree you have now is the skeleton that all of this hangs on. Add a new capability → add a supervised child. Something breaks → the tree heals it. That's the promise, and it's not magic — it's just processes all the way down.

Top comments (0)