Class: Fluent::Plugin::DruidQueryInput
- Inherits:
-
Input
- Object
- Input
- Fluent::Plugin::DruidQueryInput
- Defined in:
- lib/fluent/plugin/in_druid_query.rb
Constant Summary collapse
- NAME =
'druid_query'- DEFAULT_TAG =
NAME- DEFAULT_INTERVAL =
300- DEFAULT_DRUID_URL =
'http://localhost:8888'- DEFAULT_DRUID_USER =
nil- DEFAULT_DRUID_PASSWORD =
nil- DEFAULT_DRUID_VERIFY_SSL =
true- DEFAULT_DRUID_USER_AGENT =
NAME- DEFAULT_DRUID_TIMEOUT =
30- DEFAULT_CA_CERT =
nil- DEFAULT_QUERY_CACHE =
true- DEFAULT_QUERY_SUBTAG =
nil- DEFAULT_QUERY_GENERATE_RECORD =
true- DEFAULT_QUERY_GENERATE_INFO =
false
Instance Method Summary collapse
- #check_druid_information ⇒ Object
- #check_druid_queries ⇒ Object
- #configure(conf) ⇒ Object
- #druid_client ⇒ Object
- #emit_query_info(query:, response:, query_time: Fluent::Engine.now) ⇒ Object
- #emit_query_records(query:, response:, query_time: Fluent::Engine.now) ⇒ Object
- #query_cache_context(use_cache: true) ⇒ Object
- #run_queries ⇒ Object
- #run_query(query) ⇒ Object
- #start ⇒ Object
Instance Method Details
#check_druid_information ⇒ Object
89 90 91 |
# File 'lib/fluent/plugin/in_druid_query.rb', line 89 def check_druid_information raise Fluent::ConfigError, 'druid_url should not be empty' if druid_url.empty? end |
#check_druid_queries ⇒ Object
93 94 95 |
# File 'lib/fluent/plugin/in_druid_query.rb', line 93 def check_druid_queries raise Fluent::ConfigError, 'queries should not be empty' if queries.empty? end |
#configure(conf) ⇒ Object
79 80 81 82 83 84 85 86 87 |
# File 'lib/fluent/plugin/in_druid_query.rb', line 79 def configure(conf) super raise Fluent::ConfigError, 'tag should not be empty' if tag.empty? raise Fluent::ConfigError, 'tag_info should not be empty' if tag_info.empty? check_druid_information check_druid_queries end |
#druid_client ⇒ Object
130 131 132 133 134 135 136 137 138 139 140 141 |
# File 'lib/fluent/plugin/in_druid_query.rb', line 130 def druid_client @druid_client ||= DruidClient::Api.new( url: druid_url, username: druid_user, password: druid_password, user_agent: druid_user_agent, verify_ssl: druid_verify_ssl, timeout: druid_timeout, ca_file: ca_cert, log: log ) end |
#emit_query_info(query:, response:, query_time: Fluent::Engine.now) ⇒ Object
155 156 157 158 159 160 161 162 163 164 165 166 167 168 |
# File 'lib/fluent/plugin/in_druid_query.rb', line 155 def emit_query_info(query:, response:, query_time: Fluent::Engine.now) return unless query.generate_info current_tag = [tag_info, query.subtag].compact.join('.') info_record = { 'timestamp' => query_time.to_time.utc.iso8601(3), 'status' => response.success? ? 'success' : 'failure', 'status_code' => response.status_code, 'query_duration' => response.duration } info_record.update({ 'response_rows_count' => response.body.size }) if response.success? info_record.update(query.) router.emit(current_tag, query_time, info_record) end |
#emit_query_records(query:, response:, query_time: Fluent::Engine.now) ⇒ Object
143 144 145 146 147 148 149 150 151 152 153 |
# File 'lib/fluent/plugin/in_druid_query.rb', line 143 def emit_query_records(query:, response:, query_time: Fluent::Engine.now) return unless query.generate_record return unless response.success? current_tag = [tag, query.subtag].compact.join('.') query_events = MultiEventStream.new response.body.each do |response_entry| query_events.add(query_time, response_entry.merge(query.)) end router.emit_stream(current_tag, query_events) end |
#query_cache_context(use_cache: true) ⇒ Object
123 124 125 126 127 128 |
# File 'lib/fluent/plugin/in_druid_query.rb', line 123 def query_cache_context(use_cache: true) { useCache: use_cache, populateCache: use_cache } end |
#run_queries ⇒ Object
104 105 106 107 108 109 110 |
# File 'lib/fluent/plugin/in_druid_query.rb', line 104 def run_queries queries.each do |query| run_query(query) rescue StandardError => e log.error "while runnig query: #{query.sql}: #{e}" end end |
#run_query(query) ⇒ Object
112 113 114 115 116 117 118 119 120 121 |
# File 'lib/fluent/plugin/in_druid_query.rb', line 112 def run_query(query) query_time = Fluent::Engine.now response = druid_client.sql.query( query: query.sql, header: false, context: query_cache_context(use_cache: query.cache) ) emit_query_records(query_time: query_time, query: query, response: response) emit_query_info(query_time: query_time, query: query, response: response) end |
#start ⇒ Object
97 98 99 100 101 102 |
# File 'lib/fluent/plugin/in_druid_query.rb', line 97 def start super timer_execute(:run_queries_first, 1, repeat: false, &method(:run_queries)) if interval > 60 timer_execute(:run_queries, interval, repeat: true, &method(:run_queries)) end |