Class: UmbrellioUtils::ClickHouse::Backends::Base

Inherits:
Object
  • Object
show all
Includes:
Singleton
Defined in:
lib/umbrellio_utils/click_house/backends/base.rb

Overview

Abstract backend. Each concrete backend (Legacy for the ‘click_house` gem, Native for the `clickhouse-native` gem) implements the low-level ops (execute / query / insert / describe_table / server_version / tables / create_database / drop_database / config / logger) and a SERVER_ERROR constant used by `log_errors`.

Direct Known Subclasses

Legacy, Native

Defined Under Namespace

Modules: ClickHouseStringEscaping

Instance Method Summary collapse

Instance Method Details

#count(dataset) ⇒ Object



42
43
44
# File 'lib/umbrellio_utils/click_house/backends/base.rb', line 42

def count(dataset)
  query_value(dataset.select(SQL.ch_count))
end

#create_database(name, if_not_exists: false, cluster: nil, engine: nil) ⇒ Object



50
51
52
53
54
55
56
57
58
59
60
# File 'lib/umbrellio_utils/click_house/backends/base.rb', line 50

def create_database(name, if_not_exists: false, cluster: nil, engine: nil)
  admin_execute(
    format(
      "CREATE DATABASE %<exists>s %<name>s %<cluster>s %<engine>s",
      exists: if_not_exists ? "IF NOT EXISTS" : "",
      name:,
      cluster: cluster ? "ON CLUSTER #{cluster}" : "",
      engine: engine ? "ENGINE = #{engine}" : "",
    ),
  )
end

#db_nameObject



46
47
48
# File 'lib/umbrellio_utils/click_house/backends/base.rb', line 46

def db_name
  config.database.to_sym
end

#drop_database(name, if_exists: false, cluster: nil) ⇒ Object



62
63
64
65
66
67
68
69
70
71
# File 'lib/umbrellio_utils/click_house/backends/base.rb', line 62

def drop_database(name, if_exists: false, cluster: nil)
  admin_execute(
    format(
      "DROP DATABASE %<exists>s %<name>s %<cluster>s",
      exists: if_exists ? "IF EXISTS" : "",
      name:,
      cluster: cluster ? "ON CLUSTER #{cluster}" : "",
    ),
  )
end

#drop_table!(table_name, db_name: self.db_name) ⇒ Object



91
92
93
# File 'lib/umbrellio_utils/click_house/backends/base.rb', line 91

def drop_table!(table_name, db_name: self.db_name)
  execute("DROP TABLE #{db_name}.#{table_name} #{on_cluster(sync: true)}")
end

#from(source, db_name: self.db_name) ⇒ Object

Concrete backends implement the low-level ops (execute / query / insert / describe_table / server_version / tables / admin_execute / config / logger) and define SERVER_ERROR.



29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/umbrellio_utils/click_house/backends/base.rb', line 29

def from(source, db_name: self.db_name)
  ds =
    case source
    when Symbol
      DB.from(db_name == self.db_name ? SQL[source] : SQL[db_name][source])
    when nil
      DB.dataset
    else
      DB.from(source)
    end
  ds.clone(ch: true).with_extend(ClickHouseStringEscaping)
end

#on_cluster(sync: false) ⇒ Object

Returns the ‘ON CLUSTER <name> [SYNC]` clause for DDL, or “” if `UmbrellioUtils.config.clickhouse_cluster` is blank or we’re in a Rails test env. Test-env suppression saves hundreds of ms per DDL on a single-node CH (each ON CLUSTER op blocks waiting for replicas that don’t exist). The cluster name is still used by callers like Distributed engine declarations, regardless of this clause.



80
81
82
83
84
85
# File 'lib/umbrellio_utils/click_house/backends/base.rb', line 80

def on_cluster(sync: false)
  name = UmbrellioUtils.config.clickhouse_cluster
  return "" if name.blank?
  return "" if defined?(Rails) && Rails.env.test?
  sync ? "ON CLUSTER #{name} SYNC" : "ON CLUSTER #{name}"
end

#optimize_table!(table_name, db_name: self.db_name) ⇒ Object



95
96
97
98
99
# File 'lib/umbrellio_utils/click_house/backends/base.rb', line 95

def optimize_table!(table_name, db_name: self.db_name)
  Timeout.timeout(UmbrellioUtils.config.ch_optimize_timeout) do
    execute("OPTIMIZE TABLE #{db_name}.#{table_name} #{on_cluster} FINAL")
  end
end

#parse_value(value, type:) ⇒ Object



101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/umbrellio_utils/click_house/backends/base.rb', line 101

def parse_value(value, type:)
  case type
  when /Array/ then Array.wrap(value)
  when /DateTime/
    case value
    when String then value.present? ? Time.zone.parse(value) : nil
    else value
    end
  when /String/ then value&.to_s
  else value
  end
end

#pg_table_connection(table, schema: "public") ⇒ Object



114
115
116
117
118
119
120
121
122
123
124
# File 'lib/umbrellio_utils/click_house/backends/base.rb', line 114

def pg_table_connection(table, schema: "public")
  host = ENV["PGHOST"] || DB.opts[:host].presence || "localhost"
  port = DB.opts[:port] || 5432
  # Etc.getlogin returns "root" under non-TTY shells (e.g. rake from
  # a CI runner), which is almost never a real PG role. Prefer $USER.
   = ENV["USER"].presence || Etc.getlogin
  database = DB.opts[:database].presence || 
  username = DB.opts[:user].presence || 
  password = DB.opts[:password]
  SQL.func(:postgresql, "#{host}:#{port}", database, table, username, password, schema)
end

#populate_temp_table!(temp_table_name, dataset, schema: "public") ⇒ Object



126
127
128
129
130
131
# File 'lib/umbrellio_utils/click_house/backends/base.rb', line 126

def populate_temp_table!(temp_table_name, dataset, schema: "public")
  execute(<<~SQL.squish)
    INSERT INTO TABLE FUNCTION #{DB.literal(pg_table_connection(temp_table_name, schema:))}
    #{dataset.sql}
  SQL
end

#truncate_table!(table_name, db_name: self.db_name) ⇒ Object



87
88
89
# File 'lib/umbrellio_utils/click_house/backends/base.rb', line 87

def truncate_table!(table_name, db_name: self.db_name)
  execute("TRUNCATE TABLE #{db_name}.#{table_name} #{on_cluster(sync: true)}")
end

#with_temp_table(dataset, temp_table_name:, primary_key: [:id], primary_key_types: [:integer]) ⇒ Object



133
134
135
136
137
138
139
140
141
142
143
# File 'lib/umbrellio_utils/click_house/backends/base.rb', line 133

def with_temp_table(
  dataset, temp_table_name:, primary_key: [:id], primary_key_types: [:integer], **, &
)
  unless DB.table_exists?(temp_table_name)
    UmbrellioUtils::Database.create_temp_table(
      nil, primary_key:, primary_key_types:, temp_table_name:, &
    )
    populate_temp_table!(temp_table_name, dataset)
  end
  UmbrellioUtils::Database.with_temp_table(nil, primary_key:, temp_table_name:, **, &)
end