Class: Dekiru::DataMigration::Operator

Inherits:
Object
  • Object
show all
Defined in:
lib/dekiru/data_migration/operator.rb,
sig/dekiru/data_migration/operator.rbs

Overview

Data migration operator with transaction control and progress tracking

Direct Known Subclasses

Dekiru::DataMigrationOperator

Defined Under Namespace

Classes: NestedTransactionError

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(title, options = {}) ⇒ Operator

Returns a new instance of Operator.

Parameters:

  • title (String)
  • options (Hash[Symbol, untyped]) (defaults to: {})


19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/dekiru/data_migration/operator.rb', line 19

def initialize(title, options = {})
  @title = title
  @options = options
  @logger = @options.fetch(:logger) do
    Logger.new(Rails.root.join("log/data_migration_#{Time.current.strftime("%Y%m%d%H%M")}.log"))
  end
  @stream = @options.fetch(:output, $stdout)
  @without_transaction = @options.fetch(:without_transaction, false)
  @side_effects = Hash.new do |hash, key|
    hash[key] = Hash.new(0)
  end
end

Instance Attribute Details

#canceledBoolean? (readonly)

Returns the value of attribute canceled.

Returns:

  • (Boolean, nil)


13
14
15
# File 'lib/dekiru/data_migration/operator.rb', line 13

def canceled
  @canceled
end

#ended_atTime? (readonly)

Returns the value of attribute ended_at.

Returns:

  • (Time, nil)


13
14
15
# File 'lib/dekiru/data_migration/operator.rb', line 13

def ended_at
  @ended_at
end

#errorException? (readonly)

Returns the value of attribute error.

Returns:

  • (Exception, nil)


13
14
15
# File 'lib/dekiru/data_migration/operator.rb', line 13

def error
  @error
end

#loggerLogger? (readonly)

Returns the value of attribute logger.

Returns:

  • (Logger, nil)


13
14
15
# File 'lib/dekiru/data_migration/operator.rb', line 13

def logger
  @logger
end

#resultBoolean? (readonly)

Returns the value of attribute result.

Returns:

  • (Boolean, nil)


13
14
15
# File 'lib/dekiru/data_migration/operator.rb', line 13

def result
  @result
end

#started_atTime? (readonly)

Returns the value of attribute started_at.

Returns:

  • (Time, nil)


13
14
15
# File 'lib/dekiru/data_migration/operator.rb', line 13

def started_at
  @started_at
end

#streamIO (readonly)

Returns the value of attribute stream.

Returns:

  • (IO)


13
14
15
# File 'lib/dekiru/data_migration/operator.rb', line 13

def stream
  @stream
end

#titleString (readonly)

Returns the value of attribute title.

Returns:

  • (String)


13
14
15
# File 'lib/dekiru/data_migration/operator.rb', line 13

def title
  @title
end

Class Method Details

.execute(title, options = {}) { ... } ⇒ Boolean

Parameters:

  • title (String)
  • options (Hash[Symbol, untyped]) (defaults to: {})

Yields:

Yield Returns:

  • (void)

Returns:

  • (Boolean)


15
16
17
# File 'lib/dekiru/data_migration/operator.rb', line 15

def self.execute(title, options = {}, &block)
  new(title, options).execute(&block)
end

Instance Method Details

#cancel!void

This method returns an undefined value.

Raises:

  • (ActiveRecord::Rollback)


121
122
123
124
# File 'lib/dekiru/data_migration/operator.rb', line 121

def cancel!
  log "Canceled: #{title}"
  raise ActiveRecord::Rollback
end

#confirm?(message = "Are you sure?") ⇒ Boolean

rubocop:disable Metrics/MethodLength

Parameters:

  • message (String) (defaults to: "Are you sure?")

Returns:

  • (Boolean)


103
104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/dekiru/data_migration/operator.rb', line 103

def confirm?(message = "Are you sure?") # rubocop:disable Metrics/MethodLength
  loop do
    stream.print "#{message} (yes/no) > "
    case $stdin.gets.strip
    when "yes"
      newline
      return true
    when "no"
      newline
      cancel!
    end
  end
end

#current_transaction_open?Boolean

Returns:

  • (Boolean)


167
168
169
# File 'lib/dekiru/data_migration/operator.rb', line 167

def current_transaction_open?
  transaction_provider.current_transaction_open?
end

#durationFloat

Returns:

  • (Float)


60
61
62
# File 'lib/dekiru/data_migration/operator.rb', line 60

def duration
  ((ended_at || Time.current) - started_at)
end

#each_with_progress(enum, options = {}) ⇒ void

This method returns an undefined value.

rubocop:disable Metrics/MethodLength



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/dekiru/data_migration/operator.rb', line 64

def each_with_progress(enum, options = {}) # rubocop:disable Metrics/MethodLength
  options = options.dup
  options[:total] ||= begin
    (enum.size == Float::INFINITY ? nil : enum.size)
  rescue StandardError
    nil
  end
  options[:format] ||= options[:total] ? "%a |%b>>%i| %p%% %t" : "%a |%b>>%i| ??%% %t"
  options[:output] = stream

  @pb = ::ProgressBar.create(options)
  enum.each do |item|
    yield item
    @pb.increment
  end
  @pb.finish
end

#execute { ... } ⇒ Boolean

rubocop:disable Metrics/AbcSize,Metrics/MethodLength

Yields:

Yield Returns:

  • (void)

Returns:

  • (Boolean)


32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/dekiru/data_migration/operator.rb', line 32

def execute(&block) # rubocop:disable Metrics/AbcSize,Metrics/MethodLength
  @started_at = Time.current
  log "Start: #{title} at #{started_at}\n\n"
  if @without_transaction
    run(&block)
    @result = true
  else
    raise NestedTransactionError if current_transaction_open?

    @result = transaction_provider.within_transaction do
      run(&block)
      log "Finished execution: #{title}"
      confirm?("\nAre you sure to commit?")
    end
  end
  log "Finished successfully: #{title}" if @result == true
rescue StandardError => e
  @error = e
  @result = false
ensure
  @ended_at = Time.current
  log "Total time: #{duration.round(2)} sec"

  raise error if error

  return @result # rubocop:disable Lint/EnsureReturn
end

#find_each_with_progress(target_scope, options = {}) {|arg0| ... } ⇒ void

This method returns an undefined value.

Parameters:

  • target_scope (Object)
  • options (Hash[Symbol, untyped]) (defaults to: {})

Yields:

Yield Parameters:

  • arg0 (Object)

Yield Returns:

  • (void)


82
83
84
85
86
87
88
89
# File 'lib/dekiru/data_migration/operator.rb', line 82

def find_each_with_progress(target_scope, options = {}, &block)
  # `LocalJumpError: no block given (yield)` が出る場合、 find_each メソッドが enumerator を返していない可能性があります
  # 直接 each_with_progress を使うか、 find_each が enumerator を返すように修正してください
  options = options.dup
  batch_size = options.delete(:batch_size)
  scope = batch_size ? target_scope.find_each(batch_size: batch_size) : target_scope.find_each
  each_with_progress(scope, options, &block)
end

#handle_notification(*args) ⇒ void

This method returns an undefined value.

rubocop:disable Metrics/AbcSize

Parameters:

  • args (Object)


126
127
128
129
130
131
132
133
134
135
# File 'lib/dekiru/data_migration/operator.rb', line 126

def handle_notification(*args) # rubocop:disable Metrics/AbcSize
  event = ActiveSupport::Notifications::Event.new(*args)

  increment_side_effects(:enqueued_jobs, event.payload[:job].class.name) if event.payload[:job]
  increment_side_effects(:delivered_mailers, event.payload[:mailer]) if event.payload[:mailer]

  return unless event.payload[:sql] && /\A\s*(insert|update|delete)/i.match?(event.payload[:sql])

  increment_side_effects(:write_queries, event.payload[:sql])
end

#increment_side_effects(type, value) ⇒ void

This method returns an undefined value.

Parameters:

  • type (Symbol)
  • value (String)


137
138
139
# File 'lib/dekiru/data_migration/operator.rb', line 137

def increment_side_effects(type, value)
  @side_effects[type][value] += 1
end

#log(message) ⇒ void

This method returns an undefined value.

Parameters:

  • message (String)


93
94
95
96
97
98
99
100
101
# File 'lib/dekiru/data_migration/operator.rb', line 93

def log(message)
  if @pb && !@pb.finished?
    @pb.log(message)
  else
    stream.puts(message)
  end

  logger&.info(message.squish)
end

#newlinevoid

This method returns an undefined value.



117
118
119
# File 'lib/dekiru/data_migration/operator.rb', line 117

def newline
  stream.puts("")
end

#run { ... } ⇒ void

This method returns an undefined value.

Yields:

Yield Returns:

  • (void)


155
156
157
158
159
160
161
# File 'lib/dekiru/data_migration/operator.rb', line 155

def run(&block)
  if @options.fetch(:warning_side_effects, true)
    warning_side_effects(&block)
  else
    instance_eval(&block)
  end
end

#transaction_providerObject

Returns:

  • (Object)


163
164
165
# File 'lib/dekiru/data_migration/operator.rb', line 163

def transaction_provider
  Dekiru::DataMigration.configuration.transaction_provider
end

#warning_side_effects { ... } ⇒ void

This method returns an undefined value.

Yields:

Yield Returns:

  • (void)


141
142
143
144
145
146
147
148
149
150
151
152
153
# File 'lib/dekiru/data_migration/operator.rb', line 141

def warning_side_effects(&block)
  ActiveSupport::Notifications.subscribed(method(:handle_notification), /^(sql|enqueue|deliver)/) do
    instance_eval(&block)
  end

  @side_effects.each do |name, items|
    newline
    log "#{name.to_s.titlecase}!!"
    items.sort_by { |_v, c| c }.reverse.slice(0, 20).each do |value, count|
      log "#{count} call: #{value}"
    end
  end
end