Class: Pgbus::Process::Heartbeat
- Inherits:
-
Object
- Object
- Pgbus::Process::Heartbeat
- Defined in:
- lib/pgbus/process/heartbeat.rb
Constant Summary collapse
- INTERVAL =
seconds
60- ALIVE_THRESHOLD =
5 minutes
300
Instance Attribute Summary collapse
-
#process_entry ⇒ Object
readonly
Returns the value of attribute process_entry.
Instance Method Summary collapse
- #beat ⇒ Object
-
#initialize(kind:, metadata: {}, on_beat: nil, loop_tick_supplier: nil) ⇒ Heartbeat
constructor
A new instance of Heartbeat.
- #start ⇒ Object
- #stop ⇒ Object
Constructor Details
#initialize(kind:, metadata: {}, on_beat: nil, loop_tick_supplier: nil) ⇒ Heartbeat
Returns a new instance of Heartbeat.
14 15 16 17 18 19 20 |
# File 'lib/pgbus/process/heartbeat.rb', line 14 def initialize(kind:, metadata: {}, on_beat: nil, loop_tick_supplier: nil) @kind = kind @metadata = @on_beat = on_beat @loop_tick_supplier = loop_tick_supplier @timer = nil end |
Instance Attribute Details
#process_entry ⇒ Object (readonly)
Returns the value of attribute process_entry.
12 13 14 |
# File 'lib/pgbus/process/heartbeat.rb', line 12 def process_entry @process_entry end |
Instance Method Details
#beat ⇒ Object
33 34 35 36 37 38 39 40 41 42 43 44 45 |
# File 'lib/pgbus/process/heartbeat.rb', line 33 def beat return unless @process_id @on_beat&.call updates = { last_heartbeat_at: Time.current } if @loop_tick_supplier tick = @loop_tick_supplier.call updates[:metadata] = @metadata.merge("loop_tick_at" => tick&.to_f) end ProcessEntry.where(id: @process_id).update_all(updates) rescue StandardError => e Pgbus.logger.warn { "[Pgbus] Heartbeat failed: #{e.}" } end |
#start ⇒ Object
22 23 24 25 26 |
# File 'lib/pgbus/process/heartbeat.rb', line 22 def start register_process @timer = Concurrent::TimerTask.new(execution_interval: INTERVAL) { beat } @timer.execute end |
#stop ⇒ Object
28 29 30 31 |
# File 'lib/pgbus/process/heartbeat.rb', line 28 def stop @timer&.shutdown deregister_process end |