Module: PatientHttp::SolidQueue
- Defined in:
- lib/patient_http/solid_queue.rb,
lib/patient_http/solid_queue/engine.rb,
lib/patient_http/solid_queue/record.rb,
lib/patient_http/solid_queue/context.rb,
lib/patient_http/solid_queue/gc_lock.rb,
lib/patient_http/solid_queue/request_job.rb,
lib/patient_http/solid_queue/callback_job.rb,
lib/patient_http/solid_queue/task_handler.rb,
lib/patient_http/solid_queue/task_monitor.rb,
lib/patient_http/solid_queue/configuration.rb,
lib/patient_http/solid_queue/lifecycle_hooks.rb,
lib/patient_http/solid_queue/inflight_request.rb,
lib/patient_http/solid_queue/request_executor.rb,
lib/patient_http/solid_queue/processor_observer.rb,
lib/patient_http/solid_queue/task_monitor_thread.rb,
lib/patient_http/solid_queue/process_registration.rb
Defined Under Namespace
Classes: CallbackJob, Configuration, Context, Engine, GcLock, InflightRequest, LifecycleHooks, ProcessRegistration, ProcessorObserver, Record, RequestExecutor, RequestJob, TaskHandler, TaskMonitor, TaskMonitorThread
Constant Summary collapse
- VERSION =
File.read(File.join(__dir__, "../../VERSION")).strip
Class Attribute Summary collapse
-
.configuration ⇒ Configuration
Return the current configuration, initializing with defaults if necessary.
-
.processor ⇒ PatientHttp::Processor?
private
Returns the processor instance.
Class Method Summary collapse
-
.after_completion {|response| ... } ⇒ Object
Add a callback to be executed after a successful request completion.
-
.after_error {|error| ... } ⇒ Object
Add a callback to be executed after a request error.
-
.configure {|Configuration| ... } ⇒ Configuration
Configure the gem with a block.
-
.decrypt(value) ⇒ Object
Decrypt a value using the configured encryptor.
-
.draining? ⇒ Boolean
Check if the processor is draining (not accepting new requests).
-
.encrypt(value) ⇒ String
Encrypt a value using the configured encryptor.
-
.execute(request, callback:, callback_args: nil, raise_error_responses: false) ⇒ String
Execute an async HTTP request.
-
.external_storage ⇒ PatientHttp::ExternalStorage
private
Get an ExternalStorage instance for storing and fetching payloads.
-
.invoke_completion_callbacks(response) ⇒ void
private
Invoke the registered completion callbacks.
-
.invoke_error_callbacks(error) ⇒ void
private
Invoke the registered error callbacks.
-
.quiet ⇒ void
Signal the processor to drain (stop accepting new requests).
-
.reset! ⇒ void
private
Reset all state (useful for testing).
-
.reset_configuration! ⇒ Configuration
Reset configuration to defaults (useful for testing).
-
.running? ⇒ Boolean
Check if the processor is running.
-
.start ⇒ void
Start the processor.
-
.stop(timeout: nil) ⇒ void
Stop the processor gracefully.
-
.stopped? ⇒ Boolean
Check if the processor is stopped.
-
.stopping? ⇒ Boolean
Check if the processor is stopping.
Class Attribute Details
.configuration ⇒ Configuration
Return the current configuration, initializing with defaults if necessary.
75 76 77 |
# File 'lib/patient_http/solid_queue.rb', line 75 def configuration @configuration ||= Configuration.new end |
.processor ⇒ PatientHttp::Processor?
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns the processor instance.
275 276 277 |
# File 'lib/patient_http/solid_queue.rb', line 275 def processor @processor end |
Class Method Details
.after_completion {|response| ... } ⇒ Object
Add a callback to be executed after a successful request completion.
91 92 93 |
# File 'lib/patient_http/solid_queue.rb', line 91 def after_completion(&block) @after_completion_callbacks << block end |
.after_error {|error| ... } ⇒ Object
Add a callback to be executed after a request error.
99 100 101 |
# File 'lib/patient_http/solid_queue.rb', line 99 def after_error(&block) @after_error_callbacks << block end |
.configure {|Configuration| ... } ⇒ Configuration
Configure the gem with a block.
66 67 68 69 70 |
# File 'lib/patient_http/solid_queue.rb', line 66 def configure configuration = Configuration.new yield(configuration) if block_given? @configuration = configuration end |
.decrypt(value) ⇒ Object
Decrypt a value using the configured encryptor.
267 268 269 |
# File 'lib/patient_http/solid_queue.rb', line 267 def decrypt(value) configuration.encryptor.decrypt(value) end |
.draining? ⇒ Boolean
Check if the processor is draining (not accepting new requests).
113 114 115 |
# File 'lib/patient_http/solid_queue.rb', line 113 def draining? !!@processor&.draining? end |
.encrypt(value) ⇒ String
Encrypt a value using the configured encryptor.
259 260 261 |
# File 'lib/patient_http/solid_queue.rb', line 259 def encrypt(value) configuration.encryptor.encrypt(value) end |
.execute(request, callback:, callback_args: nil, raise_error_responses: false) ⇒ String
Execute an async HTTP request.
147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 |
# File 'lib/patient_http/solid_queue.rb', line 147 def execute(request, callback:, callback_args: nil, raise_error_responses: false) PatientHttp::CallbackValidator.validate!(callback) callback_name = callback.is_a?(Class) ? callback.name : callback.to_s callback_args = PatientHttp::CallbackValidator.validate_callback_args(callback_args) request_id = SecureRandom.uuid encrypted = encrypt(request.as_json) data = if external_storage.enabled? external_storage.store(encrypted, max_size: configuration.payload_store_threshold) else encrypted end RequestJob.perform_later(data, callback_name, raise_error_responses, callback_args, request_id) request_id end |
.external_storage ⇒ PatientHttp::ExternalStorage
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Get an ExternalStorage instance for storing and fetching payloads.
135 136 137 |
# File 'lib/patient_http/solid_queue.rb', line 135 def external_storage @external_storage ||= PatientHttp::ExternalStorage.new(configuration) end |
.invoke_completion_callbacks(response) ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
Invoke the registered completion callbacks.
234 235 236 237 238 239 240 |
# File 'lib/patient_http/solid_queue.rb', line 234 def invoke_completion_callbacks(response) @after_completion_callbacks.each do |callback| callback.call(response) rescue => e configuration.logger&.error("[PatientHttp::SolidQueue] after_completion callback error: #{e.class} - #{e.}") end end |
.invoke_error_callbacks(error) ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
Invoke the registered error callbacks.
247 248 249 250 251 252 253 |
# File 'lib/patient_http/solid_queue.rb', line 247 def invoke_error_callbacks(error) @after_error_callbacks.each do |callback| callback.call(error) rescue => e configuration.logger&.error("[PatientHttp::SolidQueue] after_error callback error: #{e.class} - #{e.}") end end |
.quiet ⇒ void
This method returns an undefined value.
Signal the processor to drain (stop accepting new requests).
191 192 193 194 195 |
# File 'lib/patient_http/solid_queue.rb', line 191 def quiet return unless running? @processor.drain end |
.reset! ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
Reset all state (useful for testing).
216 217 218 219 220 221 222 223 224 225 226 227 |
# File 'lib/patient_http/solid_queue.rb', line 216 def reset! if @request_handler PatientHttp.unregister_handler(@request_handler) @request_handler = nil end @processor&.stop(timeout: 0) @processor = nil @configuration = nil @external_storage = nil @after_completion_callbacks = [] @after_error_callbacks = [] end |
.reset_configuration! ⇒ Configuration
Reset configuration to defaults (useful for testing).
82 83 84 85 |
# File 'lib/patient_http/solid_queue.rb', line 82 def reset_configuration! @configuration = nil configuration end |
.running? ⇒ Boolean
Check if the processor is running.
106 107 108 |
# File 'lib/patient_http/solid_queue.rb', line 106 def running? !!@processor&.running? end |
.start ⇒ void
This method returns an undefined value.
Start the processor.
169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 |
# File 'lib/patient_http/solid_queue.rb', line 169 def start return if running? @processor = PatientHttp::Processor.new(configuration) @processor.observe(ProcessorObserver.new(@processor)) @processor.start @request_handler ||= lambda do |request:, callback:, raise_error_responses:, callback_args:| execute( request, callback: callback, raise_error_responses: raise_error_responses, callback_args: callback_args ) end PatientHttp.register_handler(@request_handler) end |
.stop(timeout: nil) ⇒ void
This method returns an undefined value.
Stop the processor gracefully.
201 202 203 204 205 206 207 208 209 210 |
# File 'lib/patient_http/solid_queue.rb', line 201 def stop(timeout: nil) return unless @processor if @request_handler PatientHttp.unregister_handler(@request_handler) end @processor.stop(timeout: timeout) @processor = nil end |
.stopped? ⇒ Boolean
Check if the processor is stopped.
127 128 129 |
# File 'lib/patient_http/solid_queue.rb', line 127 def stopped? @processor.nil? || @processor.stopped? end |
.stopping? ⇒ Boolean
Check if the processor is stopping.
120 121 122 |
# File 'lib/patient_http/solid_queue.rb', line 120 def stopping? !!@processor&.stopping? end |