Class: PatientHttp::Sidekiq::TaskMonitor
- Inherits:
-
Object
- Object
- PatientHttp::Sidekiq::TaskMonitor
- Defined in:
- lib/patient_http/sidekiq/task_monitor.rb
Overview
Manages inflight request tracking in Redis for crash recovery.
This class maintains a sorted set of request IDs indexed by timestamp and a hash of request payloads. It provides distributed locking for orphan detection and automatic re-enqueuing of requests that were interrupted by process crashes.
Task ID format: “hostname:pid:hex/request-uuid”
-
hostname: sanitized hostname (colons and slashes replaced with dashes)
-
pid: process ID
-
hex: 8-character random hex for uniqueness
-
request-uuid: unique identifier for the request
Constant Summary collapse
- INFLIGHT_INDEX_KEY =
Redis key prefixes
"sidekiq:patient_http:inflight_index"- INFLIGHT_JOBS_KEY =
"sidekiq:patient_http:inflight_jobs"- PROCESS_SET_KEY =
"sidekiq:patient_http:processes"- GC_LOCK_KEY =
"sidekiq:patient_http:gc_lock"- GC_LAST_RUN_KEY =
"sidekiq:patient_http:gc_last_run"- REMOVE_IF_ORPHANED_SCRIPT =
Lua script for atomic orphan removal. Checks if the task is still orphaned (timestamp < threshold) and removes it atomically. This prevents race conditions where a heartbeat could update the timestamp between the check and the removal.
KEYS = index key (sorted set) KEYS = jobs key (hash) ARGV = request_id ARGV = threshold_ms
Returns: [removed (0/1), job_payload or nil]
<<~LUA local index_key = KEYS[1] local jobs_key = KEYS[2] local request_id = ARGV[1] local threshold_ms = tonumber(ARGV[2]) local current_score = redis.call('ZSCORE', index_key, request_id) if not current_score or tonumber(current_score) >= threshold_ms then return {0, nil} -- Not orphaned or already removed end local job_payload = redis.call('HGET', jobs_key, request_id) redis.call('ZREM', index_key, request_id) redis.call('HDEL', jobs_key, request_id) return {1, job_payload} LUA
Instance Attribute Summary collapse
-
#config ⇒ Configuration
readonly
The configuration object.
Class Method Summary collapse
-
.clear_all! ⇒ void
private
Clear all registry data.
-
.inflight_count ⇒ Integer
Get the count of inflight requests in Redis.
-
.inflight_counts_by_process ⇒ Hash
Get all inflight counts across all processes and the number of max connections.
-
.registered_process_ids ⇒ Array<String>
Get all registered process IDs.
-
.total_max_connections ⇒ Integer
Get the total max connections across all processes.
Instance Method Summary collapse
-
#acquire_gc_lock ⇒ Boolean
Try to acquire the distributed garbage collection lock.
-
#cleanup_orphaned_requests(orphan_threshold_seconds, logger) ⇒ Integer
Find and re-enqueue orphaned requests.
-
#full_task_id(task_id) ⇒ String
Build unique task ID for a request task that includes process identifier.
-
#gc_needed? ⇒ Boolean
Check if garbage collection should run based on the last run timestamp.
-
#heartbeat_timestamp_for(task) ⇒ Integer?
private
Get the heartbeat timestamp for a task.
-
#initialize(config) ⇒ TaskMonitor
constructor
A new instance of TaskMonitor.
-
#ping_process ⇒ void
Record the current process’s max connections in Redis.
-
#record_gc_run ⇒ void
Record the timestamp of the last GC run in Redis.
-
#register(task) ⇒ void
Register a request as inflight in Redis.
-
#registered?(task) ⇒ Boolean
private
Check if a task is registered in the inflight registry.
-
#registered_task_ids ⇒ Array<String>
private
Get all registered task IDs for this registry’s process.
-
#release_gc_lock ⇒ Boolean
Release the garbage collection lock if held by this process.
-
#remove_process ⇒ void
Remove this process’s entry from the process set.
-
#unregister(task) ⇒ void
Unregister a request from Redis (called when request completes).
-
#update_heartbeats(task_ids) ⇒ void
Update heartbeat timestamps for multiple requests in a single operation.
Constructor Details
#initialize(config) ⇒ TaskMonitor
Returns a new instance of TaskMonitor.
161 162 163 164 165 166 |
# File 'lib/patient_http/sidekiq/task_monitor.rb', line 161 def initialize(config) @config = config hostname = ::Socket.gethostname.force_encoding("UTF-8").tr(":/", "-") pid = ::Process.pid @lock_identifier = "#{hostname}:#{pid}:#{SecureRandom.hex(8)}".freeze end |
Instance Attribute Details
#config ⇒ Configuration (readonly)
Returns the configuration object.
54 55 56 |
# File 'lib/patient_http/sidekiq/task_monitor.rb', line 54 def config @config end |
Class Method Details
.clear_all! ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
Clear all registry data. Only allowed in test environment.
138 139 140 141 142 143 144 145 146 |
# File 'lib/patient_http/sidekiq/task_monitor.rb', line 138 def clear_all! unless PatientHttp.testing? raise "clear_all! is only allowed in test environment" end ::Sidekiq.redis do |redis| redis.del(INFLIGHT_INDEX_KEY, INFLIGHT_JOBS_KEY, PROCESS_SET_KEY, GC_LOCK_KEY, GC_LAST_RUN_KEY) end end |
.inflight_count ⇒ Integer
Get the count of inflight requests in Redis.
60 61 62 63 64 |
# File 'lib/patient_http/sidekiq/task_monitor.rb', line 60 def inflight_count ::Sidekiq.redis do |redis| redis.zcard(INFLIGHT_INDEX_KEY) end end |
.inflight_counts_by_process ⇒ Hash
Get all inflight counts across all processes and the number of max connections.
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 |
# File 'lib/patient_http/sidekiq/task_monitor.rb', line 69 def inflight_counts_by_process process_ids = nil max_connections = nil inflight_task_ids = nil ::Sidekiq.redis do |redis| process_ids = redis.smembers(PROCESS_SET_KEY) return {} if process_ids.empty? max_keys = process_ids.map { |pid| max_connections_key_for(pid) } max_connections = redis.mget(*max_keys) inflight_task_ids = redis.zrange(INFLIGHT_INDEX_KEY, 0, -1) end inflight_by_process_id = inflight_task_ids.group_by do |task_id| task_id.split("/", 2).first end result = {} stale_process_ids = [] process_ids.zip(max_connections).each do |process_id, max_conn| if max_conn.nil? # Mark for removal if max_conn key doesn't exist (process is gone) stale_process_ids << process_id else host_pid = process_id.split(":", 3).first(2).join(":") counts = result[host_pid] unless counts counts = {inflight: 0, max_capacity: 0} result[host_pid] = counts end counts[:inflight] += inflight_by_process_id[process_id]&.size.to_i counts[:max_capacity] += max_conn.to_i end end # Remove stale process IDs from the set unless stale_process_ids.empty? ::Sidekiq.redis do |redis| redis.srem(PROCESS_SET_KEY, stale_process_ids) end end result end |
.registered_process_ids ⇒ Array<String>
Get all registered process IDs.
127 128 129 130 131 |
# File 'lib/patient_http/sidekiq/task_monitor.rb', line 127 def registered_process_ids ::Sidekiq.redis do |redis| redis.smembers(PROCESS_SET_KEY) end end |
.total_max_connections ⇒ Integer
Get the total max connections across all processes
120 121 122 |
# File 'lib/patient_http/sidekiq/task_monitor.rb', line 120 def total_max_connections inflight_counts_by_process.values.sum { |data| data[:max_capacity] } end |
Instance Method Details
#acquire_gc_lock ⇒ Boolean
Try to acquire the distributed garbage collection lock.
295 296 297 298 299 300 301 |
# File 'lib/patient_http/sidekiq/task_monitor.rb', line 295 def acquire_gc_lock ::Sidekiq.redis do |redis| # Use SET with NX and EX options directly # Returns "OK" if successful with ::Sidekiq.redis, nil if key already exists !!redis.set(GC_LOCK_KEY, @lock_identifier, nx: true, ex: gc_lock_ttl) end end |
#cleanup_orphaned_requests(orphan_threshold_seconds, logger) ⇒ Integer
Find and re-enqueue orphaned requests.
368 369 370 371 372 373 374 375 |
# File 'lib/patient_http/sidekiq/task_monitor.rb', line 368 def cleanup_orphaned_requests(orphan_threshold_seconds, logger) = (orphan_threshold_seconds) orphaned_requests = fetch_orphaned_requests() return 0 if orphaned_requests.empty? reenqueue_orphaned_jobs(orphaned_requests, , logger) end |
#full_task_id(task_id) ⇒ String
Build unique task ID for a request task that includes process identifier.
272 273 274 |
# File 'lib/patient_http/sidekiq/task_monitor.rb', line 272 def full_task_id(task_id) "#{@lock_identifier}/#{task_id}" end |
#gc_needed? ⇒ Boolean
Check if garbage collection should run based on the last run timestamp.
Returns true if the GC_LAST_RUN_KEY doesn’t exist in Redis or if enough time has elapsed since the last GC run.
339 340 341 342 343 344 345 346 347 348 |
# File 'lib/patient_http/sidekiq/task_monitor.rb', line 339 def gc_needed? last_run = ::Sidekiq.redis do |redis| redis.get(GC_LAST_RUN_KEY) end return true if last_run.nil? last_run_time = Time.at(last_run.to_f / 1000.0) Time.now - last_run_time >= config.heartbeat_interval end |
#heartbeat_timestamp_for(task) ⇒ Integer?
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Get the heartbeat timestamp for a task.
251 252 253 254 255 256 |
# File 'lib/patient_http/sidekiq/task_monitor.rb', line 251 def (task) score = ::Sidekiq.redis do |redis| redis.zscore(INFLIGHT_INDEX_KEY, full_task_id(task.id)) end score&.to_i end |
#ping_process ⇒ void
This method returns an undefined value.
Record the current process’s max connections in Redis.
This is used for monitoring purposes.
281 282 283 284 285 286 287 288 289 290 |
# File 'lib/patient_http/sidekiq/task_monitor.rb', line 281 def ping_process ::Sidekiq.redis do |redis| redis.multi do |transaction| transaction.sadd(PROCESS_SET_KEY, @lock_identifier) transaction.set(max_connections_key, @config.max_connections) transaction.expire(PROCESS_SET_KEY, inflight_ttl) transaction.expire(max_connections_key, process_ttl) end end end |
#record_gc_run ⇒ void
This method returns an undefined value.
Record the timestamp of the last GC run in Redis.
The timestamp is stored with a TTL slightly longer than the heartbeat interval to coordinate GC execution across multiple processes.
356 357 358 359 360 |
# File 'lib/patient_http/sidekiq/task_monitor.rb', line 356 def record_gc_run ::Sidekiq.redis do |redis| redis.set(GC_LAST_RUN_KEY, (Time.now.to_f * 1000).floor, ex: gc_last_run_ttl) end end |
#register(task) ⇒ void
This method returns an undefined value.
Register a request as inflight in Redis.
173 174 175 176 177 178 179 180 181 182 183 184 185 186 |
# File 'lib/patient_http/sidekiq/task_monitor.rb', line 173 def register(task) = (Time.now.to_f * 1000).round job_payload = JSON.generate(task.task_handler.sidekiq_job) task_id = full_task_id(task.id) ::Sidekiq.redis do |redis| redis.multi do |transaction| transaction.zadd(INFLIGHT_INDEX_KEY, , task_id) transaction.hset(INFLIGHT_JOBS_KEY, task_id, job_payload) transaction.expire(INFLIGHT_INDEX_KEY, inflight_ttl) transaction.expire(INFLIGHT_JOBS_KEY, inflight_ttl) end end end |
#registered?(task) ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Check if a task is registered in the inflight registry.
239 240 241 242 243 |
# File 'lib/patient_http/sidekiq/task_monitor.rb', line 239 def registered?(task) ::Sidekiq.redis do |redis| !redis.zscore(INFLIGHT_INDEX_KEY, full_task_id(task.id)).nil? end end |
#registered_task_ids ⇒ Array<String>
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Get all registered task IDs for this registry’s process.
262 263 264 265 266 |
# File 'lib/patient_http/sidekiq/task_monitor.rb', line 262 def registered_task_ids ::Sidekiq.redis do |redis| redis.zrange(INFLIGHT_INDEX_KEY, 0, -1) end.select { |id| id.start_with?("#{@lock_identifier}/") } end |
#release_gc_lock ⇒ Boolean
Release the garbage collection lock if held by this process.
Uses Redis WATCH/MULTI/EXEC for optimistic locking to ensure we only delete the lock if it’s still held by this process.
309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 |
# File 'lib/patient_http/sidekiq/task_monitor.rb', line 309 def release_gc_lock ::Sidekiq.redis do |redis| # Watch the lock key for changes redis.watch(GC_LOCK_KEY) # Get current lock value current_value = redis.get(GC_LOCK_KEY) if current_value == @lock_identifier # Lock is ours, delete it atomically result = redis.multi do |transaction| transaction.del(GC_LOCK_KEY) end # MULTI returns nil if transaction was aborted (someone else modified the key) # Otherwise returns array with results !result.nil? else # Lock is not ours or doesn't exist redis.unwatch false end end end |
#remove_process ⇒ void
This method returns an undefined value.
Remove this process’s entry from the process set.
207 208 209 210 211 212 |
# File 'lib/patient_http/sidekiq/task_monitor.rb', line 207 def remove_process ::Sidekiq.redis do |redis| redis.srem(PROCESS_SET_KEY, @lock_identifier) redis.del(max_connections_key) end end |
#unregister(task) ⇒ void
This method returns an undefined value.
Unregister a request from Redis (called when request completes).
193 194 195 196 197 198 199 200 201 202 |
# File 'lib/patient_http/sidekiq/task_monitor.rb', line 193 def unregister(task) task_id = full_task_id(task.id) ::Sidekiq.redis do |redis| redis.multi do |transaction| transaction.zrem(INFLIGHT_INDEX_KEY, task_id) transaction.hdel(INFLIGHT_JOBS_KEY, task_id) end end end |
#update_heartbeats(task_ids) ⇒ void
This method returns an undefined value.
Update heartbeat timestamps for multiple requests in a single operation.
219 220 221 222 223 224 225 226 227 228 229 230 231 |
# File 'lib/patient_http/sidekiq/task_monitor.rb', line 219 def update_heartbeats(task_ids) return if task_ids.empty? = (Time.now.to_f * 1000).round ::Sidekiq.redis do |redis| redis.pipelined do |pipeline| task_ids.each do |task_id| pipeline.call("ZADD", INFLIGHT_INDEX_KEY, "XX", , full_task_id(task_id)) end end end end |