Module: Postburner

Defined in:
lib/postburner.rb,
lib/postburner/tube.rb,
lib/postburner/engine.rb,
lib/postburner/runner.rb,
lib/postburner/worker.rb,
lib/postburner/tracked.rb,
lib/postburner/version.rb,
lib/postburner/scheduler.rb,
app/models/postburner/job.rb,
lib/postburner/beanstalkd.rb,
lib/postburner/connection.rb,
lib/postburner/test_helpers.rb,
lib/postburner/time_helpers.rb,
app/models/postburner/mailer.rb,
lib/postburner/advisory_lock.rb,
lib/postburner/configuration.rb,
app/models/postburner/schedule.rb,
lib/postburner/instrumentation.rb,
app/concerns/postburner/logging.rb,
app/concerns/postburner/commands.rb,
app/concerns/postburner/callbacks.rb,
app/concerns/postburner/execution.rb,
app/concerns/postburner/insertion.rb,
app/models/postburner/tracked_job.rb,
lib/postburner/active_job/payload.rb,
app/concerns/postburner/properties.rb,
app/concerns/postburner/statistics.rb,
app/models/postburner/orphaned_job.rb,
app/jobs/postburner/application_job.rb,
lib/postburner/active_job/execution.rb,
lib/postburner/strategies/null_queue.rb,
lib/postburner/strategies/strict_queue.rb,
lib/postburner/strategies/default_queue.rb,
app/models/postburner/application_record.rb,
app/models/postburner/schedule_execution.rb,
app/helpers/postburner/application_helper.rb,
app/mailers/postburner/application_mailer.rb,
app/controllers/postburner/jobs_controller.rb,
lib/postburner/strategies/inline_test_queue.rb,
app/controllers/postburner/static_controller.rb,
lib/postburner/strategies/time_travel_test_queue.rb,
app/controllers/postburner/application_controller.rb

Overview

Postburner - PostgreSQL-backed job queue system built on Beanstalkd.

Postburner is a Ruby on Rails Engine that provides a database-backed job queue with full audit trails, inspection capabilities, and multiple execution strategies. Every job is stored as an ActiveRecord model, enabling database queries, foreign key relationships, and comprehensive statistics tracking.

## Core Concepts

  • Jobs: Subclass Job and implement ‘perform` method

  • **Queue Strategies:** Control how jobs are executed (async, inline, test modes)

  • **Beanstalkd Integration:** Production queuing via Beanstalkd

  • **Database Persistence:** Full audit trail with timestamps, logs, and errors

  • Callbacks: ActiveJob-style lifecycle hooks (enqueue, attempt, processing, processed)

## Queue Strategies

Postburner uses a strategy pattern to control job execution:

  • **DefaultQueue** (default): Async via Beanstalkd, gracefully handles premature execution

  • **StrictQueue**: Async via Beanstalkd, strict premature execution errors

  • **InlineTestQueue**: Inline/synchronous, requires explicit time travel for scheduled jobs

  • **TimeTravelTestQueue**: Inline/synchronous with automatic time travel

  • **NullQueue**: Creates jobs without queueing, manual execution with time travel

## Auto-Detection

Postburner automatically detects Rails test mode and switches to InlineTestQueue when:

  • ‘Rails.env.test?` is true

  • ‘ActiveJob::Base.queue_adapter_name == :test`

## Usage

Examples:

Creating and queueing a job

class ProcessPayment < Postburner::Job
  queue 'critical'
  queue_priority 0

  def perform(args)
    payment = Payment.find(args['payment_id'])
    payment.process!
    log "Payment #{payment.id} processed"
  end
end

job = ProcessPayment.create!(args: { 'payment_id' => 123 })
job.queue!(delay: 1.hour)

Switching queue strategies

# Production with graceful premature handling (default)
Postburner.nice_async_strategy!

# Production with strict premature errors
Postburner.async_strategy!

# Test mode with explicit time control
Postburner.inline_test_strategy!

# Test mode with automatic time travel
Postburner.inline_immediate_test_strategy!

Using Beanstalkd connection

Postburner.connected do |conn|
  conn.tubes['my.tube'].stats
  conn.tubes['my.tube'].kick(3)
end

Checking if in test mode

if Postburner.testing?
  # Jobs execute inline
else
  # Jobs queued to Beanstalkd
end

See Also:

Defined Under Namespace

Modules: ActiveJob, ApplicationHelper, Beanstalkd, Callbacks, Commands, Execution, Insertion, Instrumentation, Logging, Properties, Statistics, TestHelpers, TimeHelpers, Tracked Classes: AdvisoryLock, ApplicationController, ApplicationJob, ApplicationMailer, ApplicationRecord, Configuration, Connection, DefaultQueue, Engine, InlineTestQueue, InstallGenerator, Job, JobsController, Mailer, NullQueue, OrphanedJob, Runner, Schedule, ScheduleExecution, Scheduler, StaticController, StrictQueue, TimeTravelTestQueue, TrackedJob, Tube, Worker

Constant Summary collapse

VERSION =
'1.0.0.rc.4'
TestQueue =

Backward compatibility alias

InlineTestQueue
ImmediateTestQueue =

Backward compatibility alias

TimeTravelTestQueue

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Attribute Details

#queue_strategyClass

The current queue strategy class used for job execution.

Defaults to DefaultQueue for production, auto-switches to InlineTestQueue in Rails test environment.

Examples:

Postburner.queue_strategy
# => Postburner::DefaultQueue

Returns:

  • (Class)

    One of StrictQueue, DefaultQueue, InlineTestQueue, or TimeTravelTestQueue

See Also:

  • #default_strategy!
  • #strict_strategy!
  • #inline_test_strategy!
  • #time_travel_test_strategy!


119
# File 'lib/postburner.rb', line 119

mattr_accessor :queue_strategy

Class Method Details

.clear_all!(silent: false) ⇒ Hash

Clears all configured tubes including scheduler.

Convenience method that clears all watched tubes plus the scheduler tube in a single call. Use this to completely reset Postburner’s Beanstalkd state, such as during test setup or when recovering from a stuck state.

Equivalent to:

Postburner.clear_jobs!(Postburner.watched_tube_names + [Postburner.scheduler_tube_name])

Examples:

Clear everything (interactive/console use)

Postburner.clear_all!

Silent mode (test setup or programmatic use)

Postburner.clear_all!(silent: true)

Parameters:

  • silent (Boolean) (defaults to: false)

    If true, suppress output to stdout (default: false)

Returns:

  • (Hash)

    Statistics and results (see Connection#clear_tubes!)

See Also:

  • #clear_jobs!
  • #watched_tube_names
  • #scheduler_tube_name


484
485
486
487
# File 'lib/postburner.rb', line 484

def self.clear_all!(silent: false)
  all_tubes = watched_tube_names + [scheduler_tube_name]
  clear_jobs!(all_tubes, silent: silent)
end

.clear_jobs!(tube_names = nil, silent: false) ⇒ Hash

Clears jobs from specified tubes or shows stats for all tubes.

High-level method with formatted output. Delegates to Connection#clear_tubes! for the actual work, then pretty-prints the results.

SAFETY: Only allows clearing tubes that are defined in the loaded configuration. This prevents accidentally clearing tubes from other applications or environments sharing the same Beanstalkd server.

Examples:

Show stats only (no clearing) - SAFE

Postburner.clear_jobs!
# Shows stats for ALL tubes on Beanstalkd, but doesn't clear anything

Clear watched tubes only - SAFE

Postburner.clear_jobs!(Postburner.watched_tube_names)
# Only clears tubes defined in your config

Clear scheduler tube (single string)

Postburner.clear_jobs!(Postburner.scheduler_tube_name)
# Clears the scheduler watchdog tube

Clear watched tubes AND scheduler tube

Postburner.clear_jobs!(Postburner.watched_tube_names + [Postburner.scheduler_tube_name])
# Or use the convenience method:
Postburner.clear_all!

Trying to clear unconfigured tube - RAISES ERROR

Postburner.clear_jobs!(['some-other-app-tube'])
# => ArgumentError: Cannot clear tubes not in configuration

Silent mode (programmatic use)

result = Postburner.clear_jobs!(Postburner.watched_tube_names, silent: true)
result[:totals][:total]  # => 42

Parameters:

  • tube_names (Array<String>, String, nil) (defaults to: nil)

    Tube name(s) to clear, or nil to only show stats

  • silent (Boolean) (defaults to: false)

    If true, suppress output to stdout (default: false)

Returns:

  • (Hash)

    Statistics and results (see Connection#clear_tubes!)

Raises:

  • (ArgumentError)

    if tube_names contains tubes not in watched_tube_names

See Also:



448
449
450
451
452
453
454
455
456
457
458
459
# File 'lib/postburner.rb', line 448

def self.clear_jobs!(tube_names = nil, silent: false)
  require 'json'

  tube_names = Array(tube_names) if tube_names
  result = connection.clear_tubes!(tube_names)

  unless silent
    puts JSON.pretty_generate(result)
  end

  result
end

.configurationConfiguration

Returns the global configuration instance.

Returns:



276
277
278
# File 'lib/postburner/configuration.rb', line 276

def self.configuration
  @configuration ||= Configuration.new
end

.configuration=(config) ⇒ Configuration

Sets the global configuration instance.

Parameters:

Returns:



285
286
287
# File 'lib/postburner/configuration.rb', line 285

def self.configuration=(config)
  @configuration = config
end

.configure {|config| ... } ⇒ void

This method returns an undefined value.

Configures Postburner via block.

Examples:

Postburner.configure do |config|
  config.beanstalk_url = 'beanstalk://localhost:11300'
  config.worker_config = { name: 'default', queues: ['default'], forks: 2, threads: 10 }
end

Yields:

  • (config)

    Configuration instance

Yield Parameters:



302
303
304
# File 'lib/postburner/configuration.rb', line 302

def self.configure
  yield(configuration)
end

.connectedPostburner::Connection .connected({|conn| ... }) {|conn| ... } ⇒ Object

Yields a Beanstalkd connection or returns cached connection.

When called with a block, yields the thread-local connection. When called without a block, returns the thread-local connection.

Examples:

With block (recommended)

Postburner.connected do |conn|
  conn.tubes.to_a.each do |tube|
    puts tube.name
  end
end

Without block

conn = Postburner.connected
conn.tubes['my.tube'].stats

Direct tube operations

Postburner.connected do |conn|
  conn.tubes['critical'].kick(10)
  conn.tubes['background'].stats
end

Overloads:

  • .connectedPostburner::Connection

    Returns the cached Beanstalkd connection.

    Returns:

  • .connected({|conn| ... }) {|conn| ... } ⇒ Object

    Yields connection for the duration of the block.

    Yield Parameters:

    Returns:

    • (Object)

      return value of the block

See Also:

  • #connection
  • #disconnect_all!


368
369
370
371
372
373
374
# File 'lib/postburner.rb', line 368

def self.connected
  if block_given?
    yield connection
  else
    connection
  end
end

.connectionPostburner::Connection

Note:

Connection is cached in Thread.current (thread-local)

Note:

Automatically reconnects if connection is not active

Returns a cached Beanstalkd connection.

Creates a new Connection using the configured Beanstalkd URL and caches it. Automatically reconnects if the connection is stale.

For most use cases, prefer #connected which handles connection cleanup.

Examples:

conn = Postburner.connection
conn.tubes['my.tube'].stats

Returns:

Raises:

  • (Beaneater::NotConnected)

    if connection fails

See Also:

  • #connected


327
328
329
330
331
332
# File 'lib/postburner.rb', line 327

def self.connection
  Thread.current[:postburner_connection] ||= Postburner::Connection.new
  conn = Thread.current[:postburner_connection]
  conn.reconnect! unless conn.connected?
  conn
end

.default_strategy!void

Note:

This is the DEFAULT strategy

Note:

Requires Beanstalkd server running

This method returns an undefined value.

Activates default production mode with graceful premature handling.

Sets queue strategy to DefaultQueue (the default), which queues jobs to Beanstalkd and gracefully handles premature execution by re-inserting jobs with appropriate delay.

This is the recommended production strategy and is set by default on initialization.

Examples:

Postburner.default_strategy!
job = MyJob.create!(args: {})
job.queue!(delay: 1.hour)
# Queued to Beanstalkd, automatically re-inserted if picked up early

See Also:



242
243
244
# File 'lib/postburner.rb', line 242

def self.default_strategy!
  self.queue_strategy = Postburner::DefaultQueue
end

.disconnect_all!void

This method returns an undefined value.

Closes and clears Beanstalkd connections on all known threads.

Call this on worker shutdown to release sockets cleanly. Because connections are thread-local, a single ‘connection.close` call from the main thread would only close that thread’s socket. This method iterates every live thread and closes each thread’s connection, if any.

Examples:

Puma worker shutdown (config/puma.rb)

on_worker_shutdown do
  Postburner.disconnect_all!
end


390
391
392
393
394
395
396
397
398
399
400
401
402
# File 'lib/postburner.rb', line 390

def self.disconnect_all!
  Thread.list.each do |thread|
    if (conn = thread[:postburner_connection])
      begin
        conn.close if conn.respond_to?(:close) && conn.connected?
      rescue => e
        warn "Postburner: failed to close connection on shutdown: #{e.message}" if $VERBOSE
      ensure
        thread[:postburner_connection] = nil
      end
    end
  end
end

.inline_test_strategy!void

This method returns an undefined value.

Activates strict test mode with inline job execution.

Sets queue strategy to InlineTestQueue, which executes jobs synchronously without Beanstalkd. Scheduled jobs (with future run_at) raise Postburner::Job::PrematurePerform exception, forcing explicit time management with ‘travel_to`.

Use this strategy when you want explicit control over time progression in tests and want to catch scheduling bugs by failing loudly.

Examples:

Postburner.inline_test_strategy!
job = MyJob.create!(args: {})
job.queue!  # Executes immediately
assert job.reload.processed_at

With scheduled jobs

Postburner.inline_test_strategy!
job = MyJob.create!(args: {})
job.queue!(delay: 1.hour)
# Raises PrematurePerform - use travel_to instead

See Also:



159
160
161
# File 'lib/postburner.rb', line 159

def self.inline_test_strategy!
  self.queue_strategy = Postburner::InlineTestQueue
end

.null_strategy!void

Note:

Jobs execute when manually triggered via Postburner::Job.perform

Note:

Does not require Beanstalkd to be running

Note:

Useful for batch processing and deferred execution patterns

This method returns an undefined value.

Activates null mode for creating jobs without queueing to Beanstalkd.

Sets queue strategy to NullQueue, which creates job records in the database but does NOT queue them to Beanstalkd. Jobs can be executed later by manually calling Postburner::NullQueue.handle_perform!, which includes automatic time travel for scheduled jobs.

Use this strategy for deferred batch processing, conditional execution, or scenarios where you want to create jobs in advance and execute them manually on-demand.

Examples:

Create job without queueing

Postburner.null_strategy!
job = MyJob.create!(args: {})
job.queue!
# Job created but NOT queued to Beanstalkd
job.bkid  # => nil

Manually execute later

Postburner.null_strategy!
job = MyJob.create!(args: {})
job.queue!(delay: 1.hour)
# Later, manually execute with automatic time travel
Postburner::Job.perform(job.id)
# Job executes as if it's 1 hour in the future

See Also:



281
282
283
# File 'lib/postburner.rb', line 281

def self.null_strategy!
  self.queue_strategy = Postburner::NullQueue
end

.scheduler_tube_nameString

Returns the scheduler tube name with environment prefix.

Examples:

Postburner.scheduler_tube_name
# => 'postburner.production.scheduler'

Clear scheduler tube

Postburner.clear_jobs!(Postburner.scheduler_tube_name)

Returns:

  • (String)

    Expanded scheduler tube name



527
528
529
# File 'lib/postburner.rb', line 527

def self.scheduler_tube_name
  configuration.scheduler_tube_name
end

.stats(tube_names = nil) ⇒ Hash

Note:

Beanstalkd tubes are created lazily - they only exist when jobs have been put into them. Tubes that don’t exist yet are silently skipped and won’t appear in the results. This means configured queues (e.g., ‘default’, ‘mailers’) won’t appear in stats until at least one job has been enqueued to them.

Returns detailed statistics about Beanstalkd tubes.

Collects job counts (ready, delayed, buried, reserved) for each tube and provides aggregate totals across all tubes.

Examples:

Get stats for all tubes

stats = Postburner.stats
stats[:totals][:total]  # => 42
stats[:tubes].first[:name]  # => "default"

Get stats for specific tubes

stats = Postburner.stats(Postburner.watched_tube_names)
stats[:tubes].size  # => 3 (only tubes that exist)

Understanding lazy tube creation

# Fresh Beanstalkd with no jobs queued yet:
Postburner.stats(Postburner.watched_tube_names)
# => { tubes: [], totals: { ready: 0, delayed: 0, ... } }

# After queueing a job to 'default':
MyJob.perform_later(123)
Postburner.stats(Postburner.watched_tube_names)
# => { tubes: [{ name: "postburner.development.default", ready: 1, ... }], ... }

Parameters:

  • tube_names (Array<String>, nil) (defaults to: nil)

    Specific tube names to inspect, or nil for all tubes

Returns:

  • (Hash)

    Statistics hash with keys:

    • tubes: Array of hashes with per-tube stats (name, ready, delayed, buried, reserved, total)

    • totals: Hash with aggregated counts across all tubes

Raises:

  • (Beaneater::NotConnected)

    if connection to Beanstalkd fails



587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
# File 'lib/postburner.rb', line 587

def self.stats(tube_names = nil)
  connected do |conn|
    # Get tubes to inspect
    tubes_to_inspect = if tube_names&.any?
      tube_names.map { |name| conn.tubes[name] }
    else
      conn.beanstalk.tubes.all
    end

    result = {
      tubes: [],
      totals: {
        ready: 0,
        delayed: 0,
        buried: 0,
        reserved: 0,
        total: 0
      }
    }

    # Collect stats from each tube
    tubes_to_inspect.each do |tube|
      begin
        stats = tube.stats
        # Note: beaneater transforms hyphenated beanstalkd stats to underscores

        tube_data = {
          name: tube.name,
          ready: stats.current_jobs_ready || 0,
          delayed: stats.current_jobs_delayed || 0,
          buried: stats.current_jobs_buried || 0,
          reserved: stats.current_jobs_reserved || 0,
          total: (stats.current_jobs_ready || 0) +
                 (stats.current_jobs_delayed || 0) +
                 (stats.current_jobs_buried || 0) +
                 (stats.current_jobs_reserved || 0)
        }
      rescue Beaneater::NotFoundError
        # Tube doesn't exist yet, skip it
        next
      end

      result[:tubes] << tube_data

      # Aggregate totals
      result[:totals][:ready] += tube_data[:ready]
      result[:totals][:delayed] += tube_data[:delayed]
      result[:totals][:buried] += tube_data[:buried]
      result[:totals][:reserved] += tube_data[:reserved]
      result[:totals][:total] += tube_data[:total]
    end

    result
  end
end

.strict_strategy!void

Note:

Requires Beanstalkd server running

Note:

NOT the default production strategy

This method returns an undefined value.

Activates strict production mode with asynchronous job execution.

Sets queue strategy to StrictQueue, which queues jobs to Beanstalkd and raises Postburner::Job::PrematurePerform if a job is executed before its scheduled run_at time.

Use this strategy for production debugging or when you want strict enforcement of scheduling. For most production use cases, prefer #default_strategy! instead.

Examples:

Postburner.strict_strategy!
job = MyJob.create!(args: {})
job.queue!(delay: 1.hour)
# Queued to Beanstalkd, will raise if picked up early

See Also:



215
216
217
# File 'lib/postburner.rb', line 215

def self.strict_strategy!
  self.queue_strategy = Postburner::StrictQueue
end

.testing?Boolean

Checks if currently using a test queue strategy.

Returns true if the current queue strategy is InlineTestQueue or TimeTravelTestQueue, indicating that jobs execute inline/synchronously without Beanstalkd.

Examples:

Postburner.nice_async_strategy!
Postburner.testing?  # => false

Postburner.inline_test_strategy!
Postburner.testing?  # => true

Returns:

  • (Boolean)

    true if using test strategy, false if using production strategy

See Also:



303
304
305
# File 'lib/postburner.rb', line 303

def self.testing?
  queue_strategy.testing
end

.time_travel_test_strategy!void

Note:

Jobs execute in queue order, not scheduled time order

Note:

Requires ActiveSupport::Testing::TimeHelpers

This method returns an undefined value.

Activates test mode with automatic time travel for scheduled jobs.

Sets queue strategy to TimeTravelTestQueue, which executes jobs synchronously and automatically uses time travel for jobs with future run_at timestamps.

Use this strategy for integration/feature tests where you want convenience over explicit time control.

Examples:

Postburner.time_travel_test_strategy!
job = MyJob.create!(args: {})
job.queue!(delay: 1.hour)
# Automatically travels to scheduled time and executes
assert job.reload.processed_at

See Also:



187
188
189
# File 'lib/postburner.rb', line 187

def self.time_travel_test_strategy!
  self.queue_strategy = Postburner::TimeTravelTestQueue
end

.tube_prefix(env = nil) ⇒ String

Returns the Beanstalkd tube name prefix for the given environment.

Delegates to Postburner::Configuration#tube_prefix. Postburner automatically prefixes all queue names with this value.

Examples:

Postburner.tube_prefix('production')  # => "postburner.production"
Postburner.tube_prefix                # => "postburner.test" (in test env)

Parameters:

  • env (String, Symbol, nil) (defaults to: nil)

    Environment name (defaults to Rails.env or nil)

Returns:

  • (String)

    Tube prefix (e.g., “postburner.production”)

See Also:



546
547
548
# File 'lib/postburner.rb', line 546

def self.tube_prefix(env = nil)
  configuration.tube_prefix(env)
end

.watched_tube_namesArray<String>

Returns array of watched tube names with environment prefix.

Expands configured queue names to full tube names and memoizes the result.

Examples:

Postburner.watched_tube_names
# => ['postburner.production.default', 'postburner.production.critical']

Returns:

  • (Array<String>)

    Array of expanded tube names



499
500
501
# File 'lib/postburner.rb', line 499

def self.watched_tube_names
  @__watched_tube_names ||= configuration.queue_names.map { |q| configuration.expand_tube_name(q) }
end

.watched_tubesArray<Beaneater::Tube>

Returns array of watched Beaneater::Tube instances.

Creates Beaneater tube instances for all configured queues and memoizes the result.

Examples:

Postburner.watched_tubes.each { |tube| puts tube.stats }

Returns:

  • (Array<Beaneater::Tube>)

    Array of tube instances



512
513
514
# File 'lib/postburner.rb', line 512

def self.watched_tubes
  @__watched_tubes ||= watched_tube_names.map { |tube_name| connection.tubes[tube_name] }
end