Module: PatientHttp::SolidQueue

Defined in:
lib/patient_http/solid_queue.rb,
lib/patient_http/solid_queue/engine.rb,
lib/patient_http/solid_queue/record.rb,
lib/patient_http/solid_queue/context.rb,
lib/patient_http/solid_queue/gc_lock.rb,
lib/patient_http/solid_queue/request_job.rb,
lib/patient_http/solid_queue/callback_job.rb,
lib/patient_http/solid_queue/task_handler.rb,
lib/patient_http/solid_queue/task_monitor.rb,
lib/patient_http/solid_queue/configuration.rb,
lib/patient_http/solid_queue/lifecycle_hooks.rb,
lib/patient_http/solid_queue/inflight_request.rb,
lib/patient_http/solid_queue/request_executor.rb,
lib/patient_http/solid_queue/processor_observer.rb,
lib/patient_http/solid_queue/task_monitor_thread.rb,
lib/patient_http/solid_queue/process_registration.rb

Defined Under Namespace

Classes: CallbackJob, Configuration, Context, Engine, GcLock, InflightRequest, LifecycleHooks, ProcessRegistration, ProcessorObserver, Record, RequestExecutor, RequestJob, TaskHandler, TaskMonitor, TaskMonitorThread

Constant Summary collapse

VERSION =
File.read(File.join(__dir__, "../../VERSION")).strip

Class Attribute Summary collapse

Class Method Summary collapse

Class Attribute Details

.configurationConfiguration

Return the current configuration, initializing with defaults if necessary.

Returns:



75
76
77
# File 'lib/patient_http/solid_queue.rb', line 75

def configuration
  @configuration ||= Configuration.new
end

.processorPatientHttp::Processor?

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns the processor instance.

Returns:

  • (PatientHttp::Processor, nil)


275
276
277
# File 'lib/patient_http/solid_queue.rb', line 275

def processor
  @processor
end

Class Method Details

.after_completion {|response| ... } ⇒ Object

Add a callback to be executed after a successful request completion.

Yields:

  • (response)

    block to execute after an HTTP request completes

Yield Parameters:

  • response (PatientHttp::Response)

    the HTTP response



91
92
93
# File 'lib/patient_http/solid_queue.rb', line 91

def after_completion(&block)
  @after_completion_callbacks << block
end

.after_error {|error| ... } ⇒ Object

Add a callback to be executed after a request error.

Yields:

  • (error)

    block to execute after an HTTP request errors

Yield Parameters:

  • error (PatientHttp::Error)

    information about the error



99
100
101
# File 'lib/patient_http/solid_queue.rb', line 99

def after_error(&block)
  @after_error_callbacks << block
end

.configure {|Configuration| ... } ⇒ Configuration

Configure the gem with a block.

Yields:

Returns:



66
67
68
69
70
# File 'lib/patient_http/solid_queue.rb', line 66

def configure
  configuration = Configuration.new
  yield(configuration) if block_given?
  @configuration = configuration
end

.decrypt(value) ⇒ Object

Decrypt a value using the configured encryptor.

Parameters:

  • value (String)

    the encrypted value to decrypt

Returns:

  • (Object)

    the decrypted value



267
268
269
# File 'lib/patient_http/solid_queue.rb', line 267

def decrypt(value)
  configuration.encryptor.decrypt(value)
end

.draining?Boolean

Check if the processor is draining (not accepting new requests).

Returns:

  • (Boolean)


113
114
115
# File 'lib/patient_http/solid_queue.rb', line 113

def draining?
  !!@processor&.draining?
end

.encrypt(value) ⇒ String

Encrypt a value using the configured encryptor.

Parameters:

  • value (Object)

    the value to encrypt

Returns:

  • (String)

    the encrypted value



259
260
261
# File 'lib/patient_http/solid_queue.rb', line 259

def encrypt(value)
  configuration.encryptor.encrypt(value)
end

.execute(request, callback:, callback_args: nil, raise_error_responses: false) ⇒ String

Execute an async HTTP request.

Parameters:

  • request (PatientHttp::Request)

    the HTTP request to execute

  • callback (Class, String)

    Callback service class with on_complete and on_error instance methods, or its fully qualified class name.

  • callback_args (#to_h, nil) (defaults to: nil)

    Arguments to pass to callback

  • raise_error_responses (Boolean) (defaults to: false)

    If true, treats non-2xx responses as errors

Returns:

  • (String)

    the request ID



147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
# File 'lib/patient_http/solid_queue.rb', line 147

def execute(request, callback:, callback_args: nil, raise_error_responses: false)
  PatientHttp::CallbackValidator.validate!(callback)
  callback_name = callback.is_a?(Class) ? callback.name : callback.to_s
  callback_args = PatientHttp::CallbackValidator.validate_callback_args(callback_args)
  request_id = SecureRandom.uuid

  encrypted = encrypt(request.as_json)

  data = if external_storage.enabled?
    external_storage.store(encrypted, max_size: configuration.payload_store_threshold)
  else
    encrypted
  end

  RequestJob.perform_later(data, callback_name, raise_error_responses, callback_args, request_id)

  request_id
end

.external_storagePatientHttp::ExternalStorage

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Get an ExternalStorage instance for storing and fetching payloads.

Returns:

  • (PatientHttp::ExternalStorage)


135
136
137
# File 'lib/patient_http/solid_queue.rb', line 135

def external_storage
  @external_storage ||= PatientHttp::ExternalStorage.new(configuration)
end

.invoke_completion_callbacks(response) ⇒ void

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Invoke the registered completion callbacks.

Parameters:

  • response (PatientHttp::Response)

    the HTTP response



234
235
236
237
238
239
240
# File 'lib/patient_http/solid_queue.rb', line 234

def invoke_completion_callbacks(response)
  @after_completion_callbacks.each do |callback|
    callback.call(response)
  rescue => e
    configuration.logger&.error("[PatientHttp::SolidQueue] after_completion callback error: #{e.class} - #{e.message}")
  end
end

.invoke_error_callbacks(error) ⇒ void

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Invoke the registered error callbacks.

Parameters:

  • error (PatientHttp::Error)

    information about the error



247
248
249
250
251
252
253
# File 'lib/patient_http/solid_queue.rb', line 247

def invoke_error_callbacks(error)
  @after_error_callbacks.each do |callback|
    callback.call(error)
  rescue => e
    configuration.logger&.error("[PatientHttp::SolidQueue] after_error callback error: #{e.class} - #{e.message}")
  end
end

.quietvoid

This method returns an undefined value.

Signal the processor to drain (stop accepting new requests).



191
192
193
194
195
# File 'lib/patient_http/solid_queue.rb', line 191

def quiet
  return unless running?

  @processor.drain
end

.reset!void

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Reset all state (useful for testing).



216
217
218
219
220
221
222
223
224
225
226
227
# File 'lib/patient_http/solid_queue.rb', line 216

def reset!
  if @request_handler
    PatientHttp.unregister_handler(@request_handler)
    @request_handler = nil
  end
  @processor&.stop(timeout: 0)
  @processor = nil
  @configuration = nil
  @external_storage = nil
  @after_completion_callbacks = []
  @after_error_callbacks = []
end

.reset_configuration!Configuration

Reset configuration to defaults (useful for testing).

Returns:



82
83
84
85
# File 'lib/patient_http/solid_queue.rb', line 82

def reset_configuration!
  @configuration = nil
  configuration
end

.running?Boolean

Check if the processor is running.

Returns:

  • (Boolean)


106
107
108
# File 'lib/patient_http/solid_queue.rb', line 106

def running?
  !!@processor&.running?
end

.startvoid

This method returns an undefined value.

Start the processor.



169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
# File 'lib/patient_http/solid_queue.rb', line 169

def start
  return if running?

  @processor = PatientHttp::Processor.new(configuration)
  @processor.observe(ProcessorObserver.new(@processor))
  @processor.start

  @request_handler ||= lambda do |request:, callback:, raise_error_responses:, callback_args:|
    execute(
      request,
      callback: callback,
      raise_error_responses: raise_error_responses,
      callback_args: callback_args
    )
  end

  PatientHttp.register_handler(@request_handler)
end

.stop(timeout: nil) ⇒ void

This method returns an undefined value.

Stop the processor gracefully.

Parameters:

  • timeout (Float, nil) (defaults to: nil)

    maximum time to wait for in-flight requests to complete



201
202
203
204
205
206
207
208
209
210
# File 'lib/patient_http/solid_queue.rb', line 201

def stop(timeout: nil)
  return unless @processor

  if @request_handler
    PatientHttp.unregister_handler(@request_handler)
  end

  @processor.stop(timeout: timeout)
  @processor = nil
end

.stopped?Boolean

Check if the processor is stopped.

Returns:

  • (Boolean)


127
128
129
# File 'lib/patient_http/solid_queue.rb', line 127

def stopped?
  @processor.nil? || @processor.stopped?
end

.stopping?Boolean

Check if the processor is stopping.

Returns:

  • (Boolean)


120
121
122
# File 'lib/patient_http/solid_queue.rb', line 120

def stopping?
  !!@processor&.stopping?
end