44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
|
# File 'lib/tapsoob/cli/data_stream.rb', line 44
def push(database_url, dump_path = nil)
opts = parse_opts(options)
if dump_path && Dir.exist?(dump_path)
op = Tapsoob::Operation::Base.factory(:push, database_url, dump_path, opts)
op.push_data
else
if opts[:parallel] && opts[:parallel] > 1
STDERR.puts "Warning: Parallel mode not supported when reading from STDIN"
end
STDIN.each_line do |line|
table = JSON.parse(line, symbolize_names: true)
table_name = table[:table_name]
conn = Sequel.connect(database_url)
begin
if conn.uri =~ /mysql/i
conn.run("SET SESSION wait_timeout=28800")
conn.run("SET SESSION interactive_timeout=28800")
conn.run("SET SESSION net_read_timeout=3600")
conn.run("SET SESSION net_write_timeout=3600")
end
conn[table_name.to_sym].truncate if opts[:purge]
stream = Tapsoob::DataStream::Base.factory(conn, {
table_name: table_name,
chunksize: opts[:default_chunksize]
}, { :"discard-identity" => opts[:"discard-identity"] || false, :purge => opts[:purge] || false, :debug => opts[:debug] })
stream.import_rows(table)
rescue Exception => e
Tapsoob.log.debug e.message
STDERR.puts "Error loading data in #{table_name} : #{e.message}"
ensure
conn.disconnect
end
end
end
end
|