Class: Sidekiq::LimitFetch::Heartbeat

Inherits:
Object
  • Object
show all
Defined in:
lib/sidekiq/limit_fetch/heartbeat.rb

Overview

Manages lifecycle of the heartbeat thread.

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(capsule) ⇒ Heartbeat

Returns a new instance of Heartbeat.

Parameters:

  • capsule (Sidekiq::Capsule)


21
22
23
24
# File 'lib/sidekiq/limit_fetch/heartbeat.rb', line 21

def initialize(capsule)
  @capsule = capsule
  @down = false
end

Class Method Details

.shutdownObject

Send a shutdown notification to all heartbeat threads.



13
14
15
16
17
18
# File 'lib/sidekiq/limit_fetch/heartbeat.rb', line 13

def self.shutdown
  threads.each do |thread|
    thread.raise(Shutdown)
    thread.join(LimitFetch.configuration[:heartbeat_period] * 2)
  end
end

.threadsSet<Thread>

Returns:

  • (Set<Thread>)


8
9
10
# File 'lib/sidekiq/limit_fetch/heartbeat.rb', line 8

def self.threads
  @threads ||= Set.new
end

Instance Method Details

#capsule_metaGlobal::CapsuleMetadata



87
88
89
# File 'lib/sidekiq/limit_fetch/heartbeat.rb', line 87

def capsule_meta
  @capsule_meta ||= Global.capsule[@capsule.name]
end

#capsule_semGlobal::CapsuleSemaphor



82
83
84
# File 'lib/sidekiq/limit_fetch/heartbeat.rb', line 82

def capsule_sem
  @capsule_sem ||= Global::CapsuleSemaphor.new(@capsule)
end

#handle_error(error) ⇒ Object

Parameters:

  • error (StandardError)


71
72
73
74
75
76
77
78
79
# File 'lib/sidekiq/limit_fetch/heartbeat.rb', line 71

def handle_error(error)
  if @down
    # Already logged the original error, make repeating connection errors more concise
    log :error, "Failed: #{error.class}"
  else
    log :error, "#{error.class}: #{error.message}#{error.backtrace.join("\n")}"
  end
  @down = true
end

#log(level, message) ⇒ Object

Parameters:

  • level (#to_s)
  • message (#to_s)


93
94
95
# File 'lib/sidekiq/limit_fetch/heartbeat.rb', line 93

def log(level, message)
  Sidekiq.logger.public_send(level, "[sidekiq-limit_fetch] [heartbeat] #{message}")
end

#runObject

Run a heartbeat every 15s. Some special error handling is included to be able to endure a Redis outage.



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/sidekiq/limit_fetch/heartbeat.rb', line 36

def run
  loop do
    begin
      with_error_handling do
        capsule_sem.heartbeat
        if @down
          log :info, "Redis back online, heartbeat completed successfully"
          @down = false
        end
      end
      with_error_handling { Kernel.sleep(LimitFetch.configuration[:heartbeat_period]) }

    rescue LimitFetch::Shutdown
      break
    end
  end

  with_error_handling do
    capsule_sem.purge([capsule_meta.uuid])  # Deregister
    log :info, "Successfully shut down"
  end
end

#startThread

Returns:

  • (Thread)


27
28
29
30
31
32
# File 'lib/sidekiq/limit_fetch/heartbeat.rb', line 27

def start
  Thread.new { run }.tap do |thread|
    thread.name = "sidekiq-limit_fetch.heartbeat"
    self.class.threads << thread
  end
end

#with_error_handlingBoolean

Returns whether block completed successfully.

Returns:

  • (Boolean)

    whether block completed successfully



60
61
62
63
64
65
66
67
68
# File 'lib/sidekiq/limit_fetch/heartbeat.rb', line 60

def with_error_handling
  yield
  true
rescue LimitFetch::Shutdown
  raise
rescue StandardError => error
  handle_error(error)
  false
end