Class: Tapsoob::Pull
Instance Attribute Summary
Attributes inherited from Operation
#database_url, #dump_path, #opts
Class Method Summary
collapse
Instance Method Summary
collapse
Methods inherited from Operation
#apply_table_filter, #catch_errors, #completed_tables, #db, #default_chunksize, #exclude_tables, #exiting?, #format_number, #indexes_first?, #initialize, #log, #resuming?, #schema_only?, #setup_signal_trap, #skip_schema?, #store_session, #stream_state, #stream_state=, #table_filter
Class Method Details
.factory(db, state) ⇒ Object
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
|
# File 'lib/tapsoob/operation.rb', line 277
def self.factory(db, state)
if defined?(Sequel::MySQL) && Sequel::MySQL.respond_to?(:convert_invalid_date_time=)
Sequel::MySQL.convert_invalid_date_time = :nil
end
if state.has_key?(:klass)
return eval(state[:klass]).new(db, state)
end
if Tapsoob::Utils.single_integer_primary_key(db, state[:table_name].to_sym)
DataStreamKeyed.new(db, state)
else
DataStream.new(db, state)
end
end
|
Instance Method Details
#fetch_tables_info ⇒ Object
267
268
269
270
271
272
273
274
275
|
# File 'lib/tapsoob/operation.rb', line 267
def fetch_tables_info
tables = db.tables
data = {}
apply_table_filter(tables).each do |table_name|
data[table_name] = db[table_name].count
end
data
end
|
#file_prefix ⇒ Object
158
159
160
|
# File 'lib/tapsoob/operation.rb', line 158
def file_prefix
"pull"
end
|
#pull_data ⇒ Object
194
195
196
197
198
199
200
201
202
203
204
205
206
207
|
# File 'lib/tapsoob/operation.rb', line 194
def pull_data
log.info "Receiving data"
log.info "#{tables.size} tables, #{format_number(record_count)} records"
tables.each do |table_name, count|
progress = (opts[:progress] ? ProgressBar.new(table_name.to_s, count) : nil)
stream = Tapsoob::DataStream.factory(db, {
:chunksize => default_chunksize,
:table_name => table_name
}, { :debug => opts[:debug] })
pull_data_from_table(stream, progress)
end
end
|
#pull_data_from_table(stream, progress) ⇒ Object
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
|
# File 'lib/tapsoob/operation.rb', line 221
def pull_data_from_table(stream, progress)
loop do
begin
exit 0 if exiting?
size = stream.fetch_database do |rows|
if dump_path.nil?
puts JSON.generate(rows)
else
Tapsoob::Utils.export_rows(dump_path, stream.table_name, rows)
end
end
stream.error = false
self.stream_state = stream.to_hash
rescue Tapsoob::CorruptedData => e
log.info "Corrupted Data Received #{e.message}, retrying..."
stream.error = true
next
end
progress.inc(size) if progress && !exiting?
break if stream.complete?
end
progress.finish if progress
completed_tables << stream.table_name.to_s
self.stream_state = {}
end
|
#pull_indexes ⇒ Object
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
|
# File 'lib/tapsoob/operation.rb', line 293
def pull_indexes
log.info "Receiving indexes"
raw_idxs = Tapsoob::Schema.indexes_individual(database_url)
idxs = (raw_idxs && raw_idxs.length >= 2 ? JSON.parse(raw_idxs) : {})
apply_table_filter(idxs).each do |table, indexes|
next unless indexes.size > 0
progress = ProgressBar.new(table, indexes.size)
indexes.each do |idx|
output = Tapsoob::Utils.export_indexes(dump_path, table, idx)
puts output if dump_path.nil? && output
progress.inc(1)
end
progress.finish
end
end
|
#pull_partial_data ⇒ Object
209
210
211
212
213
214
215
216
217
218
219
|
# File 'lib/tapsoob/operation.rb', line 209
def pull_partial_data
return if stream_state == {}
table_name = stream_state[:table_name]
record_count = tables[table_name.to_s]
log.info "Resuming #{table_name}, #{format_number(record_count)} records"
progress = (opts[:progress] ? ProgressBar.new(table_name.to_s, record_count) : nil)
stream = Tapsoob::DataStream.factory(db, stream_state)
pull_data_from_table(stream, progress)
end
|
#pull_reset_sequences ⇒ Object
311
312
313
314
315
316
|
# File 'lib/tapsoob/operation.rb', line 311
def pull_reset_sequences
log.info "Resetting sequences"
output = Tapsoob::Utils.schema_bin(:reset_db_sequences, database_url)
puts output if dump_path.nil? && output
end
|
#pull_schema ⇒ Object
180
181
182
183
184
185
186
187
188
189
190
191
192
|
# File 'lib/tapsoob/operation.rb', line 180
def pull_schema
log.info "Receiving schema"
progress = ProgressBar.new('Schema', tables.size)
tables.each do |table_name, count|
schema_data = Tapsoob::Schema.dump_table(database_url, table_name)
log.debug "Table: #{table_name}\n#{schema_data}\n"
output = Tapsoob::Utils.export_schema(dump_path, table_name, schema_data)
puts output if dump_path.nil? && output
progress.inc(1)
end
progress.finish
end
|
#record_count ⇒ Object
259
260
261
|
# File 'lib/tapsoob/operation.rb', line 259
def record_count
tables_info.values.inject(:+)
end
|
#run ⇒ Object
166
167
168
169
170
171
172
173
174
175
176
177
178
|
# File 'lib/tapsoob/operation.rb', line 166
def run
catch_errors do
unless resuming?
pull_schema if !skip_schema?
pull_indexes if indexes_first? && !skip_schema?
end
setup_signal_trap
pull_partial_data if resuming?
pull_data unless schema_only?
pull_indexes if !indexes_first? && !skip_schema?
pull_reset_sequences
end
end
|
#tables ⇒ Object
250
251
252
253
254
255
256
257
|
# File 'lib/tapsoob/operation.rb', line 250
def tables
h = {}
tables_info.each do |table_name, count|
next if completed_tables.include?(table_name.to_s)
h[table_name.to_s] = count
end
h
end
|
#tables_info ⇒ Object
263
264
265
|
# File 'lib/tapsoob/operation.rb', line 263
def tables_info
opts[:tables_info] ||= fetch_tables_info
end
|
#to_hash ⇒ Object
162
163
164
|
# File 'lib/tapsoob/operation.rb', line 162
def to_hash
super.merge(:remote_tables_info => remote_tables_info)
end
|