Module: Sidekiq::Routing

Defined in:
lib/sidekiq/routing.rb,
lib/sidekiq/routing/mover.rb,
lib/sidekiq/routing/store.rb,
lib/sidekiq/routing/sweeper.rb,
lib/sidekiq/routing/version.rb,
lib/sidekiq/routing/configuration.rb,
lib/sidekiq/routing/web_extension.rb,
lib/sidekiq/routing/parked_processor.rb,
lib/sidekiq/routing/middleware/client.rb,
lib/sidekiq/routing/middleware/server.rb

Defined Under Namespace

Modules: Auto, Middleware, Mover, Store, WebExtension Classes: Configuration, ParkedProcessor, Sweeper

Constant Summary collapse

PARKED_QUEUE_DEFAULT =
"routing_parked"
ORIGINAL_QUEUE_KEY =

Keys stamped into the job payload.

"routing_original_queue"
NO_DIVERT_KEY =
"routing_no_divert"
MODE_PARK =
"park"
MODE_BLACKHOLE =
"blackhole"
MODES =
[MODE_PARK, MODE_BLACKHOLE].freeze
VERSION =
"0.1.0"

Class Method Summary collapse

Class Method Details

.blackhole(klass) ⇒ Object



78
79
80
# File 'lib/sidekiq/routing.rb', line 78

def blackhole(klass)
  write(klass, MODE_BLACKHOLE)
end

.class_name(klass_or_name) ⇒ Object

Resolve the effective job-class name. Prefers the ActiveJob “wrapped” class so a wrapped job is matched by its real class, not the JobWrapper (mirrors Sidekiq’s own display_class). Accepts a Class, String, or nil.



164
165
166
167
168
# File 'lib/sidekiq/routing.rb', line 164

def class_name(klass_or_name)
  return klass_or_name if klass_or_name.is_a?(String)

  klass_or_name&.name
end

.configurationObject



33
34
35
# File 'lib/sidekiq/routing.rb', line 33

def configuration
  @_configuration ||= Configuration.new
end

.enabled?Boolean

Returns:

  • (Boolean)


60
61
62
# File 'lib/sidekiq/routing.rb', line 60

def enabled?
  configuration.enabled
end

.install!Object

Registers the manual routing client + server middleware on both client and server configurations. This mirrors Sidekiq::Lock.install! so host apps can add the gem first and opt in to routing per app.



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/sidekiq/routing.rb', line 44

def install!
  require "sidekiq/routing/middleware/client"
  require "sidekiq/routing/middleware/server"

  prepend_routing = ->(chain) { chain.prepend Middleware::Client }

  Sidekiq.configure_client do |config|
    config.client_middleware(&prepend_routing)
  end

  Sidekiq.configure_server do |config|
    config.client_middleware(&prepend_routing)
    config.server_middleware { |chain| install_server_middleware(chain) }
  end
end

.loggerObject



64
65
66
# File 'lib/sidekiq/routing.rb', line 64

def logger
  configuration.logger
end

.mode(klass) ⇒ Object



98
99
100
# File 'lib/sidekiq/routing.rb', line 98

def mode(klass)
  Store.fetch(class_name(klass))&.fetch("mode", nil)
end

.park(klass) ⇒ Object

—- operator API: managing manual routes —-



74
75
76
# File 'lib/sidekiq/routing.rb', line 74

def park(klass)
  write(klass, MODE_PARK)
end

.parked?(klass) ⇒ Boolean

Returns:

  • (Boolean)


90
91
92
# File 'lib/sidekiq/routing.rb', line 90

def parked?(klass)
  mode(klass) == MODE_PARK
end

.parked_breakdown(sample: configuration.breakdown_sample_size) ⇒ Object

{ “SomeJob” => { “count” => 12, “by_original_queue” => { “within_1_minute” => 12 } } }

Scans at most ‘sample` jobs (default Configuration#breakdown_sample_size), not the whole queue: the parking queue can hold millions during a flood and this is called on every Web tab load. The result is a distribution over the sampled head, not exact totals — use parked_size (O(1) LLEN) for the true total. Pass sample: nil to scan all.



136
137
138
139
140
141
142
143
144
145
146
147
148
149
# File 'lib/sidekiq/routing.rb', line 136

def parked_breakdown(sample: configuration.breakdown_sample_size)
  result = Hash.new { |h, k| h[k] = { "count" => 0, "by_original_queue" => Hash.new(0) } }
  scanned = 0
  Sidekiq::Queue.new(parked_queue).each do |job|
    break if sample && scanned >= sample

    klass = job.display_class
    original = job.item[ORIGINAL_QUEUE_KEY] || "unknown"
    result[klass]["count"] += 1
    result[klass]["by_original_queue"][original] += 1
    scanned += 1
  end
  result
end

.parked_queueObject



68
69
70
# File 'lib/sidekiq/routing.rb', line 68

def parked_queue
  configuration.parked_queue
end

.parked_sizeObject

—- parking queue introspection —-



126
127
128
# File 'lib/sidekiq/routing.rb', line 126

def parked_size
  Sidekiq::Queue.new(parked_queue).size
end

.process_parked(klass: nil, limit: nil, batch_size: nil) ⇒ Object



157
158
159
# File 'lib/sidekiq/routing.rb', line 157

def process_parked(klass: nil, limit: nil, batch_size: nil)
  ParkedProcessor.new.call(klass: klass && class_name(klass), limit:, batch_size:)
end

.reset_cache!Object



117
118
119
120
121
122
# File 'lib/sidekiq/routing.rb', line 117

def reset_cache!
  @snapshot_mutex.synchronize do
    @snapshot = nil
    @snapshot_at = nil
  end
end

.route_for(klass_or_name) ⇒ Object

Returns the route hash (“mode”=>…) for a class, or nil. Reads from a process-local snapshot of all routes, refreshed at most once per cache_ttl_seconds.



113
114
115
# File 'lib/sidekiq/routing.rb', line 113

def route_for(klass_or_name)
  snapshot[class_name(klass_or_name)]
end

.routed?(klass) ⇒ Boolean

Returns:

  • (Boolean)


94
95
96
# File 'lib/sidekiq/routing.rb', line 94

def routed?(klass)
  !Store.fetch(class_name(klass)).nil?
end

.routesObject

All active manual routes, read straight from Redis (uncached) so operators and the Web tab always see the truth.



104
105
106
# File 'lib/sidekiq/routing.rb', line 104

def routes
  Store.all
end

.setup {|configuration| ... } ⇒ Object

Yields:



37
38
39
# File 'lib/sidekiq/routing.rb', line 37

def setup
  yield configuration if block_given?
end

.sweep(klass, queue: nil, limit: nil, batch_size: nil) ⇒ Object

—- recovery (thin wrappers; logic in Sweeper/ParkedProcessor) —-



153
154
155
# File 'lib/sidekiq/routing.rb', line 153

def sweep(klass, queue: nil, limit: nil, batch_size: nil)
  Sweeper.new.call(class_name(klass), queue:, limit:, batch_size:)
end

.unpark(klass) ⇒ Object



82
83
84
85
86
87
88
# File 'lib/sidekiq/routing.rb', line 82

def unpark(klass)
  name = class_name(klass)
  Store.delete(name)
  reset_cache!
  logger.warn("[Routing] unparked #{name}")
  name
end