Class: Wurk::Client
- Inherits:
-
Object
- Object
- Wurk::Client
- Includes:
- JobUtil
- Defined in:
- lib/wurk/client.rb,
lib/wurk/client/buffered.rb
Overview
Enqueue interface. Pipelined LPUSH / ZADD writes against the canonical Sidekiq Redis schema — never change keys, JSON shape, or score format here: wire-compat is sacred.
Spec: docs/target/sidekiq-free.md §7.
Defined Under Namespace
Modules: Buffered
Constant Summary collapse
- DEFAULT_BATCH_SIZE =
Sidekiq mirrors these exactly. Tests against the upstream parity suite depend on the magic numbers, not just behavior.
1_000- SCHEDULED_BATCH_SIZE =
100- SPREAD_INTERVAL_FLOOR =
5
Constants included from JobUtil
JobUtil::RETRY_FOR_MAX, JobUtil::TRANSIENT_ATTRIBUTES
Instance Attribute Summary collapse
-
#redis_pool ⇒ Object
Returns the value of attribute redis_pool.
Class Method Summary collapse
- .enqueue(klass) ⇒ Object
- .enqueue_in(interval, klass) ⇒ Object
- .enqueue_to(queue, klass) ⇒ Object
- .enqueue_to_in(queue, interval, klass) ⇒ Object
- .push(item) ⇒ Object
- .push_bulk(items) ⇒ Object
-
.reliable_push! ⇒ Object
Activate reliable_push! mode globally.
- .reliable_push? ⇒ Boolean
- .reliable_push_buffer ⇒ Object
- .reliable_push_buffer=(value) ⇒ Object
-
.reliable_push_drainer(interval: Buffered::Drainer::DEFAULT_INTERVAL) ⇒ Object
Start an opt-in background drainer thread.
- .reliable_push_drainer_running? ⇒ Boolean
- .reliable_push_drainer_stop! ⇒ Object
- .reliable_push_overflow ⇒ Object
- .reliable_push_overflow=(mode) ⇒ Object
-
.via(pool) ⇒ Object
Thread-local pool override.
Instance Method Summary collapse
-
#cancel!(jid) ⇒ Object
Marks an IterableJob as cancelled.
-
#flush_batched(payloads) ⇒ Object
Flush batched payloads (each carrying a ‘bid`) to Redis in one pipeline.
-
#initialize(pool: nil, config: nil, chain: nil) ⇒ Client
constructor
A new instance of Client.
-
#middleware {|copy| ... } ⇒ Object
Returns the chain (or a duplicate when a block is given, matching Sidekiq).
-
#push(item) ⇒ String?
Jid; nil when client middleware halts the push.
-
#push_bulk(items) ⇒ Array<String, nil>
Jids in submission order; nil entries mark middleware-halted jobs.
Methods included from JobUtil
#normalize_item, #now_in_millis, #validate, #verify_json
Constructor Details
#initialize(pool: nil, config: nil, chain: nil) ⇒ Client
Returns a new instance of Client.
24 25 26 27 28 |
# File 'lib/wurk/client.rb', line 24 def initialize(pool: nil, config: nil, chain: nil) @config = config || Wurk.configuration @redis_pool = pool @chain = chain || @config.client_middleware end |
Instance Attribute Details
#redis_pool ⇒ Object
Returns the value of attribute redis_pool.
22 23 24 |
# File 'lib/wurk/client.rb', line 22 def redis_pool @redis_pool end |
Class Method Details
.enqueue(klass) ⇒ Object
91 |
# File 'lib/wurk/client.rb', line 91 def enqueue(klass, *) = klass.perform_async(*) |
.enqueue_in(interval, klass) ⇒ Object
101 102 103 |
# File 'lib/wurk/client.rb', line 101 def enqueue_in(interval, klass, *) klass.perform_in(interval, *) end |
.enqueue_to(queue, klass) ⇒ Object
93 94 95 |
# File 'lib/wurk/client.rb', line 93 def enqueue_to(queue, klass, *) klass.set(queue: queue.to_s).perform_async(*) end |
.enqueue_to_in(queue, interval, klass) ⇒ Object
97 98 99 |
# File 'lib/wurk/client.rb', line 97 def enqueue_to_in(queue, interval, klass, *) klass.set(queue: queue.to_s).perform_in(interval, *) end |
.push(item) ⇒ Object
89 |
# File 'lib/wurk/client.rb', line 89 def push(item) = new.push(item) |
.push_bulk(items) ⇒ Object
90 |
# File 'lib/wurk/client.rb', line 90 def push_bulk(items) = new.push_bulk(items) |
.reliable_push! ⇒ Object
Activate reliable_push! mode globally. Idempotent — call from the top level of an initializer (NOT inside Wurk.configure_*). Spec: docs/target/sidekiq-pro.md §5.
325 326 327 328 |
# File 'lib/wurk/client/buffered.rb', line 325 def reliable_push! # rubocop:disable Naming/PredicateMethod Buffered.install! true end |
.reliable_push? ⇒ Boolean
330 331 332 |
# File 'lib/wurk/client/buffered.rb', line 330 def reliable_push? Buffered.installed? end |
.reliable_push_buffer ⇒ Object
334 335 336 |
# File 'lib/wurk/client/buffered.rb', line 334 def reliable_push_buffer Buffered.buffer_cap end |
.reliable_push_buffer=(value) ⇒ Object
338 339 340 |
# File 'lib/wurk/client/buffered.rb', line 338 def reliable_push_buffer=(value) Buffered.buffer_cap = value end |
.reliable_push_drainer(interval: Buffered::Drainer::DEFAULT_INTERVAL) ⇒ Object
Start an opt-in background drainer thread. Implicitly enables reliable_push! so callers don’t have to chain the two. Idempotent; calling again replaces the thread with one at the new interval. Spec for reliable_push (sidekiq-pro.md §5) only requires drain on next push — this is a Wurk extension for issue #19’s “Background drain thread flushes on reconnect” so producer-stopped-mid-outage buffers don’t sit idle until next push.
357 358 359 360 361 |
# File 'lib/wurk/client/buffered.rb', line 357 def reliable_push_drainer(interval: Buffered::Drainer::DEFAULT_INTERVAL) Buffered.install! Buffered.start_drainer!(interval: interval) true end |
.reliable_push_drainer_running? ⇒ Boolean
367 368 369 |
# File 'lib/wurk/client/buffered.rb', line 367 def reliable_push_drainer_running? Buffered.drainer_running? end |
.reliable_push_drainer_stop! ⇒ Object
363 364 365 |
# File 'lib/wurk/client/buffered.rb', line 363 def reliable_push_drainer_stop! Buffered.stop_drainer! end |
.reliable_push_overflow ⇒ Object
342 343 344 |
# File 'lib/wurk/client/buffered.rb', line 342 def reliable_push_overflow Buffered.overflow_mode end |
.reliable_push_overflow=(mode) ⇒ Object
346 347 348 |
# File 'lib/wurk/client/buffered.rb', line 346 def reliable_push_overflow=(mode) Buffered.overflow_mode = mode end |
.via(pool) ⇒ Object
Thread-local pool override. Re-entrant calls are rejected — Sidekiq raises here too, because nested ‘via` would silently shadow. The begin/ensure guards the slot so a raise on entry doesn’t clear the outer caller’s pool.
109 110 111 112 113 114 115 116 117 118 119 |
# File 'lib/wurk/client.rb', line 109 def via(pool) raise ArgumentError, 'pool is required' if pool.nil? raise 'Wurk::Client.via is not re-entrant' if Thread.current[:wurk_via_pool] Thread.current[:wurk_via_pool] = pool begin yield ensure Thread.current[:wurk_via_pool] = nil end end |
Instance Method Details
#cancel!(jid) ⇒ Object
Marks an IterableJob as cancelled. Returns the Unix epoch timestamp written. Field name + epoch-second value mirror Sidekiq::IterableJob#cancel! exactly. TTL = CANCELLATION_PERIOD so other workers observe the flag well after the dashboard click that issued the cancel.
68 69 70 71 72 73 74 75 76 77 |
# File 'lib/wurk/client.rb', line 68 def cancel!(jid) raise ArgumentError, 'jid must be a non-empty String' if jid.nil? || jid.to_s.empty? ts = ::Process.clock_gettime(::Process::CLOCK_REALTIME).to_i pool.with do |conn| conn.call('HSET', "it-#{jid}", 'cancelled', ts) conn.call('EXPIRE', "it-#{jid}", Wurk::IterableJob::CANCELLATION_PERIOD) end ts end |
#flush_batched(payloads) ⇒ Object
Flush batched payloads (each carrying a ‘bid`) to Redis in one pipeline. Public entry point for Wurk::Batch’s autoflush buffer — see #push_batched for the per-job BATCH_PUSH semantics it reuses.
82 83 84 85 86 |
# File 'lib/wurk/client.rb', line 82 def flush_batched(payloads) return if payloads.empty? pool.with { |conn| push_batched_pipelined(conn, payloads, now_in_millis) } end |
#middleware {|copy| ... } ⇒ Object
Returns the chain (or a duplicate when a block is given, matching Sidekiq).
31 32 33 34 35 36 37 |
# File 'lib/wurk/client.rb', line 31 def middleware return @chain unless block_given? copy = @chain.dup yield copy copy end |
#push(item) ⇒ String?
Returns jid; nil when client middleware halts the push.
40 41 42 43 44 45 46 47 48 49 |
# File 'lib/wurk/client.rb', line 40 def push(item) normed = normalize_item(item) payload = invoke_chain(normed) return nil unless payload verify_json(payload) raw_push([payload]) emit_enqueued([payload]) payload['jid'] end |
#push_bulk(items) ⇒ Array<String, nil>
Returns jids in submission order; nil entries mark middleware-halted jobs.
53 54 55 56 57 58 59 60 61 62 |
# File 'lib/wurk/client.rb', line 53 def push_bulk(items) args = items['args'] || items[:args] validate_bulk_shape!(items, args) return [] if args.empty? at_values = (items, args.size) batch_sz = items['batch_size'] || items[:batch_size] || (at_values ? SCHEDULED_BATCH_SIZE : DEFAULT_BATCH_SIZE) base = bulk_base(items) flush_bulk(args, at_values, base, batch_sz) end |