Class: DWH::Adapters::Databricks
- Defined in:
- lib/dwh/adapters/databricks.rb
Overview
Databricks adapter for executing SQL queries against Databricks SQL warehouses.
Supports OAuth M2M (service principal) authentication only.
Constant Summary collapse
- DEFAULT_POLL_INTERVAL =
0.25- MAX_POLL_INTERVAL =
30- STATEMENTS_API =
'/api/2.0/sql/statements'.freeze
Constants included from Settings
Constants included from Functions::Dates
Functions::Dates::DATE_CLASSES, Functions::Dates::TIMESTAMPABLE_UNITS
Instance Attribute Summary
Attributes inherited from Adapter
Attributes included from Settings
Instance Method Summary collapse
- #connection ⇒ Object
-
#execute(sql, format: :array, retries: 0) ⇒ Array<Array>, ...
Execute sql on the target database.
- #execute_stream(sql, io, stats: nil, retries: 0) ⇒ Object
-
#initialize(config) ⇒ Databricks
constructor
A new instance of Databricks.
- #metadata(table, **qualifiers) ⇒ Object
- #stats(table, date_column: nil) ⇒ Object
-
#stream(sql) {|chunk| ... } ⇒ Object
Execute SQL query and yield streamed results.
- #tables(**qualifiers) ⇒ Object
- #test_connection(raise_exception: false) ⇒ Object
Methods inherited from Adapter
#adapter_name, #alter_settings, #close, config, configuration, #connect!, #connect?, #extra_connection_params, #extra_query_params, #reset_settings, #table?, #token_expired?, #with_debug, #with_retry
Methods included from Settings
#adapter_name, #load_settings, #settings_file, #settings_file_path, #using_base_settings?
Methods included from Logger
Methods included from Behaviors
#apply_advanced_filtering_on_array_projections?, #cross_universe_measure_filtering_strategy, #extend_ending_date_to_last_hour_of_day?, #final_measure_filter?, #final_pass_measure_join_type, #greedy_apply_date_filters, #intermediate_measure_filter?, #temp_table_prefix, #temp_table_type
Methods included from Functions
#cast, #cross_join, #gsk, #lower_case, #quote, #string_lit, #trim, #upper_case
Methods included from Functions::Arrays
#array_exclude_list, #array_in_list, #array_unnest_join
Methods included from Functions::Nulls
#if_null, #null_if, #null_if_zero
Methods included from Functions::ExtractDatePart
#extract_day_name, #extract_day_of_month, #extract_day_of_week, #extract_day_of_year, #extract_hour, #extract_minute, #extract_month, #extract_month_name, #extract_quarter, #extract_week_of_year, #extract_year, #extract_year_month
Methods included from Functions::Dates
#adjust_week_start_day, #adjust_week_start_day?, #current_date, #current_time, #current_timestamp, #date_add, #date_data_type, #date_diff, #date_format, #date_format_sql, #date_int?, #date_lit, #date_literal, #date_time_format, #date_time_literal, #date_time_tz_format, #default_week_start_day, #timestamp_lit, #timestamp_literal, #truncate_date, #week_start_day, #week_starts_on_sunday?
Methods included from Capabilities
#supports_array_functions?, #supports_common_table_expressions?, #supports_cross_join?, #supports_full_join?, #supports_sub_queries?, #supports_table_join?, #supports_temp_tables?, #supports_window_functions?
Constructor Details
#initialize(config) ⇒ Databricks
Returns a new instance of Databricks.
34 35 36 37 |
# File 'lib/dwh/adapters/databricks.rb', line 34 def initialize(config) super validate_auth_config end |
Instance Method Details
#connection ⇒ Object
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/dwh/adapters/databricks.rb', line 39 def connection return @connection if @connection && !token_expired? reset_connection if token_expired? @connection = Faraday.new( url: "https://#{workspace_host}", headers: { 'Content-Type' => 'application/json', 'Authorization' => "Bearer #{auth_token}", 'User-Agent' => config[:client_name] }, request: { timeout: config[:query_timeout] }.merge(extra_connection_params) ) end |
#execute(sql, format: :array, retries: 0) ⇒ Array<Array>, ...
Execute sql on the target database.
67 68 69 70 71 72 73 74 75 76 |
# File 'lib/dwh/adapters/databricks.rb', line 67 def execute(sql, format: :array, retries: 0) result = with_retry(retries + 1) do with_debug(sql) do response = submit_query(sql) fetch_data(handle_query_response(response)) end end format_result(result, format) end |
#execute_stream(sql, io, stats: nil, retries: 0) ⇒ Object
78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/dwh/adapters/databricks.rb', line 78 def execute_stream(sql, io, stats: nil, retries: 0) with_retry(retries) do with_debug(sql) do response = submit_query(sql) fetch_data(handle_query_response(response), io: io, stats: stats) end end io.rewind io end |
#metadata(table, **qualifiers) ⇒ Object
113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 |
# File 'lib/dwh/adapters/databricks.rb', line 113 def (table, **qualifiers) catalog = qualifiers[:catalog] || config[:catalog] schema = qualifiers[:schema] || config[:schema] raise ConfigError, 'catalog is required for Databricks metadata query' unless catalog db_table = Table.new(table, schema: schema, catalog: catalog) sql = <<~SQL SELECT column_name, data_type, numeric_precision, numeric_scale, character_maximum_length FROM #{catalog}.information_schema.columns WHERE table_name = '#{db_table.physical_name}' SQL sql += " AND table_schema = '#{db_table.schema}'" if db_table.schema columns = execute(sql) columns.each do |col| db_table << Column.new( name: col[0]&.downcase, data_type: col[1]&.downcase, precision: col[2], scale: col[3], max_char_length: col[4] ) end db_table end |
#stats(table, date_column: nil) ⇒ Object
143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 |
# File 'lib/dwh/adapters/databricks.rb', line 143 def stats(table, date_column: nil) date_fields = if date_column ", MIN(#{date_column}) AS date_start, MAX(#{date_column}) AS date_end" else ', NULL AS date_start, NULL AS date_end' end data = execute("SELECT COUNT(*) AS row_count#{date_fields} FROM #{table}") cols = data.first TableStats.new( row_count: cols[0], date_start: cols[1], date_end: cols[2] ) end |
#stream(sql) {|chunk| ... } ⇒ Object
Execute SQL query and yield streamed results
93 94 95 96 97 98 |
# File 'lib/dwh/adapters/databricks.rb', line 93 def stream(sql, &block) with_debug(sql) do response = submit_query(sql) fetch_data(handle_query_response(response), proc: block) end end |
#tables(**qualifiers) ⇒ Object
100 101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/dwh/adapters/databricks.rb', line 100 def tables(**qualifiers) catalog = qualifiers[:catalog] || config[:catalog] schema = qualifiers[:schema] || config[:schema] raise ConfigError, 'catalog is required for Databricks tables query' unless catalog sql = "SELECT table_name FROM #{catalog}.information_schema.tables" sql += " WHERE table_schema = '#{schema}'" if schema result = execute(sql) result.flatten end |
#test_connection(raise_exception: false) ⇒ Object
56 57 58 59 60 61 62 63 64 |
# File 'lib/dwh/adapters/databricks.rb', line 56 def test_connection(raise_exception: false) execute('SELECT 1') true rescue StandardError => e raise ConnectionError, "Failed to connect to Databricks: #{e.}" if raise_exception logger.error "Connection test failed: #{e.}" false end |