Class: Aikido::Zen::APIStream
- Inherits:
-
Object
- Object
- Aikido::Zen::APIStream
- Defined in:
- lib/aikido/zen/api_stream.rb
Instance Method Summary collapse
-
#can_connect? ⇒ Boolean
Whether we could connect to the realtime endpoint.
- #handle(type, &block) ⇒ Object
-
#initialize(config: Aikido::Zen.config, min_backoff: 5, max_backoff: 60, backoff_reset: 30, open_timeout: 5, write_timeout: open_timeout, read_timeout: 70) ⇒ APIStream
constructor
A new instance of APIStream.
- #running? ⇒ Boolean (also: #started?)
- #start! ⇒ Object
- #stop! ⇒ Object
Constructor Details
#initialize(config: Aikido::Zen.config, min_backoff: 5, max_backoff: 60, backoff_reset: 30, open_timeout: 5, write_timeout: open_timeout, read_timeout: 70) ⇒ APIStream
Returns a new instance of APIStream.
9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
# File 'lib/aikido/zen/api_stream.rb', line 9 def initialize( config: Aikido::Zen.config, min_backoff: 5, max_backoff: 60, backoff_reset: 30, open_timeout: 5, write_timeout: open_timeout, read_timeout: 70 ) @config = config @min_backoff = min_backoff @max_backoff = max_backoff @backoff_reset = backoff_reset @open_timeout = open_timeout @write_timeout = write_timeout @read_timeout = read_timeout @running = Concurrent::AtomicBoolean.new @thread = nil @http = nil endpoint = @config.realtime_settings_updates_endpoint @host = endpoint.host @port = endpoint.port @use_ssl = endpoint.scheme == "https" @token = @config.api_token @handlers = Concurrent::Array.new end |
Instance Method Details
#can_connect? ⇒ Boolean
Returns whether we could connect to the realtime endpoint.
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/aikido/zen/api_stream.rb', line 41 def can_connect? http = Net::HTTP.new(@host, @port) http.use_ssl = @use_ssl http.open_timeout = 5 http.write_timeout = 5 http.read_timeout = 5 http.max_retries = 0 request = Net::HTTP::Get.new("/config") request["Authorization"] = @token begin http.request(request) return true rescue Timeout::Error, SocketError, IOError, SystemCallError, OpenSSL::OpenSSLError => err @config.logger.debug("Error probing realtime endpoint: #{err.class}: #{err.}") rescue => err @config.logger.error("Error probing realtime endpoint: #{err.class}: #{err.}") end false end |
#handle(type, &block) ⇒ Object
130 131 132 133 134 135 136 |
# File 'lib/aikido/zen/api_stream.rb', line 130 def handle(type, &block) raise ArgumentError, "block required" unless block @handlers << proc do |event| block.call(event) if type === event[:type] end end |
#running? ⇒ Boolean Also known as: started?
65 66 67 |
# File 'lib/aikido/zen/api_stream.rb', line 65 def running? @running.true? end |
#start! ⇒ Object
70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 |
# File 'lib/aikido/zen/api_stream.rb', line 70 def start! return false unless @running.make_true @thread = Thread.new do backoff = @min_backoff while running? time_before = Process.clock_gettime(Process::CLOCK_MONOTONIC, :second) begin work rescue IOError => err @config.logger.debug("Error in API stream: #{err.class}: #{err.}") if running? rescue Timeout::Error, SocketError, SystemCallError, OpenSSL::OpenSSLError => err @config.logger.debug("Error in API stream: #{err.class}: #{err.}") rescue => err @config.logger.error("Error in API stream: #{err.class}: #{err.}") end break unless running? time_after = Process.clock_gettime(Process::CLOCK_MONOTONIC, :second) backoff = if time_after - time_before > @backoff_reset @min_backoff else [backoff * 2, @max_backoff].min end jitter = rand * backoff / 2 @config.logger.debug("API stream reconnecting in %d seconds" % (backoff + jitter).ceil) sleep(backoff + jitter) end end true end |
#stop! ⇒ Object
110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 |
# File 'lib/aikido/zen/api_stream.rb', line 110 def stop! return false unless @running.make_false begin @http&.finish rescue IOError # ignore error end begin @thread&.wakeup rescue ThreadError # ignore error end @thread.join(@read_timeout) true end |