Class: PatientHttp::SolidQueue::TaskMonitor
- Inherits:
-
Object
- Object
- PatientHttp::SolidQueue::TaskMonitor
- Defined in:
- lib/patient_http/solid_queue/task_monitor.rb
Overview
Manages inflight request tracking in the database for crash recovery.
This class maintains Active Record records for each in-flight request. It provides distributed locking for orphan detection and automatic re-enqueueing of requests interrupted by process crashes.
Task ID format: “hostname:pid:hex/request-uuid”
-
hostname: sanitized hostname (colons and slashes replaced with dashes)
-
pid: process ID
-
hex: 8-character random hex for uniqueness
-
request-uuid: unique identifier for the request
Constant Summary collapse
- GC_LOCK_NAME =
"gc"
Instance Attribute Summary collapse
-
#config ⇒ Configuration
readonly
The configuration object.
Class Method Summary collapse
-
.clear_all! ⇒ void
private
Clear all records.
Instance Method Summary collapse
-
#acquire_gc_lock ⇒ Boolean
Try to acquire the distributed garbage collection lock.
-
#cleanup_orphaned_requests(orphan_threshold_seconds, logger) ⇒ Integer
Find and re-enqueue orphaned requests.
-
#full_task_id(task_id) ⇒ String
Build unique task ID for a request task that includes process identifier.
-
#initialize(config) ⇒ TaskMonitor
constructor
A new instance of TaskMonitor.
-
#ping_process ⇒ void
Record or refresh this process’s registration.
-
#register(task) ⇒ void
Register a request as inflight in the database.
-
#registered?(task) ⇒ Boolean
private
Check if a task is registered in the inflight table.
-
#release_gc_lock ⇒ void
Release the garbage collection lock if held by this process, and record last_gc_at.
-
#remove_process ⇒ void
Remove this process’s registration.
-
#unregister(task) ⇒ void
Unregister a request from the database (called when request completes).
-
#update_heartbeats(task_ids) ⇒ void
Update heartbeat timestamps for multiple requests in a single operation.
Constructor Details
#initialize(config) ⇒ TaskMonitor
Returns a new instance of TaskMonitor.
22 23 24 25 26 27 |
# File 'lib/patient_http/solid_queue/task_monitor.rb', line 22 def initialize(config) @config = config hostname = ::Socket.gethostname.force_encoding("UTF-8").tr(":/", "-") pid = ::Process.pid @lock_identifier = "#{hostname}:#{pid}:#{SecureRandom.hex(8)}".freeze end |
Instance Attribute Details
#config ⇒ Configuration (readonly)
Returns the configuration object.
20 21 22 |
# File 'lib/patient_http/solid_queue/task_monitor.rb', line 20 def config @config end |
Class Method Details
.clear_all! ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
Clear all records. Only allowed in test environment.
201 202 203 204 205 206 207 208 209 |
# File 'lib/patient_http/solid_queue/task_monitor.rb', line 201 def self.clear_all! unless PatientHttp.testing? raise "clear_all! is only allowed in test environment" end InflightRequest.delete_all ProcessRegistration.delete_all GcLock.delete_all end |
Instance Method Details
#acquire_gc_lock ⇒ Boolean
Try to acquire the distributed garbage collection lock.
Uses a single semaphore row and pessimistic locking to ensure only one process can claim the lock at a time. Returns false if another process holds a non-expired lock, or if GC was run recently (within heartbeat_interval).
107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 |
# File 'lib/patient_http/solid_queue/task_monitor.rb', line 107 def acquire_gc_lock now = Time.current expires_at = now + gc_lock_ttl.seconds acquired = false ensure_gc_lock_row! GcLock.transaction do lock = GcLock.lock.find_by!(lock_name: GC_LOCK_NAME) recent_gc = lock.last_gc_at && lock.last_gc_at > (now - @config.heartbeat_interval) next if recent_gc lock_held = lock.lock_holder.present? && lock.expires_at.present? && lock.expires_at > now next if lock_held lock.update!( lock_holder: @lock_identifier, acquired_at: now, expires_at: expires_at ) acquired = true end acquired rescue => e @config.logger&.error("[PatientHttp::SolidQueue] Failed to acquire GC lock: #{e.}") raise if PatientHttp.testing? false end |
#cleanup_orphaned_requests(orphan_threshold_seconds, logger) ⇒ Integer
Find and re-enqueue orphaned requests.
154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 |
# File 'lib/patient_http/solid_queue/task_monitor.rb', line 154 def cleanup_orphaned_requests(orphan_threshold_seconds, logger) threshold = Time.current - orphan_threshold_seconds.seconds prune_stale_process_registrations(threshold) # Get process IDs with a recent heartbeat active_process_ids = ProcessRegistration.where("last_seen_at >= ?", threshold).pluck(:process_id) # Find stale requests from processes not in the active set orphaned = InflightRequest .where("heartbeat_at < ?", threshold) .where.not(process_id: active_process_ids) .to_a return 0 if orphaned.empty? reenqueued_count = 0 orphaned.each do |record| reenqueued_count += 1 if reenqueue_orphaned_record(record, threshold, logger) end reenqueued_count end |
#full_task_id(task_id) ⇒ String
Build unique task ID for a request task that includes process identifier.
183 184 185 |
# File 'lib/patient_http/solid_queue/task_monitor.rb', line 183 def full_task_id(task_id) "#{@lock_identifier}/#{task_id}" end |
#ping_process ⇒ void
This method returns an undefined value.
Record or refresh this process’s registration.
79 80 81 82 83 84 85 86 87 |
# File 'lib/patient_http/solid_queue/task_monitor.rb', line 79 def ping_process ProcessRegistration.upsert( {process_id: @lock_identifier, max_connections: @config.max_connections, last_seen_at: Time.current}, unique_by: :process_id ) rescue => e @config.logger&.error("[PatientHttp::SolidQueue] Failed to ping process: #{e.}") raise if PatientHttp.testing? end |
#register(task) ⇒ void
This method returns an undefined value.
Register a request as inflight in the database.
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/patient_http/solid_queue/task_monitor.rb', line 33 def register(task) job_payload = task.task_handler.active_job_data.to_json task_id = full_task_id(task.id) now = Time.current InflightRequest.create!( task_id: task_id, process_id: @lock_identifier, job_payload: job_payload, heartbeat_at: now, created_at: now ) rescue => e @config.logger&.error("[PatientHttp::SolidQueue] Failed to register task #{task_id}: #{e.}") raise if PatientHttp.testing? end |
#registered?(task) ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Check if a task is registered in the inflight table.
192 193 194 |
# File 'lib/patient_http/solid_queue/task_monitor.rb', line 192 def registered?(task) InflightRequest.where(task_id: full_task_id(task.id)).exists? end |
#release_gc_lock ⇒ void
This method returns an undefined value.
Release the garbage collection lock if held by this process, and record last_gc_at.
141 142 143 144 145 146 147 |
# File 'lib/patient_http/solid_queue/task_monitor.rb', line 141 def release_gc_lock GcLock.where(lock_name: GC_LOCK_NAME, lock_holder: @lock_identifier) .update_all(last_gc_at: Time.current, lock_holder: nil, acquired_at: nil, expires_at: nil) rescue => e @config.logger&.error("[PatientHttp::SolidQueue] Failed to release GC lock: #{e.}") raise if PatientHttp.testing? end |
#remove_process ⇒ void
This method returns an undefined value.
Remove this process’s registration.
92 93 94 95 96 97 |
# File 'lib/patient_http/solid_queue/task_monitor.rb', line 92 def remove_process ProcessRegistration.where(process_id: @lock_identifier).delete_all rescue => e @config.logger&.error("[PatientHttp::SolidQueue] Failed to remove process: #{e.}") raise if PatientHttp.testing? end |
#unregister(task) ⇒ void
This method returns an undefined value.
Unregister a request from the database (called when request completes).
54 55 56 57 58 59 60 |
# File 'lib/patient_http/solid_queue/task_monitor.rb', line 54 def unregister(task) task_id = full_task_id(task.id) InflightRequest.where(task_id: task_id).delete_all rescue => e @config.logger&.error("[PatientHttp::SolidQueue] Failed to unregister task #{task_id}: #{e.}") raise if PatientHttp.testing? end |
#update_heartbeats(task_ids) ⇒ void
This method returns an undefined value.
Update heartbeat timestamps for multiple requests in a single operation.
66 67 68 69 70 71 72 73 74 |
# File 'lib/patient_http/solid_queue/task_monitor.rb', line 66 def update_heartbeats(task_ids) return if task_ids.empty? full_ids = task_ids.map { |id| full_task_id(id) } InflightRequest.where(task_id: full_ids).update_all(heartbeat_at: Time.current) rescue => e @config.logger&.error("[PatientHttp::SolidQueue] Failed to update heartbeats: #{e.}") raise if PatientHttp.testing? end |