Class: Dekiru::DataMigration::Operator
- Inherits:
-
Object
- Object
- Dekiru::DataMigration::Operator
- Defined in:
- lib/dekiru/data_migration/operator.rb,
sig/dekiru/data_migration/operator.rbs
Overview
Data migration operator with transaction control and progress tracking
Direct Known Subclasses
Defined Under Namespace
Classes: NestedTransactionError
Instance Attribute Summary collapse
-
#canceled ⇒ Boolean?
readonly
Returns the value of attribute canceled.
-
#ended_at ⇒ Time?
readonly
Returns the value of attribute ended_at.
-
#error ⇒ Exception?
readonly
Returns the value of attribute error.
-
#logger ⇒ Logger?
readonly
Returns the value of attribute logger.
-
#result ⇒ Boolean?
readonly
Returns the value of attribute result.
-
#started_at ⇒ Time?
readonly
Returns the value of attribute started_at.
-
#stream ⇒ IO
readonly
Returns the value of attribute stream.
-
#title ⇒ String
readonly
Returns the value of attribute title.
Class Method Summary collapse
Instance Method Summary collapse
- #cancel! ⇒ void
-
#confirm?(message = "Are you sure?") ⇒ Boolean
rubocop:disable Metrics/MethodLength.
- #current_transaction_open? ⇒ Boolean
- #duration ⇒ Float
-
#each_with_progress(enum, options = {}) ⇒ void
rubocop:disable Metrics/MethodLength.
-
#execute { ... } ⇒ Boolean
rubocop:disable Metrics/AbcSize,Metrics/MethodLength.
- #find_each_with_progress(target_scope, options = {}) {|arg0| ... } ⇒ void
-
#handle_notification(*args) ⇒ void
rubocop:disable Metrics/AbcSize.
- #increment_side_effects(type, value) ⇒ void
-
#initialize(title, options = {}) ⇒ Operator
constructor
A new instance of Operator.
- #log(message) ⇒ void
- #newline ⇒ void
- #run { ... } ⇒ void
- #transaction_provider ⇒ Object
- #warning_side_effects { ... } ⇒ void
Constructor Details
#initialize(title, options = {}) ⇒ Operator
Returns a new instance of Operator.
19 20 21 22 23 24 25 26 27 28 29 30 |
# File 'lib/dekiru/data_migration/operator.rb', line 19 def initialize(title, = {}) @title = title @options = @logger = @options.fetch(:logger) do Logger.new(Rails.root.join("log/data_migration_#{Time.current.strftime("%Y%m%d%H%M")}.log")) end @stream = @options.fetch(:output, $stdout) @without_transaction = @options.fetch(:without_transaction, false) @side_effects = Hash.new do |hash, key| hash[key] = Hash.new(0) end end |
Instance Attribute Details
#canceled ⇒ Boolean? (readonly)
Returns the value of attribute canceled.
13 14 15 |
# File 'lib/dekiru/data_migration/operator.rb', line 13 def canceled @canceled end |
#ended_at ⇒ Time? (readonly)
Returns the value of attribute ended_at.
13 14 15 |
# File 'lib/dekiru/data_migration/operator.rb', line 13 def ended_at @ended_at end |
#error ⇒ Exception? (readonly)
Returns the value of attribute error.
13 14 15 |
# File 'lib/dekiru/data_migration/operator.rb', line 13 def error @error end |
#logger ⇒ Logger? (readonly)
Returns the value of attribute logger.
13 14 15 |
# File 'lib/dekiru/data_migration/operator.rb', line 13 def logger @logger end |
#result ⇒ Boolean? (readonly)
Returns the value of attribute result.
13 14 15 |
# File 'lib/dekiru/data_migration/operator.rb', line 13 def result @result end |
#started_at ⇒ Time? (readonly)
Returns the value of attribute started_at.
13 14 15 |
# File 'lib/dekiru/data_migration/operator.rb', line 13 def started_at @started_at end |
#stream ⇒ IO (readonly)
Returns the value of attribute stream.
13 14 15 |
# File 'lib/dekiru/data_migration/operator.rb', line 13 def stream @stream end |
#title ⇒ String (readonly)
Returns the value of attribute title.
13 14 15 |
# File 'lib/dekiru/data_migration/operator.rb', line 13 def title @title end |
Class Method Details
.execute(title, options = {}) { ... } ⇒ Boolean
15 16 17 |
# File 'lib/dekiru/data_migration/operator.rb', line 15 def self.execute(title, = {}, &block) new(title, ).execute(&block) end |
Instance Method Details
#cancel! ⇒ void
This method returns an undefined value.
121 122 123 124 |
# File 'lib/dekiru/data_migration/operator.rb', line 121 def cancel! log "Canceled: #{title}" raise ActiveRecord::Rollback end |
#confirm?(message = "Are you sure?") ⇒ Boolean
rubocop:disable Metrics/MethodLength
103 104 105 106 107 108 109 110 111 112 113 114 115 |
# File 'lib/dekiru/data_migration/operator.rb', line 103 def confirm?( = "Are you sure?") # rubocop:disable Metrics/MethodLength loop do stream.print "#{} (yes/no) > " case $stdin.gets.strip when "yes" newline return true when "no" newline cancel! end end end |
#current_transaction_open? ⇒ Boolean
167 168 169 |
# File 'lib/dekiru/data_migration/operator.rb', line 167 def current_transaction_open? transaction_provider.current_transaction_open? end |
#duration ⇒ Float
60 61 62 |
# File 'lib/dekiru/data_migration/operator.rb', line 60 def duration ((ended_at || Time.current) - started_at) end |
#each_with_progress(enum, options = {}) ⇒ void
This method returns an undefined value.
rubocop:disable Metrics/MethodLength
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 |
# File 'lib/dekiru/data_migration/operator.rb', line 64 def each_with_progress(enum, = {}) # rubocop:disable Metrics/MethodLength = .dup [:total] ||= begin (enum.size == Float::INFINITY ? nil : enum.size) rescue StandardError nil end [:format] ||= [:total] ? "%a |%b>>%i| %p%% %t" : "%a |%b>>%i| ??%% %t" [:output] = stream @pb = ::ProgressBar.create() enum.each do |item| yield item @pb.increment end @pb.finish end |
#execute { ... } ⇒ Boolean
rubocop:disable Metrics/AbcSize,Metrics/MethodLength
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/dekiru/data_migration/operator.rb', line 32 def execute(&block) # rubocop:disable Metrics/AbcSize,Metrics/MethodLength @started_at = Time.current log "Start: #{title} at #{started_at}\n\n" if @without_transaction run(&block) @result = true else raise NestedTransactionError if current_transaction_open? @result = transaction_provider.within_transaction do run(&block) log "Finished execution: #{title}" confirm?("\nAre you sure to commit?") end end log "Finished successfully: #{title}" if @result == true rescue StandardError => e @error = e @result = false ensure @ended_at = Time.current log "Total time: #{duration.round(2)} sec" raise error if error return @result # rubocop:disable Lint/EnsureReturn end |
#find_each_with_progress(target_scope, options = {}) {|arg0| ... } ⇒ void
This method returns an undefined value.
82 83 84 85 86 87 88 89 |
# File 'lib/dekiru/data_migration/operator.rb', line 82 def find_each_with_progress(target_scope, = {}, &block) # `LocalJumpError: no block given (yield)` が出る場合、 find_each メソッドが enumerator を返していない可能性があります # 直接 each_with_progress を使うか、 find_each が enumerator を返すように修正してください = .dup batch_size = .delete(:batch_size) scope = batch_size ? target_scope.find_each(batch_size: batch_size) : target_scope.find_each each_with_progress(scope, , &block) end |
#handle_notification(*args) ⇒ void
This method returns an undefined value.
rubocop:disable Metrics/AbcSize
126 127 128 129 130 131 132 133 134 135 |
# File 'lib/dekiru/data_migration/operator.rb', line 126 def handle_notification(*args) # rubocop:disable Metrics/AbcSize event = ActiveSupport::Notifications::Event.new(*args) increment_side_effects(:enqueued_jobs, event.payload[:job].class.name) if event.payload[:job] increment_side_effects(:delivered_mailers, event.payload[:mailer]) if event.payload[:mailer] return unless event.payload[:sql] && /\A\s*(insert|update|delete)/i.match?(event.payload[:sql]) increment_side_effects(:write_queries, event.payload[:sql]) end |
#increment_side_effects(type, value) ⇒ void
This method returns an undefined value.
137 138 139 |
# File 'lib/dekiru/data_migration/operator.rb', line 137 def increment_side_effects(type, value) @side_effects[type][value] += 1 end |
#log(message) ⇒ void
This method returns an undefined value.
93 94 95 96 97 98 99 100 101 |
# File 'lib/dekiru/data_migration/operator.rb', line 93 def log() if @pb && !@pb.finished? @pb.log() else stream.puts() end logger&.info(.squish) end |
#newline ⇒ void
This method returns an undefined value.
117 118 119 |
# File 'lib/dekiru/data_migration/operator.rb', line 117 def newline stream.puts("") end |
#run { ... } ⇒ void
This method returns an undefined value.
155 156 157 158 159 160 161 |
# File 'lib/dekiru/data_migration/operator.rb', line 155 def run(&block) if @options.fetch(:warning_side_effects, true) warning_side_effects(&block) else instance_eval(&block) end end |
#transaction_provider ⇒ Object
163 164 165 |
# File 'lib/dekiru/data_migration/operator.rb', line 163 def transaction_provider Dekiru::DataMigration.configuration.transaction_provider end |
#warning_side_effects { ... } ⇒ void
This method returns an undefined value.
141 142 143 144 145 146 147 148 149 150 151 152 153 |
# File 'lib/dekiru/data_migration/operator.rb', line 141 def warning_side_effects(&block) ActiveSupport::Notifications.subscribed(method(:handle_notification), /^(sql|enqueue|deliver)/) do instance_eval(&block) end @side_effects.each do |name, items| newline log "#{name.to_s.titlecase}!!" items.sort_by { |_v, c| c }.reverse.slice(0, 20).each do |value, count| log "#{count} call: #{value}" end end end |