Class: Wurk::Heartbeat
- Inherits:
-
Object
- Object
- Wurk::Heartbeat
- Includes:
- Component
- Defined in:
- lib/wurk/heartbeat.rb
Overview
Single-purpose owner of the Redis-side process heartbeat. Lifted out of Wurk::Launcher so the launcher can stay focused on lifecycle and so the heartbeat schema lives in one place readers can grep for.
Each beat is one pipelined round-trip:
SADD processes <identity>
HSET <identity> info concurrency busy beat quiet rss rtt_us
EXPIRE <identity> 60
UNLINK <identity>:work
HSET <identity>:work <tid> <json> ... (only if WORK_STATE non-empty)
EXPIRE <identity>:work 60 (only if WORK_STATE non-empty)
LPOP <identity>-signals × BEAT_PAUSE
The work hash is UNLINK-then-rewritten on every beat — a dropped beat momentarily empties it, and ProcessSet#cleanup compensates by SREM-ing identities whose ‘info` field has expired.
Heartbeat owns only the wire. Signals queued by the dashboard are pulled out of the pipelined results and returned to the caller for dispatch —TSTP/TERM semantics live in Launcher.
‘:heartbeat` fires once on the first successful beat and again after any network partition (rearmed when beat! rescues). `:beat` fires every tick.
Spec: docs/target/sidekiq-free.md §12 (Launcher#❤️).
Constant Summary collapse
- BEAT_PAUSE =
Cadence in seconds. Key TTL is 60s — a process is dead after ~6 misses.
10- TTL_SECONDS =
60
Constants included from Component
Component::DEFAULT_THREAD_PRIORITY, Component::PROCESS_NONCE
Instance Attribute Summary collapse
-
#identity ⇒ Object
readonly
Returns the value of attribute identity.
-
#last_beat_at ⇒ Object
readonly
Returns the value of attribute last_beat_at.
-
#rtt_us ⇒ Object
readonly
Returns the value of attribute rtt_us.
Attributes included from Component
Instance Method Summary collapse
-
#beat! ⇒ Object
Pipelined beat.
-
#initialize(identity:, config:, started_at: nil, embedded: false, quiet: nil) ⇒ Heartbeat
constructor
‘quiet:` is a callable so Launcher can keep ownership of its `@done` flag without an awkward setter contract.
-
#stop! ⇒ Object
Best-effort teardown.
Methods included from Component
#default_tag, #fire_event, #handle_exception, #hostname, #leader?, #logger, #mono_ms, #process_nonce, #real_ms, #redis, #safe_thread, #tid, #watchdog
Constructor Details
#initialize(identity:, config:, started_at: nil, embedded: false, quiet: nil) ⇒ Heartbeat
‘quiet:` is a callable so Launcher can keep ownership of its `@done` flag without an awkward setter contract. `info_overrides:` lets the caller (Launcher#embedded, tests) inject fields without forcing Heartbeat to know about every flag the host process tracks.
46 47 48 49 50 51 52 53 54 |
# File 'lib/wurk/heartbeat.rb', line 46 def initialize(identity:, config:, started_at: nil, embedded: false, quiet: nil) @identity = identity @config = config @started_at = started_at || Time.now.to_f @embedded = @quiet_proc = quiet || -> { false } @first_heartbeat = true @rtt_us = 0 end |
Instance Attribute Details
#identity ⇒ Object (readonly)
Returns the value of attribute identity.
40 41 42 |
# File 'lib/wurk/heartbeat.rb', line 40 def identity @identity end |
#last_beat_at ⇒ Object (readonly)
Returns the value of attribute last_beat_at.
40 41 42 |
# File 'lib/wurk/heartbeat.rb', line 40 def last_beat_at @last_beat_at end |
#rtt_us ⇒ Object (readonly)
Returns the value of attribute rtt_us.
40 41 42 |
# File 'lib/wurk/heartbeat.rb', line 40 def rtt_us @rtt_us end |
Instance Method Details
#beat! ⇒ Object
Pipelined beat. Returns the drained signals as Array<String> on success, or nil if Redis raised — the next successful beat refires ‘:heartbeat` so partition recovery is observable.
59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/wurk/heartbeat.rb', line 59 def beat! sigs, @rtt_us = pipelined_beat @last_beat_at = Time.now.to_f if @first_heartbeat @first_heartbeat = false fire_event(:heartbeat) end fire_event(:beat, oneshot: false) emit_statsd_gauges sigs.compact rescue StandardError => e @first_heartbeat = true handle_exception(e, { context: 'heartbeat' }) nil end |
#stop! ⇒ Object
Best-effort teardown. SREM removes us from the live list; UNLINK drops the identity hash and its work mirror. Idempotent: if Redis is down the next process boot’s ProcessSet#cleanup prunes us.
78 79 80 81 82 83 84 85 86 87 |
# File 'lib/wurk/heartbeat.rb', line 78 def stop! redis do |conn| conn.pipelined do |pipe| pipe.call('SREM', Keys::PROCESSES, @identity) pipe.call('UNLINK', @identity, "#{@identity}:work") end end rescue StandardError nil end |