Class: Wurk::Client::Buffered::Drainer

Inherits:
Object
  • Object
show all
Defined in:
lib/wurk/client/buffered.rb

Overview

Background drain thread. Wakes every ‘interval` seconds and tries `Buffered.drain!` against a fresh Wurk::Client. drain! already short-circuits on the first ConnectionError, so a still-down Redis just leaves the buffer alone for this tick — no exponential backoff or explicit “reconnect detection” needed; the inner connection retry already lives inside `client.raw_push`.

Constant Summary collapse

DEFAULT_INTERVAL =
2.0
STOP_JOIN_TIMEOUT =
5.0

Instance Method Summary collapse

Constructor Details

#initialize(interval: DEFAULT_INTERVAL, client_factory: -> { Wurk::Client.new }) ⇒ Drainer

Returns a new instance of Drainer.



233
234
235
236
237
238
239
240
241
242
243
244
# File 'lib/wurk/client/buffered.rb', line 233

def initialize(interval: DEFAULT_INTERVAL, client_factory: -> { Wurk::Client.new })
  unless interval.is_a?(Numeric) && interval.positive?
    raise ArgumentError, 'interval must be a positive Numeric'
  end

  @interval = interval
  @client_factory = client_factory
  @done = false
  @thread = nil
  @wake = ConditionVariable.new
  @lock = Mutex.new
end

Instance Method Details

#running?Boolean

Returns:

  • (Boolean)


267
268
269
# File 'lib/wurk/client/buffered.rb', line 267

def running?
  @thread&.alive? == true
end

#startObject



246
247
248
249
250
251
252
253
254
255
256
# File 'lib/wurk/client/buffered.rb', line 246

def start
  @lock.synchronize do
    return if @thread&.alive?

    @done = false
    @thread = Thread.new do
      Thread.current.name = 'wurk-reliable_push-drainer'
      run
    end
  end
end

#stopObject



258
259
260
261
262
263
264
265
# File 'lib/wurk/client/buffered.rb', line 258

def stop
  @lock.synchronize do
    @done = true
    @wake.broadcast
  end
  @thread&.join(STOP_JOIN_TIMEOUT)
  @thread = nil
end