Class: Wurk::Heartbeat

Inherits:
Object
  • Object
show all
Includes:
Component
Defined in:
lib/wurk/heartbeat.rb

Overview

Single-purpose owner of the Redis-side process heartbeat. Lifted out of Wurk::Launcher so the launcher can stay focused on lifecycle and so the heartbeat schema lives in one place readers can grep for.

Each beat is one pipelined round-trip:

SADD   processes <identity>
HSET   <identity>          info concurrency busy beat quiet rss rtt_us
EXPIRE <identity>          60
UNLINK <identity>:work
HSET   <identity>:work     <tid> <json> ...    (only if WORK_STATE non-empty)
EXPIRE <identity>:work     60                  (only if WORK_STATE non-empty)
LPOP   <identity>-signals  × BEAT_PAUSE

The work hash is UNLINK-then-rewritten on every beat — a dropped beat momentarily empties it, and ProcessSet#cleanup compensates by SREM-ing identities whose ‘info` field has expired.

Heartbeat owns only the wire. Signals queued by the dashboard are pulled out of the pipelined results and returned to the caller for dispatch —TSTP/TERM semantics live in Launcher.

‘:heartbeat` fires once on the first successful beat and again after any network partition (rearmed when beat! rescues). `:beat` fires every tick.

Spec: docs/target/sidekiq-free.md §12 (Launcher#❤️).

Constant Summary collapse

BEAT_PAUSE =

Cadence in seconds. Key TTL is 60s — a process is dead after ~6 misses.

10
TTL_SECONDS =
60

Constants included from Component

Component::DEFAULT_THREAD_PRIORITY, Component::PROCESS_NONCE

Instance Attribute Summary collapse

Attributes included from Component

#config

Instance Method Summary collapse

Methods included from Component

#default_tag, #fire_event, #handle_exception, #hostname, #leader?, #logger, #mono_ms, #process_nonce, #real_ms, #redis, #safe_thread, #tid, #watchdog

Constructor Details

#initialize(identity:, config:, started_at: nil, embedded: false, quiet: nil) ⇒ Heartbeat

‘quiet:` is a callable so Launcher can keep ownership of its `@done` flag without an awkward setter contract. `info_overrides:` lets the caller (Launcher#embedded, tests) inject fields without forcing Heartbeat to know about every flag the host process tracks.



46
47
48
49
50
51
52
53
54
# File 'lib/wurk/heartbeat.rb', line 46

def initialize(identity:, config:, started_at: nil, embedded: false, quiet: nil)
  @identity = identity
  @config = config
  @started_at = started_at || Time.now.to_f
  @embedded = embedded
  @quiet_proc = quiet || -> { false }
  @first_heartbeat = true
  @rtt_us = 0
end

Instance Attribute Details

#identityObject (readonly)

Returns the value of attribute identity.



40
41
42
# File 'lib/wurk/heartbeat.rb', line 40

def identity
  @identity
end

#last_beat_atObject (readonly)

Returns the value of attribute last_beat_at.



40
41
42
# File 'lib/wurk/heartbeat.rb', line 40

def last_beat_at
  @last_beat_at
end

#rtt_usObject (readonly)

Returns the value of attribute rtt_us.



40
41
42
# File 'lib/wurk/heartbeat.rb', line 40

def rtt_us
  @rtt_us
end

Instance Method Details

#beat!Object

Pipelined beat. Returns the drained signals as Array<String> on success, or nil if Redis raised — the next successful beat refires ‘:heartbeat` so partition recovery is observable.



59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/wurk/heartbeat.rb', line 59

def beat!
  sigs, @rtt_us = pipelined_beat
  @last_beat_at = Time.now.to_f
  if @first_heartbeat
    @first_heartbeat = false
    fire_event(:heartbeat)
  end
  fire_event(:beat, oneshot: false)
  emit_statsd_gauges
  sigs.compact
rescue StandardError => e
  @first_heartbeat = true
  handle_exception(e, { context: 'heartbeat' })
  nil
end

#stop!Object

Best-effort teardown. SREM removes us from the live list; UNLINK drops the identity hash and its work mirror. Idempotent: if Redis is down the next process boot’s ProcessSet#cleanup prunes us.



78
79
80
81
82
83
84
85
86
87
# File 'lib/wurk/heartbeat.rb', line 78

def stop!
  redis do |conn|
    conn.pipelined do |pipe|
      pipe.call('SREM', Keys::PROCESSES, @identity)
      pipe.call('UNLINK', @identity, "#{@identity}:work")
    end
  end
rescue StandardError
  nil
end