Class: Aikido::Zen::APIStream

Inherits:
Object
  • Object
show all
Defined in:
lib/aikido/zen/api_stream.rb

Instance Method Summary collapse

Constructor Details

#initialize(config: Aikido::Zen.config, min_backoff: 5, max_backoff: 60, backoff_reset: 30, open_timeout: 5, write_timeout: open_timeout, read_timeout: 70) ⇒ APIStream

Returns a new instance of APIStream.



9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/aikido/zen/api_stream.rb', line 9

def initialize(
  config: Aikido::Zen.config,
  min_backoff: 5,
  max_backoff: 60,
  backoff_reset: 30,
  open_timeout: 5,
  write_timeout: open_timeout,
  read_timeout: 70
)
  @config = config
  @min_backoff = min_backoff
  @max_backoff = max_backoff
  @backoff_reset = backoff_reset
  @open_timeout = open_timeout
  @write_timeout = write_timeout
  @read_timeout = read_timeout

  @running = Concurrent::AtomicBoolean.new
  @thread = nil
  @http = nil

  endpoint = @config.realtime_settings_updates_endpoint

  @host = endpoint.host
  @port = endpoint.port
  @use_ssl = endpoint.scheme == "https"
  @token = @config.api_token

  @handlers = Concurrent::Array.new
end

Instance Method Details

#can_connect?Boolean

Returns whether we could connect to the realtime endpoint.

Returns:

  • (Boolean)

    whether we could connect to the realtime endpoint



41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/aikido/zen/api_stream.rb', line 41

def can_connect?
  http = Net::HTTP.new(@host, @port)
  http.use_ssl = @use_ssl
  http.open_timeout = 5
  http.write_timeout = 5
  http.read_timeout = 5
  http.max_retries = 0

  request = Net::HTTP::Get.new("/config")
  request["Authorization"] = @token

  begin
    http.request(request)

    return true
  rescue Timeout::Error, SocketError, IOError, SystemCallError, OpenSSL::OpenSSLError => err
    @config.logger.debug("Error probing realtime endpoint: #{err.class}: #{err.message}")
  rescue => err
    @config.logger.error("Error probing realtime endpoint: #{err.class}: #{err.message}")
  end

  false
end

#handle(type, &block) ⇒ Object

Raises:

  • (ArgumentError)


130
131
132
133
134
135
136
# File 'lib/aikido/zen/api_stream.rb', line 130

def handle(type, &block)
  raise ArgumentError, "block required" unless block

  @handlers << proc do |event|
    block.call(event) if type === event[:type]
  end
end

#running?Boolean Also known as: started?

Returns:

  • (Boolean)


65
66
67
# File 'lib/aikido/zen/api_stream.rb', line 65

def running?
  @running.true?
end

#start!Object



70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# File 'lib/aikido/zen/api_stream.rb', line 70

def start!
  return false unless @running.make_true

  @thread = Thread.new do
    backoff = @min_backoff

    while running?
      time_before = Process.clock_gettime(Process::CLOCK_MONOTONIC, :second)

      begin
        work
      rescue IOError => err
        @config.logger.debug("Error in API stream: #{err.class}: #{err.message}") if running?
      rescue Timeout::Error, SocketError, SystemCallError, OpenSSL::OpenSSLError => err
        @config.logger.debug("Error in API stream: #{err.class}: #{err.message}")
      rescue => err
        @config.logger.error("Error in API stream: #{err.class}: #{err.message}")
      end

      break unless running?

      time_after = Process.clock_gettime(Process::CLOCK_MONOTONIC, :second)

      backoff = if time_after - time_before > @backoff_reset
        @min_backoff
      else
        [backoff * 2, @max_backoff].min
      end

      jitter = rand * backoff / 2

      @config.logger.debug("API stream reconnecting in %d seconds" % (backoff + jitter).ceil)

      sleep(backoff + jitter)
    end
  end

  true
end

#stop!Object



110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# File 'lib/aikido/zen/api_stream.rb', line 110

def stop!
  return false unless @running.make_false

  begin
    @http&.finish
  rescue IOError
    # ignore error
  end

  begin
    @thread&.wakeup
  rescue ThreadError
    # ignore error
  end

  @thread.join(@read_timeout)

  true
end