Module: SwitchmanInstJobs::Switchman::Shard::ClassMethods
- Defined in:
- lib/switchman_inst_jobs/switchman/shard.rb
Instance Method Summary collapse
- #activate!(categories) ⇒ Object
- #clear_cache ⇒ Object
- #delayed_jobs_shards ⇒ Object
-
#hold_jobs!(wait: false) ⇒ Object
Adapted from hold/unhold methods in base delayed jobs base Wait is required to be able to safely move jobs.
- #periodic_clear_shard_cache ⇒ Object
- #skip_delayed_job_auto_activation ⇒ Object
- #unhold_jobs! ⇒ Object
Instance Method Details
#activate!(categories) ⇒ Object
106 107 108 109 110 111 112 113 114 115 116 117 |
# File 'lib/switchman_inst_jobs/switchman/shard.rb', line 106 def activate!(categories) if !@skip_delayed_job_auto_activation && !categories[::Delayed::Backend::ActiveRecord::AbstractJob] && categories[::ActiveRecord::Base] && categories[::ActiveRecord::Base] != ::Switchman::Shard.current(::ActiveRecord::Base) skip_delayed_job_auto_activation do categories[::Delayed::Backend::ActiveRecord::AbstractJob] = categories[::ActiveRecord::Base].delayed_jobs_shard end end super end |
#clear_cache ⇒ Object
101 102 103 104 |
# File 'lib/switchman_inst_jobs/switchman/shard.rb', line 101 def clear_cache super remove_instance_variable(:@delayed_jobs_shards) if instance_variable_defined?(:@delayed_jobs_shards) end |
#delayed_jobs_shards ⇒ Object
135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 |
# File 'lib/switchman_inst_jobs/switchman/shard.rb', line 135 def delayed_jobs_shards return none unless ::Switchman::Shard.columns_hash.key?("delayed_jobs_shard_id") scope = ::Switchman::Shard.unscoped .where(id: ::Switchman::Shard.unscoped .distinct .where.not(delayed_jobs_shard_id: nil) .select(:delayed_jobs_shard_id)) db_jobs_shards = ::Switchman::DatabaseServer.all.map { |db| db.config[:delayed_jobs_shard] }.uniq db_jobs_shards.delete(nil) has_self = db_jobs_shards.delete("self") scope = scope.or(::Switchman::Shard.unscoped.where(id: db_jobs_shards)) unless db_jobs_shards.empty? if has_self self_dbs = ::Switchman::DatabaseServer.select { |db| db.config[:delayed_jobs_shard] == "self" }.map(&:id) scope = scope.or(::Switchman::Shard.unscoped .where(id: ::Switchman::Shard.unscoped.where(delayed_jobs_shard_id: nil, database_server_id: self_dbs) .select(:id))) end @jobs_scope_empty = !scope.exists? unless instance_variable_defined?(:@jobs_scope_empty) return ::Switchman::Shard.where(id: ::Switchman::Shard.default.id) if @jobs_scope_empty ::Switchman::Shard.merge(scope) end |
#hold_jobs!(wait: false) ⇒ Object
Adapted from hold/unhold methods in base delayed jobs base Wait is required to be able to safely move jobs
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/switchman_inst_jobs/switchman/shard.rb', line 35 def hold_jobs!(wait: false) shards = all.to_a wait_for_caches = false shards.each do |shard| shard.jobs_held = true if shard.changed? shard.save! wait_for_caches = true if wait end end shards_by_jobs_shard(shards).each do |jobs_shard, shard_ids| jobs_shard.activate(::Delayed::Backend::ActiveRecord::AbstractJob) do lock_jobs_for_hold(shard_ids) end end return unless wait # Wait a little over the 60 second in-process shard cache clearing # threshold to ensure that all new jobs are now being enqueued # locked Rails.logger.debug("Waiting for caches to clear") sleep(65) if wait && wait_for_caches shards_by_jobs_shard(shards).each do |jobs_shard, shard_ids| jobs_shard.activate(::Delayed::Backend::ActiveRecord::AbstractJob) do while ::Delayed::Job.where(shard_id: shard_ids) .where.not(locked_at: nil) .where.not(locked_by: ::Delayed::Backend::Base::ON_HOLD_LOCKED_BY).exists? sleep 10 lock_jobs_for_hold(shard_ids) end end end end |
#periodic_clear_shard_cache ⇒ Object
127 128 129 130 131 132 133 |
# File 'lib/switchman_inst_jobs/switchman/shard.rb', line 127 def periodic_clear_shard_cache # TODO: make this configurable @timed_cache ||= TimedCache.new(-> { 60.seconds.ago }) do ::Switchman::Shard.clear_cache end @timed_cache.clear end |
#skip_delayed_job_auto_activation ⇒ Object
119 120 121 122 123 124 125 |
# File 'lib/switchman_inst_jobs/switchman/shard.rb', line 119 def skip_delayed_job_auto_activation was = @skip_delayed_job_auto_activation @skip_delayed_job_auto_activation = true yield ensure @skip_delayed_job_auto_activation = was end |
#unhold_jobs! ⇒ Object
70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 |
# File 'lib/switchman_inst_jobs/switchman/shard.rb', line 70 def unhold_jobs! shards = all.to_a waited = false shards.each do |shard| shard.jobs_held = false next unless shard.changed? shard.save! next if waited # Wait a little over the 60 second in-process shard cache clearing # threshold to ensure that all new jobs are now being enqueued # unlocked Rails.logger.debug("Waiting for caches to clear") sleep(65) waited = true end shards_by_jobs_shard(shards).each do |jobs_shard, shard_ids| jobs_shard.activate(::Delayed::Backend::ActiveRecord::AbstractJob) do ::Delayed::Job.where(locked_by: ::Delayed::Backend::Base::ON_HOLD_LOCKED_BY, shard_id: shard_ids) .in_batches(of: 10_000) .update_all( locked_by: nil, locked_at: nil, attempts: 0, failed_at: nil ) end end end |