Class: Wurk::Client::Buffered::Drainer
- Inherits:
-
Object
- Object
- Wurk::Client::Buffered::Drainer
- Defined in:
- lib/wurk/client/buffered.rb
Overview
Background drain thread. Wakes every ‘interval` seconds and tries `Buffered.drain!` against a fresh Wurk::Client. drain! already short-circuits on the first ConnectionError, so a still-down Redis just leaves the buffer alone for this tick — no exponential backoff or explicit “reconnect detection” needed; the inner connection retry already lives inside `client.raw_push`.
Constant Summary collapse
- DEFAULT_INTERVAL =
2.0- STOP_JOIN_TIMEOUT =
5.0
Instance Method Summary collapse
-
#initialize(interval: DEFAULT_INTERVAL, client_factory: -> { Wurk::Client.new }) ⇒ Drainer
constructor
A new instance of Drainer.
- #running? ⇒ Boolean
- #start ⇒ Object
- #stop ⇒ Object
Constructor Details
#initialize(interval: DEFAULT_INTERVAL, client_factory: -> { Wurk::Client.new }) ⇒ Drainer
Returns a new instance of Drainer.
233 234 235 236 237 238 239 240 241 242 243 244 |
# File 'lib/wurk/client/buffered.rb', line 233 def initialize(interval: DEFAULT_INTERVAL, client_factory: -> { Wurk::Client.new }) unless interval.is_a?(Numeric) && interval.positive? raise ArgumentError, 'interval must be a positive Numeric' end @interval = interval @client_factory = client_factory @done = false @thread = nil @wake = ConditionVariable.new @lock = Mutex.new end |
Instance Method Details
#running? ⇒ Boolean
267 268 269 |
# File 'lib/wurk/client/buffered.rb', line 267 def running? @thread&.alive? == true end |
#start ⇒ Object
246 247 248 249 250 251 252 253 254 255 256 |
# File 'lib/wurk/client/buffered.rb', line 246 def start @lock.synchronize do return if @thread&.alive? @done = false @thread = Thread.new do Thread.current.name = 'wurk-reliable_push-drainer' run end end end |
#stop ⇒ Object
258 259 260 261 262 263 264 265 |
# File 'lib/wurk/client/buffered.rb', line 258 def stop @lock.synchronize do @done = true @wake.broadcast end @thread&.join(STOP_JOIN_TIMEOUT) @thread = nil end |