Class: Tapsoob::Operation

Inherits:
Object
  • Object
show all
Defined in:
lib/tapsoob/operation.rb

Direct Known Subclasses

Pull, Push

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(database_url, dump_path = nil, opts = {}) ⇒ Operation

Returns a new instance of Operation.



14
15
16
17
18
19
# File 'lib/tapsoob/operation.rb', line 14

def initialize(database_url, dump_path = nil, opts={})
  @database_url = database_url
  @dump_path    = dump_path
  @opts         = opts
  @exiting      = false
end

Instance Attribute Details

#database_urlObject (readonly)

Returns the value of attribute database_url.



12
13
14
# File 'lib/tapsoob/operation.rb', line 12

def database_url
  @database_url
end

#dump_pathObject (readonly)

Returns the value of attribute dump_path.



12
13
14
# File 'lib/tapsoob/operation.rb', line 12

def dump_path
  @dump_path
end

#optsObject (readonly)

Returns the value of attribute opts.



12
13
14
# File 'lib/tapsoob/operation.rb', line 12

def opts
  @opts
end

Class Method Details

.factory(type, database_url, dump_path, opts) ⇒ Object



164
165
166
167
168
169
170
171
172
173
174
# File 'lib/tapsoob/operation.rb', line 164

def self.factory(type, database_url, dump_path, opts)
  type = :resume if opts[:resume]
  klass = case type
    when :pull   then Tapsoob::Pull
    when :push   then Tapsoob::Push
    when :resume then eval(opts[:klass])
    else raise "Unknown Operation Type -> #{type}"
  end

  klass.new(database_url, dump_path, opts)
end

Instance Method Details

#add_completed_table(table_name) ⇒ Object



146
147
148
149
150
# File 'lib/tapsoob/operation.rb', line 146

def add_completed_table(table_name)
  completed_tables_mutex.synchronize do
    completed_tables << table_name.to_s
  end
end

#apply_table_filter(tables) ⇒ Object



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/tapsoob/operation.rb', line 45

def apply_table_filter(tables)
  return tables if table_filter.empty? && exclude_tables.empty?

  if tables.kind_of?(Hash)
    ntables = {}
    tables.each do |t, d|
      if !exclude_tables.include?(t.to_s) && (!table_filter.empty? && table_filter.include?(t.to_s))
        ntables[t] = d
      end
    end
    ntables
  else
    tables.reject { |t| exclude_tables.include?(t.to_s) }.select { |t| table_filter.include?(t.to_s) }
  end
end

#catch_errors(&blk) ⇒ Object



156
157
158
159
160
161
162
# File 'lib/tapsoob/operation.rb', line 156

def catch_errors(&blk)
  begin
    blk.call
  rescue Exception => e
    raise e
  end
end

#completed_tablesObject



108
109
110
# File 'lib/tapsoob/operation.rb', line 108

def completed_tables
  opts[:completed_tables] ||= []
end

#completed_tables_mutexObject



142
143
144
# File 'lib/tapsoob/operation.rb', line 142

def completed_tables_mutex
  @completed_tables_mutex ||= Mutex.new
end

#data?Boolean

Returns:

  • (Boolean)


25
26
27
# File 'lib/tapsoob/operation.rb', line 25

def data?
  opts[:data]
end

#dbObject



120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/tapsoob/operation.rb', line 120

def db
  @db ||= Sequel.connect(database_url, max_connections: parallel_workers * 2)
  @db.extension :schema_dumper
  @db.loggers << Tapsoob.log if opts[:debug]

  # Set parameters
  if @db.uri =~ /oracle/i
    @db << "ALTER SESSION SET NLS_DATE_FORMAT='YYYY-MM-DD HH24:MI:SS'"
    @db << "ALTER SESSION SET NLS_TIMESTAMP_FORMAT='YYYY-MM-DD HH24:MI:SS:FF6'"
  end

  @db
end

#default_chunksizeObject



104
105
106
# File 'lib/tapsoob/operation.rb', line 104

def default_chunksize
  opts[:default_chunksize]
end

#exclude_tablesObject



41
42
43
# File 'lib/tapsoob/operation.rb', line 41

def exclude_tables
  opts[:exclude_tables] || []
end

#exiting?Boolean

Returns:

  • (Boolean)


84
85
86
# File 'lib/tapsoob/operation.rb', line 84

def exiting?
  !!@exiting
end

#file_prefixObject



21
22
23
# File 'lib/tapsoob/operation.rb', line 21

def file_prefix
  "op"
end

#format_number(num) ⇒ Object



152
153
154
# File 'lib/tapsoob/operation.rb', line 152

def format_number(num)
  num.to_s.gsub(/(\d)(?=(\d\d\d)+(?!\d))/, "\\1,")
end

#indexes_first?Boolean

Returns:

  • (Boolean)


33
34
35
# File 'lib/tapsoob/operation.rb', line 33

def indexes_first?
  !!opts[:indexes_first]
end

#logObject



61
62
63
64
# File 'lib/tapsoob/operation.rb', line 61

def log
  Tapsoob.log.level = Logger::DEBUG if opts[:debug]
  Tapsoob.log
end

#parallel?Boolean

Returns:

  • (Boolean)


134
135
136
# File 'lib/tapsoob/operation.rb', line 134

def parallel?
  parallel_workers > 1
end

#parallel_workersObject



138
139
140
# File 'lib/tapsoob/operation.rb', line 138

def parallel_workers
  @parallel_workers ||= [opts[:parallel].to_i, 1].max
end

#resuming?Boolean

Returns:

  • (Boolean)


100
101
102
# File 'lib/tapsoob/operation.rb', line 100

def resuming?
  opts[:resume] == true
end

#schema?Boolean

Returns:

  • (Boolean)


29
30
31
# File 'lib/tapsoob/operation.rb', line 29

def schema?
  opts[:schema]
end

#setup_signal_trapObject



88
89
90
91
92
93
94
95
96
97
98
# File 'lib/tapsoob/operation.rb', line 88

def setup_signal_trap
  trap("INT") {
    puts "\nCompleting current action..."
    @exiting = true
  }

  trap("TERM") {
    puts "\nCompleting current action..."
    @exiting = true
  }
end

#store_sessionObject



66
67
68
69
70
71
72
# File 'lib/tapsoob/operation.rb', line 66

def store_session
  file = "#{file_prefix}_#{Time.now.strftime("%Y%m%d%H%M")}.dat"
  log.info "\nSaving session to #{file}..."
  File.open(file, 'w') do |f|
    f.write(JSON.generate(to_hash))
  end
end

#stream_stateObject



112
113
114
# File 'lib/tapsoob/operation.rb', line 112

def stream_state
  opts[:stream_state] ||= {}
end

#stream_state=(val) ⇒ Object



116
117
118
# File 'lib/tapsoob/operation.rb', line 116

def stream_state=(val)
  opts[:stream_state] = val
end

#table_filterObject



37
38
39
# File 'lib/tapsoob/operation.rb', line 37

def table_filter
  opts[:tables] || []
end

#to_hashObject



74
75
76
77
78
79
80
81
82
# File 'lib/tapsoob/operation.rb', line 74

def to_hash
  {
    :klass            => self.class.to_s,
    :database_url     => database_url,
    :stream_state     => stream_state,
    :completed_tables => completed_tables,
    :table_filter     => table_filter,
  }
end