Class: Wurk::Capsule

Inherits:
Object
  • Object
show all
Defined in:
lib/wurk/capsule.rb

Overview

One processing unit: a set of threads + queues sharing a fetcher and a Redis pool. Configurations can hold many capsules; each maps to its own Manager and Processors.

Spec: docs/target/sidekiq-free.md §5 (Sidekiq::Capsule).

Constant Summary collapse

MODES =
%i[strict weighted random].freeze
POOL_OVERHEAD =

Pool size = concurrency + POOL_OVERHEAD. Every processor thread can be parked in a blocking BLMOVE (reliable fetch), holding its slot for ~3s. POOL_OVERHEAD reserves connections for the Launcher’s heartbeat thread + the Scheduled::Poller so they can never be starved by busy fetchers. Without this, concurrency=1 deadlocks immediately (worker holds the only slot, heartbeat times out).

2

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name, config) ⇒ Capsule

Returns a new instance of Capsule.



19
20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/wurk/capsule.rb', line 19

def initialize(name, config)
  @name = name.to_s
  @config = config
  @concurrency = config[:concurrency] || 5
  @queues = ['default']
  @mode = :strict
  @weights = { 'default' => 0 }
  @fetcher = nil
  @redis_pool = nil
  @local_redis_pool = nil
  @client_chain = nil
  @server_chain = nil
end

Instance Attribute Details

#concurrencyObject

Returns the value of attribute concurrency.



17
18
19
# File 'lib/wurk/capsule.rb', line 17

def concurrency
  @concurrency
end

#configObject (readonly)

Returns the value of attribute config.



16
17
18
# File 'lib/wurk/capsule.rb', line 16

def config
  @config
end

#fetcherObject

Returns the value of attribute fetcher.



17
18
19
# File 'lib/wurk/capsule.rb', line 17

def fetcher
  @fetcher
end

#modeObject (readonly)

Returns the value of attribute mode.



16
17
18
# File 'lib/wurk/capsule.rb', line 16

def mode
  @mode
end

#nameObject (readonly)

Returns the value of attribute name.



16
17
18
# File 'lib/wurk/capsule.rb', line 16

def name
  @name
end

#queuesObject

Returns the value of attribute queues.



16
17
18
# File 'lib/wurk/capsule.rb', line 16

def queues
  @queues
end

#weightsObject (readonly)

Returns the value of attribute weights.



16
17
18
# File 'lib/wurk/capsule.rb', line 16

def weights
  @weights
end

Instance Method Details

#client_middleware {|chain| ... } ⇒ Object

Yields:

  • (chain)


74
75
76
77
78
# File 'lib/wurk/capsule.rb', line 74

def client_middleware
  chain = (@client_chain ||= @config.client_middleware.copy_for(self))
  yield chain if block_given?
  chain
end

#local_redis_poolObject



101
102
103
# File 'lib/wurk/capsule.rb', line 101

def local_redis_pool
  @local_redis_pool ||= build_pool(size: @concurrency, name: "#{@name}-local")
end

#loggerObject



124
125
126
# File 'lib/wurk/capsule.rb', line 124

def logger
  @config.logger
end

#lookup(name) ⇒ Object



120
121
122
# File 'lib/wurk/capsule.rb', line 120

def lookup(name)
  @config.lookup(name)
end

#prepare!Object

Materialize everything that lazy-inits via ‘||=` and default the fetcher, BEFORE Configuration#freeze! freezes the capsule — otherwise the first post-freeze access (a fetch tick, a middleware call) hits a nil fetcher or FrozenErrors building a pool. The swarm’s ChildBoot used to do this by hand; centralizing it here covers the standalone CLI and embedded paths too (the bug behind a nil ‘fetcher` in `exe/wurk`). Idempotent.



65
66
67
68
69
70
71
72
# File 'lib/wurk/capsule.rb', line 65

def prepare!
  @fetcher ||= Wurk::Fetcher::Reliable.new(self)
  redis_pool
  local_redis_pool
  client_middleware
  server_middleware
  self
end

#queue_specsObject

Lossless ‘name` specs (unlike `queues`, which is the weight-expanded list). Lets Configuration#topology rebuild a slot that round-trips back through `queues=` without flattening weights.



55
56
57
# File 'lib/wurk/capsule.rb', line 55

def queue_specs
  @weights.map { |q, w| w.positive? ? "#{q},#{w}" : q }
end

#redisObject



116
117
118
# File 'lib/wurk/capsule.rb', line 116

def redis(&)
  redis_pool.with(&)
end

#redis_poolObject



97
98
99
# File 'lib/wurk/capsule.rb', line 97

def redis_pool
  @redis_pool ||= build_pool(size: @concurrency + POOL_OVERHEAD, name: "#{@name}-main")
end

#reset_redis_pools!Object

Disconnect and drop cached pools. Called by Wurk::Swarm just before fork (parent side: close inherited sockets) and just after fork (child side: rebuild lazily). Connection_pool#shutdown is terminal, so dropping the reference is required — ‘redis_pool` will rebuild.



109
110
111
112
113
114
# File 'lib/wurk/capsule.rb', line 109

def reset_redis_pools!
  @redis_pool&.disconnect!
  @redis_pool = nil
  @local_redis_pool&.disconnect!
  @local_redis_pool = nil
end

#server_middleware {|chain| ... } ⇒ Object

Yields:

  • (chain)


80
81
82
83
84
85
86
87
# File 'lib/wurk/capsule.rb', line 80

def server_middleware
  # copy_for(self) — not dup — binds the chain's `@config` to this capsule,
  # so middleware that reach for `redis_pool`/`redis`/`logger` resolve them
  # instead of hitting `nil` (a plain dup leaves @config nil).
  chain = (@server_chain ||= @config.server_middleware.copy_for(self))
  yield chain if block_given?
  chain
end

#stopObject

No-op in OSS. Reserved for Pro/Ent hooks that need to flush per-capsule state on shutdown. Manager#stop invokes this in ‘ensure` so the contract is honored regardless of how stop unwinds.

Spec: docs/target/sidekiq-free.md §5.



133
# File 'lib/wurk/capsule.rb', line 133

def stop; end

#to_hObject



33
34
35
# File 'lib/wurk/capsule.rb', line 33

def to_h
  { concurrency: @concurrency, mode: @mode, weights: @weights }
end