🚀 Cosmonats - lightweight background and stream processing

It is a Ruby background job and stream processing framework powered by NATS JetStream. It provides a familiar API for job queues while adding powerful stream processing capabilities, solving the scalability limitations of Redis and database-backed queues through true horizontal scaling and disk-backed persistence.

logo.svg

📖 Index

🎯 Why?

Among many others, why creating another? Cosmonats is a background processing framework for Ruby, powered by NATS. It's designed to solve the fundamental scaling problems that plague Redis/DB-based job queues and at the same time to provide both job and stream processing capabilities.

The Problem with Redis at Scale

  • Single-threaded command processing - All operations serialized, creating contention with many workers
  • Memory-only persistence - Everything must fit in RAM, expensive to scale
  • Vertical scaling only - Can't truly distribute a single queue across nodes
  • Polling overhead - Thousands of blocked connections
  • No native backpressure - Queues can grow unbounded
  • Weak durability - Async replication can lose jobs during failures

Note: Alternatives like Dragonfly solve the threading bottleneck but still face memory/scaling limitations.

The Problem with RDBMS at Scale

  • Database contention - Polling queries compete with application queries for resources
  • Connection pool pressure - Workers consume database connections, starving the application
  • Row-level locking overhead - SELECT FOR UPDATE SKIP LOCKED still scans rows under high concurrency
  • Vacuum/autovacuum impact - High-churn job tables degrade database performance
  • Vertical scaling only - Limited by single database instance capabilities
  • Index bloat - High UPDATE/DELETE volume causes index degradation over time
  • Table bloat - Constant row updates fragment tables, requiring maintenance
  • LISTEN/NOTIFY limitations - 8KB payload limit, no persistence, breaks down at high volumes (10K+ notifications/sec)
  • No native horizontal scaling - Cannot distribute a single job queue across multiple database nodes

Note: Solutions using DB might be ok for moderate workloads but face these fundamental limitations at higher scales.

The Solution

Built on NATS, cosmonats provides:

True horizontal scaling - Distribute streams across cluster nodes
Disk-backed persistence - TB-scale queues with memory cache
Replicated acknowledgments - Survive multi-node failures
Built-in flow control - Automatic backpressure
Multi-DC support - Native geo-distribution, and super clusters
High throughput & low latency - Millions of messages per second
Stream processing - Beyond simple job queues

✨ Features

🎪 Job Processing

  • Familiar compatible API - Easy migration from existing codebases
  • Priority queues - Multiple priority levels (critical, high, default, low)
  • Scheduled jobs - Execute jobs at specific times or after delays
  • Automatic retries - Configurable retry strategies with exponential backoff
  • Dead letter queue - Capture permanently failed jobs
  • Job uniqueness - Prevent duplicate job execution

🌊 Stream Processing

  • Real-time data streams - Process continuous event streams
  • Batch processing - Handle multiple messages efficiently
  • Message replay - Reprocess messages from any point in time
  • Consumer groups - Multiple consumers with load balancing
  • Exactly-once semantics - With proper configuration
  • Custom serialization - JSON, MessagePack, Protobuf support

📦 Installation

# Gemfile
gem "cosmonats"

Requirements: Ruby 3.1.0+, NATS Server (installation guide)

Add these lines to config/routes.rb:

require "cosmo/web"

Rails.application.routes.draw do
  mount Cosmo::Web => "/cosmo" # access web UI at http://localhost:3000/cosmo
  ...
end

🚀 Quick Start

1. Create config/cosmo.yml and run bundle exec cosmo -S to create streams in NATS:

concurrency: 5
max_retries: 3

consumers:
  jobs:
    default:
      ack_policy: explicit
      max_deliver: 10
      max_ack_pending: 10
      ack_wait: 15
      subject: jobs.%{name}.>

setup:
  jobs:
    default:
      storage: file
      retention: workqueue
      subjects: ["jobs.%{name}.>"]
      allow_direct: true

2. Create a Job in app/workers

class SendEmailJob
  include Cosmo::Job

  options stream: :default, retry: 3, dead: true

  def perform(user_id, email_type)
    # Pretend to send email to user: UserMailer.send(email_type, user_id).deliver_now
    sleep 0.5 # Simulate work
    puts "#{user_id}, #{email_type}"
  end
end

3. Enqueue Jobs

10.times { |i| SendEmailJob.perform_async(i, "welcome") }

4. Run

bundle exec cosmo jobs

💡 Core Concepts

Jobs

Simple background tasks with a familiar API:

class ReportJob
  include Cosmo::Job

  options(
    stream: :critical,  # Stream name
    retry: 5,           # Retry attempts
    dead: true          # Use dead letter queue
  )

  def perform(report_id)
    logger.info "Processing report #{report_id}"
    Report.find(report_id).generate!
  rescue StandardError => e
    logger.error "Failed: #{e.message}"
    raise  # Triggers retry
  end
end

# Usage
ReportJob.perform_async(42)                              # Enqueue now
ReportJob.perform_in(30.minutes, 42)                     # Delayed
ReportJob.perform_at(Time.parse("2026-01-25 10:00"), 42) # Scheduled

Streams

Real-time event processing with powerful features:

class ClicksProcessor
  include Cosmo::Stream

  options(
    stream: :clickstream,
    batch_size: 100,
    start_position: :last,  # :first, :last, :new, or timestamp
    consumer: {
      ack_policy: "explicit",
      max_deliver: 3,
      max_ack_pending: 100,
      subjects: ["events.clicks.>"]
    }
  )

  # Process one message
  def process_one
    data = message.data
    Analytics.track_click(data)
    message.ack  # Success
  end

  # OR process batch
  def process(messages)
    Analytics.track_click(messages.map(&:data))
    messages.each(&:ack)
  end
end

# Publishing
ClicksProcessor.publish(
  { user_id: 123, page: "/home" },
  subject: "events.clicks.homepage"
)

# Message acknowledgment strategies
message.ack                          # Success
message.nack(delay: 5_000_000_000)   # Retry (5 seconds in nanoseconds)
message.term                         # Permanent failure, no retry

Configuration

File-based (config/cosmo.yml):

timeout: 25                 # Shutdown timeout in seconds
concurrency: &concurrency 1 # Number of worker threads
max_retries: &max_retries 3 # Default max retries

stream_config: &stream_config
  storage: file         # storage type (file or memory)
  retention: workqueue  # retention policy (limits, interest, workqueue)
  duplicate_window: 120 # time window for duplicate message detection in seconds
  discard: old          # discard new messages when stream is full (discard new or old)
  allow_direct: true    # allow direct messages to stream, required for web UI
  subjects:
    - jobs.%{name}.>    # subject pattern for stream, %{name} will be replaced with stream name

consumer_config: &consumer_config
  ack_policy: explicit    # ack policy (explicit, none, all), each individual message must be acknowledged
  max_deliver: 10         # maximum number of times a message will be delivered before it's considered failed
  max_ack_pending: 20     # maximum number of messages with pending ack for this consumer
  ack_wait: 60            # time in seconds to wait for an ack before redelivering the message
  subject: jobs.%{name}.> # subject pattern for consumer, %{name} will be replaced with stream name

consumers:
  jobs:
    critical:
      <<: *consumer_config
      priority: 50
    high:
      <<: *consumer_config
      priority: 30
    default:
      <<: *consumer_config
      priority: 15
    low:
      <<: *consumer_config
      priority: 5
    scheduled:
      <<: *consumer_config
      max_deliver: 1
      max_ack_pending: 100
      ack_wait: 10

setup:
  jobs:
    critical:
      <<: *stream_config
      description: Very critical priority jobs
    high:
      <<: *stream_config
      description: Higher priority jobs
    default:
      <<: *stream_config
      description: Default priority jobs
    low:
      <<: *stream_config
      description: Lower priority jobs
    scheduled:
      <<: *stream_config
      description: Scheduled jobs
    dead:
      <<: *stream_config
      retention: limits
      max_msgs: 10000
      max_age: 604800 # 7d
      description: Broken jobs (DLQ)

development:
  verbose: false
  concurrency: *concurrency

staging:
  verbose: true
  concurrency: 3

production:
  concurrency: 3

Programmatic:

Cosmo::Config.set(:concurrency, 20)
Cosmo::Config.set(:setup, :streams, :custom, { storage: "file", subjects: ["custom.>"] })

Environment variables:

export NATS_URL=nats://localhost:4222
export COSMO_JOBS_FETCH_TIMEOUT=0.1
export COSMO_STREAMS_FETCH_TIMEOUT=0.1

🔧 Advanced Usage

Priority Queues:

class UrgentJob
  include Cosmo::Job
  options stream: :critical  # priority: 50 in config
end

# config/cosmo.yml
consumers:
  jobs:
    critical:
      priority: 50  # Polled more frequently
    default:
      priority: 15

Custom Serializers:

module MessagePackSerializer
  def self.serialize(data)
    MessagePack.pack(data)
  end

  def self.deserialize(payload)
    MessagePack.unpack(payload)
  end
end

class FastStream
  include Cosmo::Stream
  options publisher: { serializer: MessagePackSerializer }
end

Error Handling:

class ResilientJob
  include Cosmo::Job
  options retry: 5, dead: true

  def perform(data)
    process_data(data)
  rescue RetryableError => e
    logger.warn "Retryable: #{e.message}"
    raise  # Will retry
  rescue FatalError => e
    logger.error "Fatal: #{e.message}"
    # Don't raise - won't retry
  end
end

Testing:

# Synchronous execution
SendEmailJob.perform_sync(123, "test")

# Test job creation
jid = SendEmailJob.perform_async(123, "welcome")
assert_kind_of String, jid

🖥️ CLI Reference

# Setup streams
cosmo -C config/cosmo.yml --setup

# Run processors
cosmo -C config/cosmo.yml -c 20 -r ./app/jobs jobs     # Jobs only
cosmo -C config/cosmo.yml -c 20 streams                # Streams only
cosmo -C config/cosmo.yml -c 20                        # Both

Common Flags:

Flag Description Example
-C, --config PATH Config file path -C config/cosmo.yml
-c, --concurrency INT Worker threads -c 20
-r, --require PATH Auto-require directory -r ./app/jobs
-t, --timeout NUM Shutdown timeout (sec) -t 60
-S, --setup Setup streams & exit --setup

🚢 Deployment

NATS Cluster:

# nats-server.conf
port: 4222
jetstream {
  store_dir: /var/lib/nats
  max_file: 10G
}
cluster {
  name: cosmo-cluster
  listen: 0.0.0.0:6222
  routes: [nats://nats-2:6222, nats://nats-3:6222]
}

Docker Compose:

services:
  nats:
    image: nats:latest
    command: -js -c /etc/nats/nats-server.conf
    volumes:
      - ./nats.conf:/etc/nats/nats-server.conf
      - nats-data:/var/lib/nats

  worker:
    build: .
    environment:
      NATS_URL: nats://nats:4222
    command: bundle exec cosmo -C config/cosmo.yml -c 20 jobs
    deploy:
      replicas: 3

Systemd Service:

# /etc/systemd/system/cosmo.service
[Unit]
Description=Cosmo Background Processor
After=network.target

[Service]
Type=simple
User=deploy
WorkingDirectory=/var/www/myapp
Environment=RAILS_ENV=production
Environment=NATS_URL=nats://localhost:4222
ExecStart=/usr/local/bin/bundle exec cosmo -C config/cosmo.yml -c 20 jobs
Restart=always
RestartSec=10
StandardOutput=syslog
StandardError=syslog
SyslogIdentifier=cosmo

[Install]
WantedBy=multi-user.target

Enable and start:

sudo systemctl enable cosmo
sudo systemctl start cosmo
sudo systemctl status cosmo

📊 Monitoring

Structured Logging:

2026-01-23T10:15:30.123Z INFO pid=12345 tid=abc jid=def: start
2026-01-23T10:15:32.456Z INFO pid=12345 tid=abc jid=def elapsed=2.333: done

Stream Metrics:

client = Cosmo::Client.instance
info = client.stream_info("default")

info.state.messages       # Total messages
info.state.bytes          # Total bytes
info.state.consumer_count # Number of consumers

Prometheus: NATS exposes metrics at :8222/metrics

  • jetstream_server_store_msgs - Messages in stream
  • jetstream_consumer_delivered_msgs - Delivered messages
  • jetstream_consumer_ack_pending - Pending acknowledgments

💼 Examples

Email Queue:

class EmailJob
  include Cosmo::Job
  options stream: :default, retry: 3

  def perform(user_id, template)
    user = User.find(user_id)
    EmailService.send(user.email, template)
  end
end

EmailJob.perform_async(123, "welcome")
EmailJob.perform_in(1.day, 123, "followup")

Image Processing Pipeline:

class ImageProcessor
  include Cosmo::Stream
  options(
    stream: :images,
    consumer: { subjects: ["images.uploaded.>"] }
  )

  def process_one
    processed = ImageService.process(message.data["url"])
    publish(processed, subject: "images.processed.optimized")
    message.ack
  rescue => e
    logger.error "Processing failed: #{e.message}"
    message.nack(delay: 30_000_000_000)
  end
end

ImageProcessor.publish({ url: "https://example.com/image.jpg" }, subject: "images.uploaded.user")

Real-Time Analytics:

class AnalyticsAggregator
  include Cosmo::Stream
  options batch_size: 1000, consumer: { subjects: ["events.*.>"] }

  def process(messages)
    events = messages.map(&:data)
    aggregates = events.group_by { |e| e["type"] }.transform_values(&:count)
    Analytics.bulk_insert(aggregates)
    messages.each(&:ack)
  end
end
**Made with ❤️ for Ruby** *Blast off Cosmonats! 🚀*