Class: Postburner::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/postburner/worker.rb

Overview

Puma-style worker with configurable forks and threads.

Processes jobs from Beanstalkd queues using a scalable architecture that adapts from development to production. Workers can run in single-process mode (forks: 0) or multi-process mode (forks: 1+), each with configurable thread pools.

## Architecture

### Single Process Mode (forks: 0)

Main Process
└─ Thread Pool (N threads watching all queues)

### Multi-Process Mode (forks: 1+)

Parent Process
├─ Fork 0
   └─ Thread Pool (N threads watching all queues)
├─ Fork 1
   └─ Thread Pool (N threads watching all queues)
└─ Fork 2
    └─ Thread Pool (N threads watching all queues)

## Scaling Strategy

Development (forks: 0, threads: 1): Single-threaded, single-process for simplest debugging.

Staging (forks: 0, threads: 10): Multi-threaded, single-process for moderate concurrency.

Production (forks: 4, threads: 10): 4 processes × 10 threads = 40 concurrent jobs.

## Connection Model

Each worker thread maintains its own Beanstalkd connection for thread safety.

## GC Limit

Workers support a GC limit (gc_limit) that causes the process to exit after processing N jobs. In multi-process mode, the parent automatically respawns exited children. This prevents memory bloat in long-running workers.

## Scheduler Integration

Workers automatically watch the scheduler tube and process watchdog jobs. On reserve timeout, workers check if a watchdog exists and create one if missing, ensuring the scheduler continues running even if jobs are sparse.

Examples:

Starting a worker programmatically

config = Postburner::Configuration.load_yaml('config/postburner.yml', 'production', 'default')
worker = Postburner::Worker.new(config)
worker.start

Configuration in postburner.yml

production:
  workers:
    default:
      forks: 4
      threads: 10
      gc_limit: 5000
      queues:
        - default
        - critical

See Also:

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config) ⇒ Worker

Initialize a new worker.

Examples:

config = Postburner::Configuration.load_yaml('config/postburner.yml', 'production', 'default')
worker = Postburner::Worker.new(config)

Parameters:



93
94
95
96
97
98
# File 'lib/postburner/worker.rb', line 93

def initialize(config)
  @config = config
  @logger = config.logger
  @shutdown = false
  setup_signal_handlers
end

Instance Attribute Details

#configPostburner::Configuration (readonly)

Returns The worker configuration.

Returns:



83
84
85
# File 'lib/postburner/worker.rb', line 83

def config
  @config
end

#loggerObject (readonly)

Returns the value of attribute logger.



83
# File 'lib/postburner/worker.rb', line 83

attr_reader :config, :logger

Instance Method Details

#orphaned?Boolean

Checks if this process has been orphaned (parent died).

When the parent process dies, the kernel re-parents children to init (PID 1) or a subreaper. Detecting this allows forked children to exit gracefully instead of running indefinitely as orphans.

Compares current parent PID against the PID recorded at fork time (@parent_pid), which handles the case where the parent process itself is PID 1 (e.g., running as a container entrypoint).

Returns:

  • (Boolean)

    true if parent PID changed since fork, false otherwise



152
153
154
# File 'lib/postburner/worker.rb', line 152

def orphaned?
  @parent_pid && Process.ppid != @parent_pid
end

#reconnect_backoff(attempts) ⇒ Integer

Calculates exponential backoff sleep duration for reconnection attempts.

Uses exponential backoff starting at 1 second and doubling each attempt, capped at 32 seconds to prevent excessively long waits.

Parameters:

  • attempts (Integer)

    Number of consecutive failed attempts (0-based)

Returns:

  • (Integer)

    Sleep duration in seconds (1, 2, 4, 8, 16, or 32)



163
164
165
# File 'lib/postburner/worker.rb', line 163

def reconnect_backoff(attempts)
  [2**attempts, 32].min
end

#shutdownvoid

This method returns an undefined value.

Initiates graceful shutdown.

Sets shutdown flag to stop processing new jobs. Current jobs are allowed to finish.



130
131
132
# File 'lib/postburner/worker.rb', line 130

def shutdown
  @shutdown = true
end

#shutdown?Boolean

Checks if shutdown has been requested.

Returns:

  • (Boolean)

    true if shutdown requested, false otherwise



137
138
139
# File 'lib/postburner/worker.rb', line 137

def shutdown?
  @shutdown
end

#startvoid

This method returns an undefined value.

Starts the worker.

Detects whether to run in single-process mode (forks: 0) or multi-process mode (forks: 1+) and starts accordingly.



106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/postburner/worker.rb', line 106

def start
  logger.info "[Postburner::Worker] Starting worker '#{worker_config[:name]}'..."
  logger.info "[Postburner::Worker] Queues: #{config.queue_names.join(', ')}"
  logger.info "[Postburner::Worker] Config: #{worker_config[:forks]} forks, #{worker_config[:threads]} threads, gc_limit: #{worker_config[:gc_limit] || 'unlimited'}, timeout: #{worker_config[:timeout]}s"
  log_reloading_status
  all_tubes = config.expanded_tube_names + [config.scheduler_tube_name]
  logger.info "[Postburner] #{config.beanstalk_url} known tubes: #{all_tubes.join(', ')}"
  log_next_scheduler_watchdog
  ensure_watchdog_on_startup!
  log_tube_stats("Startup")

  if worker_config[:forks] > 0
    start_forked_mode
  else
    start_single_process_mode
  end
end