Class: Riemann::Tools::RiemannClientWrapper
- Inherits:
-
Object
- Object
- Riemann::Tools::RiemannClientWrapper
- Includes:
- Singleton
- Defined in:
- lib/riemann/tools/riemann_client_wrapper.rb
Constant Summary collapse
- BACKOFF_TMIN =
Minimum delay between reconnection attempts
0.5- BACKOFF_TMAX =
Maximum delay
30.0- BACKOFF_FACTOR =
2- STATE_IDLE =
1- STATE_RUNNING =
2- STATE_DRAINING =
3- ALLOWED_OPTIONS =
These options are transport-related, and SHALL be the same for each tool running in riemann-wrapper. Other options are ignored as far as the wrapper is concerned.
%i[host port timeout tls tls_key tls_cert tls_ca_cert tls_verify tcp tls].freeze
Instance Attribute Summary collapse
-
#options ⇒ Object
Returns the value of attribute options.
-
#state ⇒ Object
readonly
The wrapper manage a single connection to riemann, transport options cannot be adjusted when riemann-wrapper is running, and enqueing events should not happen when the system is tearing down.
Instance Method Summary collapse
- #<<(event) ⇒ Object
- #client ⇒ Object
- #drain ⇒ Object
-
#initialize ⇒ RiemannClientWrapper
constructor
A new instance of RiemannClientWrapper.
Constructor Details
#initialize ⇒ RiemannClientWrapper
Returns a new instance of RiemannClientWrapper.
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/riemann/tools/riemann_client_wrapper.rb', line 48 def initialize @options = nil @queue = Queue.new @max_bulk_size = 1000 @state = STATE_IDLE @worker = Thread.new do Thread.current.abort_on_exception = true backoff_delay = BACKOFF_TMIN loop do events = [] events << @queue.pop events << @queue.pop while !@queue.empty? && events.size < @max_bulk_size client.bulk_send(events) backoff_delay = BACKOFF_TMIN rescue StandardError => e sleep(backoff_delay) dropped_count = events.size + @queue.size @queue.clear warn "Dropped #{dropped_count} event#{'s' if dropped_count > 1} due to #{e}" backoff_delay *= BACKOFF_FACTOR backoff_delay = BACKOFF_TMAX if backoff_delay > BACKOFF_TMAX end end at_exit { drain } end |
Instance Attribute Details
#options ⇒ Object
Returns the value of attribute options.
35 36 37 |
# File 'lib/riemann/tools/riemann_client_wrapper.rb', line 35 def @options end |
#state ⇒ Object (readonly)
The wrapper manage a single connection to riemann, transport options cannot be adjusted when riemann-wrapper is running, and enqueing events should not happen when the system is tearing down. This is achieved with this simple state machine
- idle
-
–client–> [running] –drain–> [draining]
^ |
+-------[#reset (development only)]-------+
24 25 26 |
# File 'lib/riemann/tools/riemann_client_wrapper.rb', line 24 def state @state end |
Instance Method Details
#<<(event) ⇒ Object
104 105 106 107 108 |
# File 'lib/riemann/tools/riemann_client_wrapper.rb', line 104 def <<(event) raise('Cannot queue events while draining') if state == STATE_DRAINING @queue << event end |
#client ⇒ Object
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 |
# File 'lib/riemann/tools/riemann_client_wrapper.rb', line 81 def client @client ||= begin @state = STATE_RUNNING r = Riemann::Client.new( host: [:host], port: [:port], timeout: [:timeout], ssl: [:tls], key_file: [:tls_key], cert_file: [:tls_cert], ca_file: [:tls_ca_cert], ssl_verify: [:tls_verify], ) if [:tcp] || [:tls] r.tcp else r end end end |
#drain ⇒ Object
110 111 112 113 |
# File 'lib/riemann/tools/riemann_client_wrapper.rb', line 110 def drain @state = STATE_DRAINING sleep(1) until @queue.empty? || @worker.stop? end |