Class: UmbrellioUtils::ClickHouse::Backends::Base
- Inherits:
-
Object
- Object
- UmbrellioUtils::ClickHouse::Backends::Base
- Includes:
- Singleton
- Defined in:
- lib/umbrellio_utils/click_house/backends/base.rb
Overview
Abstract backend. Each concrete backend (Legacy for the ‘click_house` gem, Native for the `clickhouse-native` gem) implements the low-level ops (execute / query / insert / describe_table / server_version / tables / create_database / drop_database / config / logger) and a SERVER_ERROR constant used by `log_errors`.
Defined Under Namespace
Modules: ClickHouseStringEscaping
Instance Method Summary collapse
- #count(dataset) ⇒ Object
- #create_database(name, if_not_exists: false, cluster: nil, engine: nil) ⇒ Object
- #db_name ⇒ Object
- #drop_database(name, if_exists: false, cluster: nil) ⇒ Object
- #drop_table!(table_name, db_name: self.db_name) ⇒ Object
-
#from(source, db_name: self.db_name) ⇒ Object
Concrete backends implement the low-level ops (execute / query / insert / describe_table / server_version / tables / admin_execute / config / logger) and define SERVER_ERROR.
-
#on_cluster(sync: false) ⇒ Object
Returns the ‘ON CLUSTER <name> [SYNC]` clause for DDL, or “” if `UmbrellioUtils.config.clickhouse_cluster` is blank or we’re in a Rails test env.
- #optimize_table!(table_name, db_name: self.db_name) ⇒ Object
- #parse_value(value, type:) ⇒ Object
- #pg_table_connection(table, schema: "public") ⇒ Object
- #populate_temp_table!(temp_table_name, dataset, schema: "public") ⇒ Object
- #truncate_table!(table_name, db_name: self.db_name) ⇒ Object
- #with_temp_table(dataset, temp_table_name:, primary_key: [:id], primary_key_types: [:integer]) ⇒ Object
Instance Method Details
#count(dataset) ⇒ Object
42 43 44 |
# File 'lib/umbrellio_utils/click_house/backends/base.rb', line 42 def count(dataset) query_value(dataset.select(SQL.ch_count)) end |
#create_database(name, if_not_exists: false, cluster: nil, engine: nil) ⇒ Object
50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/umbrellio_utils/click_house/backends/base.rb', line 50 def create_database(name, if_not_exists: false, cluster: nil, engine: nil) admin_execute( format( "CREATE DATABASE %<exists>s %<name>s %<cluster>s %<engine>s", exists: if_not_exists ? "IF NOT EXISTS" : "", name:, cluster: cluster ? "ON CLUSTER #{cluster}" : "", engine: engine ? "ENGINE = #{engine}" : "", ), ) end |
#db_name ⇒ Object
46 47 48 |
# File 'lib/umbrellio_utils/click_house/backends/base.rb', line 46 def db_name config.database.to_sym end |
#drop_database(name, if_exists: false, cluster: nil) ⇒ Object
62 63 64 65 66 67 68 69 70 71 |
# File 'lib/umbrellio_utils/click_house/backends/base.rb', line 62 def drop_database(name, if_exists: false, cluster: nil) admin_execute( format( "DROP DATABASE %<exists>s %<name>s %<cluster>s", exists: if_exists ? "IF EXISTS" : "", name:, cluster: cluster ? "ON CLUSTER #{cluster}" : "", ), ) end |
#drop_table!(table_name, db_name: self.db_name) ⇒ Object
91 92 93 |
# File 'lib/umbrellio_utils/click_house/backends/base.rb', line 91 def drop_table!(table_name, db_name: self.db_name) execute("DROP TABLE #{db_name}.#{table_name} #{on_cluster(sync: true)}") end |
#from(source, db_name: self.db_name) ⇒ Object
Concrete backends implement the low-level ops (execute / query / insert / describe_table / server_version / tables / admin_execute / config / logger) and define SERVER_ERROR.
29 30 31 32 33 34 35 36 37 38 39 40 |
# File 'lib/umbrellio_utils/click_house/backends/base.rb', line 29 def from(source, db_name: self.db_name) ds = case source when Symbol DB.from(db_name == self.db_name ? SQL[source] : SQL[db_name][source]) when nil DB.dataset else DB.from(source) end ds.clone(ch: true).with_extend(ClickHouseStringEscaping) end |
#on_cluster(sync: false) ⇒ Object
Returns the ‘ON CLUSTER <name> [SYNC]` clause for DDL, or “” if `UmbrellioUtils.config.clickhouse_cluster` is blank or we’re in a Rails test env. Test-env suppression saves hundreds of ms per DDL on a single-node CH (each ON CLUSTER op blocks waiting for replicas that don’t exist). The cluster name is still used by callers like Distributed engine declarations, regardless of this clause.
80 81 82 83 84 85 |
# File 'lib/umbrellio_utils/click_house/backends/base.rb', line 80 def on_cluster(sync: false) name = UmbrellioUtils.config.clickhouse_cluster return "" if name.blank? return "" if defined?(Rails) && Rails.env.test? sync ? "ON CLUSTER #{name} SYNC" : "ON CLUSTER #{name}" end |
#optimize_table!(table_name, db_name: self.db_name) ⇒ Object
95 96 97 98 99 |
# File 'lib/umbrellio_utils/click_house/backends/base.rb', line 95 def optimize_table!(table_name, db_name: self.db_name) Timeout.timeout(UmbrellioUtils.config.ch_optimize_timeout) do execute("OPTIMIZE TABLE #{db_name}.#{table_name} #{on_cluster} FINAL") end end |
#parse_value(value, type:) ⇒ Object
101 102 103 104 105 106 107 108 109 110 111 112 |
# File 'lib/umbrellio_utils/click_house/backends/base.rb', line 101 def parse_value(value, type:) case type when /Array/ then Array.wrap(value) when /DateTime/ case value when String then value.present? ? Time.zone.parse(value) : nil else value end when /String/ then value&.to_s else value end end |
#pg_table_connection(table, schema: "public") ⇒ Object
114 115 116 117 118 119 120 121 122 123 124 |
# File 'lib/umbrellio_utils/click_house/backends/base.rb', line 114 def pg_table_connection(table, schema: "public") host = ENV["PGHOST"] || DB.opts[:host].presence || "localhost" port = DB.opts[:port] || 5432 # Etc.getlogin returns "root" under non-TTY shells (e.g. rake from # a CI runner), which is almost never a real PG role. Prefer $USER. login = ENV["USER"].presence || Etc.getlogin database = DB.opts[:database].presence || login username = DB.opts[:user].presence || login password = DB.opts[:password] SQL.func(:postgresql, "#{host}:#{port}", database, table, username, password, schema) end |
#populate_temp_table!(temp_table_name, dataset, schema: "public") ⇒ Object
126 127 128 129 130 131 |
# File 'lib/umbrellio_utils/click_house/backends/base.rb', line 126 def populate_temp_table!(temp_table_name, dataset, schema: "public") execute(<<~SQL.squish) INSERT INTO TABLE FUNCTION #{DB.literal(pg_table_connection(temp_table_name, schema:))} #{dataset.sql} SQL end |
#truncate_table!(table_name, db_name: self.db_name) ⇒ Object
87 88 89 |
# File 'lib/umbrellio_utils/click_house/backends/base.rb', line 87 def truncate_table!(table_name, db_name: self.db_name) execute("TRUNCATE TABLE #{db_name}.#{table_name} #{on_cluster(sync: true)}") end |
#with_temp_table(dataset, temp_table_name:, primary_key: [:id], primary_key_types: [:integer]) ⇒ Object
133 134 135 136 137 138 139 140 141 142 143 |
# File 'lib/umbrellio_utils/click_house/backends/base.rb', line 133 def with_temp_table( dataset, temp_table_name:, primary_key: [:id], primary_key_types: [:integer], **, & ) unless DB.table_exists?(temp_table_name) UmbrellioUtils::Database.create_temp_table( nil, primary_key:, primary_key_types:, temp_table_name:, & ) populate_temp_table!(temp_table_name, dataset) end UmbrellioUtils::Database.with_temp_table(nil, primary_key:, temp_table_name:, **, &) end |