Class: DWH::Adapters::ClickHouse
- Defined in:
- lib/dwh/adapters/click_house.rb
Overview
ClickHouse adapter for executing analytical queries against ClickHouse databases. Uses the ClickHouse HTTP interface (default port 8123) via Faraday.
Constant Summary collapse
- QUERY_FORMAT =
'JSONCompact'.freeze
Constants included from Settings
Constants included from Functions::Dates
Functions::Dates::DATE_CLASSES, Functions::Dates::TIMESTAMPABLE_UNITS
Instance Attribute Summary
Attributes inherited from Adapter
Attributes included from Settings
Instance Method Summary collapse
- #connection ⇒ Object
- #execute(sql, format: :array, retries: 0) ⇒ Object
- #execute_stream(sql, io, stats: nil, retries: 0) ⇒ Object
- #metadata(table, **qualifiers) ⇒ Object
- #stats(table, date_column: nil, **qualifiers) ⇒ Object
-
#stream(sql) {|chunk| ... } ⇒ Object
Executes the given sql and yields the streamed results to the given block.
- #tables(**qualifiers) ⇒ Object
- #test_connection(raise_exception: false) ⇒ Object
Methods inherited from Adapter
#adapter_name, #alter_settings, #close, config, configuration, #connect!, #connect?, #extra_connection_params, #extra_query_params, #initialize, #reset_settings, #table?, #token_expired?, #with_debug, #with_retry
Methods included from Settings
#adapter_name, #aggregate_function?, #aggregate_functions, #load_settings, #reserved?, #reserved_keywords, #settings_file, #settings_file_path, #using_base_settings?
Methods included from Logger
Methods included from Behaviors
#apply_advanced_filtering_on_array_projections?, #cross_universe_measure_filtering_strategy, #extend_ending_date_to_last_hour_of_day?, #final_measure_filter?, #final_pass_measure_join_type, #greedy_apply_date_filters, #intermediate_measure_filter?, #temp_table_prefix, #temp_table_type
Methods included from Functions
#cast, #cross_join, #gsk, #lower_case, #quote, #string_lit, #trim, #upper_case
Methods included from Functions::Arrays
#array_exclude_list, #array_in_list, #array_unnest_join
Methods included from Functions::Nulls
#if_null, #null_if, #null_if_zero
Methods included from Functions::ExtractDatePart
#extract_day_name, #extract_day_of_month, #extract_day_of_week, #extract_day_of_year, #extract_hour, #extract_minute, #extract_month, #extract_month_name, #extract_quarter, #extract_week_of_year, #extract_year, #extract_year_month
Methods included from Functions::Dates
#adjust_week_start_day, #adjust_week_start_day?, #current_date, #current_time, #current_timestamp, #date_add, #date_data_type, #date_diff, #date_format, #date_format_sql, #date_int?, #date_lit, #date_literal, #date_time_format, #date_time_literal, #date_time_tz_format, #default_week_start_day, #timestamp_lit, #timestamp_literal, #truncate_date, #week_start_day, #week_starts_on_sunday?
Methods included from Capabilities
#supports_array_functions?, #supports_common_table_expressions?, #supports_cross_join?, #supports_full_join?, #supports_sub_queries?, #supports_table_join?, #supports_temp_tables?, #supports_window_functions?
Constructor Details
This class inherits a constructor from DWH::Adapters::Adapter
Instance Method Details
#connection ⇒ Object
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/dwh/adapters/click_house.rb', line 29 def connection return @connection if @connection headers = { 'Content-Type' => 'text/plain', 'X-ClickHouse-User' => config[:username], 'X-ClickHouse-Database' => database } headers['X-ClickHouse-Key'] = config[:password] if config[:password] @connection = Faraday.new( url: "#{config[:protocol]}://#{config[:host]}:#{config[:port]}", headers: headers, request: { timeout: config[:query_timeout] } ) @connection end |
#execute(sql, format: :array, retries: 0) ⇒ Object
95 96 97 98 99 100 101 102 |
# File 'lib/dwh/adapters/click_house.rb', line 95 def execute(sql, format: :array, retries: 0) raw = with_debug(sql) { with_retry(retries) { execute_raw(sql) } } format_result(raw, format) rescue ExecutionError raise rescue StandardError => e raise ExecutionError, e. end |
#execute_stream(sql, io, stats: nil, retries: 0) ⇒ Object
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 |
# File 'lib/dwh/adapters/click_house.rb', line 104 def execute_stream(sql, io, stats: nil, retries: 0) with_debug(sql) do with_retry(retries) do raw = execute_raw(sql) cols = raw['meta'].map { it['name'] } io.write(CSV.generate_line(cols)) raw['data'].each do |row| stats << row unless stats.nil? io.write(CSV.generate_line(row)) end end end io.rewind io rescue ExecutionError raise rescue StandardError => e raise ExecutionError, e. end |
#metadata(table, **qualifiers) ⇒ Object
68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/dwh/adapters/click_house.rb', line 68 def (table, **qualifiers) db = qualifiers[:schema] || database full_table = db ? "#{db}.#{table}" : table # DESCRIBE returns: name, type, default_type, default_expression, comment, codec_expression, ttl_expression res = execute_raw("DESCRIBE TABLE #{full_table}") db_table = Table.new(table, schema: db) res['data'].each do |row| db_table << Column.new(name: row[0], data_type: row[1]) end db_table end |
#stats(table, date_column: nil, **qualifiers) ⇒ Object
80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/dwh/adapters/click_house.rb', line 80 def stats(table, date_column: nil, **qualifiers) db = qualifiers[:schema] || database full_table = db ? "#{db}.#{table}" : table sql = +'SELECT count() AS row_count' sql << ", min(#{date_column}) AS date_start, max(#{date_column}) AS date_end" if date_column sql << " FROM #{full_table}" row = execute_raw(sql)['data'][0] TableStats.new( row_count: row[0].to_i, date_start: date_column ? safe_parse_date(row[1]) : nil, date_end: date_column ? safe_parse_date(row[2]) : nil ) end |
#stream(sql) {|chunk| ... } ⇒ Object
Executes the given sql and yields the streamed results to the given block.
125 126 127 128 129 |
# File 'lib/dwh/adapters/click_house.rb', line 125 def stream(sql, &block) with_debug(sql) do execute_raw(sql)['data'].each(&block) end end |
#tables(**qualifiers) ⇒ Object
62 63 64 65 66 |
# File 'lib/dwh/adapters/click_house.rb', line 62 def tables(**qualifiers) db = qualifiers[:schema] || database sql = "SELECT name FROM system.tables WHERE database = '#{db}' AND engine NOT IN ('View', 'MaterializedView')" execute_raw(sql)['data'].flatten end |
#test_connection(raise_exception: false) ⇒ Object
48 49 50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/dwh/adapters/click_house.rb', line 48 def test_connection(raise_exception: false) res = connection.get('/ping') unless res.success? && res.body.strip == 'Ok.' raise ConnectionError, "ClickHouse ping returned: #{res.body}" if raise_exception return false end true rescue Faraday::ConnectionFailed => e raise ConnectionError, e. if raise_exception false end |