Class: DWH::Adapters::Adapter Abstract

Inherits:
Object
  • Object
show all
Extended by:
Settings
Includes:
Behaviors, Capabilities, Functions, Logger
Defined in:
lib/dwh/adapters.rb

Overview

This class is abstract.

Adapters base class. All adapters should inherit from this class and implement these core required methods:

Adapter implementations can declare configuration options, defaults, and whether it is required. This is a class level method. They will be validated and a ConfigError will be raised if there is an issue. Methods not implemented will raise NotImplementedError

Additionally, if certain setting need to be overridden you can add a settings file in a relative directory like so: settings/my_adapter.yml. Or, you can specify an exact settings file location at the class level:

class MyAdapter < DWH::Adapters::Adapter
  settings_file_path "my_dir/my_settings.yml"
end

Examples:

class MyAdapter < DWH::Adapters::Adapter
  config :username, String, required: true, message: "login id of the current user"
  config :port, Integer, required: true, default: 5432
end

Constant Summary

Constants included from Settings

Settings::BASE_SETTINGS_FILE

Constants included from Functions::Dates

Functions::Dates::DATE_CLASSES, Functions::Dates::TIMESTAMPABLE_UNITS

Instance Attribute Summary collapse

Attributes included from Settings

#adapter_settings

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Settings

load_settings, settings_file, settings_file_path, using_base_settings?

Methods included from Logger

#logger, logger

Methods included from Behaviors

#apply_advanced_filtering_on_array_projections?, #cross_universe_measure_filtering_strategy, #extend_ending_date_to_last_hour_of_day?, #final_measure_filter?, #final_pass_measure_join_type, #greedy_apply_date_filters, #intermediate_measure_filter?, #temp_table_prefix, #temp_table_type

Methods included from Functions

#cast, #cross_join, #gsk, #lower_case, #quote, #string_lit, #trim, #upper_case

Methods included from Functions::Arrays

#array_exclude_list, #array_in_list, #array_unnest_join

Methods included from Functions::Nulls

#if_null, #null_if, #null_if_zero

Methods included from Functions::ExtractDatePart

#extract_day_name, #extract_day_of_month, #extract_day_of_week, #extract_day_of_year, #extract_hour, #extract_minute, #extract_month, #extract_month_name, #extract_quarter, #extract_week_of_year, #extract_year, #extract_year_month

Methods included from Functions::Dates

#adjust_week_start_day, #adjust_week_start_day?, #current_date, #current_time, #current_timestamp, #date_add, #date_data_type, #date_diff, #date_format, #date_format_sql, #date_int?, #date_lit, #date_literal, #date_time_format, #date_time_literal, #date_time_tz_format, #default_week_start_day, #timestamp_lit, #timestamp_literal, #truncate_date, #week_start_day, #week_starts_on_sunday?

Methods included from Capabilities

#supports_array_functions?, #supports_common_table_expressions?, #supports_cross_join?, #supports_full_join?, #supports_sub_queries?, #supports_table_join?, #supports_temp_tables?, #supports_window_functions?

Constructor Details

#initialize(config) ⇒ Adapter

Returns a new instance of Adapter.



89
90
91
92
93
94
95
96
97
98
99
# File 'lib/dwh/adapters.rb', line 89

def initialize(config)
  @config = config.transform_keys(&:to_sym)
  # Per instance customization of general settings
  # So you can have multiple connections to Trino
  # but exhibit diff behavior
  @settings = self.class.adapter_settings.merge(
    (config[:settings] || {}).transform_keys(&:to_sym)
  )

  valid_config?
end

Instance Attribute Details

#configHash (readonly)

Instance level configurations

Returns:

  • (Hash)

    the actual instance configuration



80
81
82
# File 'lib/dwh/adapters.rb', line 80

def config
  @config
end

#settingsHash (readonly)

This is the actual runtime settings used by the adapter once initialized. During intialization settings could be overridden. Settings are different from configuration in that settings control behaviour and syntax while configuration determines how we connect.

Returns:

  • (Hash)

    symbolized hash of settings



107
108
109
# File 'lib/dwh/adapters.rb', line 107

def settings
  @settings
end

Class Method Details

.config(name, type, options = {}) ⇒ Hash

Define the configurations required for the adapter to connect and query target database.

Parameters:

  • name (String, Symbol)

    name of the configuration

  • type (Constant)

    ruby type of the configuration

  • options (Hash) (defaults to: {})

    options for the config

Options Hash (options):

  • :required (Boolean)

    Whether option is required

  • :default (*)

    The default value

  • :message (String)

    The error message or info displayed

Returns:

  • (Hash)


58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/dwh/adapters.rb', line 58

def self.config(name, type, options = {})
  configuration[name.to_sym] = {
    type: type,
    required: options[:required] || false,
    default: options[:default],
    message: options[:message] || "Invalid or missing parameter: #{name}",
    allowed: options[:allowed] || []
  }

  define_method(name.to_sym) do
    config[name.to_sym]
  end
end

.configurationHash

Get the adapter class level configuration settings

Returns:

  • (Hash)


74
75
76
# File 'lib/dwh/adapters.rb', line 74

def self.configuration
  @configuration ||= {}
end

Instance Method Details

#adapter_nameString

Adapter name from the class name

Returns:

  • (String)


309
310
311
# File 'lib/dwh/adapters.rb', line 309

def adapter_name
  self.class.name.split('::').last.downcase
end

#alter_settings(changes = {}) ⇒ Hash

Allows an already instantiated adapter to change its current settings. this might be useful in situations where behavior needs to be modified on runtime basis.

Returns:

  • (Hash)

    the complete settings with changes merged



113
114
115
116
117
# File 'lib/dwh/adapters.rb', line 113

def alter_settings(changes = {})
  reset_settings unless @original_settings.nil?
  @original_settings = @settings
  @settings.merge!(changes)
end

#closeObject

Close the connection if it was created.



156
157
158
159
# File 'lib/dwh/adapters.rb', line 156

def close
  @connection&.close
  @connection = nil
end

#connect!Boolean

Test connection and raise exception if connection fails.

Returns:

Raises:



145
146
147
# File 'lib/dwh/adapters.rb', line 145

def connect!
  test_connection(raise_exception: true)
end

#connect?Boolean

Tests whether the dtabase can be connected

Returns:



151
152
153
# File 'lib/dwh/adapters.rb', line 151

def connect?
  test_connection(raise_exception: false)
end

#connectionObject

Creates a connection to the target database and returns the connection object or self

Raises:

  • (NotImplementedError)


128
129
130
# File 'lib/dwh/adapters.rb', line 128

def connection
  raise NotImplementedError, "#{self.class} has not implemented method '#{__method__}'"
end

#execute(sql, format: :array, retries: 0) ⇒ Array<Array>, ...

Execute sql on the target database.

Parameters:

  • sql (String)

    actual sql

  • format (Symbol, String) (defaults to: :array)

    return format type

    • array returns array of array

    • object returns array of Hashes

    • csv returns as csv

    • native returns the native result from any clients used

      • For example: Postgres using pg client will return PG::Result

      • Http clients will returns the HTTP response object

  • retries (Integer) (defaults to: 0)

    number of retries in case of failure. Default is 0

Returns:

  • (Array<Array>, Hash, CSV, Native)

Raises:



174
175
176
# File 'lib/dwh/adapters.rb', line 174

def execute(sql, format: :array, retries: 0)
  raise NotImplementedError, "#{self.class} has not implemented method '#{__method__}'"
end

#execute_stream(sql, io, stats: nil, retries: 0) ⇒ IO

Execute sql and stream responses back. Data is writtent out in CSV format to the provided IO object.

Parameters:

  • sql (String)

    actual sql

  • io (IO)

    IO object to write records to

  • stats (StreamingStats) (defaults to: nil)

    collect stats and preview data this is optional

  • retries (Integer) (defaults to: 0)

    number of retries in case of failure

Returns:

  • (IO)

Raises:



187
188
189
# File 'lib/dwh/adapters.rb', line 187

def execute_stream(sql, io, stats: nil, retries: 0)
  raise NotImplementedError, "#{self.class} has not implemented method '#{__method__}'"
end

#extra_connection_paramsHash

If any extra connection params were passed in the config object, this will return it.

Returns:

  • (Hash)

    default empty hash



317
318
319
# File 'lib/dwh/adapters.rb', line 317

def extra_connection_params
  config[:extra_connection_params] || {}
end

#extra_query_paramsHash

If the adapter supports it, will pass on extra query params from the config to the executor.

Returns:

  • (Hash)

    default empty hash



325
326
327
# File 'lib/dwh/adapters.rb', line 325

def extra_query_params
  config[:extra_query_params] || {}
end

#metadata(table, **qualifiers) ⇒ DWH::Table

Get the schema structure of a given a given table_name. Pass in optional catalog and schema info.

Example:

("public.big_table")
("big_table")
("big_table",schema: "public")

Parameters:

  • table (String)
    • table name

  • qualifiers (Hash)

    a customizable set of options

Options Hash (**qualifiers):

  • :catalog (String)

    optional catalog or equivalent name space. will be ignored if the adapter doesn’t support

  • :schema (String)

    optional schema to scope to. will be ignored if the adapter doesn’t support

Returns:

Raises:

  • (NotImplementedError)


260
261
262
# File 'lib/dwh/adapters.rb', line 260

def (table, **qualifiers)
  raise NotImplementedError, "#{self.class} has not implemented method '#{__method__}'"
end

#reset_settingsHash

This returns settings back to its original state prior to running alter_settings.

Returns:

  • (Hash)

    with original settings



122
123
124
# File 'lib/dwh/adapters.rb', line 122

def reset_settings
  @settings = @original_settings if @original_settings
end

#stats(table, date_column: nil, **qualifiers) ⇒ DWH::Table

Returns basic stats of a given table. Will typically include row_count, date_start, and date_end.

Examples:

stats("public.big_table", date_column: "fact_date")
stats("big_table")
stats("big_table",schema: "public")

Parameters:

  • table (String)

    table name

  • date_column (String) (defaults to: nil)

    optional date column to use to find range

  • qualifiers (Hash)

    a customizable set of options

Options Hash (**qualifiers):

  • :catalog (String)

    optional catalog or equivalent name space. will be ignored if the adapter doesn’t support

  • :schema (String)

    optional schema to scope to. will be ignored if the adapter doesn’t support

Returns:

Raises:



218
219
220
# File 'lib/dwh/adapters.rb', line 218

def stats(table, date_column: nil, **qualifiers)
  raise NotImplementedError, "#{self.class} has not implemented method '#{__method__}'"
end

#stream(sql) {|chunk| ... } ⇒ Object

Executes the given sql and yields the streamed results to the given block.

Parameters:

  • sql (String)

    actual sql

Yields:

  • (chunk)

    Yields a streamed chunk as it streams in. The chunk type might vary depending on the target db and settings

Raises:



198
199
200
# File 'lib/dwh/adapters.rb', line 198

def stream(sql, &block)
  raise NotImplementedError, "#{self.class} has not implemented method '#{__method__}'"
end

#table?(table, **qualifiers) ⇒ Boolean

Check if table exists in remote db.

Parameters:

  • qualifiers (Hash)

    a customizable set of options

Options Hash (**qualifiers):

  • :catalog (String)

    optional catalog or equivalent name space. will be ignored if the adapter doesn’t support

  • :schema (String)

    optional schema to scope to. will be ignored if the adapter doesn’t support

Returns:



242
243
244
# File 'lib/dwh/adapters.rb', line 242

def table?(table, **qualifiers)
  tables(**qualifiers).include?(table)
end

#tables(**qualifiers) ⇒ Array<String>

Get all tables available in the target db. It will use the default catalog and schema config only specified here.

Parameters:

  • qualifiers (Hash)

    a customizable set of options

Options Hash (**qualifiers):

  • :catalog (String)

    optional catalog or equivalent name space. will be ignored if the adapter doesn’t support

  • :schema (String)

    optional schema to scope to. will be ignored if the adapter doesn’t support

Returns:

  • (Array<String>)

Raises:

  • (NotImplementedError)


231
232
233
# File 'lib/dwh/adapters.rb', line 231

def tables(**qualifiers)
  raise NotImplementedError, "#{self.class} has not implemented method '#{__method__}'"
end

#test_connection(raise_exception: false) ⇒ Boolean

Tests the connection to the target database and returns true if successful, or raise Exception or false connection object or self

Returns:

Raises:



137
138
139
# File 'lib/dwh/adapters.rb', line 137

def test_connection(raise_exception: false)
  raise NotImplementedError, "#{self.class} has not implemented method '#{__method__}'"
end

#token_expired?Boolean

For adapters that uses access tokens via jwt or oauth this will return wether a tokens is expired

Returns:



332
333
334
# File 'lib/dwh/adapters.rb', line 332

def token_expired?
  @token_expires_at.nil? || Time.now >= @token_expires_at
end

#with_debug(sql, &block) ⇒ Object

Wraps an SQL execution with debug logging. It sill include execution time.

Parameters:

  • sql (String)

    actual sql being executed

Returns:

  • execution results (see #execute)



293
294
295
296
297
298
299
300
301
302
303
304
305
# File 'lib/dwh/adapters.rb', line 293

def with_debug(sql, &block)
  starting = Process.clock_gettime(Process::CLOCK_MONOTONIC)

  logger.debug("=== SQL === \n#{sql}")

  result = block.call

  ending = Process.clock_gettime(Process::CLOCK_MONOTONIC)
  elapsed = ending - starting
  logger.debug("=== FINISHED SQL (#{elapsed.round(1)} secs) ===")

  result
end

#with_retry(max_attempts = 2, &block) ⇒ Object

Will call the block with retries given by the max attempts param. If max attempts is 0, it will just return the block.call

Parameters:

  • max_attempts (Integer) (defaults to: 2)

    max number of retries



269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
# File 'lib/dwh/adapters.rb', line 269

def with_retry(max_attempts = 2, &block)
  return block.call if max_attempts.zero?

  attempts = 0

  begin
    attempts += 1
    block.call
  rescue StandardError => e
    if attempts < max_attempts
      logger.warn "Attempt #{attempts} failed with error: #{e.message}. Retrying..."
      retry
    else
      logger.error "Failed after #{attempts} attempts with error: #{e.message}"
      raise
    end
  end
end