Class: DuckLake::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/ducklake/client.rb

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(catalog_url:, storage_url:, storage_options: {}, snapshot_version: nil, snapshot_time: nil, data_inlining_row_limit: 0, create_if_not_exists: false, migrate_if_required: true, read_only: false, override_storage_url: false, encrypted: false) ⇒ Client

Returns a new instance of Client.



3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/ducklake/client.rb', line 3

def initialize(
  catalog_url:,
  storage_url:,
  storage_options: {},
  snapshot_version: nil,
  snapshot_time: nil,
  data_inlining_row_limit: 0,
  create_if_not_exists: false,
  migrate_if_required: true, # TODO make false in 0.2.0
  read_only: false, # experimental
  override_storage_url: false, # experimental
  encrypted: false # experimental
)
  catalog_uri = URI.parse(catalog_url)
  storage_uri = URI.parse(storage_url)

  extension = nil
  case catalog_uri.scheme
  when "postgres", "postgresql"
    extension = "postgres"
    attach = "postgres:#{catalog_uri}"
  when "mysql", "mariadb"
    extension = "mysql"
    attach = "mysql:#{catalog_uri}"
  when "sqlite"
    extension = "sqlite"
    attach = "sqlite:#{catalog_path(catalog_uri)}"
  when "duckdb"
    attach = "duckdb:#{catalog_path(catalog_uri)}"
  else
    raise ArgumentError, "Unsupported catalog type: #{catalog_uri.scheme}"
  end

  @storage_scheme = storage_uri.scheme
  @storage_options = storage_options.dup

  secret_options = nil
  storage_options = storage_options.dup

  case storage_uri.scheme
  when "s3"
    # https://duckdb.org/docs/stable/core_extensions/httpfs/s3api.html
    key_id = storage_options.delete(:aws_access_key_id)
    secret = storage_options.delete(:aws_secret_access_key)
    region = storage_options.delete(:region)

    secret_options = {
      type: "s3",
      provider: "credential_chain"
    }
    secret_options[:key_id] = key_id if key_id
    secret_options[:secret] = secret if secret
    secret_options[:region] = region if region
  end

  if storage_options.any?
    raise ArgumentError, "Unsupported #{storage_uri.scheme || "file"} storage options: #{storage_options.keys.map(&:inspect).join(", ")}"
  end

  attach_options = {data_path: storage_url}
  attach_options[:read_only] = true if read_only
  attach_options[:encrypted] = 1 if encrypted
  attach_options[:snapshot_version] = snapshot_version if !snapshot_version.nil?
  attach_options[:snapshot_time] = snapshot_time if !snapshot_time.nil?
  attach_options[:data_inlining_row_limit] = data_inlining_row_limit
  attach_options[:create_if_not_exists] = false unless create_if_not_exists
  attach_options[:migrate_if_required] = false unless migrate_if_required
  attach_options[:override_data_path] = true if override_storage_url

  @catalog = "ducklake"
  @storage_url = storage_url

  if read_only
    config = DuckDB::Config.new
    config["access_mode"] = "READ_ONLY"

    # make the entire database read-only, not just DuckLake
    # read-only mode can only be set when the database is opened
    # and cannot be used on in-memory database, so create a temporary one
    @tmpdir = Dir.mktmpdir
    ObjectSpace.define_finalizer(@tmpdir, self.class.finalize(@tmpdir.dup))
    dbpath = File.join(@tmpdir, "memory.duckdb")
    DuckDB::Database.open(dbpath) { }

    @db =
      if Gem::Version.new(DuckDB::VERSION) >= Gem::Version.new("1.5.2")
        DuckDB::Database.open(dbpath, config: config)
      else
        DuckDB::Database.open(dbpath, config)
      end
  else
    @db = DuckDB::Database.open
  end

  @conn = @db.connect

  install_extension("ducklake")
  install_extension(extension) if extension
  create_secret(secret_options) if secret_options
  attach_with_options(@catalog, "ducklake:#{attach}", attach_options)
  execute("USE #{quote_identifier(@catalog)}")
  detach("memory")
end

Class Method Details

.finalize(dir) ⇒ Object



456
457
458
# File 'lib/ducklake/client.rb', line 456

def self.finalize(dir)
  proc { FileUtils.remove_entry(dir) }
end

Instance Method Details

#add_data_files(table, data, allow_missing: nil, ignore_extra_columns: nil) ⇒ Object



373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
# File 'lib/ducklake/client.rb', line 373

def add_data_files(table, data, allow_missing: nil, ignore_extra_columns: nil)
  params = [@catalog, table, data]
  args = ["?", "?", "?"]

  if !allow_missing.nil?
    args << "allow_missing => ?"
    params << allow_missing
  end

  if !ignore_extra_columns.nil?
    args << "ignore_extra_columns => ?"
    params << ignore_extra_columns
  end

  execute("CALL ducklake_add_data_files(#{args.join(", ")})", params)
  nil
end

#attach(alias_, url) ⇒ Object



132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# File 'lib/ducklake/client.rb', line 132

def attach(alias_, url)
  type = nil
  extension = nil

  uri = URI.parse(url)
  case uri.scheme
  when "postgres", "postgresql"
    type = "postgres"
    extension = "postgres"
  else
    raise ArgumentError, "Unsupported data source type: #{uri.scheme}"
  end

  install_extension(extension) if extension

  options = {
    type: type,
    read_only: true
  }
  attach_with_options(alias_, url, options)
end

#checkpointObject



334
335
336
337
# File 'lib/ducklake/client.rb', line 334

def checkpoint
  execute("CHECKPOINT")
  nil
end

#cleanup_old_files(cleanup_all: false, older_than: nil, dry_run: false) ⇒ Object



268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
# File 'lib/ducklake/client.rb', line 268

def cleanup_old_files(cleanup_all: false, older_than: nil, dry_run: false)
  args = ["?"]
  params = [@catalog]

  if cleanup_all
    args << "cleanup_all => ?"
    params << cleanup_all
  end

  if !older_than.nil?
    args << "older_than => ?"
    params << older_than
  end

  if dry_run
    args << "dry_run => ?"
    params << dry_run
  end

  symbolize_keys execute("CALL ducklake_cleanup_old_files(#{args.join(", ")})", params)
end

#column_info(table) ⇒ Object



163
164
165
166
167
168
169
170
171
172
173
174
175
# File 'lib/ducklake/client.rb', line 163

def column_info(table)
  sql = <<~SQL
    SELECT column_name AS name, LOWER(data_type) AS type
    FROM information_schema.columns
    WHERE table_catalog = ? AND table_schema = ? AND table_name = ?
    ORDER BY ordinal_position
  SQL
  result = execute(sql, [@catalog, "main", table])
  if result.empty?
    raise CatalogError, "Table does not exist!"
  end
  symbolize_keys result
end

#current_snapshotObject



197
198
199
# File 'lib/ducklake/client.rb', line 197

def current_snapshot
  execute("SELECT * FROM ducklake_current_snapshot(?)", [@catalog]).rows[0][0]
end

#delete_orphaned_files(cleanup_all: false, older_than: nil, dry_run: false) ⇒ Object



291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
# File 'lib/ducklake/client.rb', line 291

def delete_orphaned_files(cleanup_all: false, older_than: nil, dry_run: false)
  args = ["?"]
  params = [@catalog]

  if cleanup_all
    args << "cleanup_all => ?"
    params << cleanup_all
  end

  if !older_than.nil?
    args << "older_than => ?"
    params << older_than
  end

  if dry_run
    args << "dry_run => ?"
    params << dry_run
  end

  symbolize_keys execute("CALL ducklake_delete_orphaned_files(#{args.join(", ")})", params)
end

#detach(alias_) ⇒ Object



154
155
156
157
# File 'lib/ducklake/client.rb', line 154

def detach(alias_)
  execute("DETACH #{quote_identifier(alias_)}")
  nil
end

#disable_external_access(allowed_directories: [], allowed_paths: []) ⇒ Object



108
109
110
111
112
113
114
# File 'lib/ducklake/client.rb', line 108

def disable_external_access(allowed_directories: [], allowed_paths: [])
  allowed_directories += [@storage_url]
  execute("SET allowed_directories = #{quote_array(allowed_directories)}")
  execute("SET allowed_paths = #{quote_array(allowed_paths)}")
  execute("SET enable_external_access = false")
  nil
end

#disconnectObject



445
446
447
448
449
# File 'lib/ducklake/client.rb', line 445

def disconnect
  @conn.disconnect
  @db.close
  nil
end

#drop_table(table, if_exists: nil) ⇒ Object

TODO more DDL methods?



187
188
189
190
# File 'lib/ducklake/client.rb', line 187

def drop_table(table, if_exists: nil)
  execute("DROP TABLE#{" IF EXISTS" if if_exists} #{quote_identifier(table)}")
  nil
end

#duckdb_versionObject

experimental



234
235
236
# File 'lib/ducklake/client.rb', line 234

def duckdb_version
  execute("SELECT VERSION() AS version").first["version"]
end

#expire_snapshots(versions: nil, older_than: nil, dry_run: false) ⇒ Object



245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
# File 'lib/ducklake/client.rb', line 245

def expire_snapshots(versions: nil, older_than: nil, dry_run: false)
  args = ["?"]
  params = [@catalog]

  if !versions.nil?
    # inline since duckdb gem does not support array params
    args << "versions => #{quote_array(versions)}"
  end

  if !older_than.nil?
    args << "older_than => ?"
    params << older_than
  end

  if dry_run
    args << "dry_run => ?"
    params << dry_run
  end

  symbolize_keys execute("CALL ducklake_expire_snapshots(#{args.join(", ")})", params)
end

#extension_versionObject

experimental



229
230
231
# File 'lib/ducklake/client.rb', line 229

def extension_version
  execute("SELECT extension_version FROM duckdb_extensions() WHERE extension_name = ?", ["ducklake"]).first["extension_version"]
end

#flush_inlined_data(table_name: nil) ⇒ Object



340
341
342
343
344
345
346
347
348
349
350
351
# File 'lib/ducklake/client.rb', line 340

def flush_inlined_data(table_name: nil)
  args = ["?"]
  params = [@catalog]

  if !table_name.nil?
    args << "table_name => ?"
    params << table_name
  end

  # TODO return nil in 0.2.0
  symbolize_keys execute("CALL ducklake_flush_inlined_data(#{args.join(", ")})", params)
end

#format_versionObject



224
225
226
# File 'lib/ducklake/client.rb', line 224

def format_version
  execute("SELECT value FROM ducklake_options(?) WHERE option_name = ?", [@catalog, "version"]).first["value"]
end

#inspectObject

hide internal state



452
453
454
# File 'lib/ducklake/client.rb', line 452

def inspect
  to_s
end

#last_committed_snapshotObject



201
202
203
# File 'lib/ducklake/client.rb', line 201

def last_committed_snapshot
  execute("SELECT * FROM ducklake_last_committed_snapshot(?)", [@catalog]).rows[0][0]
end

#list_files(table, snapshot_version: nil, snapshot_time: nil) ⇒ Object



354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
# File 'lib/ducklake/client.rb', line 354

def list_files(table, snapshot_version: nil, snapshot_time: nil)
  args = ["?", "?"]
  params = [@catalog, table]

  if !snapshot_version.nil?
    args << "snapshot_version => ?"
    params << snapshot_version
  end

  if !snapshot_time.nil?
    snapshot_time = snapshot_time.utc if snapshot_time.is_a?(Time)
    args << "snapshot_time => ?"
    params << snapshot_time
  end

  symbolize_keys execute("SELECT * FROM ducklake_list_files(#{args.join(", ")})", params)
end

#merge_adjacent_filesObject



239
240
241
242
# File 'lib/ducklake/client.rb', line 239

def merge_adjacent_files
  execute("CALL merge_adjacent_files()")
  nil
end

#optionsObject



206
207
208
# File 'lib/ducklake/client.rb', line 206

def options
  symbolize_keys execute("SELECT * FROM ducklake_options(?)", [@catalog])
end

#polars(table, snapshot_version: nil, snapshot_time: nil) ⇒ Object

experimental



392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
# File 'lib/ducklake/client.rb', line 392

def polars(table, snapshot_version: nil, snapshot_time: nil)
  files = list_files(table, snapshot_version:, snapshot_time:)
  sources = files.map { |v| v[:data_file] }
  # TODO support schema changes
  # column_mapping = [
  #   "iceberg-column-mapping",
  #   nil
  # ]
  deletion_files = [
    "iceberg-position-delete",
    files.map.with_index.select { |v, i| v[:delete_file] }.to_h { |v, i| [i, [v[:delete_file]]] }
  ]
  Polars.scan_parquet(
    sources,
    storage_options: polars_storage_options,
    # allow_missing_columns: true,
    # extra_columns: "ignore",
    # _column_mapping: column_mapping,
    _deletion_files: deletion_files
  )
end

#quote(value) ⇒ Object

libduckdb does not provide function TODO support more types



422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
# File 'lib/ducklake/client.rb', line 422

def quote(value)
  if value.nil?
    "NULL"
  elsif value == true
    "true"
  elsif value == false
    "false"
  elsif defined?(BigDecimal) && value.is_a?(BigDecimal)
    value.to_s("F")
  elsif value.is_a?(Numeric)
    value.to_s
  else
    if value.is_a?(Time)
      value = value.utc.iso8601(9)
    elsif value.is_a?(DateTime)
      value = value.iso8601(9)
    elsif value.is_a?(Date)
      value = value.strftime("%Y-%m-%d")
    end
    "'#{encoded(value).gsub("'", "''")}'"
  end
end

#quote_identifier(value) ⇒ Object



416
417
418
# File 'lib/ducklake/client.rb', line 416

def quote_identifier(value)
  "\"#{encoded(value).gsub('"', '""')}\""
end

#rewrite_data_files(table = nil, delete_threshold: nil) ⇒ Object



314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
# File 'lib/ducklake/client.rb', line 314

def rewrite_data_files(table = nil, delete_threshold: nil)
  args = ["?"]
  params = [@catalog]

  if !table.nil?
    args << "?"
    params << table
  end

  if !delete_threshold.nil?
    args << "delete_threshold => ?"
    params << delete_threshold
  end

  execute("CALL ducklake_rewrite_data_files(#{args.join(", ")})", params)
  nil
end

#set_option(name, value, table_name: nil) ⇒ Object



211
212
213
214
215
216
217
218
219
220
221
222
# File 'lib/ducklake/client.rb', line 211

def set_option(name, value, table_name: nil)
  args = ["?", "?", "?"]
  params = [@catalog, name, value]

  if !table_name.nil?
    args << "table_name => ?"
    params << table_name
  end

  execute("CALL ducklake_set_option(#{args.join(", ")})", params)
  nil
end

#snapshotsObject



193
194
195
# File 'lib/ducklake/client.rb', line 193

def snapshots
  symbolize_keys execute("SELECT * FROM ducklake_snapshots(?)", [@catalog])
end

#sql(sql, params = []) ⇒ Object



116
117
118
# File 'lib/ducklake/client.rb', line 116

def sql(sql, params = [])
  execute(sql, params)
end

#table_changes(table, start_snapshot, end_snapshot) ⇒ Object

experimental TODO use keyword arguments or range?



179
180
181
182
183
184
# File 'lib/ducklake/client.rb', line 179

def table_changes(table, start_snapshot, end_snapshot)
  params = [@catalog, "main", table, start_snapshot, end_snapshot]
  result = execute("SELECT * FROM ducklake_table_changes(?, ?, ?, ?, ?)", params)
  # only return changes between snapshots
  symbolize_keys result.reject { |v| v["snapshot_id"] == start_snapshot }
end

#table_infoObject



159
160
161
# File 'lib/ducklake/client.rb', line 159

def table_info
  symbolize_keys execute("SELECT * FROM ducklake_table_info(?)", [@catalog])
end

#transaction(commit_message: nil, commit_author: nil) ⇒ Object



120
121
122
123
124
125
126
127
128
129
130
# File 'lib/ducklake/client.rb', line 120

def transaction(commit_message: nil, commit_author: nil)
  execute("BEGIN")
  begin
    yield
    set_commit_message(commit_message, commit_author) if commit_message || commit_author
    execute("COMMIT")
  rescue => e
    execute("ROLLBACK")
    raise e unless e.is_a?(Rollback)
  end
end