Class: PatientHttp::SolidQueue::TaskMonitor

Inherits:
Object
  • Object
show all
Defined in:
lib/patient_http/solid_queue/task_monitor.rb

Overview

Manages inflight request tracking in the database for crash recovery.

This class maintains Active Record records for each in-flight request. It provides distributed locking for orphan detection and automatic re-enqueueing of requests interrupted by process crashes.

Task ID format: “hostname:pid:hex/request-uuid”

  • hostname: sanitized hostname (colons and slashes replaced with dashes)

  • pid: process ID

  • hex: 8-character random hex for uniqueness

  • request-uuid: unique identifier for the request

Constant Summary collapse

GC_LOCK_NAME =
"gc"

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config) ⇒ TaskMonitor

Returns a new instance of TaskMonitor.



22
23
24
25
26
27
# File 'lib/patient_http/solid_queue/task_monitor.rb', line 22

def initialize(config)
  @config = config
  hostname = ::Socket.gethostname.force_encoding("UTF-8").tr(":/", "-")
  pid = ::Process.pid
  @lock_identifier = "#{hostname}:#{pid}:#{SecureRandom.hex(8)}".freeze
end

Instance Attribute Details

#configConfiguration (readonly)

Returns the configuration object.

Returns:



20
21
22
# File 'lib/patient_http/solid_queue/task_monitor.rb', line 20

def config
  @config
end

Class Method Details

.clear_all!void

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Clear all records. Only allowed in test environment.

Raises:

  • (RuntimeError)

    if called outside of test environment



201
202
203
204
205
206
207
208
209
# File 'lib/patient_http/solid_queue/task_monitor.rb', line 201

def self.clear_all!
  unless PatientHttp.testing?
    raise "clear_all! is only allowed in test environment"
  end

  InflightRequest.delete_all
  ProcessRegistration.delete_all
  GcLock.delete_all
end

Instance Method Details

#acquire_gc_lockBoolean

Try to acquire the distributed garbage collection lock.

Uses a single semaphore row and pessimistic locking to ensure only one process can claim the lock at a time. Returns false if another process holds a non-expired lock, or if GC was run recently (within heartbeat_interval).

Returns:

  • (Boolean)

    true if lock acquired, false otherwise



107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
# File 'lib/patient_http/solid_queue/task_monitor.rb', line 107

def acquire_gc_lock
  now = Time.current
  expires_at = now + gc_lock_ttl.seconds
  acquired = false

  ensure_gc_lock_row!

  GcLock.transaction do
    lock = GcLock.lock.find_by!(lock_name: GC_LOCK_NAME)

    recent_gc = lock.last_gc_at && lock.last_gc_at > (now - @config.heartbeat_interval)
    next if recent_gc

    lock_held = lock.lock_holder.present? && lock.expires_at.present? && lock.expires_at > now
    next if lock_held

    lock.update!(
      lock_holder: @lock_identifier,
      acquired_at: now,
      expires_at: expires_at
    )
    acquired = true
  end

  acquired
rescue => e
  @config.logger&.error("[PatientHttp::SolidQueue] Failed to acquire GC lock: #{e.message}")
  raise if PatientHttp.testing?
  false
end

#cleanup_orphaned_requests(orphan_threshold_seconds, logger) ⇒ Integer

Find and re-enqueue orphaned requests.

Parameters:

  • orphan_threshold_seconds (Numeric)

    age threshold for considering a request orphaned

  • logger (Logger)

    logger for output

Returns:

  • (Integer)

    number of orphaned requests re-enqueued



154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
# File 'lib/patient_http/solid_queue/task_monitor.rb', line 154

def cleanup_orphaned_requests(orphan_threshold_seconds, logger)
  threshold = Time.current - orphan_threshold_seconds.seconds

  prune_stale_process_registrations(threshold)

  # Get process IDs with a recent heartbeat
  active_process_ids = ProcessRegistration.where("last_seen_at >= ?", threshold).pluck(:process_id)

  # Find stale requests from processes not in the active set
  orphaned = InflightRequest
    .where("heartbeat_at < ?", threshold)
    .where.not(process_id: active_process_ids)
    .to_a

  return 0 if orphaned.empty?

  reenqueued_count = 0

  orphaned.each do |record|
    reenqueued_count += 1 if reenqueue_orphaned_record(record, threshold, logger)
  end

  reenqueued_count
end

#full_task_id(task_id) ⇒ String

Build unique task ID for a request task that includes process identifier.

Parameters:

  • task_id (String)

    the request task ID

Returns:

  • (String)

    the unique task ID



183
184
185
# File 'lib/patient_http/solid_queue/task_monitor.rb', line 183

def full_task_id(task_id)
  "#{@lock_identifier}/#{task_id}"
end

#ping_processvoid

This method returns an undefined value.

Record or refresh this process’s registration.



79
80
81
82
83
84
85
86
87
# File 'lib/patient_http/solid_queue/task_monitor.rb', line 79

def ping_process
  ProcessRegistration.upsert(
    {process_id: @lock_identifier, max_connections: @config.max_connections, last_seen_at: Time.current},
    unique_by: :process_id
  )
rescue => e
  @config.logger&.error("[PatientHttp::SolidQueue] Failed to ping process: #{e.message}")
  raise if PatientHttp.testing?
end

#register(task) ⇒ void

This method returns an undefined value.

Register a request as inflight in the database.

Parameters:

  • task (PatientHttp::RequestTask)

    the request task to register



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/patient_http/solid_queue/task_monitor.rb', line 33

def register(task)
  job_payload = task.task_handler.active_job_data.to_json
  task_id = full_task_id(task.id)
  now = Time.current

  InflightRequest.create!(
    task_id: task_id,
    process_id: @lock_identifier,
    job_payload: job_payload,
    heartbeat_at: now,
    created_at: now
  )
rescue => e
  @config.logger&.error("[PatientHttp::SolidQueue] Failed to register task #{task_id}: #{e.message}")
  raise if PatientHttp.testing?
end

#registered?(task) ⇒ Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Check if a task is registered in the inflight table.

Parameters:

  • task (PatientHttp::RequestTask)

    the request task

Returns:

  • (Boolean)


192
193
194
# File 'lib/patient_http/solid_queue/task_monitor.rb', line 192

def registered?(task)
  InflightRequest.where(task_id: full_task_id(task.id)).exists?
end

#release_gc_lockvoid

This method returns an undefined value.

Release the garbage collection lock if held by this process, and record last_gc_at.



141
142
143
144
145
146
147
# File 'lib/patient_http/solid_queue/task_monitor.rb', line 141

def release_gc_lock
  GcLock.where(lock_name: GC_LOCK_NAME, lock_holder: @lock_identifier)
    .update_all(last_gc_at: Time.current, lock_holder: nil, acquired_at: nil, expires_at: nil)
rescue => e
  @config.logger&.error("[PatientHttp::SolidQueue] Failed to release GC lock: #{e.message}")
  raise if PatientHttp.testing?
end

#remove_processvoid

This method returns an undefined value.

Remove this process’s registration.



92
93
94
95
96
97
# File 'lib/patient_http/solid_queue/task_monitor.rb', line 92

def remove_process
  ProcessRegistration.where(process_id: @lock_identifier).delete_all
rescue => e
  @config.logger&.error("[PatientHttp::SolidQueue] Failed to remove process: #{e.message}")
  raise if PatientHttp.testing?
end

#unregister(task) ⇒ void

This method returns an undefined value.

Unregister a request from the database (called when request completes).

Parameters:

  • task (PatientHttp::RequestTask)

    the request task to unregister



54
55
56
57
58
59
60
# File 'lib/patient_http/solid_queue/task_monitor.rb', line 54

def unregister(task)
  task_id = full_task_id(task.id)
  InflightRequest.where(task_id: task_id).delete_all
rescue => e
  @config.logger&.error("[PatientHttp::SolidQueue] Failed to unregister task #{task_id}: #{e.message}")
  raise if PatientHttp.testing?
end

#update_heartbeats(task_ids) ⇒ void

This method returns an undefined value.

Update heartbeat timestamps for multiple requests in a single operation.

Parameters:

  • task_ids (Array<String>)

    the request IDs to update



66
67
68
69
70
71
72
73
74
# File 'lib/patient_http/solid_queue/task_monitor.rb', line 66

def update_heartbeats(task_ids)
  return if task_ids.empty?

  full_ids = task_ids.map { |id| full_task_id(id) }
  InflightRequest.where(task_id: full_ids).update_all(heartbeat_at: Time.current)
rescue => e
  @config.logger&.error("[PatientHttp::SolidQueue] Failed to update heartbeats: #{e.message}")
  raise if PatientHttp.testing?
end