Class: Altertable::Lakehouse::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/altertable/lakehouse/client.rb

Constant Summary collapse

DEFAULT_BASE_URL =
"https://api.altertable.ai"
DEFAULT_TIMEOUT =
10

Instance Method Summary collapse

Constructor Details

#initialize(username: nil, password: nil, basic_auth_token: nil, base_url: nil, timeout: nil, user_agent: nil, adapter: nil) ⇒ Client

Returns a new instance of Client.



14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/altertable/lakehouse/client.rb', line 14

def initialize(username: nil, password: nil, basic_auth_token: nil, base_url: nil, timeout: nil, user_agent: nil, adapter: nil)
  # 1. Try passed basic_auth_token
  # 2. Try passed username/password
  # 3. Try ENV["ALTERTABLE_BASIC_AUTH_TOKEN"]
  # 4. Try ENV["ALTERTABLE_USERNAME"] / ENV["ALTERTABLE_PASSWORD"]

  if basic_auth_token
    @auth_header = "Basic #{basic_auth_token}"
  elsif username && password
    @auth_header = "Basic #{Base64.strict_encode64("#{username}:#{password}")}"
  elsif (env_token = ENV["ALTERTABLE_BASIC_AUTH_TOKEN"])
    @auth_header = "Basic #{env_token}"
  elsif (env_user = ENV["ALTERTABLE_USERNAME"]) && (env_pass = ENV["ALTERTABLE_PASSWORD"])
    @auth_header = "Basic #{Base64.strict_encode64("#{env_user}:#{env_pass}")}"
  else
    raise ConfigurationError, "Authentication credentials required (username/password or basic_auth_token)"
  end

  @base_url = base_url || DEFAULT_BASE_URL
  @timeout = timeout || DEFAULT_TIMEOUT
  @user_agent = user_agent ? "AltertableRuby/#{VERSION} #{user_agent}" : "AltertableRuby/#{VERSION}"
  
  headers = {
    "Authorization" => @auth_header,
    "User-Agent" => @user_agent,
    "Content-Type" => "application/json"
  }

  @adapter = select_adapter(adapter, base_url: @base_url, timeout: @timeout, headers: headers)
end

Instance Method Details

#append(catalog:, schema:, table:, payload:, sync: nil) ⇒ Object

POST /append



46
47
48
49
50
51
52
# File 'lib/altertable/lakehouse/client.rb', line 46

def append(catalog:, schema:, table:, payload:, sync: nil)
  params = { catalog: catalog, schema: schema, table: table }
  params[:sync] = sync unless sync.nil?
  req = Models::AppendRequest.new(payload)
  resp = request(:post, "/append", body: req.to_h, query: params)
  Models::AppendResponse.from_h(resp)
end

#autocomplete(statement:, catalog: nil, schema: nil, session_id: nil, max_suggestions: nil) ⇒ Object

POST /autocomplete



131
132
133
134
135
136
137
138
139
140
141
# File 'lib/altertable/lakehouse/client.rb', line 131

def autocomplete(statement:, catalog: nil, schema: nil, session_id: nil, max_suggestions: nil)
  req = Models::AutocompleteRequest.new(
    statement: statement,
    catalog: catalog,
    schema: schema,
    session_id: session_id,
    max_suggestions: max_suggestions
  )
  resp = request(:post, "/autocomplete", body: req.to_h)
  Models::AutocompleteResponse.from_h(resp)
end

#cancel_query(query_id, session_id:) ⇒ Object

DELETE /query/:query_id



113
114
115
116
# File 'lib/altertable/lakehouse/client.rb', line 113

def cancel_query(query_id, session_id:)
  resp = request(:delete, "/query/#{query_id}", query: { session_id: session_id })
  Models::CancelQueryResponse.from_h(resp)
end

#explain(statement:, catalog: nil, schema: nil, session_id: nil, include_plan: nil) ⇒ Object

POST /explain



144
145
146
147
148
149
150
151
152
153
154
# File 'lib/altertable/lakehouse/client.rb', line 144

def explain(statement:, catalog: nil, schema: nil, session_id: nil, include_plan: nil)
  req = Models::ExplainRequest.new(
    statement: statement,
    catalog: catalog,
    schema: schema,
    session_id: session_id,
    include_plan: include_plan
  )
  resp = request(:post, "/explain", body: req.to_h)
  Models::ExplainResponse.from_h(resp)
end

#get_query(query_id) ⇒ Object

GET /query/:query_id



107
108
109
110
# File 'lib/altertable/lakehouse/client.rb', line 107

def get_query(query_id)
  resp = request(:get, "/query/#{query_id}")
  Models::QueryLogResponse.from_h(resp)
end

#get_task(task_id) ⇒ Object

GET /tasks/:task_id



55
56
57
58
# File 'lib/altertable/lakehouse/client.rb', line 55

def get_task(task_id)
  resp = request(:get, "/tasks/#{task_id}")
  Models::TaskResponse.from_h(resp)
end

#query(statement:, **options) ⇒ Object

POST /query (streamed)



61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/altertable/lakehouse/client.rb', line 61

def query(statement:, **options)
  req_body = Models::QueryRequest.new(statement: statement, **options).to_h.to_json

  enum = Enumerator.new do |yielder|
    buffer = ""
    
    # Use adapter's stream capability
    resp = @adapter.post("/query", body: req_body) do |chunk, _|
      buffer << chunk
    end

    handle_stream_response(resp, buffer, yielder)
  end

  QueryResult.new(enum)
end

#query_all(statement:, **options) ⇒ Object

POST /query (accumulated)



79
80
81
82
83
84
85
86
87
# File 'lib/altertable/lakehouse/client.rb', line 79

def query_all(statement:, **options)
  result = query(statement: statement, **options)
  rows = result.to_a # Accumulate
  {
    metadata: result.,
    columns: result.columns,
    rows: rows
  }
end

#upload(catalog:, schema:, table:, format:, mode:, file_io:, primary_key: nil) ⇒ Object

POST /upload



90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/altertable/lakehouse/client.rb', line 90

def upload(catalog:, schema:, table:, format:, mode:, file_io:, primary_key: nil)
  params = {
    catalog: catalog,
    schema: schema,
    table: table,
    format: format,
    mode: mode
  }
  params[:primary_key] = primary_key if primary_key

  body = file_io.respond_to?(:read) ? file_io.read : file_io

  resp = @adapter.post("/upload", body: body, params: params, headers: { "Content-Type" => "application/octet-stream" })
  handle_response(resp)
end

#validate(statement:, catalog: nil, schema: nil, session_id: nil) ⇒ Object

POST /validate



119
120
121
122
123
124
125
126
127
128
# File 'lib/altertable/lakehouse/client.rb', line 119

def validate(statement:, catalog: nil, schema: nil, session_id: nil)
  req = Models::ValidateRequest.new(
    statement: statement,
    catalog: catalog,
    schema: schema,
    session_id: session_id
  )
  resp = request(:post, "/validate", body: req.to_h)
  Models::ValidateResponse.from_h(resp)
end