Class: DWH::Adapters::Databricks

Inherits:
Adapter
  • Object
show all
Defined in:
lib/dwh/adapters/databricks.rb

Overview

Databricks adapter for executing SQL queries against Databricks SQL warehouses.

Supports OAuth M2M (service principal) authentication only.

Examples:

Connection with OAuth (service principal)

DWH.create(:databricks, {
  host: 'adb-1234567890123456.7.azuredatabricks.net',
  warehouse: 'abc123def456',
  oauth_client_id: 'service-principal-app-id',
  oauth_client_secret: 'your-oauth-secret-here',
  catalog: 'main',
  schema: 'default'
})

Constant Summary collapse

DEFAULT_POLL_INTERVAL =
0.25
MAX_POLL_INTERVAL =
30
STATEMENTS_API =
'/api/2.0/sql/statements'.freeze

Constants included from Settings

Settings::BASE_SETTINGS_FILE

Constants included from Functions::Dates

Functions::Dates::DATE_CLASSES, Functions::Dates::TIMESTAMPABLE_UNITS

Instance Attribute Summary

Attributes inherited from Adapter

#config, #settings

Attributes included from Settings

#adapter_settings

Instance Method Summary collapse

Methods inherited from Adapter

#adapter_name, #alter_settings, #close, config, configuration, #connect!, #connect?, #extra_connection_params, #extra_query_params, #reset_settings, #table?, #token_expired?, #with_debug, #with_retry

Methods included from Settings

#adapter_name, #load_settings, #settings_file, #settings_file_path, #using_base_settings?

Methods included from Logger

#logger, logger

Methods included from Behaviors

#apply_advanced_filtering_on_array_projections?, #cross_universe_measure_filtering_strategy, #extend_ending_date_to_last_hour_of_day?, #final_measure_filter?, #final_pass_measure_join_type, #greedy_apply_date_filters, #intermediate_measure_filter?, #temp_table_prefix, #temp_table_type

Methods included from Functions

#cast, #cross_join, #gsk, #lower_case, #quote, #string_lit, #trim, #upper_case

Methods included from Functions::Arrays

#array_exclude_list, #array_in_list, #array_unnest_join

Methods included from Functions::Nulls

#if_null, #null_if, #null_if_zero

Methods included from Functions::ExtractDatePart

#extract_day_name, #extract_day_of_month, #extract_day_of_week, #extract_day_of_year, #extract_hour, #extract_minute, #extract_month, #extract_month_name, #extract_quarter, #extract_week_of_year, #extract_year, #extract_year_month

Methods included from Functions::Dates

#adjust_week_start_day, #adjust_week_start_day?, #current_date, #current_time, #current_timestamp, #date_add, #date_data_type, #date_diff, #date_format, #date_format_sql, #date_int?, #date_lit, #date_literal, #date_time_format, #date_time_literal, #date_time_tz_format, #default_week_start_day, #timestamp_lit, #timestamp_literal, #truncate_date, #week_start_day, #week_starts_on_sunday?

Methods included from Capabilities

#supports_array_functions?, #supports_common_table_expressions?, #supports_cross_join?, #supports_full_join?, #supports_sub_queries?, #supports_table_join?, #supports_temp_tables?, #supports_window_functions?

Constructor Details

#initialize(config) ⇒ Databricks

Returns a new instance of Databricks.



34
35
36
37
# File 'lib/dwh/adapters/databricks.rb', line 34

def initialize(config)
  super
  validate_auth_config
end

Instance Method Details

#connectionObject



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/dwh/adapters/databricks.rb', line 39

def connection
  return @connection if @connection && !token_expired?

  reset_connection if token_expired?
  @connection = Faraday.new(
    url: "https://#{workspace_host}",
    headers: {
      'Content-Type' => 'application/json',
      'Authorization' => "Bearer #{auth_token}",
      'User-Agent' => config[:client_name]
    },
    request: {
      timeout: config[:query_timeout]
    }.merge(extra_connection_params)
  )
end

#execute(sql, format: :array, retries: 0) ⇒ Array<Array>, ...

Execute sql on the target database.

Parameters:

  • sql (String)

    actual sql

  • format (Symbol, String) (defaults to: :array)

    return format type

    • array returns array of array

    • object returns array of Hashes

    • csv returns as csv

    • native returns the native result from any clients used

      • For example: Postgres using pg client will return PG::Result

      • Http clients will returns the HTTP response object

  • retries (Integer) (defaults to: 0)

    number of retries in case of failure. Default is 0

Returns:

  • (Array<Array>, Hash, CSV, Native)

Raises:



67
68
69
70
71
72
73
74
75
76
# File 'lib/dwh/adapters/databricks.rb', line 67

def execute(sql, format: :array, retries: 0)
  result = with_retry(retries + 1) do
    with_debug(sql) do
      response = submit_query(sql)
      fetch_data(handle_query_response(response))
    end
  end

  format_result(result, format)
end

#execute_stream(sql, io, stats: nil, retries: 0) ⇒ Object



78
79
80
81
82
83
84
85
86
87
88
# File 'lib/dwh/adapters/databricks.rb', line 78

def execute_stream(sql, io, stats: nil, retries: 0)
  with_retry(retries) do
    with_debug(sql) do
      response = submit_query(sql)
      fetch_data(handle_query_response(response), io: io, stats: stats)
    end
  end

  io.rewind
  io
end

#metadata(table, **qualifiers) ⇒ Object

Raises:



113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# File 'lib/dwh/adapters/databricks.rb', line 113

def (table, **qualifiers)
  catalog = qualifiers[:catalog] || config[:catalog]
  schema = qualifiers[:schema] || config[:schema]

  raise ConfigError, 'catalog is required for Databricks metadata query' unless catalog

  db_table = Table.new(table, schema: schema, catalog: catalog)

  sql = <<~SQL
    SELECT column_name, data_type, numeric_precision, numeric_scale, character_maximum_length
    FROM #{catalog}.information_schema.columns
    WHERE table_name = '#{db_table.physical_name}'
  SQL
  sql += " AND table_schema = '#{db_table.schema}'" if db_table.schema

  columns = execute(sql)

  columns.each do |col|
    db_table << Column.new(
      name: col[0]&.downcase,
      data_type: col[1]&.downcase,
      precision: col[2],
      scale: col[3],
      max_char_length: col[4]
    )
  end

  db_table
end

#stats(table, date_column: nil) ⇒ Object



143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
# File 'lib/dwh/adapters/databricks.rb', line 143

def stats(table, date_column: nil)
  date_fields = if date_column
                  ", MIN(#{date_column}) AS date_start, MAX(#{date_column}) AS date_end"
                else
                  ', NULL AS date_start, NULL AS date_end'
                end

  data = execute("SELECT COUNT(*) AS row_count#{date_fields} FROM #{table}")
  cols = data.first

  TableStats.new(
    row_count: cols[0],
    date_start: cols[1],
    date_end: cols[2]
  )
end

#stream(sql) {|chunk| ... } ⇒ Object

Execute SQL query and yield streamed results

Parameters:

  • sql (String)

    SQL query to execute

Yields:

  • (chunk)

    yields each chunk of data as it’s processed



93
94
95
96
97
98
# File 'lib/dwh/adapters/databricks.rb', line 93

def stream(sql, &block)
  with_debug(sql) do
    response = submit_query(sql)
    fetch_data(handle_query_response(response), proc: block)
  end
end

#tables(**qualifiers) ⇒ Object

Raises:



100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/dwh/adapters/databricks.rb', line 100

def tables(**qualifiers)
  catalog = qualifiers[:catalog] || config[:catalog]
  schema = qualifiers[:schema] || config[:schema]

  raise ConfigError, 'catalog is required for Databricks tables query' unless catalog

  sql = "SELECT table_name FROM #{catalog}.information_schema.tables"
  sql += " WHERE table_schema = '#{schema}'" if schema

  result = execute(sql)
  result.flatten
end

#test_connection(raise_exception: false) ⇒ Object



56
57
58
59
60
61
62
63
64
# File 'lib/dwh/adapters/databricks.rb', line 56

def test_connection(raise_exception: false)
  execute('SELECT 1')
  true
rescue StandardError => e
  raise ConnectionError, "Failed to connect to Databricks: #{e.message}" if raise_exception

  logger.error "Connection test failed: #{e.message}"
  false
end