Class: Sidekiq::LimitFetch::Heartbeat
- Inherits:
-
Object
- Object
- Sidekiq::LimitFetch::Heartbeat
- Defined in:
- lib/sidekiq/limit_fetch/heartbeat.rb
Overview
Manages lifecycle of the heartbeat thread.
Class Method Summary collapse
-
.shutdown ⇒ Object
Send a shutdown notification to all heartbeat threads.
- .threads ⇒ Set<Thread>
Instance Method Summary collapse
- #capsule_meta ⇒ Global::CapsuleMetadata
- #capsule_sem ⇒ Global::CapsuleSemaphor
- #handle_error(error) ⇒ Object
-
#initialize(capsule) ⇒ Heartbeat
constructor
A new instance of Heartbeat.
- #log(level, message) ⇒ Object
-
#run ⇒ Object
Run a heartbeat every 15s.
- #start ⇒ Thread
-
#with_error_handling ⇒ Boolean
Whether block completed successfully.
Constructor Details
#initialize(capsule) ⇒ Heartbeat
Returns a new instance of Heartbeat.
21 22 23 24 |
# File 'lib/sidekiq/limit_fetch/heartbeat.rb', line 21 def initialize(capsule) @capsule = capsule @down = false end |
Class Method Details
.shutdown ⇒ Object
Send a shutdown notification to all heartbeat threads.
13 14 15 16 17 18 |
# File 'lib/sidekiq/limit_fetch/heartbeat.rb', line 13 def self.shutdown threads.each do |thread| thread.raise(Shutdown) thread.join(LimitFetch.configuration[:heartbeat_period] * 2) end end |
.threads ⇒ Set<Thread>
8 9 10 |
# File 'lib/sidekiq/limit_fetch/heartbeat.rb', line 8 def self.threads @threads ||= Set.new end |
Instance Method Details
#capsule_meta ⇒ Global::CapsuleMetadata
87 88 89 |
# File 'lib/sidekiq/limit_fetch/heartbeat.rb', line 87 def @capsule_meta ||= Global.capsule[@capsule.name] end |
#capsule_sem ⇒ Global::CapsuleSemaphor
82 83 84 |
# File 'lib/sidekiq/limit_fetch/heartbeat.rb', line 82 def capsule_sem @capsule_sem ||= Global::CapsuleSemaphor.new(@capsule) end |
#handle_error(error) ⇒ Object
71 72 73 74 75 76 77 78 79 |
# File 'lib/sidekiq/limit_fetch/heartbeat.rb', line 71 def handle_error(error) if @down # Already logged the original error, make repeating connection errors more concise log :error, "Failed: #{error.class}" else log :error, "#{error.class}: #{error.}#{error.backtrace.join("\n")}" end @down = true end |
#log(level, message) ⇒ Object
93 94 95 |
# File 'lib/sidekiq/limit_fetch/heartbeat.rb', line 93 def log(level, ) Sidekiq.logger.public_send(level, "[sidekiq-limit_fetch] [heartbeat] #{}") end |
#run ⇒ Object
Run a heartbeat every 15s. Some special error handling is included to be able to endure a Redis outage.
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/sidekiq/limit_fetch/heartbeat.rb', line 36 def run loop do begin with_error_handling do capsule_sem.heartbeat if @down log :info, "Redis back online, heartbeat completed successfully" @down = false end end with_error_handling { Kernel.sleep(LimitFetch.configuration[:heartbeat_period]) } rescue LimitFetch::Shutdown break end end with_error_handling do capsule_sem.purge([.uuid]) # Deregister log :info, "Successfully shut down" end end |
#start ⇒ Thread
27 28 29 30 31 32 |
# File 'lib/sidekiq/limit_fetch/heartbeat.rb', line 27 def start Thread.new { run }.tap do |thread| thread.name = "sidekiq-limit_fetch.heartbeat" self.class.threads << thread end end |
#with_error_handling ⇒ Boolean
Returns whether block completed successfully.
60 61 62 63 64 65 66 67 68 |
# File 'lib/sidekiq/limit_fetch/heartbeat.rb', line 60 def with_error_handling yield true rescue LimitFetch::Shutdown raise rescue StandardError => error handle_error(error) false end |