Class: PatientHttp::Sidekiq::TaskMonitor

Inherits:
Object
  • Object
show all
Defined in:
lib/patient_http/sidekiq/task_monitor.rb

Overview

Manages inflight request tracking in Redis for crash recovery.

This class maintains a sorted set of request IDs indexed by timestamp and a hash of request payloads. It provides distributed locking for orphan detection and automatic re-enqueuing of requests that were interrupted by process crashes.

Task ID format: “hostname:pid:hex/request-uuid”

  • hostname: sanitized hostname (colons and slashes replaced with dashes)

  • pid: process ID

  • hex: 8-character random hex for uniqueness

  • request-uuid: unique identifier for the request

Constant Summary collapse

INFLIGHT_INDEX_KEY =

Redis key prefixes

"sidekiq:patient_http:inflight_index"
INFLIGHT_JOBS_KEY =
"sidekiq:patient_http:inflight_jobs"
PROCESS_SET_KEY =
"sidekiq:patient_http:processes"
GC_LOCK_KEY =
"sidekiq:patient_http:gc_lock"
GC_LAST_RUN_KEY =
"sidekiq:patient_http:gc_last_run"
REMOVE_IF_ORPHANED_SCRIPT =

Lua script for atomic orphan removal. Checks if the task is still orphaned (timestamp < threshold) and removes it atomically. This prevents race conditions where a heartbeat could update the timestamp between the check and the removal.

KEYS = index key (sorted set) KEYS = jobs key (hash) ARGV = request_id ARGV = threshold_ms

Returns: [removed (0/1), job_payload or nil]

<<~LUA
  local index_key = KEYS[1]
  local jobs_key = KEYS[2]
  local request_id = ARGV[1]
  local threshold_ms = tonumber(ARGV[2])

  local current_score = redis.call('ZSCORE', index_key, request_id)
  if not current_score or tonumber(current_score) >= threshold_ms then
    return {0, nil}  -- Not orphaned or already removed
  end

  local job_payload = redis.call('HGET', jobs_key, request_id)
  redis.call('ZREM', index_key, request_id)
  redis.call('HDEL', jobs_key, request_id)
  return {1, job_payload}
LUA

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config) ⇒ TaskMonitor

Returns a new instance of TaskMonitor.

Parameters:



161
162
163
164
165
166
# File 'lib/patient_http/sidekiq/task_monitor.rb', line 161

def initialize(config)
  @config = config
  hostname = ::Socket.gethostname.force_encoding("UTF-8").tr(":/", "-")
  pid = ::Process.pid
  @lock_identifier = "#{hostname}:#{pid}:#{SecureRandom.hex(8)}".freeze
end

Instance Attribute Details

#configConfiguration (readonly)

Returns the configuration object.

Returns:



54
55
56
# File 'lib/patient_http/sidekiq/task_monitor.rb', line 54

def config
  @config
end

Class Method Details

.clear_all!void

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Clear all registry data. Only allowed in test environment.

Raises:

  • (RuntimeError)

    if called outside of test environment



138
139
140
141
142
143
144
145
146
# File 'lib/patient_http/sidekiq/task_monitor.rb', line 138

def clear_all!
  unless PatientHttp.testing?
    raise "clear_all! is only allowed in test environment"
  end

  ::Sidekiq.redis do |redis|
    redis.del(INFLIGHT_INDEX_KEY, INFLIGHT_JOBS_KEY, PROCESS_SET_KEY, GC_LOCK_KEY, GC_LAST_RUN_KEY)
  end
end

.inflight_countInteger

Get the count of inflight requests in Redis.

Returns:

  • (Integer)

    number of inflight requests



60
61
62
63
64
# File 'lib/patient_http/sidekiq/task_monitor.rb', line 60

def inflight_count
  ::Sidekiq.redis do |redis|
    redis.zcard(INFLIGHT_INDEX_KEY)
  end
end

.inflight_counts_by_processHash

Get all inflight counts across all processes and the number of max connections.

Returns:

  • (Hash)

    hash of “hostname:pid” => { inflight: Integer, max_capacity: Integer }



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/patient_http/sidekiq/task_monitor.rb', line 69

def inflight_counts_by_process
  process_ids = nil
  max_connections = nil
  inflight_task_ids = nil

  ::Sidekiq.redis do |redis|
    process_ids = redis.smembers(PROCESS_SET_KEY)
    return {} if process_ids.empty?

    max_keys = process_ids.map { |pid| max_connections_key_for(pid) }
    max_connections = redis.mget(*max_keys)

    inflight_task_ids = redis.zrange(INFLIGHT_INDEX_KEY, 0, -1)
  end

  inflight_by_process_id = inflight_task_ids.group_by do |task_id|
    task_id.split("/", 2).first
  end

  result = {}
  stale_process_ids = []

  process_ids.zip(max_connections).each do |process_id, max_conn|
    if max_conn.nil?
      # Mark for removal if max_conn key doesn't exist (process is gone)
      stale_process_ids << process_id
    else
      host_pid = process_id.split(":", 3).first(2).join(":")
      counts = result[host_pid]
      unless counts
        counts = {inflight: 0, max_capacity: 0}
        result[host_pid] = counts
      end
      counts[:inflight] += inflight_by_process_id[process_id]&.size.to_i
      counts[:max_capacity] += max_conn.to_i
    end
  end

  # Remove stale process IDs from the set
  unless stale_process_ids.empty?
    ::Sidekiq.redis do |redis|
      redis.srem(PROCESS_SET_KEY, stale_process_ids)
    end
  end

  result
end

.registered_process_idsArray<String>

Get all registered process IDs.

Returns:

  • (Array<String>)

    list of process identifiers



127
128
129
130
131
# File 'lib/patient_http/sidekiq/task_monitor.rb', line 127

def registered_process_ids
  ::Sidekiq.redis do |redis|
    redis.smembers(PROCESS_SET_KEY)
  end
end

.total_max_connectionsInteger

Get the total max connections across all processes

Returns:

  • (Integer)

    sum of max connections from all active processes



120
121
122
# File 'lib/patient_http/sidekiq/task_monitor.rb', line 120

def total_max_connections
  inflight_counts_by_process.values.sum { |data| data[:max_capacity] }
end

Instance Method Details

#acquire_gc_lockBoolean

Try to acquire the distributed garbage collection lock.

Returns:

  • (Boolean)

    true if lock acquired, false otherwise



295
296
297
298
299
300
301
# File 'lib/patient_http/sidekiq/task_monitor.rb', line 295

def acquire_gc_lock
  ::Sidekiq.redis do |redis|
    # Use SET with NX and EX options directly
    # Returns "OK" if successful with ::Sidekiq.redis, nil if key already exists
    !!redis.set(GC_LOCK_KEY, @lock_identifier, nx: true, ex: gc_lock_ttl)
  end
end

#cleanup_orphaned_requests(orphan_threshold_seconds, logger) ⇒ Integer

Find and re-enqueue orphaned requests.

Parameters:

  • orphan_threshold_seconds (Numeric)

    age threshold for considering a request orphaned

  • logger (Logger)

    logger for output

Returns:

  • (Integer)

    number of orphaned requests re-enqueued



368
369
370
371
372
373
374
375
# File 'lib/patient_http/sidekiq/task_monitor.rb', line 368

def cleanup_orphaned_requests(orphan_threshold_seconds, logger)
  threshold_timestamp_ms = calculate_threshold_timestamp(orphan_threshold_seconds)
  orphaned_requests = fetch_orphaned_requests(threshold_timestamp_ms)

  return 0 if orphaned_requests.empty?

  reenqueue_orphaned_jobs(orphaned_requests, threshold_timestamp_ms, logger)
end

#full_task_id(task_id) ⇒ String

Build unique task ID for a request task that includes process identifier.

Parameters:

  • task_id (String)

    the request task

Returns:

  • (String)

    the unique task ID



272
273
274
# File 'lib/patient_http/sidekiq/task_monitor.rb', line 272

def full_task_id(task_id)
  "#{@lock_identifier}/#{task_id}"
end

#gc_needed?Boolean

Check if garbage collection should run based on the last run timestamp.

Returns true if the GC_LAST_RUN_KEY doesn’t exist in Redis or if enough time has elapsed since the last GC run.

Returns:

  • (Boolean)

    true if GC should run, false otherwise



339
340
341
342
343
344
345
346
347
348
# File 'lib/patient_http/sidekiq/task_monitor.rb', line 339

def gc_needed?
  last_run = ::Sidekiq.redis do |redis|
    redis.get(GC_LAST_RUN_KEY)
  end

  return true if last_run.nil?

  last_run_time = Time.at(last_run.to_f / 1000.0)
  Time.now - last_run_time >= config.heartbeat_interval
end

#heartbeat_timestamp_for(task) ⇒ Integer?

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Get the heartbeat timestamp for a task.

Parameters:

  • task (RequestTask)

    the request task

Returns:

  • (Integer, nil)

    timestamp in milliseconds, or nil if not registered



251
252
253
254
255
256
# File 'lib/patient_http/sidekiq/task_monitor.rb', line 251

def heartbeat_timestamp_for(task)
  score = ::Sidekiq.redis do |redis|
    redis.zscore(INFLIGHT_INDEX_KEY, full_task_id(task.id))
  end
  score&.to_i
end

#ping_processvoid

This method returns an undefined value.

Record the current process’s max connections in Redis.

This is used for monitoring purposes.



281
282
283
284
285
286
287
288
289
290
# File 'lib/patient_http/sidekiq/task_monitor.rb', line 281

def ping_process
  ::Sidekiq.redis do |redis|
    redis.multi do |transaction|
      transaction.sadd(PROCESS_SET_KEY, @lock_identifier)
      transaction.set(max_connections_key, @config.max_connections)
      transaction.expire(PROCESS_SET_KEY, inflight_ttl)
      transaction.expire(max_connections_key, process_ttl)
    end
  end
end

#record_gc_runvoid

This method returns an undefined value.

Record the timestamp of the last GC run in Redis.

The timestamp is stored with a TTL slightly longer than the heartbeat interval to coordinate GC execution across multiple processes.



356
357
358
359
360
# File 'lib/patient_http/sidekiq/task_monitor.rb', line 356

def record_gc_run
  ::Sidekiq.redis do |redis|
    redis.set(GC_LAST_RUN_KEY, (Time.now.to_f * 1000).floor, ex: gc_last_run_ttl)
  end
end

#register(task) ⇒ void

This method returns an undefined value.

Register a request as inflight in Redis.

Parameters:

  • task (RequestTask)

    the request task to register



173
174
175
176
177
178
179
180
181
182
183
184
185
186
# File 'lib/patient_http/sidekiq/task_monitor.rb', line 173

def register(task)
  timestamp_ms = (Time.now.to_f * 1000).round
  job_payload = JSON.generate(task.task_handler.sidekiq_job)
  task_id = full_task_id(task.id)

  ::Sidekiq.redis do |redis|
    redis.multi do |transaction|
      transaction.zadd(INFLIGHT_INDEX_KEY, timestamp_ms, task_id)
      transaction.hset(INFLIGHT_JOBS_KEY, task_id, job_payload)
      transaction.expire(INFLIGHT_INDEX_KEY, inflight_ttl)
      transaction.expire(INFLIGHT_JOBS_KEY, inflight_ttl)
    end
  end
end

#registered?(task) ⇒ Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Check if a task is registered in the inflight registry.

Parameters:

  • task (RequestTask)

    the request task

Returns:

  • (Boolean)

    true if registered, false otherwise



239
240
241
242
243
# File 'lib/patient_http/sidekiq/task_monitor.rb', line 239

def registered?(task)
  ::Sidekiq.redis do |redis|
    !redis.zscore(INFLIGHT_INDEX_KEY, full_task_id(task.id)).nil?
  end
end

#registered_task_idsArray<String>

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Get all registered task IDs for this registry’s process.

Returns:

  • (Array<String>)

    list of full task IDs



262
263
264
265
266
# File 'lib/patient_http/sidekiq/task_monitor.rb', line 262

def registered_task_ids
  ::Sidekiq.redis do |redis|
    redis.zrange(INFLIGHT_INDEX_KEY, 0, -1)
  end.select { |id| id.start_with?("#{@lock_identifier}/") }
end

#release_gc_lockBoolean

Release the garbage collection lock if held by this process.

Uses Redis WATCH/MULTI/EXEC for optimistic locking to ensure we only delete the lock if it’s still held by this process.

Returns:

  • (Boolean)

    true if the lock was released, false otherwise



309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
# File 'lib/patient_http/sidekiq/task_monitor.rb', line 309

def release_gc_lock
  ::Sidekiq.redis do |redis|
    # Watch the lock key for changes
    redis.watch(GC_LOCK_KEY)

    # Get current lock value
    current_value = redis.get(GC_LOCK_KEY)

    if current_value == @lock_identifier
      # Lock is ours, delete it atomically
      result = redis.multi do |transaction|
        transaction.del(GC_LOCK_KEY)
      end
      # MULTI returns nil if transaction was aborted (someone else modified the key)
      # Otherwise returns array with results
      !result.nil?
    else
      # Lock is not ours or doesn't exist
      redis.unwatch
      false
    end
  end
end

#remove_processvoid

This method returns an undefined value.

Remove this process’s entry from the process set.



207
208
209
210
211
212
# File 'lib/patient_http/sidekiq/task_monitor.rb', line 207

def remove_process
  ::Sidekiq.redis do |redis|
    redis.srem(PROCESS_SET_KEY, @lock_identifier)
    redis.del(max_connections_key)
  end
end

#unregister(task) ⇒ void

This method returns an undefined value.

Unregister a request from Redis (called when request completes).

Parameters:

  • task (RequestTask)

    the request task to unregister



193
194
195
196
197
198
199
200
201
202
# File 'lib/patient_http/sidekiq/task_monitor.rb', line 193

def unregister(task)
  task_id = full_task_id(task.id)

  ::Sidekiq.redis do |redis|
    redis.multi do |transaction|
      transaction.zrem(INFLIGHT_INDEX_KEY, task_id)
      transaction.hdel(INFLIGHT_JOBS_KEY, task_id)
    end
  end
end

#update_heartbeats(task_ids) ⇒ void

This method returns an undefined value.

Update heartbeat timestamps for multiple requests in a single operation.

Parameters:

  • task_ids (Array<String>)

    the request IDs to update



219
220
221
222
223
224
225
226
227
228
229
230
231
# File 'lib/patient_http/sidekiq/task_monitor.rb', line 219

def update_heartbeats(task_ids)
  return if task_ids.empty?

  timestamp_ms = (Time.now.to_f * 1000).round

  ::Sidekiq.redis do |redis|
    redis.pipelined do |pipeline|
      task_ids.each do |task_id|
        pipeline.call("ZADD", INFLIGHT_INDEX_KEY, "XX", timestamp_ms, full_task_id(task_id))
      end
    end
  end
end