Class: Fluent::Plugin::DruidQueryInput

Inherits:
Input
  • Object
show all
Defined in:
lib/fluent/plugin/in_druid_query.rb

Constant Summary collapse

NAME =
'druid_query'
DEFAULT_TAG =
NAME
DEFAULT_INTERVAL =
300
DEFAULT_DRUID_URL =
'http://localhost:8888'
DEFAULT_DRUID_USER =
nil
DEFAULT_DRUID_PASSWORD =
nil
DEFAULT_DRUID_VERIFY_SSL =
true
DEFAULT_DRUID_USER_AGENT =
NAME
DEFAULT_DRUID_TIMEOUT =
30
DEFAULT_CA_CERT =
nil
DEFAULT_QUERY_CACHE =
true
DEFAULT_QUERY_SUBTAG =
nil
DEFAULT_QUERY_GENERATE_RECORD =
true
DEFAULT_QUERY_GENERATE_INFO =
false

Instance Method Summary collapse

Instance Method Details

#check_druid_informationObject

Raises:

  • (Fluent::ConfigError)


89
90
91
# File 'lib/fluent/plugin/in_druid_query.rb', line 89

def check_druid_information
  raise Fluent::ConfigError, 'druid_url should not be empty' if druid_url.empty?
end

#check_druid_queriesObject

Raises:

  • (Fluent::ConfigError)


93
94
95
# File 'lib/fluent/plugin/in_druid_query.rb', line 93

def check_druid_queries
  raise Fluent::ConfigError, 'queries should not be empty' if queries.empty?
end

#configure(conf) ⇒ Object

Raises:

  • (Fluent::ConfigError)


79
80
81
82
83
84
85
86
87
# File 'lib/fluent/plugin/in_druid_query.rb', line 79

def configure(conf)
  super

  raise Fluent::ConfigError, 'tag should not be empty' if tag.empty?
  raise Fluent::ConfigError, 'tag_info should not be empty' if tag_info.empty?

  check_druid_information
  check_druid_queries
end

#druid_clientObject



130
131
132
133
134
135
136
137
138
139
140
141
# File 'lib/fluent/plugin/in_druid_query.rb', line 130

def druid_client
  @druid_client ||= DruidClient::Api.new(
    url: druid_url,
    username: druid_user,
    password: druid_password,
    user_agent: druid_user_agent,
    verify_ssl: druid_verify_ssl,
    timeout: druid_timeout,
    ca_file: ca_cert,
    log: log
  )
end

#emit_query_info(query:, response:, query_time: Fluent::Engine.now) ⇒ Object



155
156
157
158
159
160
161
162
163
164
165
166
167
168
# File 'lib/fluent/plugin/in_druid_query.rb', line 155

def emit_query_info(query:, response:, query_time: Fluent::Engine.now)
  return unless query.generate_info

  current_tag = [tag_info, query.subtag].compact.join('.')
  info_record = {
    'timestamp' => query_time.to_time.utc.iso8601(3),
    'status' => response.success? ? 'success' : 'failure',
    'status_code' => response.status_code,
    'query_duration' => response.duration
  }
  info_record.update({ 'response_rows_count' => response.body.size }) if response.success?
  info_record.update(query.)
  router.emit(current_tag, query_time, info_record)
end

#emit_query_records(query:, response:, query_time: Fluent::Engine.now) ⇒ Object



143
144
145
146
147
148
149
150
151
152
153
# File 'lib/fluent/plugin/in_druid_query.rb', line 143

def emit_query_records(query:, response:, query_time: Fluent::Engine.now)
  return unless query.generate_record
  return unless response.success?

  current_tag = [tag, query.subtag].compact.join('.')
  query_events = MultiEventStream.new
  response.body.each do |response_entry|
    query_events.add(query_time, response_entry.merge(query.))
  end
  router.emit_stream(current_tag, query_events)
end

#query_cache_context(use_cache: true) ⇒ Object



123
124
125
126
127
128
# File 'lib/fluent/plugin/in_druid_query.rb', line 123

def query_cache_context(use_cache: true)
  {
    useCache: use_cache,
    populateCache: use_cache
  }
end

#run_queriesObject



104
105
106
107
108
109
110
# File 'lib/fluent/plugin/in_druid_query.rb', line 104

def run_queries
  queries.each do |query|
    run_query(query)
  rescue StandardError => e
    log.error "while runnig query: #{query.sql}: #{e}"
  end
end

#run_query(query) ⇒ Object



112
113
114
115
116
117
118
119
120
121
# File 'lib/fluent/plugin/in_druid_query.rb', line 112

def run_query(query)
  query_time = Fluent::Engine.now
  response = druid_client.sql.query(
    query: query.sql,
    header: false,
    context: query_cache_context(use_cache: query.cache)
  )
  emit_query_records(query_time: query_time, query: query, response: response)
  emit_query_info(query_time: query_time, query: query, response: response)
end

#startObject



97
98
99
100
101
102
# File 'lib/fluent/plugin/in_druid_query.rb', line 97

def start
  super

  timer_execute(:run_queries_first, 1, repeat: false, &method(:run_queries)) if interval > 60
  timer_execute(:run_queries, interval, repeat: true, &method(:run_queries))
end