Module: Sidekiq::Routing
- Defined in:
- lib/sidekiq/routing.rb,
lib/sidekiq/routing/mover.rb,
lib/sidekiq/routing/store.rb,
lib/sidekiq/routing/sweeper.rb,
lib/sidekiq/routing/version.rb,
lib/sidekiq/routing/configuration.rb,
lib/sidekiq/routing/web_extension.rb,
lib/sidekiq/routing/parked_processor.rb,
lib/sidekiq/routing/middleware/client.rb,
lib/sidekiq/routing/middleware/server.rb
Defined Under Namespace
Modules: Auto, Middleware, Mover, Store, WebExtension Classes: Configuration, ParkedProcessor, Sweeper
Constant Summary collapse
- PARKED_QUEUE_DEFAULT =
"routing_parked"- ORIGINAL_QUEUE_KEY =
Keys stamped into the job payload.
"routing_original_queue"- NO_DIVERT_KEY =
"routing_no_divert"- MODE_PARK =
"park"- MODE_BLACKHOLE =
"blackhole"- MODES =
[MODE_PARK, MODE_BLACKHOLE].freeze
- VERSION =
"0.1.0"
Class Method Summary collapse
- .blackhole(klass) ⇒ Object
-
.class_name(klass_or_name) ⇒ Object
Resolve the effective job-class name.
- .configuration ⇒ Object
- .enabled? ⇒ Boolean
-
.install! ⇒ Object
Registers the manual routing client + server middleware on both client and server configurations.
- .logger ⇒ Object
- .mode(klass) ⇒ Object
-
.park(klass) ⇒ Object
—- operator API: managing manual routes —-.
- .parked?(klass) ⇒ Boolean
-
.parked_breakdown(sample: configuration.breakdown_sample_size) ⇒ Object
{ “SomeJob” => { “count” => 12, “by_original_queue” => { “within_1_minute” => 12 } } }.
- .parked_queue ⇒ Object
-
.parked_size ⇒ Object
—- parking queue introspection —-.
- .process_parked(klass: nil, limit: nil, batch_size: nil) ⇒ Object
- .reset_cache! ⇒ Object
-
.route_for(klass_or_name) ⇒ Object
Returns the route hash (“mode”=>…) for a class, or nil.
- .routed?(klass) ⇒ Boolean
-
.routes ⇒ Object
All active manual routes, read straight from Redis (uncached) so operators and the Web tab always see the truth.
- .setup {|configuration| ... } ⇒ Object
-
.sweep(klass, queue: nil, limit: nil, batch_size: nil) ⇒ Object
—- recovery (thin wrappers; logic in Sweeper/ParkedProcessor) —-.
- .unpark(klass) ⇒ Object
Class Method Details
.blackhole(klass) ⇒ Object
78 79 80 |
# File 'lib/sidekiq/routing.rb', line 78 def blackhole(klass) write(klass, MODE_BLACKHOLE) end |
.class_name(klass_or_name) ⇒ Object
Resolve the effective job-class name. Prefers the ActiveJob “wrapped” class so a wrapped job is matched by its real class, not the JobWrapper (mirrors Sidekiq’s own display_class). Accepts a Class, String, or nil.
164 165 166 167 168 |
# File 'lib/sidekiq/routing.rb', line 164 def class_name(klass_or_name) return klass_or_name if klass_or_name.is_a?(String) klass_or_name&.name end |
.configuration ⇒ Object
33 34 35 |
# File 'lib/sidekiq/routing.rb', line 33 def configuration @_configuration ||= Configuration.new end |
.enabled? ⇒ Boolean
60 61 62 |
# File 'lib/sidekiq/routing.rb', line 60 def enabled? configuration.enabled end |
.install! ⇒ Object
Registers the manual routing client + server middleware on both client and server configurations. This mirrors Sidekiq::Lock.install! so host apps can add the gem first and opt in to routing per app.
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/sidekiq/routing.rb', line 44 def install! require "sidekiq/routing/middleware/client" require "sidekiq/routing/middleware/server" prepend_routing = ->(chain) { chain.prepend Middleware::Client } Sidekiq.configure_client do |config| config.client_middleware(&prepend_routing) end Sidekiq.configure_server do |config| config.client_middleware(&prepend_routing) config.server_middleware { |chain| install_server_middleware(chain) } end end |
.logger ⇒ Object
64 65 66 |
# File 'lib/sidekiq/routing.rb', line 64 def logger configuration.logger end |
.mode(klass) ⇒ Object
98 99 100 |
# File 'lib/sidekiq/routing.rb', line 98 def mode(klass) Store.fetch(class_name(klass))&.fetch("mode", nil) end |
.park(klass) ⇒ Object
—- operator API: managing manual routes —-
74 75 76 |
# File 'lib/sidekiq/routing.rb', line 74 def park(klass) write(klass, MODE_PARK) end |
.parked?(klass) ⇒ Boolean
90 91 92 |
# File 'lib/sidekiq/routing.rb', line 90 def parked?(klass) mode(klass) == MODE_PARK end |
.parked_breakdown(sample: configuration.breakdown_sample_size) ⇒ Object
{ “SomeJob” => { “count” => 12, “by_original_queue” => { “within_1_minute” => 12 } } }
Scans at most ‘sample` jobs (default Configuration#breakdown_sample_size), not the whole queue: the parking queue can hold millions during a flood and this is called on every Web tab load. The result is a distribution over the sampled head, not exact totals — use parked_size (O(1) LLEN) for the true total. Pass sample: nil to scan all.
136 137 138 139 140 141 142 143 144 145 146 147 148 149 |
# File 'lib/sidekiq/routing.rb', line 136 def parked_breakdown(sample: configuration.breakdown_sample_size) result = Hash.new { |h, k| h[k] = { "count" => 0, "by_original_queue" => Hash.new(0) } } scanned = 0 Sidekiq::Queue.new(parked_queue).each do |job| break if sample && scanned >= sample klass = job.display_class original = job.item[ORIGINAL_QUEUE_KEY] || "unknown" result[klass]["count"] += 1 result[klass]["by_original_queue"][original] += 1 scanned += 1 end result end |
.parked_queue ⇒ Object
68 69 70 |
# File 'lib/sidekiq/routing.rb', line 68 def parked_queue configuration.parked_queue end |
.parked_size ⇒ Object
—- parking queue introspection —-
126 127 128 |
# File 'lib/sidekiq/routing.rb', line 126 def parked_size Sidekiq::Queue.new(parked_queue).size end |
.process_parked(klass: nil, limit: nil, batch_size: nil) ⇒ Object
157 158 159 |
# File 'lib/sidekiq/routing.rb', line 157 def process_parked(klass: nil, limit: nil, batch_size: nil) ParkedProcessor.new.call(klass: klass && class_name(klass), limit:, batch_size:) end |
.reset_cache! ⇒ Object
117 118 119 120 121 122 |
# File 'lib/sidekiq/routing.rb', line 117 def reset_cache! @snapshot_mutex.synchronize do @snapshot = nil @snapshot_at = nil end end |
.route_for(klass_or_name) ⇒ Object
Returns the route hash (“mode”=>…) for a class, or nil. Reads from a process-local snapshot of all routes, refreshed at most once per cache_ttl_seconds.
113 114 115 |
# File 'lib/sidekiq/routing.rb', line 113 def route_for(klass_or_name) snapshot[class_name(klass_or_name)] end |
.routed?(klass) ⇒ Boolean
94 95 96 |
# File 'lib/sidekiq/routing.rb', line 94 def routed?(klass) !Store.fetch(class_name(klass)).nil? end |
.routes ⇒ Object
All active manual routes, read straight from Redis (uncached) so operators and the Web tab always see the truth.
104 105 106 |
# File 'lib/sidekiq/routing.rb', line 104 def routes Store.all end |
.setup {|configuration| ... } ⇒ Object
37 38 39 |
# File 'lib/sidekiq/routing.rb', line 37 def setup yield configuration if block_given? end |
.sweep(klass, queue: nil, limit: nil, batch_size: nil) ⇒ Object
—- recovery (thin wrappers; logic in Sweeper/ParkedProcessor) —-
153 154 155 |
# File 'lib/sidekiq/routing.rb', line 153 def sweep(klass, queue: nil, limit: nil, batch_size: nil) Sweeper.new.call(class_name(klass), queue:, limit:, batch_size:) end |