Class: Bricolage::PSQLTask
Overview
We don’t support dynamodb still now
Defined Under Namespace
Classes: SQLAction
Constant Summary
collapse
- GRANT_OPTS =
%w[privilege to]
Constants included
from VacuumLock
VacuumLock::DEFAULT_VACUUM_LOCK_FILE, VacuumLock::DEFAULT_VACUUM_LOCK_TIMEOUT
Instance Attribute Summary
#ds
Instance Method Summary
collapse
-
#analyze_if(enabled, target = '${dest_table}') ⇒ Object
-
#copy_statement(src_ds, src_path, dest_table, format, jsonpath, opts) ⇒ Object
-
#create_dummy_table(target) ⇒ Object
-
#drop(target_table) ⇒ Object
-
#drop_force(target_table) ⇒ Object
-
#drop_force_if(enabled) ⇒ Object
-
#drop_if(enabled) ⇒ Object
-
#drop_obj_force(type, name) ⇒ Object
-
#drop_view_force(target_view) ⇒ Object
-
#drop_view_force_if(enabled) ⇒ Object
-
#each_statement ⇒ Object
-
#exec(stmt) ⇒ Object
-
#explain_source ⇒ Object
-
#format_option(fmt, src_ds, jsonpath) ⇒ Object
-
#format_query(query) ⇒ Object
-
#grant(privilege:, on:, to:) ⇒ Object
-
#grant_if(opts, target) ⇒ Object
-
#json_param(jsonpath) ⇒ Object
-
#load(src_ds, src_path, dest_table, format, jsonpath, opts) ⇒ Object
-
#rename_table(src, dest) ⇒ Object
-
#run ⇒ Object
-
#run_explain ⇒ Object
-
#serialize_vacuum ⇒ Object
-
#source ⇒ Object
-
#support_explain?(statement_kind) ⇒ Boolean
-
#transaction ⇒ Object
-
#truncate_if(enabled, target = '${dest_table}') ⇒ Object
-
#unload(stmt, dest_ds, dest_path, format, opts) ⇒ Object
-
#unload_format_option(format, ds) ⇒ Object
-
#unload_statement(stmt, dest_ds, dest_path, format, opts) ⇒ Object
-
#vacuum_if(enable_vacuum, enable_vacuum_sort, target = '${dest_table}') ⇒ Object
Methods included from VacuumLock
cleanup_vacuum_lock, create_lockfile_cmd, create_vacuum_lock_file, enable_vacuum_lock?, locking?, psql_serialize_vacuum_begin, psql_serialize_vacuum_end, serialize_vacuum, using, #using_vacuum_lock, vacuum_lock_parameters
#bind, #initialize
Instance Method Details
#analyze_if(enabled, target = '${dest_table}') ⇒ Object
362
363
364
|
# File 'lib/bricolage/psqldatasource.rb', line 362
def analyze_if(enabled, target = '${dest_table}')
exec SQLStatement.for_string("analyze #{target};") if enabled
end
|
#copy_statement(src_ds, src_path, dest_table, format, jsonpath, opts) ⇒ Object
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
|
# File 'lib/bricolage/psqldatasource.rb', line 396
def copy_statement(src_ds, src_path, dest_table, format, jsonpath, opts)
unless src_ds.redshift_loader_source?
raise ParameterError, "input data source does not support redshift as bulk loading source: #{src_ds.name}"
end
buf = StringIO.new
buf.puts "copy #{dest_table}"
buf.puts "from '#{src_ds.url(src_path)}'"
buf.puts "credentials '#{src_ds.credential_string}'"
buf.puts format_option(format, src_ds, jsonpath)
opts.each do |opt|
buf.puts opt.to_s
end
buf.puts ';'
buf.string
end
|
#create_dummy_table(target) ⇒ Object
298
299
300
301
302
|
# File 'lib/bricolage/psqldatasource.rb', line 298
def create_dummy_table(target)
exec SQLStatement.for_string(
"create table if not exists #{target} (x int);\n"
)
end
|
#drop(target_table) ⇒ Object
304
305
306
|
# File 'lib/bricolage/psqldatasource.rb', line 304
def drop(target_table)
exec SQLStatement.for_string("drop table #{target_table} cascade;")
end
|
#drop_force(target_table) ⇒ Object
318
319
320
|
# File 'lib/bricolage/psqldatasource.rb', line 318
def drop_force(target_table)
drop_obj_force('table', target_table)
end
|
#drop_force_if(enabled) ⇒ Object
326
327
328
|
# File 'lib/bricolage/psqldatasource.rb', line 326
def drop_force_if(enabled)
drop_force('${dest_table}') if enabled
end
|
#drop_if(enabled) ⇒ Object
308
309
310
|
# File 'lib/bricolage/psqldatasource.rb', line 308
def drop_if(enabled)
drop '${dest_table}' if enabled
end
|
#drop_obj_force(type, name) ⇒ Object
312
313
314
315
316
|
# File 'lib/bricolage/psqldatasource.rb', line 312
def drop_obj_force(type, name)
exec SQLStatement.for_string(
"drop #{type} if exists #{name} cascade;\n"
)
end
|
#drop_view_force(target_view) ⇒ Object
322
323
324
|
# File 'lib/bricolage/psqldatasource.rb', line 322
def drop_view_force(target_view)
drop_obj_force('view', target_view)
end
|
#drop_view_force_if(enabled) ⇒ Object
330
331
332
|
# File 'lib/bricolage/psqldatasource.rb', line 330
def drop_view_force_if(enabled)
drop_view_force('${dest_table}') if enabled
end
|
#each_statement ⇒ Object
238
239
240
241
242
|
# File 'lib/bricolage/psqldatasource.rb', line 238
def each_statement
each_action do |action|
yield action.statement
end
end
|
#exec(stmt) ⇒ Object
234
235
236
|
# File 'lib/bricolage/psqldatasource.rb', line 234
def exec(stmt)
add SQLAction.new(stmt)
end
|
#explain_source ⇒ Object
277
278
279
280
281
282
283
284
285
286
287
288
289
|
# File 'lib/bricolage/psqldatasource.rb', line 277
def explain_source
buf = StringIO.new
each_statement do |stmt|
buf.puts
buf.puts "-- #{stmt.location}" if stmt.location
if support_explain?(stmt.kind)
buf.puts "explain #{stmt.stripped_source}"
else
buf.puts "/* #{stmt.stripped_source} */"
end
end
buf.string
end
|
412
413
414
415
416
417
418
419
420
421
422
423
|
# File 'lib/bricolage/psqldatasource.rb', line 412
def format_option(fmt, src_ds, jsonpath)
case fmt
when 'tsv'
%q(delimiter '\t')
when 'csv'
%q(delimiter ',')
when 'json'
"json '#{json_param(jsonpath)}'"
else
raise ParameterError, "unsupported format: #{fmt}"
end
end
|
469
470
471
|
# File 'lib/bricolage/psqldatasource.rb', line 469
def format_query(query)
query.gsub(/^--.*/, '').strip.gsub(/[ \t]*\n[ \t]*/, ' ').gsub(/\\/,"\\\\\\\\").gsub("'", "\\\\'")
end
|
#grant(privilege:, on:, to:) ⇒ Object
366
367
368
|
# File 'lib/bricolage/psqldatasource.rb', line 366
def grant(privilege:, on:, to:)
exec SQLStatement.for_string("grant #{privilege} on #{on} to #{to};")
end
|
#grant_if(opts, target) ⇒ Object
372
373
374
375
376
377
378
379
380
381
382
383
384
|
# File 'lib/bricolage/psqldatasource.rb', line 372
def grant_if(opts, target)
return unless opts
return if opts.empty?
unknown_keys = opts.keys - GRANT_OPTS
raise ParameterError, "unknown grant options: #{unknown_keys.inspect}" unless unknown_keys.empty?
missing_keys = GRANT_OPTS - opts.keys
raise ParameterError, %Q(missing grant options: #{missing_keys.inspect}) unless missing_keys.empty?
args = {on: target}
opts.each do |k, v|
args[k.intern] = v
end
grant(**args)
end
|
#json_param(jsonpath) ⇒ Object
425
426
427
428
429
430
431
432
433
434
|
# File 'lib/bricolage/psqldatasource.rb', line 425
def json_param(jsonpath)
case jsonpath
when nil
'auto'
when %r{\As3://}
jsonpath
else
src_ds.url(jsonpath)
end
end
|
#load(src_ds, src_path, dest_table, format, jsonpath, opts) ⇒ Object
392
393
394
|
# File 'lib/bricolage/psqldatasource.rb', line 392
def load(src_ds, src_path, dest_table, format, jsonpath, opts)
exec SQLStatement.for_string(copy_statement(src_ds, src_path, dest_table, format, jsonpath, opts))
end
|
#rename_table(src, dest) ⇒ Object
338
339
340
|
# File 'lib/bricolage/psqldatasource.rb', line 338
def rename_table(src, dest)
exec SQLStatement.for_string("alter table #{src} rename to #{dest};")
end
|
#run ⇒ Object
267
268
269
270
271
|
# File 'lib/bricolage/psqldatasource.rb', line 267
def run
VacuumLock.using {
@ds.execute source
}
end
|
#run_explain ⇒ Object
273
274
275
|
# File 'lib/bricolage/psqldatasource.rb', line 273
def run_explain
@ds.execute explain_source
end
|
#serialize_vacuum ⇒ Object
#source ⇒ Object
255
256
257
258
259
260
261
262
263
264
|
# File 'lib/bricolage/psqldatasource.rb', line 255
def source
buf = StringIO.new
buf.puts '\timing on'
each_statement do |stmt|
buf.puts
buf.puts "/* #{stmt.location} */" if stmt.location
buf.puts stmt.stripped_source
end
buf.string
end
|
#support_explain?(statement_kind) ⇒ Boolean
291
292
293
294
295
296
|
# File 'lib/bricolage/psqldatasource.rb', line 291
def support_explain?(statement_kind)
case statement_kind
when 'select', 'insert', 'update', 'delete' then true
else false
end
end
|
#truncate_if(enabled, target = '${dest_table}') ⇒ Object
334
335
336
|
# File 'lib/bricolage/psqldatasource.rb', line 334
def truncate_if(enabled, target = '${dest_table}')
exec SQLStatement.for_string("truncate #{target};") if enabled
end
|
#unload(stmt, dest_ds, dest_path, format, opts) ⇒ Object
436
437
438
|
# File 'lib/bricolage/psqldatasource.rb', line 436
def unload(stmt, dest_ds, dest_path, format, opts)
exec unload_statement(stmt, dest_ds, dest_path, format, opts)
end
|
454
455
456
457
458
459
460
461
462
463
464
465
466
467
|
# File 'lib/bricolage/psqldatasource.rb', line 454
def unload_format_option(format, ds)
case format
when 'tsv'
%q(delimiter '\t')
when 'csv'
%q(delimiter ',')
when 'parquet'
'format as parquet'
when 'json'
'format as json'
else
raise ParameterError, "unsupported format: #{fmt}"
end
end
|
#unload_statement(stmt, dest_ds, dest_path, format, opts) ⇒ Object
440
441
442
443
444
445
446
447
448
449
450
451
452
|
# File 'lib/bricolage/psqldatasource.rb', line 440
def unload_statement(stmt, dest_ds, dest_path, format, opts)
buf = StringIO.new
buf.puts "unload ('#{format_query(stmt.stripped_raw_content)}')"
buf.puts "to '#{dest_ds.url(dest_path)}'"
buf.puts "credentials '#{dest_ds.credential_string}'"
buf.puts unload_format_option(format, dest_ds)
opts.each do |opt|
buf.puts opt.to_s
end
buf.puts ';'
res = StringResource.new(buf.string, stmt.location)
SQLStatement.new(res, stmt.declarations)
end
|
#vacuum_if(enable_vacuum, enable_vacuum_sort, target = '${dest_table}') ⇒ Object
342
343
344
345
346
347
348
349
350
351
352
|
# File 'lib/bricolage/psqldatasource.rb', line 342
def vacuum_if(enable_vacuum, enable_vacuum_sort, target = '${dest_table}')
if enable_vacuum
serialize_vacuum {
exec SQLStatement.for_string("vacuum #{target};")
}
elsif enable_vacuum_sort
serialize_vacuum {
exec SQLStatement.for_string("vacuum sort only #{target};")
}
end
end
|