Class: Riemann::Tools::RiemannClientWrapper

Inherits:
Object
  • Object
show all
Includes:
Singleton
Defined in:
lib/riemann/tools/riemann_client_wrapper.rb

Constant Summary collapse

BACKOFF_TMIN =

Minimum delay between reconnection attempts

0.5
BACKOFF_TMAX =

Maximum delay

30.0
BACKOFF_FACTOR =
2
STATE_IDLE =
1
STATE_RUNNING =
2
STATE_DRAINING =
3
ALLOWED_OPTIONS =

These options are transport-related, and SHALL be the same for each tool running in riemann-wrapper. Other options are ignored as far as the wrapper is concerned.

%i[host port timeout tls tls_key tls_cert tls_ca_cert tls_verify tcp tls].freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeRiemannClientWrapper

Returns a new instance of RiemannClientWrapper.



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/riemann/tools/riemann_client_wrapper.rb', line 48

def initialize
  @options = nil
  @queue = Queue.new
  @max_bulk_size = 1000
  @state = STATE_IDLE

  @worker = Thread.new do
    Thread.current.abort_on_exception = true
    backoff_delay = BACKOFF_TMIN

    loop do
      events = []

      events << @queue.pop
      events << @queue.pop while !@queue.empty? && events.size < @max_bulk_size

      client.bulk_send(events)
      backoff_delay = BACKOFF_TMIN
    rescue StandardError => e
      sleep(backoff_delay)

      dropped_count = events.size + @queue.size
      @queue.clear
      warn "Dropped #{dropped_count} event#{'s' if dropped_count > 1} due to #{e}"

      backoff_delay *= BACKOFF_FACTOR
      backoff_delay = BACKOFF_TMAX if backoff_delay > BACKOFF_TMAX
    end
  end

  at_exit { drain }
end

Instance Attribute Details

#optionsObject

Returns the value of attribute options.



35
36
37
# File 'lib/riemann/tools/riemann_client_wrapper.rb', line 35

def options
  @options
end

#stateObject (readonly)

The wrapper manage a single connection to riemann, transport options cannot be adjusted when riemann-wrapper is running, and enqueing events should not happen when the system is tearing down. This is achieved with this simple state machine

idle

–client–> [running] –drain–> [draining]

^                                         |
+-------[#reset (development only)]-------+


24
25
26
# File 'lib/riemann/tools/riemann_client_wrapper.rb', line 24

def state
  @state
end

Instance Method Details

#<<(event) ⇒ Object



104
105
106
107
108
# File 'lib/riemann/tools/riemann_client_wrapper.rb', line 104

def <<(event)
  raise('Cannot queue events while draining') if state == STATE_DRAINING

  @queue << event
end

#clientObject



81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/riemann/tools/riemann_client_wrapper.rb', line 81

def client
  @client ||= begin
    @state = STATE_RUNNING

    r = Riemann::Client.new(
      host: options[:host],
      port: options[:port],
      timeout: options[:timeout],
      ssl: options[:tls],
      key_file: options[:tls_key],
      cert_file: options[:tls_cert],
      ca_file: options[:tls_ca_cert],
      ssl_verify: options[:tls_verify],
    )

    if options[:tcp] || options[:tls]
      r.tcp
    else
      r
    end
  end
end

#drainObject



110
111
112
113
# File 'lib/riemann/tools/riemann_client_wrapper.rb', line 110

def drain
  @state = STATE_DRAINING
  sleep(1) until @queue.empty? || @worker.stop?
end