Class: Pgbus::Process::Heartbeat

Inherits:
Object
  • Object
show all
Defined in:
lib/pgbus/process/heartbeat.rb

Constant Summary collapse

INTERVAL =

seconds

60
ALIVE_THRESHOLD =

5 minutes

300

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(kind:, metadata: {}, on_beat: nil, loop_tick_supplier: nil) ⇒ Heartbeat

Returns a new instance of Heartbeat.



14
15
16
17
18
19
20
# File 'lib/pgbus/process/heartbeat.rb', line 14

def initialize(kind:, metadata: {}, on_beat: nil, loop_tick_supplier: nil)
  @kind = kind
  @metadata = 
  @on_beat = on_beat
  @loop_tick_supplier = loop_tick_supplier
  @timer = nil
end

Instance Attribute Details

#process_entryObject (readonly)

Returns the value of attribute process_entry.



12
13
14
# File 'lib/pgbus/process/heartbeat.rb', line 12

def process_entry
  @process_entry
end

Instance Method Details

#beatObject



33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/pgbus/process/heartbeat.rb', line 33

def beat
  return unless @process_id

  @on_beat&.call
  updates = { last_heartbeat_at: Time.current }
  if @loop_tick_supplier
    tick = @loop_tick_supplier.call
    updates[:metadata] = @metadata.merge("loop_tick_at" => tick&.to_f)
  end
  ProcessEntry.where(id: @process_id).update_all(updates)
rescue StandardError => e
  Pgbus.logger.warn { "[Pgbus] Heartbeat failed: #{e.message}" }
end

#startObject



22
23
24
25
26
# File 'lib/pgbus/process/heartbeat.rb', line 22

def start
  register_process
  @timer = Concurrent::TimerTask.new(execution_interval: INTERVAL) { beat }
  @timer.execute
end

#stopObject



28
29
30
31
# File 'lib/pgbus/process/heartbeat.rb', line 28

def stop
  @timer&.shutdown
  deregister_process
end