Class: Wurk::Client

Inherits:
Object
  • Object
show all
Includes:
JobUtil
Defined in:
lib/wurk/client.rb,
lib/wurk/client/buffered.rb

Overview

Enqueue interface. Pipelined LPUSH / ZADD writes against the canonical Sidekiq Redis schema — never change keys, JSON shape, or score format here: wire-compat is sacred.

Spec: docs/target/sidekiq-free.md §7.

Defined Under Namespace

Modules: Buffered

Constant Summary collapse

DEFAULT_BATCH_SIZE =

Sidekiq mirrors these exactly. Tests against the upstream parity suite depend on the magic numbers, not just behavior.

1_000
SCHEDULED_BATCH_SIZE =
100
SPREAD_INTERVAL_FLOOR =
5

Constants included from JobUtil

JobUtil::RETRY_FOR_MAX, JobUtil::TRANSIENT_ATTRIBUTES

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from JobUtil

#normalize_item, #now_in_millis, #validate, #verify_json

Constructor Details

#initialize(pool: nil, config: nil, chain: nil) ⇒ Client

Returns a new instance of Client.



24
25
26
27
28
# File 'lib/wurk/client.rb', line 24

def initialize(pool: nil, config: nil, chain: nil)
  @config     = config || Wurk.configuration
  @redis_pool = pool
  @chain      = chain || @config.client_middleware
end

Instance Attribute Details

#redis_poolObject

Returns the value of attribute redis_pool.



22
23
24
# File 'lib/wurk/client.rb', line 22

def redis_pool
  @redis_pool
end

Class Method Details

.enqueue(klass) ⇒ Object



91
# File 'lib/wurk/client.rb', line 91

def enqueue(klass, *) = klass.perform_async(*)

.enqueue_in(interval, klass) ⇒ Object



101
102
103
# File 'lib/wurk/client.rb', line 101

def enqueue_in(interval, klass, *)
  klass.perform_in(interval, *)
end

.enqueue_to(queue, klass) ⇒ Object



93
94
95
# File 'lib/wurk/client.rb', line 93

def enqueue_to(queue, klass, *)
  klass.set(queue: queue.to_s).perform_async(*)
end

.enqueue_to_in(queue, interval, klass) ⇒ Object



97
98
99
# File 'lib/wurk/client.rb', line 97

def enqueue_to_in(queue, interval, klass, *)
  klass.set(queue: queue.to_s).perform_in(interval, *)
end

.push(item) ⇒ Object



89
# File 'lib/wurk/client.rb', line 89

def push(item)        = new.push(item)

.push_bulk(items) ⇒ Object



90
# File 'lib/wurk/client.rb', line 90

def push_bulk(items)  = new.push_bulk(items)

.reliable_push!Object

Activate reliable_push! mode globally. Idempotent — call from the top level of an initializer (NOT inside Wurk.configure_*). Spec: docs/target/sidekiq-pro.md §5.



325
326
327
328
# File 'lib/wurk/client/buffered.rb', line 325

def reliable_push! # rubocop:disable Naming/PredicateMethod
  Buffered.install!
  true
end

.reliable_push?Boolean

Returns:

  • (Boolean)


330
331
332
# File 'lib/wurk/client/buffered.rb', line 330

def reliable_push?
  Buffered.installed?
end

.reliable_push_bufferObject



334
335
336
# File 'lib/wurk/client/buffered.rb', line 334

def reliable_push_buffer
  Buffered.buffer_cap
end

.reliable_push_buffer=(value) ⇒ Object



338
339
340
# File 'lib/wurk/client/buffered.rb', line 338

def reliable_push_buffer=(value)
  Buffered.buffer_cap = value
end

.reliable_push_drainer(interval: Buffered::Drainer::DEFAULT_INTERVAL) ⇒ Object

Start an opt-in background drainer thread. Implicitly enables reliable_push! so callers don’t have to chain the two. Idempotent; calling again replaces the thread with one at the new interval. Spec for reliable_push (sidekiq-pro.md §5) only requires drain on next push — this is a Wurk extension for issue #19’s “Background drain thread flushes on reconnect” so producer-stopped-mid-outage buffers don’t sit idle until next push.



357
358
359
360
361
# File 'lib/wurk/client/buffered.rb', line 357

def reliable_push_drainer(interval: Buffered::Drainer::DEFAULT_INTERVAL)
  Buffered.install!
  Buffered.start_drainer!(interval: interval)
  true
end

.reliable_push_drainer_running?Boolean

Returns:

  • (Boolean)


367
368
369
# File 'lib/wurk/client/buffered.rb', line 367

def reliable_push_drainer_running?
  Buffered.drainer_running?
end

.reliable_push_drainer_stop!Object



363
364
365
# File 'lib/wurk/client/buffered.rb', line 363

def reliable_push_drainer_stop!
  Buffered.stop_drainer!
end

.reliable_push_overflowObject



342
343
344
# File 'lib/wurk/client/buffered.rb', line 342

def reliable_push_overflow
  Buffered.overflow_mode
end

.reliable_push_overflow=(mode) ⇒ Object



346
347
348
# File 'lib/wurk/client/buffered.rb', line 346

def reliable_push_overflow=(mode)
  Buffered.overflow_mode = mode
end

.via(pool) ⇒ Object

Thread-local pool override. Re-entrant calls are rejected — Sidekiq raises here too, because nested ‘via` would silently shadow. The begin/ensure guards the slot so a raise on entry doesn’t clear the outer caller’s pool.

Raises:

  • (ArgumentError)


109
110
111
112
113
114
115
116
117
118
119
# File 'lib/wurk/client.rb', line 109

def via(pool)
  raise ArgumentError, 'pool is required' if pool.nil?
  raise 'Wurk::Client.via is not re-entrant' if Thread.current[:wurk_via_pool]

  Thread.current[:wurk_via_pool] = pool
  begin
    yield
  ensure
    Thread.current[:wurk_via_pool] = nil
  end
end

Instance Method Details

#cancel!(jid) ⇒ Object

Marks an IterableJob as cancelled. Returns the Unix epoch timestamp written. Field name + epoch-second value mirror Sidekiq::IterableJob#cancel! exactly. TTL = CANCELLATION_PERIOD so other workers observe the flag well after the dashboard click that issued the cancel.

Raises:

  • (ArgumentError)


68
69
70
71
72
73
74
75
76
77
# File 'lib/wurk/client.rb', line 68

def cancel!(jid)
  raise ArgumentError, 'jid must be a non-empty String' if jid.nil? || jid.to_s.empty?

  ts = ::Process.clock_gettime(::Process::CLOCK_REALTIME).to_i
  pool.with do |conn|
    conn.call('HSET', "it-#{jid}", 'cancelled', ts)
    conn.call('EXPIRE', "it-#{jid}", Wurk::IterableJob::CANCELLATION_PERIOD)
  end
  ts
end

#flush_batched(payloads) ⇒ Object

Flush batched payloads (each carrying a ‘bid`) to Redis in one pipeline. Public entry point for Wurk::Batch’s autoflush buffer — see #push_batched for the per-job BATCH_PUSH semantics it reuses.



82
83
84
85
86
# File 'lib/wurk/client.rb', line 82

def flush_batched(payloads)
  return if payloads.empty?

  pool.with { |conn| push_batched_pipelined(conn, payloads, now_in_millis) }
end

#middleware {|copy| ... } ⇒ Object

Returns the chain (or a duplicate when a block is given, matching Sidekiq).

Yields:

  • (copy)


31
32
33
34
35
36
37
# File 'lib/wurk/client.rb', line 31

def middleware
  return @chain unless block_given?

  copy = @chain.dup
  yield copy
  copy
end

#push(item) ⇒ String?

Returns jid; nil when client middleware halts the push.

Returns:

  • (String, nil)

    jid; nil when client middleware halts the push.



40
41
42
43
44
45
46
47
48
49
# File 'lib/wurk/client.rb', line 40

def push(item)
  normed  = normalize_item(item)
  payload = invoke_chain(normed)
  return nil unless payload

  verify_json(payload)
  raw_push([payload])
  emit_enqueued([payload])
  payload['jid']
end

#push_bulk(items) ⇒ Array<String, nil>

Returns jids in submission order; nil entries mark middleware-halted jobs.

Parameters:

  • items (Hash)

    keys: class, args (Array<Array>), at?, spread_interval?, batch_size?, jid?

Returns:

  • (Array<String, nil>)

    jids in submission order; nil entries mark middleware-halted jobs.



53
54
55
56
57
58
59
60
61
62
# File 'lib/wurk/client.rb', line 53

def push_bulk(items)
  args = items['args'] || items[:args]
  validate_bulk_shape!(items, args)
  return [] if args.empty?

  at_values = expand_at(items, args.size)
  batch_sz  = items['batch_size'] || items[:batch_size] || (at_values ? SCHEDULED_BATCH_SIZE : DEFAULT_BATCH_SIZE)
  base      = bulk_base(items)
  flush_bulk(args, at_values, base, batch_sz)
end