Class: Tapsoob::Pull

Inherits:
Operation show all
Defined in:
lib/tapsoob/operation.rb

Instance Attribute Summary

Attributes inherited from Operation

#database_url, #dump_path, #opts

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Operation

#apply_table_filter, #catch_errors, #completed_tables, #db, #default_chunksize, #exclude_tables, #exiting?, #format_number, #indexes_first?, #initialize, #log, #resuming?, #setup_signal_trap, #skip_schema?, #store_session, #stream_state, #stream_state=, #table_filter

Constructor Details

This class inherits a constructor from Tapsoob::Operation

Class Method Details

.factory(db, state) ⇒ Object



272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
# File 'lib/tapsoob/operation.rb', line 272

def self.factory(db, state)
  if defined?(Sequel::MySQL) && Sequel::MySQL.respond_to?(:convert_invalid_date_time=)
    Sequel::MySQL.convert_invalid_date_time = :nil
  end

  if state.has_key?(:klass)
    return eval(state[:klass]).new(db, state)
  end

  if Tapsoob::Utils.single_integer_primary_key(db, state[:table_name].to_sym)
    DataStreamKeyed.new(db, state)
  else
    DataStream.new(db, state)
  end
end

Instance Method Details

#fetch_tables_infoObject



262
263
264
265
266
267
268
269
270
# File 'lib/tapsoob/operation.rb', line 262

def fetch_tables_info
  tables = db.tables

  data = {}
  apply_table_filter(tables).each do |table_name|
    data[table_name] = db[table_name].count
  end
  data
end

#file_prefixObject



154
155
156
# File 'lib/tapsoob/operation.rb', line 154

def file_prefix
  "pull"
end

#pull_dataObject



189
190
191
192
193
194
195
196
197
198
199
200
201
202
# File 'lib/tapsoob/operation.rb', line 189

def pull_data
  log.info "Receiving data"

  log.info "#{tables.size} tables, #{format_number(record_count)} records"

  tables.each do |table_name, count|
    progress = (opts[:progress] ? ProgressBar.new(table_name.to_s, count) : nil)
    stream   = Tapsoob::DataStream.factory(db, {
      :chunksize  => default_chunksize,
      :table_name => table_name
    }, { :debug => opts[:debug] })
    pull_data_from_table(stream, progress)
  end
end

#pull_data_from_table(stream, progress) ⇒ Object



216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
# File 'lib/tapsoob/operation.rb', line 216

def pull_data_from_table(stream, progress)
  loop do
    begin
      exit 0 if exiting?

      size = stream.fetch_database do |rows|
        if dump_path.nil?
          puts JSON.generate(rows)
        else
          Tapsoob::Utils.export_rows(dump_path, stream.table_name, rows)
        end
      end
      stream.error = false
      self.stream_state = stream.to_hash
    rescue Tapsoob::CorruptedData => e
      log.info "Corrupted Data Received #{e.message}, retrying..."
      stream.error = true
      next
    end

    progress.inc(size) if progress && !exiting?
    break if stream.complete?
  end

  progress.finish if progress
  completed_tables << stream.table_name.to_s
  self.stream_state = {}
end

#pull_indexesObject



288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
# File 'lib/tapsoob/operation.rb', line 288

def pull_indexes
  log.info "Receiving indexes"

  raw_idxs = Tapsoob::Schema.indexes_individual(database_url)
  idxs     = (raw_idxs && raw_idxs.length >= 2 ? JSON.parse(raw_idxs) : {})

  apply_table_filter(idxs).each do |table, indexes|
    next unless indexes.size > 0
    progress = ProgressBar.new(table, indexes.size)
    indexes.each do |idx|
      Tapsoob::Utils.export_indexes(dump_path, table, idx)
      progress.inc(1)
    end
    progress.finish
  end
end

#pull_partial_dataObject



204
205
206
207
208
209
210
211
212
213
214
# File 'lib/tapsoob/operation.rb', line 204

def pull_partial_data
  return if stream_state == {}

  table_name = stream_state[:table_name]
  record_count = tables[table_name.to_s]
  log.info "Resuming #{table_name}, #{format_number(record_count)} records"

  progress = (opts[:progress] ? ProgressBar.new(table_name.to_s, record_count) : nil)
  stream = Tapsoob::DataStream.factory(db, stream_state)
  pull_data_from_table(stream, progress)
end

#pull_reset_sequencesObject



305
306
307
308
309
# File 'lib/tapsoob/operation.rb', line 305

def pull_reset_sequences
  log.info "Resetting sequences"

  Tapsoob::Utils.schema_bin(:reset_db_sequences, database_url)
end

#pull_schemaObject



176
177
178
179
180
181
182
183
184
185
186
187
# File 'lib/tapsoob/operation.rb', line 176

def pull_schema
  log.info "Receiving schema"

  progress = ProgressBar.new('Schema', tables.size)
  tables.each do |table_name, count|
    schema_data = Tapsoob::Schema.dump_table(database_url, table_name)
    log.debug "Table: #{table_name}\n#{schema_data}\n"
    Tapsoob::Utils.export_schema(dump_path, table_name, schema_data)
    progress.inc(1)
  end
  progress.finish
end

#record_countObject



254
255
256
# File 'lib/tapsoob/operation.rb', line 254

def record_count
  tables_info.values.inject(:+)
end

#runObject



162
163
164
165
166
167
168
169
170
171
172
173
174
# File 'lib/tapsoob/operation.rb', line 162

def run
  catch_errors do
    unless resuming?
      pull_schema if !skip_schema?
      pull_indexes if indexes_first? && !skip_schema?
    end
    setup_signal_trap
    pull_partial_data if resuming?
    pull_data
    pull_indexes if !indexes_first? && !skip_schema?
    pull_reset_sequences
  end
end

#tablesObject



245
246
247
248
249
250
251
252
# File 'lib/tapsoob/operation.rb', line 245

def tables
  h = {}
  tables_info.each do |table_name, count|
    next if completed_tables.include?(table_name.to_s)
    h[table_name.to_s] = count
  end
  h
end

#tables_infoObject



258
259
260
# File 'lib/tapsoob/operation.rb', line 258

def tables_info
  opts[:tables_info] ||= fetch_tables_info
end

#to_hashObject



158
159
160
# File 'lib/tapsoob/operation.rb', line 158

def to_hash
  super.merge(:remote_tables_info => remote_tables_info)
end