Module: Wurk::Client::Buffered

Defined in:
lib/wurk/client/buffered.rb

Overview

Pro feature parity: in-process ring buffer that catches enqueue failures during a Redis outage and replays them on the next push. Activated globally — ‘Wurk::Client.reliable_push!`. Buffer is per-process, in-memory only; crash = lost. Does NOT cover batch creation or batch-context pushes (`bid` on payload): BATCH_PUSH has atomic counter side-effects we can’t safely replay.

Spec: docs/target/sidekiq-pro.md §5.

Defined Under Namespace

Modules: InstanceMethods Classes: Drainer, Overflow

Constant Summary collapse

DEFAULT_BUFFER_CAP =
1_000
DRAINING_KEY =
:wurk_reliable_push_draining
OVERFLOW_MODES =

Overflow modes. ‘:drop_oldest` is the spec default (Sidekiq Pro §5 ring buffer). `:raise` lets callers decide what to do on backpressure — Wurk extension surfaced for issue #19’s “over-cap pushes raise so callers can decide” requirement.

%i[drop_oldest raise].freeze
DEFAULT_OVERFLOW_MODE =
:drop_oldest
INSTALL_MUTEX =

Eagerly initialized: ‘||=` inside an accessor is not atomic — two threads racing first-touch could end up holding distinct Mutex instances and lose all synchronization on the shared buffer.

Mutex.new
BUFFER_MUTEX =
Mutex.new

Class Attribute Summary collapse

Class Method Summary collapse

Class Attribute Details

.buffer_client_factoryObject

Returns the value of attribute buffer_client_factory.



44
45
46
# File 'lib/wurk/client/buffered.rb', line 44

def buffer_client_factory
  @buffer_client_factory
end

Class Method Details

.bufferObject

Internal — visible for tests. Treat as private.



165
166
167
# File 'lib/wurk/client/buffered.rb', line 165

def buffer
  @buffer ||= []
end

.buffer_capObject



62
63
64
# File 'lib/wurk/client/buffered.rb', line 62

def buffer_cap
  @buffer_cap ||= DEFAULT_BUFFER_CAP
end

.buffer_cap=(value) ⇒ Object



66
67
68
69
70
71
72
# File 'lib/wurk/client/buffered.rb', line 66

def buffer_cap=(value)
  unless value.is_a?(Integer) && value.positive?
    raise ArgumentError, 'reliable_push_buffer must be a positive Integer'
  end

  @buffer_cap = value
end

.buffer_sizeObject



74
75
76
# File 'lib/wurk/client/buffered.rb', line 74

def buffer_size
  buffer_mutex.synchronize { buffer.size }
end

.drain!(client) ⇒ Object

Drain payloads through ‘raw_push` on the given client. Stops on the first ConnectionError, preserving order at the head of the buffer so the next push retries the same payload. Emits statsd `jobs.recovered.push` per drained payload.



150
151
152
153
154
155
156
157
158
159
160
161
162
# File 'lib/wurk/client/buffered.rb', line 150

def drain!(client)
  drained = 0
  while (payload = pop_head)
    unless attempt_replay(client, payload)
      buffer_mutex.synchronize { buffer.unshift(payload) }
      break
    end

    Wurk::Metrics::Statsd.increment('jobs.recovered.push')
    drained += 1
  end
  drained
end

.drainer_running?Boolean

Returns:

  • (Boolean)


191
192
193
# File 'lib/wurk/client/buffered.rb', line 191

def drainer_running?
  INSTALL_MUTEX.synchronize { @drainer&.running? == true }
end

.enbuffer(payloads, client: nil) ⇒ Object

Append payloads to the buffer. Behavior on cap exhaustion depends on ‘overflow_mode`:

* :drop_oldest (default, spec) — ring buffer, oldest evicted.
* :raise                       — Overflow raised, buffer left
                                 unchanged for already-appended
                                 siblings in the same call; the
                                 offending payload is attached
                                 to the exception.

Drops batched payloads — caller is expected to re-raise for those. If client is provided, captures its pool for drainer to use by default.



115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# File 'lib/wurk/client/buffered.rb', line 115

def enbuffer(payloads, client: nil)
  capture_pool_from_client(client)

  cap = buffer_cap
  mode = overflow_mode
  buffer_mutex.synchronize do
    payloads.each do |p|
      if buffer.size >= cap
        raise Overflow, p if mode == :raise

        buffer.shift # :drop_oldest
      end
      buffer << p
    end
  end
end

.install!Object

Idempotent. Prepends the wrapper module into Wurk::Client so push / push_bulk drain the buffer before each call and raw_push catches connection errors. Safe to call from multiple threads.



49
50
51
52
53
54
55
56
# File 'lib/wurk/client/buffered.rb', line 49

def install!
  install_mutex.synchronize do
    return if @installed

    Wurk::Client.prepend(InstanceMethods)
    @installed = true
  end
end

.installed?Boolean

Returns:

  • (Boolean)


58
59
60
# File 'lib/wurk/client/buffered.rb', line 58

def installed?
  @installed == true
end

.overflow_modeObject



78
79
80
# File 'lib/wurk/client/buffered.rb', line 78

def overflow_mode
  @overflow_mode ||= DEFAULT_OVERFLOW_MODE
end

.overflow_mode=(mode) ⇒ Object



82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/wurk/client/buffered.rb', line 82

def overflow_mode=(mode)
  begin
    mode = mode.to_sym
  rescue NoMethodError, TypeError
    raise ArgumentError, "overflow_mode must be one of #{OVERFLOW_MODES.inspect}"
  end

  unless OVERFLOW_MODES.include?(mode)
    raise ArgumentError, "overflow_mode must be one of #{OVERFLOW_MODES.inspect}"
  end

  @overflow_mode = mode
end

.reset!Object



96
97
98
99
100
101
102
103
# File 'lib/wurk/client/buffered.rb', line 96

def reset!
  buffer_mutex.synchronize do
    @buffer = []
    @buffer_cap = nil
    @overflow_mode = nil
    @buffer_client_factory = nil
  end
end

.start_drainer!(interval: Drainer::DEFAULT_INTERVAL, client_factory: nil) ⇒ Object

Start a background drain thread that wakes every ‘interval` seconds and tries to flush the buffer. Idempotent — replaces any prior drainer with one at the new interval. Issue #19 requirement: “Background drain thread flushes on reconnect” —handles the case where push activity stops mid-outage so the passive (drain-on-next-push) path never fires.



175
176
177
178
179
180
181
182
# File 'lib/wurk/client/buffered.rb', line 175

def start_drainer!(interval: Drainer::DEFAULT_INTERVAL, client_factory: nil)
  INSTALL_MUTEX.synchronize do
    @drainer&.stop
    factory = client_factory || buffer_client_factory || -> { Wurk::Client.new }
    @drainer = Drainer.new(interval: interval, client_factory: factory)
    @drainer.start
  end
end

.stop_drainer!Object



184
185
186
187
188
189
# File 'lib/wurk/client/buffered.rb', line 184

def stop_drainer!
  INSTALL_MUTEX.synchronize do
    @drainer&.stop
    @drainer = nil
  end
end