Class: Wurk::Configuration

Inherits:
Object
  • Object
show all
Defined in:
lib/wurk/configuration.rb

Overview

Owns runtime knobs (concurrency, queues, timeouts, lifecycle events, error/death handlers) and the registry of Capsules. Single source of truth for everything the swarm / managers / processors need to boot.

Spec: docs/target/sidekiq-free.md §4 (Sidekiq::Config).

Constant Summary collapse

DEFAULTS =

Mirrors Sidekiq::Config::DEFAULTS. Order and keys are part of the drop-in contract — third-party gems read @options via [] / fetch / dig.

{
  labels: Set.new,
  require: '.',
  environment: nil,
  concurrency: 5,
  timeout: 25,
  poll_interval_average: nil,
  average_scheduled_poll_interval: 5,
  on_complex_arguments: :raise,
  max_iteration_runtime: nil,
  error_handlers: [],
  death_handlers: [],
  lifecycle_events: {
    startup: [],
    fork: [],
    quiet: [],
    shutdown: [],
    exit: [],
    heartbeat: [],
    beat: [],
    leader: []
  },
  dead_max_jobs: 10_000,
  dead_timeout_in_seconds: 180 * 24 * 60 * 60,
  reloader: proc { |&b| b.call },
  backtrace_cleaner: ->(bt) { bt },
  logged_job_attributes: %w[bid tags],
  redis_idle_timeout: nil
}.freeze
LIFECYCLE_EVENTS =

:fork fires only inside swarm children, after fork + internal AR/Redis reconnect — apps reopen sockets / non-fork-safe libs there (Ent §7.4).

%i[startup fork quiet shutdown exit heartbeat beat leader].freeze
DEFAULT_THREAD_PRIORITY =
-1
ERROR_HANDLER =

Default error handler. Wraps the report in the thread-local Wurk::Context so logger formatters/JSON layouts can pick up jid/bid/tags. ‘full_message` (with backtrace) in dev/debug, `detailed_message` in prod —mirrors the Sidekiq behavior so log scrapers built for one work for both.

Spec: docs/target/sidekiq-free.md §4.3.

lambda do |ex, ctx, cfg = Wurk.configuration|
  safe_ctx = ctx || {}
  Wurk::Context.with(safe_ctx) do
    dev = $DEBUG || ENV['WURK_DEBUG'] || cfg.logger.debug?
    msg = dev ? ex.full_message : ex.detailed_message
    cfg.logger.info { msg }
  end
end
HISTORY_DEFAULT_INTERVAL =

— Historical metrics snapshotter (Ent §5) ————————-

30

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Configuration

Returns a new instance of Configuration.



81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/wurk/configuration.rb', line 81

def initialize(options = {})
  @options = deep_dup_defaults.merge(options)
  @options[:error_handlers] << ERROR_HANDLER if @options[:error_handlers].empty?
  @capsules = {}
  @directory = {}
  @client_chain = Middleware::Chain.new
  @server_chain = Middleware::Chain.new
  @redis_config = { url: ENV.fetch('REDIS_URL', 'redis://localhost:6379/0') }
  @logger = nil
  @thread_priority = DEFAULT_THREAD_PRIORITY
  @frozen = false
end

Instance Attribute Details

#capsulesObject (readonly)

Returns the value of attribute capsules.



69
70
71
# File 'lib/wurk/configuration.rb', line 69

def capsules
  @capsules
end

#directoryObject (readonly)

Returns the value of attribute directory.



69
70
71
# File 'lib/wurk/configuration.rb', line 69

def directory
  @directory
end

#dogstatsdObject

Pro parity: callable that builds the statsd / dogstatsd client. Invoked once per process AFTER fork; see Wurk::Metrics::Statsd.client. Assignable as a Proc, lambda, or any object responding to #call:

config.dogstatsd = -> { Datadog::Statsd.new('host', 8125) }

Spec: docs/target/sidekiq-pro.md §9.1.



79
80
81
# File 'lib/wurk/configuration.rb', line 79

def dogstatsd
  @dogstatsd
end

#loggerObject

— Logger ———————————————————–



339
340
341
# File 'lib/wurk/configuration.rb', line 339

def logger
  @logger ||= default_logger
end

#redis_configObject (readonly)

Returns the value of attribute redis_config.



69
70
71
# File 'lib/wurk/configuration.rb', line 69

def redis_config
  @redis_config
end

#super_fetch_callbackObject (readonly)

Returns the value of attribute super_fetch_callback.



69
70
71
# File 'lib/wurk/configuration.rb', line 69

def super_fetch_callback
  @super_fetch_callback
end

#thread_priorityObject

Returns the value of attribute thread_priority.



70
71
72
# File 'lib/wurk/configuration.rb', line 70

def thread_priority
  @thread_priority
end

Instance Method Details

#[](key) ⇒ Object

— Hash-like options access —————————————–



96
# File 'lib/wurk/configuration.rb', line 96

def [](key) = @options.[](key)

#[]=(key, val) ⇒ Object



98
99
100
101
# File 'lib/wurk/configuration.rb', line 98

def []=(key, val)
  guard_frozen!
  @options[key] = val
end

#average_scheduled_poll_interval=(interval) ⇒ Object



207
208
209
# File 'lib/wurk/configuration.rb', line 207

def average_scheduled_poll_interval=(interval)
  @options[:average_scheduled_poll_interval] = interval
end

#capsule(name) {|cap| ... } ⇒ Object

Yields:

  • (cap)


135
136
137
138
139
140
# File 'lib/wurk/configuration.rb', line 135

def capsule(name)
  name = name.to_s
  cap = @capsules[name] ||= Capsule.new(name, self)
  yield cap if block_given?
  cap
end

#client_middleware {|@client_chain| ... } ⇒ Object

— Middleware ——————————————————-

Yields:

  • (@client_chain)


144
145
146
147
# File 'lib/wurk/configuration.rb', line 144

def client_middleware
  yield @client_chain if block_given?
  @client_chain
end

#concurrencyObject

— Default capsule shortcuts —————————————



115
# File 'lib/wurk/configuration.rb', line 115

def concurrency = default_capsule.concurrency

#concurrency=(val) ⇒ Object



117
118
119
# File 'lib/wurk/configuration.rb', line 117

def concurrency=(val)
  default_capsule.concurrency = val
end

#configure_client {|_self| ... } ⇒ Object

Yields:

  • (_self)

Yield Parameters:



363
364
365
# File 'lib/wurk/configuration.rb', line 363

def configure_client(&block)
  yield self if block && !server?
end

#configure_server {|_self| ... } ⇒ Object

— Configure blocks (Sidekiq.configure_server / _client) ———–

Yields:

  • (_self)

Yield Parameters:



359
360
361
# File 'lib/wurk/configuration.rb', line 359

def configure_server(&block)
  yield self if block && server?
end

#death_handlersObject



203
204
205
# File 'lib/wurk/configuration.rb', line 203

def death_handlers
  @options[:death_handlers]
end

#default_capsuleObject



131
132
133
# File 'lib/wurk/configuration.rb', line 131

def default_capsule(&)
  capsule('default', &)
end

#dig(*keys) ⇒ Object



111
# File 'lib/wurk/configuration.rb', line 111

def dig(*keys) = @options.dig(*keys)

#error_handlersObject

— Handlers ———————————————————



199
200
201
# File 'lib/wurk/configuration.rb', line 199

def error_handlers
  @options[:error_handlers]
end

#fetchObject



103
# File 'lib/wurk/configuration.rb', line 103

def fetch(*, &) = @options.fetch(*, &)

#fetch_poll_intervalObject



220
221
222
# File 'lib/wurk/configuration.rb', line 220

def fetch_poll_interval
  @options[:fetch_poll_interval]
end

#fetch_poll_interval=(seconds) ⇒ Object

Reliable-fetch empty-poll backoff: the BLMOVE block timeout (seconds) used when every served queue is empty. Pro super_fetch §3.3’s ‘fetch_poll_interval` knob. Unset (nil) → the fetcher’s default (Wurk::Fetcher::Reliable::TIMEOUT, 2s). Also readable as ‘config`.



216
217
218
# File 'lib/wurk/configuration.rb', line 216

def fetch_poll_interval=(seconds)
  @options[:fetch_poll_interval] = seconds
end

#freeze!Object



404
405
406
407
408
409
410
411
412
413
# File 'lib/wurk/configuration.rb', line 404

def freeze!
  return self if @frozen

  @capsules.each_value(&:freeze)
  @capsules.freeze
  @options.freeze
  @directory.freeze
  @frozen = true
  self
end

#frozen?Boolean

Returns:

  • (Boolean)


415
416
417
# File 'lib/wurk/configuration.rb', line 415

def frozen?
  @frozen
end

#handle_exception(ex, ctx = {}) ⇒ Object



345
346
347
348
349
350
351
352
353
354
355
# File 'lib/wurk/configuration.rb', line 345

def handle_exception(ex, ctx = {})
  if error_handlers.empty?
    logger.error("#{ctx} #{ex.class}: #{ex.message}")
  else
    error_handlers.each do |handler|
      handler.call(ex, ctx, self)
    rescue StandardError => e
      logger.error("error_handler raised: #{e.class}: #{e.message}")
    end
  end
end

#health_check(port:, bind: '0.0.0.0', ready_window: 30) ⇒ Object

Opt-in thin HTTP listener inside the worker process for k8s probes. When called, the Launcher will start a TCP server on ‘port` bound to `bind` exposing `GET /live` (200 while not stopping) and `GET /ready` (200 only when Redis is reachable AND heartbeat fired within `ready_window` seconds).

Off by default — call this in a ‘configure_server` block to enable. Spec: docs/target/sidekiq-ent.md §7.1.2.

Raises:

  • (ArgumentError)


313
314
315
316
317
318
319
320
321
322
323
324
# File 'lib/wurk/configuration.rb', line 313

def health_check(port:, bind: '0.0.0.0', ready_window: 30)
  guard_frozen!
  p = Integer(port)
  rw = Integer(ready_window)
  raise ArgumentError, 'port must be between 0 and 65535' unless (0..65535).cover?(p)
  raise ArgumentError, 'ready_window must be > 0' unless rw.positive?

  b = bind.to_s
  raise ArgumentError, 'bind must be a non-empty string' if b.empty?

  @options[:health_check_options] = { port: p, bind: b, ready_window: rw }
end

#history_collectorObject



288
# File 'lib/wurk/configuration.rb', line 288

def history_collector = @options[:history_collector]

#history_enabled?Boolean

Returns:

  • (Boolean)


286
# File 'lib/wurk/configuration.rb', line 286

def history_enabled? = @options.key?(:history_interval)

#history_intervalObject



287
# File 'lib/wurk/configuration.rb', line 287

def history_interval = @options.fetch(:history_interval, HISTORY_DEFAULT_INTERVAL)

#inspectObject



419
420
421
# File 'lib/wurk/configuration.rb', line 419

def inspect
  "#<#{self.class} capsules=#{@capsules.keys} concurrency=#{total_concurrency}>"
end

#key?(key) ⇒ Boolean Also known as: has_key?

Returns:

  • (Boolean)


104
# File 'lib/wurk/configuration.rb', line 104

def key?(key) = @options.key?(key)

#local_redis_poolObject



165
166
167
# File 'lib/wurk/configuration.rb', line 165

def local_redis_pool
  @local_redis_pool ||= build_redis_pool(size: 10, name: 'internal')
end

#lookup(name, default_class = nil) ⇒ Object



193
194
195
# File 'lib/wurk/configuration.rb', line 193

def lookup(name, default_class = nil)
  @directory[name] ||= default_class&.new
end

#memory_limit_kbObject

Threshold in KB, the unit the swarm compares against /proc/<pid>/statm (pages × 4KB). nil when recycling is disabled.



399
400
401
402
# File 'lib/wurk/configuration.rb', line 399

def memory_limit_kb
  mb = memory_limit_mb
  mb&.positive? ? mb * 1024 : nil
end

#memory_limit_mbObject

Memory-based child recycling (Sidekiq Ent §7.5): the swarm parent TERMs (and respawns) any child whose RSS exceeds this many MB. Set in code or via SIDEKIQ_MAXMEM_MB (WURK_MAXMEM_MB is the native alias); an explicit value wins over the env. nil/0 disables recycling (the default).



388
389
390
# File 'lib/wurk/configuration.rb', line 388

def memory_limit_mb
  @memory_limit_mb || env_memory_limit_mb
end

#memory_limit_mb=(value) ⇒ Object



392
393
394
395
# File 'lib/wurk/configuration.rb', line 392

def memory_limit_mb=(value)
  guard_frozen!
  @memory_limit_mb = value.nil? ? nil : Integer(value)
end

#merge!(other) ⇒ Object



106
107
108
109
# File 'lib/wurk/configuration.rb', line 106

def merge!(other)
  guard_frozen!
  @options.merge!(other)
end

#new_redis_pool(size, name = 'custom') ⇒ Object



178
179
180
# File 'lib/wurk/configuration.rb', line 178

def new_redis_pool(size, name = 'custom')
  build_redis_pool(size: size, name: name)
end

#on(event, &block) ⇒ Object

— Lifecycle hooks ————————————————–

Raises:

  • (ArgumentError)


328
329
330
331
332
333
334
335
# File 'lib/wurk/configuration.rb', line 328

def on(event, &block)
  raise ArgumentError, "block required for on(#{event.inspect})" unless block
  unless LIFECYCLE_EVENTS.include?(event)
    raise ArgumentError, "invalid event #{event.inspect}, must be one of #{LIFECYCLE_EVENTS.inspect}"
  end

  @options[:lifecycle_events][event] << block
end

#periodic {|@periodic_manager| ... } ⇒ Object

Yields a Wurk::Cron::Manager so the host app can register periodic jobs at boot. Manager state is shared per-process so multiple ‘config.periodic` blocks accumulate (matches Sidekiq Ent §2.1).

Spec: docs/target/sidekiq-ent.md §2.

Yields:

  • (@periodic_manager)


257
258
259
260
261
262
# File 'lib/wurk/configuration.rb', line 257

def periodic
  require_relative 'cron'
  @periodic_manager ||= Wurk::Cron::Manager.new(self)
  yield @periodic_manager if block_given?
  @periodic_manager
end

#queuesObject



121
# File 'lib/wurk/configuration.rb', line 121

def queues = default_capsule.queues

#queues=(val) ⇒ Object



123
124
125
# File 'lib/wurk/configuration.rb', line 123

def queues=(val)
  default_capsule.queues = val
end

#redisObject



182
183
184
# File 'lib/wurk/configuration.rb', line 182

def redis(&)
  redis_pool.with(&)
end

#redis=(hash) ⇒ Object

— Redis ————————————————————



156
157
158
159
# File 'lib/wurk/configuration.rb', line 156

def redis=(hash)
  guard_frozen!
  @redis_config = @redis_config.merge(hash.transform_keys(&:to_sym))
end

#redis_poolObject



161
162
163
# File 'lib/wurk/configuration.rb', line 161

def redis_pool
  default_capsule.redis_pool
end

#register(name, instance) ⇒ Object

— Service locator (extension registry) —————————-



188
189
190
191
# File 'lib/wurk/configuration.rb', line 188

def register(name, instance)
  guard_frozen!
  @directory[name] = instance
end

#reliable_scheduler!Object

Pro reliable scheduler (§4): promote due jobs from retry/schedule onto their target queue in a single atomic Lua (ZRANGEBYSCORE+ZREM+LPUSH), closing the pop→push job-loss window of the default poller. Swaps the pluggable ‘scheduled_enq` for the atomic promoter; idempotent.



245
246
247
248
# File 'lib/wurk/configuration.rb', line 245

def reliable_scheduler!(*)
  self[:scheduled_enq] = Wurk::Scheduled::ReliableEnq
  nil
end

#reset_redis_pools!Object

Disconnect and drop every cached pool — the per-capsule mains plus the config-level internal pool. Used by Wurk::Swarm so the parent never leaks sockets into forks and each child can build fresh ones.



172
173
174
175
176
# File 'lib/wurk/configuration.rb', line 172

def reset_redis_pools!
  @capsules.each_value(&:reset_redis_pools!)
  @local_redis_pool&.disconnect!
  @local_redis_pool = nil
end

#retain_history(seconds = HISTORY_DEFAULT_INTERVAL, &block) ⇒ Object

Enables the Ent Historical Metrics snapshotter: every ‘seconds` the cluster leader emits a statsd-shaped snapshot to the configured `dogstatsd` client. With no block the default §5.2 gauge set is published; a block receives the dogstatsd client `s` and collects custom metrics instead. The Launcher starts the snapshotter only when this has been called.

Spec: docs/target/sidekiq-ent.md §5.1.

Raises:

  • (ArgumentError)


276
277
278
279
280
281
282
283
284
# File 'lib/wurk/configuration.rb', line 276

def retain_history(seconds = HISTORY_DEFAULT_INTERVAL, &block)
  guard_frozen!
  interval = Float(seconds)
  raise ArgumentError, 'retain_history interval must be > 0' unless interval.positive?

  @options[:history_interval] = interval
  @options[:history_collector] = block
  nil
end

#server?Boolean

Returns:

  • (Boolean)


367
368
369
# File 'lib/wurk/configuration.rb', line 367

def server?
  @options[:server] == true
end

#server_middleware {|@server_chain| ... } ⇒ Object

Yields:

  • (@server_chain)


149
150
151
152
# File 'lib/wurk/configuration.rb', line 149

def server_middleware
  yield @server_chain if block_given?
  @server_chain
end

#super_fetch!(&block) ⇒ Object

Sidekiq Pro’s opt-in toggles for reliable fetch and the reliable scheduler. Both are already the default in Wurk — the fetcher is always the reliable BLMOVE fetcher with orphan reclamation, and the scheduler is always atomic Lua — so the toggle itself is a no-op. They exist only so a Pro initializer drops in unchanged instead of raising NoMethodError.

The optional block is Pro’s recovery callback: ‘|jobstr, pill|`, fired once per orphan recovery (`pill` nil) and once on a poison kill (`pill` responds to .jid/.klass/.count/.queue). The reaper drives it via Wurk::Middleware::PoisonPill.track!. Spec: docs/target/sidekiq-pro.md §3.1.



236
237
238
239
# File 'lib/wurk/configuration.rb', line 236

def super_fetch!(*, &block)
  @super_fetch_callback = block if block
  nil
end

#topologyObject

Worker topology for the swarm. When the host hasn’t declared one (the railtie path), default to a single flat fork running the default capsule’s queues + concurrency. Assign a custom Wurk::Topology (via ‘topology=`) for specialized slots.



375
376
377
# File 'lib/wurk/configuration.rb', line 375

def topology
  @topology ||= default_topology
end

#topology=(value) ⇒ Object



379
380
381
382
# File 'lib/wurk/configuration.rb', line 379

def topology=(value)
  guard_frozen!
  @topology = value
end

#total_concurrencyObject



127
128
129
# File 'lib/wurk/configuration.rb', line 127

def total_concurrency
  @capsules.each_value.sum(&:concurrency)
end

#webObject

Web UI configuration: the authorization hook and read-only mode. Returns the process-wide ‘Wurk::Web.config` singleton so `config.web.read_only = true` and the engine middleware share one source of truth. Lazy-requires the web layer to keep standalone boot lean.

Spec: docs/target/sidekiq-ent.md §9.2.



298
299
300
301
# File 'lib/wurk/configuration.rb', line 298

def web
  require_relative 'web/config'
  Wurk::Web.config
end