Class: UmbrellioUtils::ClickHouse::Backends::Base

Inherits:
Object
  • Object
show all
Includes:
Singleton
Defined in:
lib/umbrellio_utils/click_house/backends/base.rb

Overview

Abstract backend. Each concrete backend (Legacy for the ‘click_house` gem, Native for the `clickhouse-native` gem) implements the low-level ops (execute / query / insert / describe_table / server_version / tables / create_database / drop_database / config / logger) and a SERVER_ERROR constant used by `log_errors`.

Direct Known Subclasses

Legacy, Native

Instance Method Summary collapse

Instance Method Details

#count(dataset) ⇒ Object



33
34
35
# File 'lib/umbrellio_utils/click_house/backends/base.rb', line 33

def count(dataset)
  query_value(dataset.select(SQL.ch_count))
end

#create_database(name, if_not_exists: false, cluster: nil, engine: nil) ⇒ Object



41
42
43
44
45
46
47
48
49
50
51
# File 'lib/umbrellio_utils/click_house/backends/base.rb', line 41

def create_database(name, if_not_exists: false, cluster: nil, engine: nil)
  admin_execute(
    format(
      "CREATE DATABASE %<exists>s %<name>s %<cluster>s %<engine>s",
      exists: if_not_exists ? "IF NOT EXISTS" : "",
      name:,
      cluster: cluster ? "ON CLUSTER #{cluster}" : "",
      engine: engine ? "ENGINE = #{engine}" : "",
    ),
  )
end

#db_nameObject



37
38
39
# File 'lib/umbrellio_utils/click_house/backends/base.rb', line 37

def db_name
  config.database.to_sym
end

#drop_database(name, if_exists: false, cluster: nil) ⇒ Object



53
54
55
56
57
58
59
60
61
62
# File 'lib/umbrellio_utils/click_house/backends/base.rb', line 53

def drop_database(name, if_exists: false, cluster: nil)
  admin_execute(
    format(
      "DROP DATABASE %<exists>s %<name>s %<cluster>s",
      exists: if_exists ? "IF EXISTS" : "",
      name:,
      cluster: cluster ? "ON CLUSTER #{cluster}" : "",
    ),
  )
end

#drop_table!(table_name, db_name: self.db_name) ⇒ Object



82
83
84
# File 'lib/umbrellio_utils/click_house/backends/base.rb', line 82

def drop_table!(table_name, db_name: self.db_name)
  execute("DROP TABLE #{db_name}.#{table_name} #{on_cluster(sync: true)}")
end

#from(source, db_name: self.db_name) ⇒ Object

Concrete backends implement the low-level ops (execute / query / insert / describe_table / server_version / tables / admin_execute / config / logger) and define SERVER_ERROR.



20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/umbrellio_utils/click_house/backends/base.rb', line 20

def from(source, db_name: self.db_name)
  ds =
    case source
    when Symbol
      DB.from(db_name == self.db_name ? SQL[source] : SQL[db_name][source])
    when nil
      DB.dataset
    else
      DB.from(source)
    end
  ds.clone(ch: true)
end

#on_cluster(sync: false) ⇒ Object

Returns the ‘ON CLUSTER <name> [SYNC]` clause for DDL, or “” if `UmbrellioUtils.config.clickhouse_cluster` is blank or we’re in a Rails test env. Test-env suppression saves hundreds of ms per DDL on a single-node CH (each ON CLUSTER op blocks waiting for replicas that don’t exist). The cluster name is still used by callers like Distributed engine declarations, regardless of this clause.



71
72
73
74
75
76
# File 'lib/umbrellio_utils/click_house/backends/base.rb', line 71

def on_cluster(sync: false)
  name = UmbrellioUtils.config.clickhouse_cluster
  return "" if name.blank?
  return "" if defined?(Rails) && Rails.env.test?
  sync ? "ON CLUSTER #{name} SYNC" : "ON CLUSTER #{name}"
end

#optimize_table!(table_name, db_name: self.db_name) ⇒ Object



86
87
88
89
90
# File 'lib/umbrellio_utils/click_house/backends/base.rb', line 86

def optimize_table!(table_name, db_name: self.db_name)
  Timeout.timeout(UmbrellioUtils.config.ch_optimize_timeout) do
    execute("OPTIMIZE TABLE #{db_name}.#{table_name} #{on_cluster} FINAL")
  end
end

#parse_value(value, type:) ⇒ Object



92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/umbrellio_utils/click_house/backends/base.rb', line 92

def parse_value(value, type:)
  case type
  when /Array/ then Array.wrap(value)
  when /DateTime/
    case value
    when String then value.present? ? Time.zone.parse(value) : nil
    else value
    end
  when /String/ then value&.to_s
  else value
  end
end

#pg_table_connection(table, schema: "public") ⇒ Object



105
106
107
108
109
110
111
112
113
114
115
# File 'lib/umbrellio_utils/click_house/backends/base.rb', line 105

def pg_table_connection(table, schema: "public")
  host = ENV["PGHOST"] || DB.opts[:host].presence || "localhost"
  port = DB.opts[:port] || 5432
  # Etc.getlogin returns "root" under non-TTY shells (e.g. rake from
  # a CI runner), which is almost never a real PG role. Prefer $USER.
   = ENV["USER"].presence || Etc.getlogin
  database = DB.opts[:database].presence || 
  username = DB.opts[:user].presence || 
  password = DB.opts[:password]
  SQL.func(:postgresql, "#{host}:#{port}", database, table, username, password, schema)
end

#populate_temp_table!(temp_table_name, dataset, schema: "public") ⇒ Object



117
118
119
120
121
122
# File 'lib/umbrellio_utils/click_house/backends/base.rb', line 117

def populate_temp_table!(temp_table_name, dataset, schema: "public")
  execute(<<~SQL.squish)
    INSERT INTO TABLE FUNCTION #{DB.literal(pg_table_connection(temp_table_name, schema:))}
    #{dataset.sql}
  SQL
end

#truncate_table!(table_name, db_name: self.db_name) ⇒ Object



78
79
80
# File 'lib/umbrellio_utils/click_house/backends/base.rb', line 78

def truncate_table!(table_name, db_name: self.db_name)
  execute("TRUNCATE TABLE #{db_name}.#{table_name} #{on_cluster(sync: true)}")
end

#with_temp_table(dataset, temp_table_name:, primary_key: [:id], primary_key_types: [:integer]) ⇒ Object



124
125
126
127
128
129
130
131
132
133
134
# File 'lib/umbrellio_utils/click_house/backends/base.rb', line 124

def with_temp_table(
  dataset, temp_table_name:, primary_key: [:id], primary_key_types: [:integer], **, &
)
  unless DB.table_exists?(temp_table_name)
    UmbrellioUtils::Database.create_temp_table(
      nil, primary_key:, primary_key_types:, temp_table_name:, &
    )
    populate_temp_table!(temp_table_name, dataset)
  end
  UmbrellioUtils::Database.with_temp_table(nil, primary_key:, temp_table_name:, **, &)
end