Class: PatientHttp::Processor

Inherits:
Object
  • Object
show all
Includes:
RedirectHelper, TimeHelper
Defined in:
lib/patient_http/processor.rb

Overview

Core processor that handles async HTTP requests in a dedicated thread

Constant Summary collapse

DEQUEUE_TIMEOUT =

Timing constants for the reactor loop

1.0

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from TimeHelper

#monotonic_time, #wall_clock_time

Constructor Details

#initialize(config) ⇒ void

Initialize the processor.

Parameters:



23
24
25
26
27
28
29
30
31
32
33
34
35
# File 'lib/patient_http/processor.rb', line 23

def initialize(config)
  @config = config
  @lifecycle = LifecycleManager.new
  @queue = Thread::Queue.new
  @reactor_thread = nil
  @inflight_requests = Concurrent::Hash.new
  @pending_tasks = Concurrent::Hash.new
  @tasks_lock = Mutex.new
  @idle_condition = ConditionVariable.new
  @testing_callback = nil
  @http_client = Client.new(self)
  @observers = []
end

Instance Attribute Details

#configConfiguration (readonly)

Returns the configuration object for the processor.

Returns:



13
14
15
# File 'lib/patient_http/processor.rb', line 13

def config
  @config
end

#testing_callbackObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Callback to invoke after each request. Only available in testing mode.



17
18
19
# File 'lib/patient_http/processor.rb', line 17

def testing_callback
  @testing_callback
end

Instance Method Details

#drainvoid

This method returns an undefined value.

Drain the processor (stop accepting new requests).



113
114
115
116
117
118
119
# File 'lib/patient_http/processor.rb', line 113

def drain
  @tasks_lock.synchronize do
    return unless @lifecycle.drain!
  end

  @config.logger&.info("[PatientHttp] Processor draining (no longer accepting new requests)")
end

#drained?Boolean

Check if processor is drained (draining and idle).

Returns:

  • (Boolean)


181
182
183
# File 'lib/patient_http/processor.rb', line 181

def drained?
  @lifecycle.draining? && idle?
end

#draining?Boolean

Check if processor is draining.

Returns:

  • (Boolean)


174
175
176
# File 'lib/patient_http/processor.rb', line 174

def draining?
  @lifecycle.draining?
end

#enqueue(task) ⇒ void

This method returns an undefined value.

Enqueue a request task for processing.

Parameters:

Raises:



127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# File 'lib/patient_http/processor.rb', line 127

def enqueue(task)
  @tasks_lock.synchronize do
    raise NotRunningError.new("Cannot enqueue request: processor is #{state}") unless running?

    # Check capacity - raise error if at max connections
    total = @queue.size + @pending_tasks.size + @inflight_requests.size
    if total >= @config.max_connections
      notify_observers { |observer| observer.capacity_exceeded }
      raise MaxCapacityError.new("Cannot enqueue request: already at max capacity (#{@config.max_connections} connections)")
    end

    task.enqueued!
    @queue.push(task)
  end
end

#idle?Boolean

Check if processor is idle (no queued or in-flight requests).

Returns:

  • (Boolean)


195
196
197
198
199
# File 'lib/patient_http/processor.rb', line 195

def idle?
  @tasks_lock.synchronize do
    @queue.empty? && @pending_tasks.empty? && @inflight_requests.empty?
  end
end

#inflight_countInteger

Get the number of in-flight requests (actively executing HTTP calls).

This does not include queued or pending tasks. For the total pipeline count used by the capacity check, see #total_count.

Returns:

  • (Integer)


207
208
209
# File 'lib/patient_http/processor.rb', line 207

def inflight_count
  @inflight_requests.size
end

#inflight_request_idsArray<String>

Get the IDs of in-flight requests.

Returns:

  • (Array<String>)


225
226
227
228
229
# File 'lib/patient_http/processor.rb', line 225

def inflight_request_ids
  @tasks_lock.synchronize do
    @inflight_requests.keys
  end
end

#observe(observer) ⇒ void

This method returns an undefined value.

Add an observer for processor events.

Parameters:



235
236
237
238
239
240
241
242
# File 'lib/patient_http/processor.rb', line 235

def observe(observer)
  @tasks_lock.synchronize do
    raise ArgumentError.new("Observer already added") if @observers.include?(observer)

    @observers << observer
    observer.start if starting? || running?
  end
end

#runObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Run the processor in a block. This is intended for use in tests to ensure the processor is started and stopped properly.



279
280
281
282
283
284
285
286
# File 'lib/patient_http/processor.rb', line 279

def run
  start
  wait_for_running
  yield
ensure
  stop(timeout: 0)
  wait_for_idle
end

#running?Boolean

Check if processor is running.

Returns:

  • (Boolean)


160
161
162
# File 'lib/patient_http/processor.rb', line 160

def running?
  @lifecycle.running?
end

#startvoid

This method returns an undefined value.

Start the processor.



40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/patient_http/processor.rb', line 40

def start
  @tasks_lock.synchronize do
    return unless @lifecycle.start!
  end

  @reactor_thread = Thread.new do
    Thread.current.name = "patient-http-processor"
    run_reactor
  rescue => e
    @config.logger&.error("[PatientHttp] Processor error: #{e.message}\n#{e.backtrace.join("\n")}")

    raise if PatientHttp.testing?
  ensure
    @tasks_lock.synchronize { @lifecycle.stopped! } if @reactor_thread == Thread.current
  end

  @tasks_lock.synchronize do
    @lifecycle.running!
    notify_observers { |observer| observer.start }
  end

  # Block until the reactor is ready
  @lifecycle.wait_for_reactor
end

#starting?Boolean

Check if processor is starting.

Returns:

  • (Boolean)


153
154
155
# File 'lib/patient_http/processor.rb', line 153

def starting?
  @lifecycle.starting?
end

#stateSymbol

Get the current processor state.

Returns:

  • (Symbol)

    the current state



146
147
148
# File 'lib/patient_http/processor.rb', line 146

def state
  @lifecycle.state
end

#stop(timeout: nil) ⇒ void

This method returns an undefined value.

Stop the processor.

Parameters:

  • timeout (Numeric, nil) (defaults to: nil)

    how long to wait for in-flight requests (seconds)



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# File 'lib/patient_http/processor.rb', line 69

def stop(timeout: nil)
  timeout ||= @config.shutdown_timeout

  # Atomically transition to stopping state under lock to ensure consistency
  # with other state-checking operations
  @tasks_lock.synchronize do
    return unless @lifecycle.stop!
  end

  # Interrupt the reactor's queue wait by pushing a sentinel value
  @queue.push(nil)

  # Wait for in-flight and pending requests to complete.
  # Queue items are not checked here — they will be re-enqueued by
  # reenqueue_remaining_queue_items after the reactor thread exits.
  if timeout > 0
    deadline = monotonic_time + timeout
    @tasks_lock.synchronize do
      loop do
        break if @pending_tasks.empty? && @inflight_requests.empty?
        remaining = deadline - monotonic_time
        break if remaining <= 0
        @idle_condition.wait(@tasks_lock, remaining)
      end
    end
  end

  reenqueue_pending_requests

  @reactor_thread.join(1) if @reactor_thread&.alive?
  @reactor_thread.kill if @reactor_thread&.alive?
  @reactor_thread = nil

  # Drain any items left in the queue after the reactor has exited.
  # This must happen after the reactor thread is done to avoid consuming
  # the nil sentinel that wakes the reactor.
  reenqueue_remaining_queue_items

  notify_observers { |observer| observer.stop }
end

#stopped?Boolean

Check if processor is stopped.

Returns:

  • (Boolean)


167
168
169
# File 'lib/patient_http/processor.rb', line 167

def stopped?
  @lifecycle.stopped?
end

#stopping?Boolean

Check if processor is stopping.

Returns:

  • (Boolean)


188
189
190
# File 'lib/patient_http/processor.rb', line 188

def stopping?
  @lifecycle.stopping?
end

#total_countInteger

Get the total number of tasks in the pipeline (queued + pending + in-flight).

This is the count used by #enqueue for capacity enforcement.

Returns:

  • (Integer)


216
217
218
219
220
# File 'lib/patient_http/processor.rb', line 216

def total_count
  @tasks_lock.synchronize do
    @queue.size + @pending_tasks.size + @inflight_requests.size
  end
end

#wait_for_idle(timeout: 1) ⇒ Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Wait for the queue to be empty and all in-flight requests to complete. This is mainly for use in tests.

Parameters:

  • timeout (Numeric) (defaults to: 1)

    maximum time to wait in seconds (default: 5)

Returns:

  • (Boolean)

    true if processing completed, false if timeout reached



260
261
262
# File 'lib/patient_http/processor.rb', line 260

def wait_for_idle(timeout: 1)
  @lifecycle.wait_for_condition(timeout: timeout) { idle? }
end

#wait_for_processing(timeout: 1) ⇒ Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Wait for at least one request to start processing. This is mainly for use in tests.

Parameters:

  • timeout (Numeric) (defaults to: 1)

    maximum time to wait in seconds (default: 5)

Returns:

  • (Boolean)

    true if a request started processing, false if timeout reached



269
270
271
272
273
# File 'lib/patient_http/processor.rb', line 269

def wait_for_processing(timeout: 1)
  @lifecycle.wait_for_condition(timeout: timeout) do
    !@inflight_requests.empty? || !@pending_tasks.empty?
  end
end

#wait_for_running(timeout: 5) ⇒ Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Wait for the processor to start.

Parameters:

  • timeout (Numeric) (defaults to: 5)

    maximum time to wait in seconds (default: 5)

Returns:

  • (Boolean)

    true if started, false if timeout reached



249
250
251
252
# File 'lib/patient_http/processor.rb', line 249

def wait_for_running(timeout: 5)
  start
  @lifecycle.wait_for_running(timeout: timeout)
end