Class: DuckLake::Client
- Inherits:
-
Object
- Object
- DuckLake::Client
- Defined in:
- lib/ducklake/client.rb
Class Method Summary collapse
Instance Method Summary collapse
- #add_data_files(table, data, allow_missing: nil, ignore_extra_columns: nil) ⇒ Object
- #attach(alias_, url) ⇒ Object
- #checkpoint ⇒ Object
- #cleanup_old_files(cleanup_all: false, older_than: nil, dry_run: false) ⇒ Object
- #column_info(table) ⇒ Object
- #current_snapshot ⇒ Object
- #delete_orphaned_files(cleanup_all: false, older_than: nil, dry_run: false) ⇒ Object
- #detach(alias_) ⇒ Object
- #disable_external_access(allowed_directories: [], allowed_paths: []) ⇒ Object
- #disconnect ⇒ Object
-
#drop_table(table, if_exists: nil) ⇒ Object
TODO more DDL methods?.
-
#duckdb_version ⇒ Object
experimental.
- #expire_snapshots(versions: nil, older_than: nil, dry_run: false) ⇒ Object
-
#extension_version ⇒ Object
experimental.
- #flush_inlined_data(table_name: nil) ⇒ Object
- #format_version ⇒ Object
-
#initialize(catalog_url:, storage_url:, storage_options: {}, snapshot_version: nil, snapshot_time: nil, data_inlining_row_limit: 0, create_if_not_exists: false, migrate_if_required: true, read_only: false, override_storage_url: false, encrypted: false) ⇒ Client
constructor
A new instance of Client.
-
#inspect ⇒ Object
hide internal state.
- #last_committed_snapshot ⇒ Object
- #list_files(table, snapshot_version: nil, snapshot_time: nil) ⇒ Object
- #merge_adjacent_files ⇒ Object
- #options ⇒ Object
-
#polars(table, snapshot_version: nil, snapshot_time: nil) ⇒ Object
experimental.
-
#quote(value) ⇒ Object
libduckdb does not provide function TODO support more types.
-
#quote_identifier(value) ⇒ Object
libduckdb does not provide function duckdb.org/docs/stable/sql/dialect/keywords_and_identifiers.html.
- #rewrite_data_files(table = nil, delete_threshold: nil) ⇒ Object
- #set_option(name, value, table_name: nil) ⇒ Object
- #snapshots ⇒ Object
- #sql(sql, params = []) ⇒ Object
-
#table_changes(table, start_snapshot, end_snapshot) ⇒ Object
experimental TODO use keyword arguments or range?.
- #table_info ⇒ Object
- #transaction(commit_message: nil, commit_author: nil) ⇒ Object
Constructor Details
#initialize(catalog_url:, storage_url:, storage_options: {}, snapshot_version: nil, snapshot_time: nil, data_inlining_row_limit: 0, create_if_not_exists: false, migrate_if_required: true, read_only: false, override_storage_url: false, encrypted: false) ⇒ Client
Returns a new instance of Client.
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 |
# File 'lib/ducklake/client.rb', line 3 def initialize( catalog_url:, storage_url:, storage_options: {}, snapshot_version: nil, snapshot_time: nil, data_inlining_row_limit: 0, create_if_not_exists: false, migrate_if_required: true, # TODO make false in 0.2.0 read_only: false, # experimental override_storage_url: false, # experimental encrypted: false # experimental ) catalog_uri = URI.parse(catalog_url) storage_uri = URI.parse(storage_url) extension = nil case catalog_uri.scheme when "postgres", "postgresql" extension = "postgres" attach = "postgres:#{catalog_uri}" when "mysql", "mariadb" extension = "mysql" attach = "mysql:#{catalog_uri}" when "sqlite" extension = "sqlite" attach = "sqlite:#{catalog_path(catalog_uri)}" when "duckdb" attach = "duckdb:#{catalog_path(catalog_uri)}" else raise ArgumentError, "Unsupported catalog type: #{catalog_uri.scheme}" end @storage_scheme = storage_uri.scheme @storage_options = .dup = nil = .dup case storage_uri.scheme when "s3" # https://duckdb.org/docs/stable/core_extensions/httpfs/s3api.html key_id = .delete(:aws_access_key_id) secret = .delete(:aws_secret_access_key) region = .delete(:region) = { type: "s3", provider: "credential_chain" } [:key_id] = key_id if key_id [:secret] = secret if secret [:region] = region if region end if .any? raise ArgumentError, "Unsupported #{storage_uri.scheme || "file"} storage options: #{.keys.map(&:inspect).join(", ")}" end = {data_path: storage_url} [:read_only] = true if read_only [:encrypted] = 1 if encrypted [:snapshot_version] = snapshot_version if !snapshot_version.nil? [:snapshot_time] = snapshot_time if !snapshot_time.nil? [:data_inlining_row_limit] = data_inlining_row_limit [:create_if_not_exists] = false unless create_if_not_exists [:migrate_if_required] = false unless migrate_if_required [:override_data_path] = true if override_storage_url @catalog = "ducklake" @storage_url = storage_url if read_only config = DuckDB::Config.new config["access_mode"] = "READ_ONLY" # make the entire database read-only, not just DuckLake # read-only mode can only be set when the database is opened # and cannot be used on in-memory database, so create a temporary one @tmpdir = Dir.mktmpdir ObjectSpace.define_finalizer(@tmpdir, self.class.finalize(@tmpdir.dup)) dbpath = File.join(@tmpdir, "memory.duckdb") DuckDB::Database.open(dbpath) { } @db = if Gem::Version.new(DuckDB::VERSION) >= Gem::Version.new("1.5.2") DuckDB::Database.open(dbpath, config: config) else DuckDB::Database.open(dbpath, config) end else @db = DuckDB::Database.open end @conn = @db.connect install_extension("ducklake") install_extension(extension) if extension create_secret() if (@catalog, "ducklake:#{attach}", ) execute("USE #{quote_identifier(@catalog)}") detach("memory") end |
Class Method Details
.finalize(dir) ⇒ Object
456 457 458 |
# File 'lib/ducklake/client.rb', line 456 def self.finalize(dir) proc { FileUtils.remove_entry(dir) } end |
Instance Method Details
#add_data_files(table, data, allow_missing: nil, ignore_extra_columns: nil) ⇒ Object
373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 |
# File 'lib/ducklake/client.rb', line 373 def add_data_files(table, data, allow_missing: nil, ignore_extra_columns: nil) params = [@catalog, table, data] args = ["?", "?", "?"] if !allow_missing.nil? args << "allow_missing => ?" params << allow_missing end if !ignore_extra_columns.nil? args << "ignore_extra_columns => ?" params << ignore_extra_columns end execute("CALL ducklake_add_data_files(#{args.join(", ")})", params) nil end |
#attach(alias_, url) ⇒ Object
132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 |
# File 'lib/ducklake/client.rb', line 132 def attach(alias_, url) type = nil extension = nil uri = URI.parse(url) case uri.scheme when "postgres", "postgresql" type = "postgres" extension = "postgres" else raise ArgumentError, "Unsupported data source type: #{uri.scheme}" end install_extension(extension) if extension = { type: type, read_only: true } (alias_, url, ) end |
#checkpoint ⇒ Object
334 335 336 337 |
# File 'lib/ducklake/client.rb', line 334 def checkpoint execute("CHECKPOINT") nil end |
#cleanup_old_files(cleanup_all: false, older_than: nil, dry_run: false) ⇒ Object
268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 |
# File 'lib/ducklake/client.rb', line 268 def cleanup_old_files(cleanup_all: false, older_than: nil, dry_run: false) args = ["?"] params = [@catalog] if cleanup_all args << "cleanup_all => ?" params << cleanup_all end if !older_than.nil? args << "older_than => ?" params << older_than end if dry_run args << "dry_run => ?" params << dry_run end symbolize_keys execute("CALL ducklake_cleanup_old_files(#{args.join(", ")})", params) end |
#column_info(table) ⇒ Object
163 164 165 166 167 168 169 170 171 172 173 174 175 |
# File 'lib/ducklake/client.rb', line 163 def column_info(table) sql = <<~SQL SELECT column_name AS name, LOWER(data_type) AS type FROM information_schema.columns WHERE table_catalog = ? AND table_schema = ? AND table_name = ? ORDER BY ordinal_position SQL result = execute(sql, [@catalog, "main", table]) if result.empty? raise CatalogError, "Table does not exist!" end symbolize_keys result end |
#current_snapshot ⇒ Object
197 198 199 |
# File 'lib/ducklake/client.rb', line 197 def current_snapshot execute("SELECT * FROM ducklake_current_snapshot(?)", [@catalog]).rows[0][0] end |
#delete_orphaned_files(cleanup_all: false, older_than: nil, dry_run: false) ⇒ Object
291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 |
# File 'lib/ducklake/client.rb', line 291 def delete_orphaned_files(cleanup_all: false, older_than: nil, dry_run: false) args = ["?"] params = [@catalog] if cleanup_all args << "cleanup_all => ?" params << cleanup_all end if !older_than.nil? args << "older_than => ?" params << older_than end if dry_run args << "dry_run => ?" params << dry_run end symbolize_keys execute("CALL ducklake_delete_orphaned_files(#{args.join(", ")})", params) end |
#detach(alias_) ⇒ Object
154 155 156 157 |
# File 'lib/ducklake/client.rb', line 154 def detach(alias_) execute("DETACH #{quote_identifier(alias_)}") nil end |
#disable_external_access(allowed_directories: [], allowed_paths: []) ⇒ Object
108 109 110 111 112 113 114 |
# File 'lib/ducklake/client.rb', line 108 def disable_external_access(allowed_directories: [], allowed_paths: []) allowed_directories += [@storage_url] execute("SET allowed_directories = #{quote_array(allowed_directories)}") execute("SET allowed_paths = #{quote_array(allowed_paths)}") execute("SET enable_external_access = false") nil end |
#disconnect ⇒ Object
445 446 447 448 449 |
# File 'lib/ducklake/client.rb', line 445 def disconnect @conn.disconnect @db.close nil end |
#drop_table(table, if_exists: nil) ⇒ Object
TODO more DDL methods?
187 188 189 190 |
# File 'lib/ducklake/client.rb', line 187 def drop_table(table, if_exists: nil) execute("DROP TABLE#{" IF EXISTS" if if_exists} #{quote_identifier(table)}") nil end |
#duckdb_version ⇒ Object
experimental
234 235 236 |
# File 'lib/ducklake/client.rb', line 234 def duckdb_version execute("SELECT VERSION() AS version").first["version"] end |
#expire_snapshots(versions: nil, older_than: nil, dry_run: false) ⇒ Object
245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 |
# File 'lib/ducklake/client.rb', line 245 def expire_snapshots(versions: nil, older_than: nil, dry_run: false) args = ["?"] params = [@catalog] if !versions.nil? # inline since duckdb gem does not support array params args << "versions => #{quote_array(versions)}" end if !older_than.nil? args << "older_than => ?" params << older_than end if dry_run args << "dry_run => ?" params << dry_run end symbolize_keys execute("CALL ducklake_expire_snapshots(#{args.join(", ")})", params) end |
#extension_version ⇒ Object
experimental
229 230 231 |
# File 'lib/ducklake/client.rb', line 229 def extension_version execute("SELECT extension_version FROM duckdb_extensions() WHERE extension_name = ?", ["ducklake"]).first["extension_version"] end |
#flush_inlined_data(table_name: nil) ⇒ Object
340 341 342 343 344 345 346 347 348 349 350 351 |
# File 'lib/ducklake/client.rb', line 340 def flush_inlined_data(table_name: nil) args = ["?"] params = [@catalog] if !table_name.nil? args << "table_name => ?" params << table_name end # TODO return nil in 0.2.0 symbolize_keys execute("CALL ducklake_flush_inlined_data(#{args.join(", ")})", params) end |
#format_version ⇒ Object
224 225 226 |
# File 'lib/ducklake/client.rb', line 224 def format_version execute("SELECT value FROM ducklake_options(?) WHERE option_name = ?", [@catalog, "version"]).first["value"] end |
#inspect ⇒ Object
hide internal state
452 453 454 |
# File 'lib/ducklake/client.rb', line 452 def inspect to_s end |
#last_committed_snapshot ⇒ Object
201 202 203 |
# File 'lib/ducklake/client.rb', line 201 def last_committed_snapshot execute("SELECT * FROM ducklake_last_committed_snapshot(?)", [@catalog]).rows[0][0] end |
#list_files(table, snapshot_version: nil, snapshot_time: nil) ⇒ Object
354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 |
# File 'lib/ducklake/client.rb', line 354 def list_files(table, snapshot_version: nil, snapshot_time: nil) args = ["?", "?"] params = [@catalog, table] if !snapshot_version.nil? args << "snapshot_version => ?" params << snapshot_version end if !snapshot_time.nil? snapshot_time = snapshot_time.utc if snapshot_time.is_a?(Time) args << "snapshot_time => ?" params << snapshot_time end symbolize_keys execute("SELECT * FROM ducklake_list_files(#{args.join(", ")})", params) end |
#merge_adjacent_files ⇒ Object
239 240 241 242 |
# File 'lib/ducklake/client.rb', line 239 def merge_adjacent_files execute("CALL merge_adjacent_files()") nil end |
#options ⇒ Object
206 207 208 |
# File 'lib/ducklake/client.rb', line 206 def symbolize_keys execute("SELECT * FROM ducklake_options(?)", [@catalog]) end |
#polars(table, snapshot_version: nil, snapshot_time: nil) ⇒ Object
experimental
392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 |
# File 'lib/ducklake/client.rb', line 392 def polars(table, snapshot_version: nil, snapshot_time: nil) files = list_files(table, snapshot_version:, snapshot_time:) sources = files.map { |v| v[:data_file] } # TODO support schema changes # column_mapping = [ # "iceberg-column-mapping", # nil # ] deletion_files = [ "iceberg-position-delete", files.map.with_index.select { |v, i| v[:delete_file] }.to_h { |v, i| [i, [v[:delete_file]]] } ] Polars.scan_parquet( sources, storage_options: , # allow_missing_columns: true, # extra_columns: "ignore", # _column_mapping: column_mapping, _deletion_files: deletion_files ) end |
#quote(value) ⇒ Object
libduckdb does not provide function TODO support more types
422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 |
# File 'lib/ducklake/client.rb', line 422 def quote(value) if value.nil? "NULL" elsif value == true "true" elsif value == false "false" elsif defined?(BigDecimal) && value.is_a?(BigDecimal) value.to_s("F") elsif value.is_a?(Numeric) value.to_s else if value.is_a?(Time) value = value.utc.iso8601(9) elsif value.is_a?(DateTime) value = value.iso8601(9) elsif value.is_a?(Date) value = value.strftime("%Y-%m-%d") end "'#{encoded(value).gsub("'", "''")}'" end end |
#quote_identifier(value) ⇒ Object
libduckdb does not provide function duckdb.org/docs/stable/sql/dialect/keywords_and_identifiers.html
416 417 418 |
# File 'lib/ducklake/client.rb', line 416 def quote_identifier(value) "\"#{encoded(value).gsub('"', '""')}\"" end |
#rewrite_data_files(table = nil, delete_threshold: nil) ⇒ Object
314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 |
# File 'lib/ducklake/client.rb', line 314 def rewrite_data_files(table = nil, delete_threshold: nil) args = ["?"] params = [@catalog] if !table.nil? args << "?" params << table end if !delete_threshold.nil? args << "delete_threshold => ?" params << delete_threshold end execute("CALL ducklake_rewrite_data_files(#{args.join(", ")})", params) nil end |
#set_option(name, value, table_name: nil) ⇒ Object
211 212 213 214 215 216 217 218 219 220 221 222 |
# File 'lib/ducklake/client.rb', line 211 def set_option(name, value, table_name: nil) args = ["?", "?", "?"] params = [@catalog, name, value] if !table_name.nil? args << "table_name => ?" params << table_name end execute("CALL ducklake_set_option(#{args.join(", ")})", params) nil end |
#snapshots ⇒ Object
193 194 195 |
# File 'lib/ducklake/client.rb', line 193 def snapshots symbolize_keys execute("SELECT * FROM ducklake_snapshots(?)", [@catalog]) end |
#sql(sql, params = []) ⇒ Object
116 117 118 |
# File 'lib/ducklake/client.rb', line 116 def sql(sql, params = []) execute(sql, params) end |
#table_changes(table, start_snapshot, end_snapshot) ⇒ Object
experimental TODO use keyword arguments or range?
179 180 181 182 183 184 |
# File 'lib/ducklake/client.rb', line 179 def table_changes(table, start_snapshot, end_snapshot) params = [@catalog, "main", table, start_snapshot, end_snapshot] result = execute("SELECT * FROM ducklake_table_changes(?, ?, ?, ?, ?)", params) # only return changes between snapshots symbolize_keys result.reject { |v| v["snapshot_id"] == start_snapshot } end |
#table_info ⇒ Object
159 160 161 |
# File 'lib/ducklake/client.rb', line 159 def table_info symbolize_keys execute("SELECT * FROM ducklake_table_info(?)", [@catalog]) end |
#transaction(commit_message: nil, commit_author: nil) ⇒ Object
120 121 122 123 124 125 126 127 128 129 130 |
# File 'lib/ducklake/client.rb', line 120 def transaction(commit_message: nil, commit_author: nil) execute("BEGIN") begin yield (, ) if || execute("COMMIT") rescue => e execute("ROLLBACK") raise e unless e.is_a?(Rollback) end end |