Class: Wurk::Capsule
- Inherits:
-
Object
- Object
- Wurk::Capsule
- Defined in:
- lib/wurk/capsule.rb
Overview
One processing unit: a set of threads + queues sharing a fetcher and a Redis pool. Configurations can hold many capsules; each maps to its own Manager and Processors.
Spec: docs/target/sidekiq-free.md §5 (Sidekiq::Capsule).
Constant Summary collapse
- MODES =
%i[strict weighted random].freeze
- POOL_OVERHEAD =
Pool size = concurrency + POOL_OVERHEAD. Every processor thread can be parked in a blocking BLMOVE (reliable fetch), holding its slot for ~3s. POOL_OVERHEAD reserves connections for the Launcher’s heartbeat thread + the Scheduled::Poller so they can never be starved by busy fetchers. Without this, concurrency=1 deadlocks immediately (worker holds the only slot, heartbeat times out).
2
Instance Attribute Summary collapse
-
#concurrency ⇒ Object
Returns the value of attribute concurrency.
-
#config ⇒ Object
readonly
Returns the value of attribute config.
-
#fetcher ⇒ Object
Returns the value of attribute fetcher.
-
#mode ⇒ Object
readonly
Returns the value of attribute mode.
-
#name ⇒ Object
readonly
Returns the value of attribute name.
-
#queues ⇒ Object
Returns the value of attribute queues.
-
#weights ⇒ Object
readonly
Returns the value of attribute weights.
Instance Method Summary collapse
- #client_middleware {|chain| ... } ⇒ Object
-
#initialize(name, config) ⇒ Capsule
constructor
A new instance of Capsule.
- #local_redis_pool ⇒ Object
- #logger ⇒ Object
- #lookup(name) ⇒ Object
-
#prepare! ⇒ Object
Materialize everything that lazy-inits via ‘||=` and default the fetcher, BEFORE Configuration#freeze! freezes the capsule — otherwise the first post-freeze access (a fetch tick, a middleware call) hits a nil fetcher or FrozenErrors building a pool.
-
#queue_specs ⇒ Object
Lossless ‘name` specs (unlike `queues`, which is the weight-expanded list).
- #redis ⇒ Object
- #redis_pool ⇒ Object
-
#reset_redis_pools! ⇒ Object
Disconnect and drop cached pools.
- #server_middleware {|chain| ... } ⇒ Object
-
#stop ⇒ Object
No-op in OSS.
- #to_h ⇒ Object
Constructor Details
#initialize(name, config) ⇒ Capsule
Returns a new instance of Capsule.
19 20 21 22 23 24 25 26 27 28 29 30 31 |
# File 'lib/wurk/capsule.rb', line 19 def initialize(name, config) @name = name.to_s @config = config @concurrency = config[:concurrency] || 5 @queues = ['default'] @mode = :strict @weights = { 'default' => 0 } @fetcher = nil @redis_pool = nil @local_redis_pool = nil @client_chain = nil @server_chain = nil end |
Instance Attribute Details
#concurrency ⇒ Object
Returns the value of attribute concurrency.
17 18 19 |
# File 'lib/wurk/capsule.rb', line 17 def concurrency @concurrency end |
#config ⇒ Object (readonly)
Returns the value of attribute config.
16 17 18 |
# File 'lib/wurk/capsule.rb', line 16 def config @config end |
#fetcher ⇒ Object
Returns the value of attribute fetcher.
17 18 19 |
# File 'lib/wurk/capsule.rb', line 17 def fetcher @fetcher end |
#mode ⇒ Object (readonly)
Returns the value of attribute mode.
16 17 18 |
# File 'lib/wurk/capsule.rb', line 16 def mode @mode end |
#name ⇒ Object (readonly)
Returns the value of attribute name.
16 17 18 |
# File 'lib/wurk/capsule.rb', line 16 def name @name end |
#queues ⇒ Object
Returns the value of attribute queues.
16 17 18 |
# File 'lib/wurk/capsule.rb', line 16 def queues @queues end |
#weights ⇒ Object (readonly)
Returns the value of attribute weights.
16 17 18 |
# File 'lib/wurk/capsule.rb', line 16 def weights @weights end |
Instance Method Details
#client_middleware {|chain| ... } ⇒ Object
74 75 76 77 78 |
# File 'lib/wurk/capsule.rb', line 74 def client_middleware chain = (@client_chain ||= @config.client_middleware.copy_for(self)) yield chain if block_given? chain end |
#local_redis_pool ⇒ Object
101 102 103 |
# File 'lib/wurk/capsule.rb', line 101 def local_redis_pool @local_redis_pool ||= build_pool(size: @concurrency, name: "#{@name}-local") end |
#logger ⇒ Object
124 125 126 |
# File 'lib/wurk/capsule.rb', line 124 def logger @config.logger end |
#lookup(name) ⇒ Object
120 121 122 |
# File 'lib/wurk/capsule.rb', line 120 def lookup(name) @config.lookup(name) end |
#prepare! ⇒ Object
Materialize everything that lazy-inits via ‘||=` and default the fetcher, BEFORE Configuration#freeze! freezes the capsule — otherwise the first post-freeze access (a fetch tick, a middleware call) hits a nil fetcher or FrozenErrors building a pool. The swarm’s ChildBoot used to do this by hand; centralizing it here covers the standalone CLI and embedded paths too (the bug behind a nil ‘fetcher` in `exe/wurk`). Idempotent.
65 66 67 68 69 70 71 72 |
# File 'lib/wurk/capsule.rb', line 65 def prepare! @fetcher ||= Wurk::Fetcher::Reliable.new(self) redis_pool local_redis_pool client_middleware server_middleware self end |
#queue_specs ⇒ Object
Lossless ‘name` specs (unlike `queues`, which is the weight-expanded list). Lets Configuration#topology rebuild a slot that round-trips back through `queues=` without flattening weights.
55 56 57 |
# File 'lib/wurk/capsule.rb', line 55 def queue_specs @weights.map { |q, w| w.positive? ? "#{q},#{w}" : q } end |
#redis ⇒ Object
116 117 118 |
# File 'lib/wurk/capsule.rb', line 116 def redis(&) redis_pool.with(&) end |
#redis_pool ⇒ Object
97 98 99 |
# File 'lib/wurk/capsule.rb', line 97 def redis_pool @redis_pool ||= build_pool(size: @concurrency + POOL_OVERHEAD, name: "#{@name}-main") end |
#reset_redis_pools! ⇒ Object
Disconnect and drop cached pools. Called by Wurk::Swarm just before fork (parent side: close inherited sockets) and just after fork (child side: rebuild lazily). Connection_pool#shutdown is terminal, so dropping the reference is required — ‘redis_pool` will rebuild.
109 110 111 112 113 114 |
# File 'lib/wurk/capsule.rb', line 109 def reset_redis_pools! @redis_pool&.disconnect! @redis_pool = nil @local_redis_pool&.disconnect! @local_redis_pool = nil end |
#server_middleware {|chain| ... } ⇒ Object
80 81 82 83 84 85 86 87 |
# File 'lib/wurk/capsule.rb', line 80 def server_middleware # copy_for(self) — not dup — binds the chain's `@config` to this capsule, # so middleware that reach for `redis_pool`/`redis`/`logger` resolve them # instead of hitting `nil` (a plain dup leaves @config nil). chain = (@server_chain ||= @config.server_middleware.copy_for(self)) yield chain if block_given? chain end |
#stop ⇒ Object
No-op in OSS. Reserved for Pro/Ent hooks that need to flush per-capsule state on shutdown. Manager#stop invokes this in ‘ensure` so the contract is honored regardless of how stop unwinds.
Spec: docs/target/sidekiq-free.md §5.
133 |
# File 'lib/wurk/capsule.rb', line 133 def stop; end |
#to_h ⇒ Object
33 34 35 |
# File 'lib/wurk/capsule.rb', line 33 def to_h { concurrency: @concurrency, mode: @mode, weights: @weights } end |