Class: NNQ::Engine::Reconnect

Inherits:
Object
  • Object
show all
Defined in:
lib/nnq/engine/reconnect.rb

Overview

Schedules reconnect attempts with exponential back-off.

Runs a background task that loops until a connection is established or the engine is closed. Caller is non-blocking: #connect returns immediately and the actual dial happens inside the task.

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(engine, endpoint, options) ⇒ Reconnect

Returns a new instance of Reconnect.



48
49
50
51
52
# File 'lib/nnq/engine/reconnect.rb', line 48

def initialize(engine, endpoint, options)
  @engine   = engine
  @endpoint = endpoint
  @options  = options
end

Class Method Details

.schedule(endpoint, options, parent_task, engine, delay: nil) ⇒ Object

Parameters:

  • endpoint (String)
  • options (Options)
  • parent_task (Async::Task)
  • engine (Engine)
  • delay (Numeric, nil) (defaults to: nil)

    initial delay (defaults to reconnect_interval)



43
44
45
# File 'lib/nnq/engine/reconnect.rb', line 43

def self.schedule(endpoint, options, parent_task, engine, delay: nil)
  new(engine, endpoint, options).run(parent_task, delay: delay)
end

Instance Method Details

#run(parent_task, delay: nil) ⇒ Object



55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/nnq/engine/reconnect.rb', line 55

def run(parent_task, delay: nil)
  delay, max_delay = init_delay(delay)

  parent_task.async(transient: true, annotation: "nnq reconnect #{@endpoint}") do
    loop do
      break if @engine.closed?
      sleep quantized_wait(delay) if delay > 0
      break if @engine.closed?
      begin
        @engine.transport_for(@endpoint).connect(@endpoint, @engine)
        break
      rescue *CONNECTION_FAILED, *CONNECTION_LOST => e
        delay = next_delay(delay, max_delay)
        @engine.emit_monitor_event(:connect_retried, endpoint: @endpoint, detail: { interval: delay, error: e })
      end
    end
  rescue Async::Stop
  end
end