Class: DWH::Adapters::ClickHouse

Inherits:
Adapter
  • Object
show all
Defined in:
lib/dwh/adapters/click_house.rb

Overview

ClickHouse adapter for executing analytical queries against ClickHouse databases. Uses the ClickHouse HTTP interface (default port 8123) via Faraday.

Examples:

Basic local connection

DWH.create(:clickhouse, { host: 'localhost', database: 'default' })

With authentication

DWH.create(:clickhouse, {
  host: 'my-clickhouse.example.com',
  port: 8443,
  protocol: 'https',
  database: 'analytics',
  username: 'analyst',
  password: 'secret'
})

Constant Summary collapse

QUERY_FORMAT =
'JSONCompact'.freeze

Constants included from Settings

Settings::BASE_SETTINGS_FILE

Constants included from Functions::Dates

Functions::Dates::DATE_CLASSES, Functions::Dates::TIMESTAMPABLE_UNITS

Instance Attribute Summary

Attributes inherited from Adapter

#config, #settings

Attributes included from Settings

#adapter_settings

Instance Method Summary collapse

Methods inherited from Adapter

#adapter_name, #alter_settings, #close, config, configuration, #connect!, #connect?, #extra_connection_params, #extra_query_params, #initialize, #reset_settings, #table?, #token_expired?, #with_debug, #with_retry

Methods included from Settings

#adapter_name, #aggregate_function?, #aggregate_functions, #load_settings, #reserved?, #reserved_keywords, #settings_file, #settings_file_path, #using_base_settings?

Methods included from Logger

#logger, logger

Methods included from Behaviors

#apply_advanced_filtering_on_array_projections?, #cross_universe_measure_filtering_strategy, #extend_ending_date_to_last_hour_of_day?, #final_measure_filter?, #final_pass_measure_join_type, #greedy_apply_date_filters, #intermediate_measure_filter?, #temp_table_prefix, #temp_table_type

Methods included from Functions

#cast, #cross_join, #gsk, #lower_case, #quote, #string_lit, #trim, #upper_case

Methods included from Functions::Arrays

#array_exclude_list, #array_in_list, #array_unnest_join

Methods included from Functions::Nulls

#if_null, #null_if, #null_if_zero

Methods included from Functions::ExtractDatePart

#extract_day_name, #extract_day_of_month, #extract_day_of_week, #extract_day_of_year, #extract_hour, #extract_minute, #extract_month, #extract_month_name, #extract_quarter, #extract_week_of_year, #extract_year, #extract_year_month

Methods included from Functions::Dates

#adjust_week_start_day, #adjust_week_start_day?, #current_date, #current_time, #current_timestamp, #date_add, #date_data_type, #date_diff, #date_format, #date_format_sql, #date_int?, #date_lit, #date_literal, #date_time_format, #date_time_literal, #date_time_tz_format, #default_week_start_day, #timestamp_lit, #timestamp_literal, #truncate_date, #week_start_day, #week_starts_on_sunday?

Methods included from Capabilities

#supports_array_functions?, #supports_common_table_expressions?, #supports_cross_join?, #supports_full_join?, #supports_sub_queries?, #supports_table_join?, #supports_temp_tables?, #supports_window_functions?

Constructor Details

This class inherits a constructor from DWH::Adapters::Adapter

Instance Method Details

#connectionObject



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/dwh/adapters/click_house.rb', line 29

def connection
  return @connection if @connection

  headers = {
    'Content-Type' => 'text/plain',
    'X-ClickHouse-User' => config[:username],
    'X-ClickHouse-Database' => database
  }
  headers['X-ClickHouse-Key'] = config[:password] if config[:password]

  @connection = Faraday.new(
    url: "#{config[:protocol]}://#{config[:host]}:#{config[:port]}",
    headers: headers,
    request: { timeout: config[:query_timeout] }
  )

  @connection
end

#execute(sql, format: :array, retries: 0) ⇒ Object



95
96
97
98
99
100
101
102
# File 'lib/dwh/adapters/click_house.rb', line 95

def execute(sql, format: :array, retries: 0)
  raw = with_debug(sql) { with_retry(retries) { execute_raw(sql) } }
  format_result(raw, format)
rescue ExecutionError
  raise
rescue StandardError => e
  raise ExecutionError, e.message
end

#execute_stream(sql, io, stats: nil, retries: 0) ⇒ Object



104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/dwh/adapters/click_house.rb', line 104

def execute_stream(sql, io, stats: nil, retries: 0)
  with_debug(sql) do
    with_retry(retries) do
      raw = execute_raw(sql)
      cols = raw['meta'].map { it['name'] }
      io.write(CSV.generate_line(cols))
      raw['data'].each do |row|
        stats << row unless stats.nil?
        io.write(CSV.generate_line(row))
      end
    end
  end
  io.rewind
  io
rescue ExecutionError
  raise
rescue StandardError => e
  raise ExecutionError, e.message
end

#metadata(table, **qualifiers) ⇒ Object



68
69
70
71
72
73
74
75
76
77
78
# File 'lib/dwh/adapters/click_house.rb', line 68

def (table, **qualifiers)
  db = qualifiers[:schema] || database
  full_table = db ? "#{db}.#{table}" : table
  # DESCRIBE returns: name, type, default_type, default_expression, comment, codec_expression, ttl_expression
  res = execute_raw("DESCRIBE TABLE #{full_table}")
  db_table = Table.new(table, schema: db)
  res['data'].each do |row|
    db_table << Column.new(name: row[0], data_type: row[1])
  end
  db_table
end

#stats(table, date_column: nil, **qualifiers) ⇒ Object



80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/dwh/adapters/click_house.rb', line 80

def stats(table, date_column: nil, **qualifiers)
  db = qualifiers[:schema] || database
  full_table = db ? "#{db}.#{table}" : table
  sql = +'SELECT count() AS row_count'
  sql << ", min(#{date_column}) AS date_start, max(#{date_column}) AS date_end" if date_column
  sql << " FROM #{full_table}"

  row = execute_raw(sql)['data'][0]
  TableStats.new(
    row_count: row[0].to_i,
    date_start: date_column ? safe_parse_date(row[1]) : nil,
    date_end: date_column ? safe_parse_date(row[2]) : nil
  )
end

#stream(sql) {|chunk| ... } ⇒ Object

Executes the given sql and yields the streamed results to the given block.

Parameters:

  • sql (String)

    actual sql

Yields:

  • (chunk)

    Yields a streamed chunk as it streams in. The chunk type might vary depending on the target db and settings

Raises:



125
126
127
128
129
# File 'lib/dwh/adapters/click_house.rb', line 125

def stream(sql, &block)
  with_debug(sql) do
    execute_raw(sql)['data'].each(&block)
  end
end

#tables(**qualifiers) ⇒ Object



62
63
64
65
66
# File 'lib/dwh/adapters/click_house.rb', line 62

def tables(**qualifiers)
  db = qualifiers[:schema] || database
  sql = "SELECT name FROM system.tables WHERE database = '#{db}' AND engine NOT IN ('View', 'MaterializedView')"
  execute_raw(sql)['data'].flatten
end

#test_connection(raise_exception: false) ⇒ Object



48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/dwh/adapters/click_house.rb', line 48

def test_connection(raise_exception: false)
  res = connection.get('/ping')
  unless res.success? && res.body.strip == 'Ok.'
    raise ConnectionError, "ClickHouse ping returned: #{res.body}" if raise_exception

    return false
  end
  true
rescue Faraday::ConnectionFailed => e
  raise ConnectionError, e.message if raise_exception

  false
end