Class: DWH::Adapters::Adapter Abstract
- Inherits:
-
Object
- Object
- DWH::Adapters::Adapter
- Extended by:
- Settings
- Includes:
- Behaviors, Capabilities, Functions, Logger
- Defined in:
- lib/dwh/adapters.rb
Overview
Adapters base class. All adapters should inherit from this class and implement these core required methods:
Adapter implementations can declare configuration options, defaults, and whether it is required. This is a class level method. They will be validated and a ConfigError will be raised if there is an issue. Methods not implemented will raise NotImplementedError
Additionally, if certain setting need to be overridden you can add a settings file in a relative directory like so: settings/my_adapter.yml. Or, you can specify an exact settings file location at the class level:
class MyAdapter < DWH::Adapters::Adapter
settings_file_path "my_dir/my_settings.yml"
end
Direct Known Subclasses
Athena, Databricks, Druid, DuckDb, MySql, Postgres, Snowflake, SqlServer, Sqlite, Trino
Constant Summary
Constants included from Settings
Constants included from Functions::Dates
Functions::Dates::DATE_CLASSES, Functions::Dates::TIMESTAMPABLE_UNITS
Instance Attribute Summary collapse
-
#config ⇒ Hash
readonly
Instance level configurations.
-
#settings ⇒ Hash
readonly
This is the actual runtime settings used by the adapter once initialized.
Attributes included from Settings
Class Method Summary collapse
-
.config(name, type, options = {}) ⇒ Hash
Define the configurations required for the adapter to connect and query target database.
-
.configuration ⇒ Hash
Get the adapter class level configuration settings.
Instance Method Summary collapse
-
#adapter_name ⇒ String
Adapter name from the class name.
-
#alter_settings(changes = {}) ⇒ Hash
Allows an already instantiated adapter to change its current settings.
-
#close ⇒ Object
Close the connection if it was created.
-
#connect! ⇒ Boolean
Test connection and raise exception if connection fails.
-
#connect? ⇒ Boolean
Tests whether the dtabase can be connected.
-
#connection ⇒ Object
Creates a connection to the target database and returns the connection object or self.
-
#execute(sql, format: :array, retries: 0) ⇒ Array<Array>, ...
Execute sql on the target database.
-
#execute_stream(sql, io, stats: nil, retries: 0) ⇒ IO
Execute sql and stream responses back.
-
#extra_connection_params ⇒ Hash
If any extra connection params were passed in the config object, this will return it.
-
#extra_query_params ⇒ Hash
If the adapter supports it, will pass on extra query params from the config to the executor.
-
#initialize(config) ⇒ Adapter
constructor
A new instance of Adapter.
-
#metadata(table, **qualifiers) ⇒ DWH::Table
Get the schema structure of a given a given table_name.
-
#reset_settings ⇒ Hash
This returns settings back to its original state prior to running alter_settings.
-
#stats(table, date_column: nil, **qualifiers) ⇒ DWH::Table
Returns basic stats of a given table.
-
#stream(sql) {|chunk| ... } ⇒ Object
Executes the given sql and yields the streamed results to the given block.
-
#table?(table, **qualifiers) ⇒ Boolean
Check if table exists in remote db.
-
#tables(**qualifiers) ⇒ Array<String>
Get all tables available in the target db.
-
#test_connection(raise_exception: false) ⇒ Boolean
Tests the connection to the target database and returns true if successful, or raise Exception or false connection object or self.
-
#token_expired? ⇒ Boolean
For adapters that uses access tokens via jwt or oauth this will return wether a tokens is expired.
-
#with_debug(sql, &block) ⇒ Object
Wraps an SQL execution with debug logging.
-
#with_retry(max_attempts = 2, &block) ⇒ Object
Will call the block with retries given by the max attempts param.
Methods included from Settings
load_settings, settings_file, settings_file_path, using_base_settings?
Methods included from Logger
Methods included from Behaviors
#apply_advanced_filtering_on_array_projections?, #cross_universe_measure_filtering_strategy, #extend_ending_date_to_last_hour_of_day?, #final_measure_filter?, #final_pass_measure_join_type, #greedy_apply_date_filters, #intermediate_measure_filter?, #temp_table_prefix, #temp_table_type
Methods included from Functions
#cast, #cross_join, #gsk, #lower_case, #quote, #string_lit, #trim, #upper_case
Methods included from Functions::Arrays
#array_exclude_list, #array_in_list, #array_unnest_join
Methods included from Functions::Nulls
#if_null, #null_if, #null_if_zero
Methods included from Functions::ExtractDatePart
#extract_day_name, #extract_day_of_month, #extract_day_of_week, #extract_day_of_year, #extract_hour, #extract_minute, #extract_month, #extract_month_name, #extract_quarter, #extract_week_of_year, #extract_year, #extract_year_month
Methods included from Functions::Dates
#adjust_week_start_day, #adjust_week_start_day?, #current_date, #current_time, #current_timestamp, #date_add, #date_data_type, #date_diff, #date_format, #date_format_sql, #date_int?, #date_lit, #date_literal, #date_time_format, #date_time_literal, #date_time_tz_format, #default_week_start_day, #timestamp_lit, #timestamp_literal, #truncate_date, #week_start_day, #week_starts_on_sunday?
Methods included from Capabilities
#supports_array_functions?, #supports_common_table_expressions?, #supports_cross_join?, #supports_full_join?, #supports_sub_queries?, #supports_table_join?, #supports_temp_tables?, #supports_window_functions?
Constructor Details
#initialize(config) ⇒ Adapter
Returns a new instance of Adapter.
89 90 91 92 93 94 95 96 97 98 99 |
# File 'lib/dwh/adapters.rb', line 89 def initialize(config) @config = config.transform_keys(&:to_sym) # Per instance customization of general settings # So you can have multiple connections to Trino # but exhibit diff behavior @settings = self.class.adapter_settings.merge( (config[:settings] || {}).transform_keys(&:to_sym) ) valid_config? end |
Instance Attribute Details
#config ⇒ Hash (readonly)
Instance level configurations
80 81 82 |
# File 'lib/dwh/adapters.rb', line 80 def config @config end |
#settings ⇒ Hash (readonly)
This is the actual runtime settings used by the adapter once initialized. During intialization settings could be overridden. Settings are different from configuration in that settings control behaviour and syntax while configuration determines how we connect.
107 108 109 |
# File 'lib/dwh/adapters.rb', line 107 def settings @settings end |
Class Method Details
.config(name, type, options = {}) ⇒ Hash
Define the configurations required for the adapter to connect and query target database.
58 59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/dwh/adapters.rb', line 58 def self.config(name, type, = {}) configuration[name.to_sym] = { type: type, required: [:required] || false, default: [:default], message: [:message] || "Invalid or missing parameter: #{name}", allowed: [:allowed] || [] } define_method(name.to_sym) do config[name.to_sym] end end |
.configuration ⇒ Hash
Get the adapter class level configuration settings
74 75 76 |
# File 'lib/dwh/adapters.rb', line 74 def self.configuration @configuration ||= {} end |
Instance Method Details
#adapter_name ⇒ String
Adapter name from the class name
309 310 311 |
# File 'lib/dwh/adapters.rb', line 309 def adapter_name self.class.name.split('::').last.downcase end |
#alter_settings(changes = {}) ⇒ Hash
Allows an already instantiated adapter to change its current settings. this might be useful in situations where behavior needs to be modified on runtime basis.
113 114 115 116 117 |
# File 'lib/dwh/adapters.rb', line 113 def alter_settings(changes = {}) reset_settings unless @original_settings.nil? @original_settings = @settings @settings.merge!(changes) end |
#close ⇒ Object
Close the connection if it was created.
156 157 158 159 |
# File 'lib/dwh/adapters.rb', line 156 def close @connection&.close @connection = nil end |
#connect! ⇒ Boolean
Test connection and raise exception if connection fails.
145 146 147 |
# File 'lib/dwh/adapters.rb', line 145 def connect! test_connection(raise_exception: true) end |
#connect? ⇒ Boolean
Tests whether the dtabase can be connected
151 152 153 |
# File 'lib/dwh/adapters.rb', line 151 def connect? test_connection(raise_exception: false) end |
#connection ⇒ Object
Creates a connection to the target database and returns the connection object or self
128 129 130 |
# File 'lib/dwh/adapters.rb', line 128 def connection raise NotImplementedError, "#{self.class} has not implemented method '#{__method__}'" end |
#execute(sql, format: :array, retries: 0) ⇒ Array<Array>, ...
Execute sql on the target database.
174 175 176 |
# File 'lib/dwh/adapters.rb', line 174 def execute(sql, format: :array, retries: 0) raise NotImplementedError, "#{self.class} has not implemented method '#{__method__}'" end |
#execute_stream(sql, io, stats: nil, retries: 0) ⇒ IO
Execute sql and stream responses back. Data is writtent out in CSV format to the provided IO object.
187 188 189 |
# File 'lib/dwh/adapters.rb', line 187 def execute_stream(sql, io, stats: nil, retries: 0) raise NotImplementedError, "#{self.class} has not implemented method '#{__method__}'" end |
#extra_connection_params ⇒ Hash
If any extra connection params were passed in the config object, this will return it.
317 318 319 |
# File 'lib/dwh/adapters.rb', line 317 def extra_connection_params config[:extra_connection_params] || {} end |
#extra_query_params ⇒ Hash
If the adapter supports it, will pass on extra query params from the config to the executor.
325 326 327 |
# File 'lib/dwh/adapters.rb', line 325 def extra_query_params config[:extra_query_params] || {} end |
#metadata(table, **qualifiers) ⇒ DWH::Table
Get the schema structure of a given a given table_name. Pass in optional catalog and schema info.
Example:
("public.big_table")
("big_table")
("big_table",schema: "public")
260 261 262 |
# File 'lib/dwh/adapters.rb', line 260 def (table, **qualifiers) raise NotImplementedError, "#{self.class} has not implemented method '#{__method__}'" end |
#reset_settings ⇒ Hash
This returns settings back to its original state prior to running alter_settings.
122 123 124 |
# File 'lib/dwh/adapters.rb', line 122 def reset_settings @settings = @original_settings if @original_settings end |
#stats(table, date_column: nil, **qualifiers) ⇒ DWH::Table
Returns basic stats of a given table. Will typically include row_count, date_start, and date_end.
218 219 220 |
# File 'lib/dwh/adapters.rb', line 218 def stats(table, date_column: nil, **qualifiers) raise NotImplementedError, "#{self.class} has not implemented method '#{__method__}'" end |
#stream(sql) {|chunk| ... } ⇒ Object
Executes the given sql and yields the streamed results to the given block.
198 199 200 |
# File 'lib/dwh/adapters.rb', line 198 def stream(sql, &block) raise NotImplementedError, "#{self.class} has not implemented method '#{__method__}'" end |
#table?(table, **qualifiers) ⇒ Boolean
Check if table exists in remote db.
242 243 244 |
# File 'lib/dwh/adapters.rb', line 242 def table?(table, **qualifiers) tables(**qualifiers).include?(table) end |
#tables(**qualifiers) ⇒ Array<String>
Get all tables available in the target db. It will use the default catalog and schema config only specified here.
231 232 233 |
# File 'lib/dwh/adapters.rb', line 231 def tables(**qualifiers) raise NotImplementedError, "#{self.class} has not implemented method '#{__method__}'" end |
#test_connection(raise_exception: false) ⇒ Boolean
Tests the connection to the target database and returns true if successful, or raise Exception or false connection object or self
137 138 139 |
# File 'lib/dwh/adapters.rb', line 137 def test_connection(raise_exception: false) raise NotImplementedError, "#{self.class} has not implemented method '#{__method__}'" end |
#token_expired? ⇒ Boolean
For adapters that uses access tokens via jwt or oauth this will return wether a tokens is expired
332 333 334 |
# File 'lib/dwh/adapters.rb', line 332 def token_expired? @token_expires_at.nil? || Time.now >= @token_expires_at end |
#with_debug(sql, &block) ⇒ Object
Wraps an SQL execution with debug logging. It sill include execution time.
293 294 295 296 297 298 299 300 301 302 303 304 305 |
# File 'lib/dwh/adapters.rb', line 293 def with_debug(sql, &block) starting = Process.clock_gettime(Process::CLOCK_MONOTONIC) logger.debug("=== SQL === \n#{sql}") result = block.call ending = Process.clock_gettime(Process::CLOCK_MONOTONIC) elapsed = ending - starting logger.debug("=== FINISHED SQL (#{elapsed.round(1)} secs) ===") result end |
#with_retry(max_attempts = 2, &block) ⇒ Object
Will call the block with retries given by the max attempts param. If max attempts is 0, it will just return the block.call
269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 |
# File 'lib/dwh/adapters.rb', line 269 def with_retry(max_attempts = 2, &block) return block.call if max_attempts.zero? attempts = 0 begin attempts += 1 block.call rescue StandardError => e if attempts < max_attempts logger.warn "Attempt #{attempts} failed with error: #{e.}. Retrying..." retry else logger.error "Failed after #{attempts} attempts with error: #{e.}" raise end end end |