Class: Pgbus::Process::Heartbeat

Inherits:
Object
  • Object
show all
Defined in:
lib/pgbus/process/heartbeat.rb

Constant Summary collapse

INTERVAL =

seconds

60
ALIVE_THRESHOLD =

5 minutes

300

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(kind:, metadata: {}, on_beat: nil) ⇒ Heartbeat

Returns a new instance of Heartbeat.



14
15
16
17
18
19
# File 'lib/pgbus/process/heartbeat.rb', line 14

def initialize(kind:, metadata: {}, on_beat: nil)
  @kind = kind
  @metadata = 
  @on_beat = on_beat
  @timer = nil
end

Instance Attribute Details

#process_entryObject (readonly)

Returns the value of attribute process_entry.



12
13
14
# File 'lib/pgbus/process/heartbeat.rb', line 12

def process_entry
  @process_entry
end

Instance Method Details

#beatObject



32
33
34
35
36
37
38
39
# File 'lib/pgbus/process/heartbeat.rb', line 32

def beat
  return unless @process_id

  @on_beat&.call
  ProcessEntry.where(id: @process_id).update_all(last_heartbeat_at: Time.current)
rescue StandardError => e
  Pgbus.logger.warn { "[Pgbus] Heartbeat failed: #{e.message}" }
end

#startObject



21
22
23
24
25
# File 'lib/pgbus/process/heartbeat.rb', line 21

def start
  register_process
  @timer = Concurrent::TimerTask.new(execution_interval: INTERVAL) { beat }
  @timer.execute
end

#stopObject



27
28
29
30
# File 'lib/pgbus/process/heartbeat.rb', line 27

def stop
  @timer&.shutdown
  deregister_process
end