nats_worker

Minimal Sneakers-like worker framework for NATS JetStream, powered by nats-pure. Designed to drop into a Rails app: put workers in app/workers, run them via bin/nats_worker or a rake task.

Installation

Add to your Gemfile:

gem "nats_worker"

Usage

# app/workers/order_worker.rb
class OrderWorker
  include NatsWorker::Worker

  from_stream "ORDERS",
    subject:  "orders.>",
    durable:  "order_worker",
    threads:  2,
    fetch:    10,
    ack_wait: 30

  def work(msg)
    payload = JSON.parse(msg.data)
    Order.create!(payload)
    # Auto-ack on success, auto-nak on raised exception.
    # You can still call `ack!(msg)`, `nack!(msg)`, `term!(msg)` manually.
  end
end

Configuration

# config/initializers/nats_worker.rb
NatsWorker.configure do |c|
  c.servers      = ENV.fetch("NATS_URL", "nats://127.0.0.1:4222").split(",")
  c.nats_options = { name: "my-app", reconnect_time_wait: 1 }
  c.logger       = Rails.logger
  c.default_threads       = 1
  c.default_fetch_size    = 10
  c.default_fetch_timeout = 5
  c.default_ack_wait      = 30
  c.shutdown_timeout      = 25
end

Running

Run all registered workers:

bundle exec nats_worker
# or
bundle exec rake nats_worker:work

Run a subset (works with both):

WORKERS=OrderWorker,PaymentWorker bundle exec nats_worker
WORKERS=OrderWorker bundle exec rake nats_worker:work

Outside of Rails, point at a custom boot file:

bundle exec nats_worker -r ./my_app.rb -w OrderWorker

Ack semantics

  • work returns normally → message is acked.
  • work raises → message is nak'd (re-delivered subject to ack_wait).
  • You may call ack!(msg) / nack!(msg) / term!(msg) explicitly; double-ack is harmless.

Signals

SIGINT / SIGTERM trigger graceful shutdown: the runner stops fetching new batches, lets in-flight work calls finish (up to shutdown_timeout seconds), drains the NATS connection and exits.

License

MIT

Tests

Unit tests:

bundle exec rspec spec/worker_spec.rb

End-to-end tests run against a live NATS server with JetStream enabled. The provided devcontainer (.devcontainer/) brings one up alongside the app container, exposing it as nats:4222. To run them:

# from the host:
cd .devcontainer && docker compose -p nats_worker up -d
docker exec -e NATS_URL=nats://nats:4222 nats_worker-app-1 \
  bash -c "cd /workspaces/nats-worker && bundle exec rspec"

If NATS_URL does not point at a reachable server, e2e specs are auto-skipped (the :integration tag is filtered out).