Class: PatientHttp::Processor
- Inherits:
-
Object
- Object
- PatientHttp::Processor
- Includes:
- RedirectHelper, TimeHelper
- Defined in:
- lib/patient_http/processor.rb
Overview
Core processor that handles async HTTP requests in a dedicated thread
Constant Summary collapse
- DEQUEUE_TIMEOUT =
Timing constants for the reactor loop
1.0
Instance Attribute Summary collapse
-
#config ⇒ Configuration
readonly
The configuration object for the processor.
-
#testing_callback ⇒ Object
private
Callback to invoke after each request.
Instance Method Summary collapse
-
#drain ⇒ void
Drain the processor (stop accepting new requests).
-
#drained? ⇒ Boolean
Check if processor is drained (draining and idle).
-
#draining? ⇒ Boolean
Check if processor is draining.
-
#enqueue(task) ⇒ void
Enqueue a request task for processing.
-
#idle? ⇒ Boolean
Check if processor is idle (no queued or in-flight requests).
-
#inflight_count ⇒ Integer
Get the number of in-flight requests (actively executing HTTP calls).
-
#inflight_request_ids ⇒ Array<String>
Get the IDs of in-flight requests.
-
#initialize(config) ⇒ void
constructor
Initialize the processor.
-
#observe(observer) ⇒ void
Add an observer for processor events.
-
#run ⇒ Object
private
Run the processor in a block.
-
#running? ⇒ Boolean
Check if processor is running.
-
#start ⇒ void
Start the processor.
-
#starting? ⇒ Boolean
Check if processor is starting.
-
#state ⇒ Symbol
Get the current processor state.
-
#stop(timeout: nil) ⇒ void
Stop the processor.
-
#stopped? ⇒ Boolean
Check if processor is stopped.
-
#stopping? ⇒ Boolean
Check if processor is stopping.
-
#total_count ⇒ Integer
Get the total number of tasks in the pipeline (queued + pending + in-flight).
-
#wait_for_idle(timeout: 1) ⇒ Boolean
private
Wait for the queue to be empty and all in-flight requests to complete.
-
#wait_for_processing(timeout: 1) ⇒ Boolean
private
Wait for at least one request to start processing.
-
#wait_for_running(timeout: 5) ⇒ Boolean
private
Wait for the processor to start.
Methods included from TimeHelper
#monotonic_time, #wall_clock_time
Constructor Details
#initialize(config) ⇒ void
Initialize the processor.
23 24 25 26 27 28 29 30 31 32 33 34 35 |
# File 'lib/patient_http/processor.rb', line 23 def initialize(config) @config = config @lifecycle = LifecycleManager.new @queue = Thread::Queue.new @reactor_thread = nil @inflight_requests = Concurrent::Hash.new @pending_tasks = Concurrent::Hash.new @tasks_lock = Mutex.new @idle_condition = ConditionVariable.new @testing_callback = nil @http_client = Client.new(self) @observers = [] end |
Instance Attribute Details
#config ⇒ Configuration (readonly)
Returns the configuration object for the processor.
13 14 15 |
# File 'lib/patient_http/processor.rb', line 13 def config @config end |
#testing_callback ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Callback to invoke after each request. Only available in testing mode.
17 18 19 |
# File 'lib/patient_http/processor.rb', line 17 def testing_callback @testing_callback end |
Instance Method Details
#drain ⇒ void
This method returns an undefined value.
Drain the processor (stop accepting new requests).
113 114 115 116 117 118 119 |
# File 'lib/patient_http/processor.rb', line 113 def drain @tasks_lock.synchronize do return unless @lifecycle.drain! end @config.logger&.info("[PatientHttp] Processor draining (no longer accepting new requests)") end |
#drained? ⇒ Boolean
Check if processor is drained (draining and idle).
181 182 183 |
# File 'lib/patient_http/processor.rb', line 181 def drained? @lifecycle.draining? && idle? end |
#draining? ⇒ Boolean
Check if processor is draining.
174 175 176 |
# File 'lib/patient_http/processor.rb', line 174 def draining? @lifecycle.draining? end |
#enqueue(task) ⇒ void
This method returns an undefined value.
Enqueue a request task for processing.
127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 |
# File 'lib/patient_http/processor.rb', line 127 def enqueue(task) @tasks_lock.synchronize do raise NotRunningError.new("Cannot enqueue request: processor is #{state}") unless running? # Check capacity - raise error if at max connections total = @queue.size + @pending_tasks.size + @inflight_requests.size if total >= @config.max_connections notify_observers { |observer| observer.capacity_exceeded } raise MaxCapacityError.new("Cannot enqueue request: already at max capacity (#{@config.max_connections} connections)") end task.enqueued! @queue.push(task) end end |
#idle? ⇒ Boolean
Check if processor is idle (no queued or in-flight requests).
195 196 197 198 199 |
# File 'lib/patient_http/processor.rb', line 195 def idle? @tasks_lock.synchronize do @queue.empty? && @pending_tasks.empty? && @inflight_requests.empty? end end |
#inflight_count ⇒ Integer
Get the number of in-flight requests (actively executing HTTP calls).
This does not include queued or pending tasks. For the total pipeline count used by the capacity check, see #total_count.
207 208 209 |
# File 'lib/patient_http/processor.rb', line 207 def inflight_count @inflight_requests.size end |
#inflight_request_ids ⇒ Array<String>
Get the IDs of in-flight requests.
225 226 227 228 229 |
# File 'lib/patient_http/processor.rb', line 225 def inflight_request_ids @tasks_lock.synchronize do @inflight_requests.keys end end |
#observe(observer) ⇒ void
This method returns an undefined value.
Add an observer for processor events.
235 236 237 238 239 240 241 242 |
# File 'lib/patient_http/processor.rb', line 235 def observe(observer) @tasks_lock.synchronize do raise ArgumentError.new("Observer already added") if @observers.include?(observer) @observers << observer observer.start if starting? || running? end end |
#run ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Run the processor in a block. This is intended for use in tests to ensure the processor is started and stopped properly.
279 280 281 282 283 284 285 286 |
# File 'lib/patient_http/processor.rb', line 279 def run start wait_for_running yield ensure stop(timeout: 0) wait_for_idle end |
#running? ⇒ Boolean
Check if processor is running.
160 161 162 |
# File 'lib/patient_http/processor.rb', line 160 def running? @lifecycle.running? end |
#start ⇒ void
This method returns an undefined value.
Start the processor.
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/patient_http/processor.rb', line 40 def start @tasks_lock.synchronize do return unless @lifecycle.start! end @reactor_thread = Thread.new do Thread.current.name = "patient-http-processor" run_reactor rescue => e @config.logger&.error("[PatientHttp] Processor error: #{e.}\n#{e.backtrace.join("\n")}") raise if PatientHttp.testing? ensure @tasks_lock.synchronize { @lifecycle.stopped! } if @reactor_thread == Thread.current end @tasks_lock.synchronize do @lifecycle.running! notify_observers { |observer| observer.start } end # Block until the reactor is ready @lifecycle.wait_for_reactor end |
#starting? ⇒ Boolean
Check if processor is starting.
153 154 155 |
# File 'lib/patient_http/processor.rb', line 153 def starting? @lifecycle.starting? end |
#state ⇒ Symbol
Get the current processor state.
146 147 148 |
# File 'lib/patient_http/processor.rb', line 146 def state @lifecycle.state end |
#stop(timeout: nil) ⇒ void
This method returns an undefined value.
Stop the processor.
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 |
# File 'lib/patient_http/processor.rb', line 69 def stop(timeout: nil) timeout ||= @config.shutdown_timeout # Atomically transition to stopping state under lock to ensure consistency # with other state-checking operations @tasks_lock.synchronize do return unless @lifecycle.stop! end # Interrupt the reactor's queue wait by pushing a sentinel value @queue.push(nil) # Wait for in-flight and pending requests to complete. # Queue items are not checked here — they will be re-enqueued by # reenqueue_remaining_queue_items after the reactor thread exits. if timeout > 0 deadline = monotonic_time + timeout @tasks_lock.synchronize do loop do break if @pending_tasks.empty? && @inflight_requests.empty? remaining = deadline - monotonic_time break if remaining <= 0 @idle_condition.wait(@tasks_lock, remaining) end end end reenqueue_pending_requests @reactor_thread.join(1) if @reactor_thread&.alive? @reactor_thread.kill if @reactor_thread&.alive? @reactor_thread = nil # Drain any items left in the queue after the reactor has exited. # This must happen after the reactor thread is done to avoid consuming # the nil sentinel that wakes the reactor. reenqueue_remaining_queue_items notify_observers { |observer| observer.stop } end |
#stopped? ⇒ Boolean
Check if processor is stopped.
167 168 169 |
# File 'lib/patient_http/processor.rb', line 167 def stopped? @lifecycle.stopped? end |
#stopping? ⇒ Boolean
Check if processor is stopping.
188 189 190 |
# File 'lib/patient_http/processor.rb', line 188 def stopping? @lifecycle.stopping? end |
#total_count ⇒ Integer
Get the total number of tasks in the pipeline (queued + pending + in-flight).
This is the count used by #enqueue for capacity enforcement.
216 217 218 219 220 |
# File 'lib/patient_http/processor.rb', line 216 def total_count @tasks_lock.synchronize do @queue.size + @pending_tasks.size + @inflight_requests.size end end |
#wait_for_idle(timeout: 1) ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Wait for the queue to be empty and all in-flight requests to complete. This is mainly for use in tests.
260 261 262 |
# File 'lib/patient_http/processor.rb', line 260 def wait_for_idle(timeout: 1) @lifecycle.wait_for_condition(timeout: timeout) { idle? } end |
#wait_for_processing(timeout: 1) ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Wait for at least one request to start processing. This is mainly for use in tests.
269 270 271 272 273 |
# File 'lib/patient_http/processor.rb', line 269 def wait_for_processing(timeout: 1) @lifecycle.wait_for_condition(timeout: timeout) do !@inflight_requests.empty? || !@pending_tasks.empty? end end |
#wait_for_running(timeout: 5) ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Wait for the processor to start.
249 250 251 252 |
# File 'lib/patient_http/processor.rb', line 249 def wait_for_running(timeout: 5) start @lifecycle.wait_for_running(timeout: timeout) end |