Class: SumologicConnection

Inherits:
Object
  • Object
show all
Defined in:
lib/fluent/plugin/out_sumologic.rb

Constant Summary collapse

COMPRESS_DEFLATE =
'deflate'
COMPRESS_GZIP =
'gzip'

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(endpoint, verify_ssl, connect_timeout, receive_timeout, send_timeout, proxy_uri, disable_cookies, sumo_client, compress_enabled, compress_encoding, logger) ⇒ SumologicConnection

Returns a new instance of SumologicConnection.



16
17
18
19
20
21
22
23
24
25
26
27
# File 'lib/fluent/plugin/out_sumologic.rb', line 16

def initialize(endpoint, verify_ssl, connect_timeout, receive_timeout, send_timeout, proxy_uri, disable_cookies, sumo_client, compress_enabled, compress_encoding, logger)
  @endpoint = endpoint
  @sumo_client = sumo_client
  create_http_client(verify_ssl, connect_timeout, receive_timeout, send_timeout, proxy_uri, disable_cookies)
  @compress = compress_enabled
  @compress_encoding = (compress_encoding ||= COMPRESS_GZIP).downcase
  @logger = logger

  unless [COMPRESS_DEFLATE, COMPRESS_GZIP].include? @compress_encoding
    raise "Invalid compression encoding #{@compress_encoding} must be gzip or deflate"
  end
end

Instance Attribute Details

#httpObject (readonly)

Returns the value of attribute http.



11
12
13
# File 'lib/fluent/plugin/out_sumologic.rb', line 11

def http
  @http
end

Instance Method Details

#compress(content) ⇒ Object



108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/fluent/plugin/out_sumologic.rb', line 108

def compress(content)
  if @compress
    if @compress_encoding == COMPRESS_GZIP
      result = gzip(content)
      result.bytes.to_a.pack("c*")
    else
      Zlib::Deflate.deflate(content)
    end
  else
    content
  end
end

#create_http_client(verify_ssl, connect_timeout, receive_timeout, send_timeout, proxy_uri, disable_cookies) ⇒ Object



97
98
99
100
101
102
103
104
105
106
# File 'lib/fluent/plugin/out_sumologic.rb', line 97

def create_http_client(verify_ssl, connect_timeout, receive_timeout, send_timeout, proxy_uri, disable_cookies)
  @http                        = HTTPClient.new(proxy_uri)
  @http.ssl_config.verify_mode = ssl_options(verify_ssl)
  @http.connect_timeout        = connect_timeout
  @http.receive_timeout        = receive_timeout
  @http.send_timeout           = send_timeout
  if disable_cookies
    @http.cookie_manager       = nil
  end
end

#gzip(content) ⇒ Object

def compress



121
122
123
124
125
126
127
128
129
# File 'lib/fluent/plugin/out_sumologic.rb', line 121

def gzip(content)
  stream = StringIO.new("w")
  stream.set_encoding("ASCII")
  gz = Zlib::GzipWriter.new(stream)
  gz.mtime=1  # Ensure that for same content there is same output
  gz.write(content)
  gz.close
  stream.string.bytes.to_a.pack("c*")
end

#publish(raw_data, source_host = nil, source_category = nil, source_name = nil, data_type, metric_data_type, collected_fields, dimensions) ⇒ Object



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/fluent/plugin/out_sumologic.rb', line 29

def publish(raw_data, source_host=nil, source_category=nil, source_name=nil, data_type, metric_data_type, collected_fields, dimensions)
  response = http.post(@endpoint, compress(raw_data), request_headers(source_host, source_category, source_name, data_type, metric_data_type, collected_fields, dimensions))
  unless response.ok?
    raise RuntimeError, "Failed to send data to HTTP Source. #{response.code} - #{response.body}"
  end

  # response is 20x, check response content
  return if response.content.length == 0
  
  # if we get a non-empty response, check it 
  begin
    response_map = JSON.load(response.content)
  rescue JSON::ParserError
    @logger.warn "Error decoding receiver response: #{response.content}"
    return
  end

  # log a warning with the present keys
  response_keys = ["id", "code", "status", "message", "errors"]
  log_params = []
  response_keys.each do |key|
    if response_map.has_key?(key) then
      value = response_map[key]
      log_params.append("#{key}: #{value}")
    end
  end
  log_params_str = log_params.join(", ")
  @logger.warn "There was an issue sending data: #{log_params_str}"
end

#request_headers(source_host, source_category, source_name, data_type, metric_data_format, collected_fields, dimensions) ⇒ Object



59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/fluent/plugin/out_sumologic.rb', line 59

def request_headers(source_host, source_category, source_name, data_type, metric_data_format, collected_fields, dimensions)
  headers = {
      'X-Sumo-Name'     => source_name,
      'X-Sumo-Category' => source_category,
      'X-Sumo-Host'     => source_host,
      'X-Sumo-Client'   => @sumo_client,
  }

  if @compress
    headers['Content-Encoding'] = @compress_encoding
  end

  if data_type == 'metrics'
    case metric_data_format
    when 'graphite'
      headers['Content-Type'] = 'application/vnd.sumologic.graphite'
    when 'carbon2'
      headers['Content-Type'] = 'application/vnd.sumologic.carbon2'
    when 'prometheus'
      headers['Content-Type'] = 'application/vnd.sumologic.prometheus'
    else
      raise RuntimeError, "Invalid #{metric_data_format}, must be graphite or carbon2 or prometheus"
    end

    unless dimensions.nil?
      headers['X-Sumo-Dimensions'] = dimensions
    end
  end
  unless collected_fields.nil?
    headers['X-Sumo-Fields'] = collected_fields
  end
  return headers
end

#ssl_options(verify_ssl) ⇒ Object



93
94
95
# File 'lib/fluent/plugin/out_sumologic.rb', line 93

def ssl_options(verify_ssl)
  verify_ssl==true ? OpenSSL::SSL::VERIFY_PEER : OpenSSL::SSL::VERIFY_NONE
end