PatientHttp

Continuous Integration Ruby Style Guide Gem Version

Built for APIs that like to think.

Generic async HTTP connection pool for Ruby applications using Fiber-based concurrency.

Motivation

Applications that make HTTP requests from within threaded environments often find that threads block waiting for I/O. A single slow API response holds an entire thread hostage, preventing it from doing other work. When many threads are blocked on HTTP I/O simultaneously, throughput collapses.

PatientHttp solves this by running HTTP requests in a dedicated processor thread that uses Ruby's Fiber scheduler for non-blocking I/O. Application threads hand off HTTP requests to the processor and return immediately. The processor handles hundreds of concurrent HTTP connections using fibers, then notifies the application when responses arrive via a pluggable callback mechanism.

This design keeps application threads free to do other work while HTTP requests are in flight.

In general you will want to use this gem through an integration like patient_http-sidekiq or patient_http-solid_queue. These gems provide a request handler that integrates with their respective job processing systems, allowing you to enqueue HTTP requests directly from your application code without coupling it to the underlying processor implementation. See the integration section for details.

The patient_llm gem provides an integration for making large language model requests asynchronously. This was the original motivation for building PatientHttp because LLM requests can take much longer than typical HTTP requests.

Quick Start

1. Implement a TaskHandler

The TaskHandler is the integration point between the pool and your application. It defines what happens when a request completes, fails, or needs to be retried.

class MyTaskHandler < PatientHttp::TaskHandler
  def initialize(job_id)
    @job_id = job_id
  end

  def on_complete(response, callback)
    # Enqueue a message for your application to process the response.
    # Keep this lightweight -- don't do heavy processing here since
    # it runs on the processor thread.
    MyJobSystem.enqueue(callback, :on_complete, response.as_json)
  end

  def on_error(error, callback)
    MyJobSystem.enqueue(callback, :on_error, error.as_json)
  end

  def retry
    # Re-enqueue the original job for retry when the processor
    # shuts down with in-flight requests
    MyJobSystem.enqueue_job(@job_id)
  end
end

Important: TaskHandler callbacks run on the processor's reactor thread. They should be lightweight and fast -- typically just enqueuing a message for another system to pick up. Doing heavy processing in a callback will block the reactor and delay other in-flight requests.

2. Create and Enqueue Requests

# Configure the processor
config = PatientHttp::Configuration.new(
  max_connections: 256,
  request_timeout: 60
)

# Start the processor
processor = PatientHttp::Processor.new(config)
processor.start

# Build a request
request = PatientHttp::Request.new(
  :get,
  "https://api.example.com/users/123",
  headers: {"Authorization" => "Bearer token"}
)

# Create a task with your handler
task = PatientHttp::RequestTask.new(
  request: request,
  task_handler: MyTaskHandler.new("job-123"),
  callback: "FetchDataCallback",
  callback_args: {user_id: 123}
)

# Enqueue it
processor.enqueue(task)

3. Process Callbacks

When the HTTP request completes, your TaskHandler#on_complete is called with the Response and callback class name. Your handler is responsible for invoking the callback in whatever way makes sense for your application (e.g., enqueuing a background job).

class FetchDataCallback
  def on_complete(response)
    user_id = response.callback_args[:user_id]
    data = response.json
    User.find(user_id).update!(external_data: data)
  end

  def on_error(error)
    user_id = error.callback_args[:user_id]
    Rails.logger.error("Failed for user #{user_id}: #{error.message}")
  end
end

Handling HTTP Error Responses

By default, HTTP error status codes (4xx, 5xx) are treated as completed requests. You can check the status using helper methods on the response:

def on_complete(response)
  if response.success?         # 2xx
    process_data(response.json)
  elsif response.client_error? # 4xx
    handle_client_error(response)
  elsif response.server_error? # 5xx
    handle_server_error(response)
  end
end

To treat non-2xx responses as errors instead, set raise_error_responses: true on the RequestTask:

task = PatientHttp::RequestTask.new(
  request: request,
  task_handler: handler,
  callback: "ApiCallback",
  raise_error_responses: true
)

When enabled, non-2xx responses call TaskHandler#on_error with an HttpError that provides access to the response:

def on_error(error)
  if error.is_a?(PatientHttp::HttpError)
    puts error.status           # HTTP status code
    puts error.url              # Request URL
    puts error.response.body    # Response body
  end
end

Request Templates

For repeated requests to the same API, use RequestTemplate to share configuration:

template = PatientHttp::RequestTemplate.new(
  base_url: "https://api.example.com",
  headers: {"Authorization" => "Bearer #{ENV['API_KEY']}"},
  timeout: 60
)

# Build requests from the template
get_request = template.get("/users/123")
post_request = template.post("/users", json: {name: "John"})

Templates support all HTTP methods (get, post, put, patch, delete) and handle URL joining, header merging, and query parameter encoding.

Standard Interface

The PatientHttp module provides a standard interface for building and dispatching requests without needing to directly interact with the processor or task handlers. This allows you to write application code that makes HTTP requests without coupling it to the underlying async processing infrastructure.

You will need to register a request handler with PatientHttp.register_handler that defines how requests are dispatched to your job queue or background processing system. Once registered, you can use the PatientHttp class methods or the RequestHelper mixin to make async HTTP requests with callbacks.

# The handler receives keyword arguments for the request, callback, and any additional callback arguments.
PatientHttp.register_handler do |request:, callback:, callback_args: nil, raise_error_responses: nil|
  # Example integration point. Adapt this to your app.
  # Build a RequestTask and enqueue it to your processor.
  task = PatientHttp::RequestTask.new(
    request: request,
    task_handler: MyTaskHandler.new,
    callback: callback,
    callback_args: callback_args,
    raise_error_responses: raise_error_responses
  )

  processor.enqueue(task)
end

# Now you can make requests directly through the PatientHttp interface with the .request,
# .get, .post, .patch, .put, and .delete class methods:
PatientHttp.get(
  "https://api.example.com/users/123",
  callback: FetchUserCallback,
  callback_args: {user_id: 123}
)

If you are using the patient_http-sidekiq gem or the patient_http-solid_queue gem, the appropriate handler will automatically be registered for you.

RequestHelper Mixin

Use PatientHttp::RequestHelper when you want a simple API for creating and dispatching async HTTP requests directly from your class.

  1. Register a request handler with PatientHttp.register_handler that defines how requests are dispatched to your job queue or background processing system.
  2. Include PatientHttp::RequestHelper in your class.
  3. Optionally define a request_template for shared base_url, headers, and timeout.
  4. Call async_get, async_post, async_put, async_patch, async_delete, or async_request.
class ApiClient
  include PatientHttp::RequestHelper

  request_template(
    base_url: "https://api.example.com",
    headers: {"Authorization" => "Bearer #{ENV["API_KEY"]}"},
    timeout: 60
  )

  def fetch_user(user_id)
    async_get(
      "/users/#{user_id}",
      callback: FetchUserCallback,
      callback_args: {user_id: user_id}
    )
  end

  def update_user(user_id, data)
    async_patch(
      "/users/#{user_id}",
      json: data,
      callback: UpdateUserCallback,
      callback_args: {user_id: user_id}
    )
  end
end

Callback Arguments

Pass custom data through the request/response cycle using callback_args:

task = PatientHttp::RequestTask.new(
  request: request,
  task_handler: handler,
  callback: "FetchDataCallback",
  callback_args: {user_id: 123, request_timestamp: Time.now.iso8601}
)

Callback arguments are available on both Response and Error objects:

response.callback_args[:user_id]    # Symbol access
response.callback_args["user_id"]   # String access

Callback args must contain only JSON-native types (nil, true, false, String, Integer, Float, Array, Hash). Hash keys are converted to strings for serialization.

Response and Error Objects

The PatientHttp::Response and error objects are designed to be serializable and deserializable as JSON, making them safe to pass through job queues and across process boundaries. This allows you to enqueue the response or error data in your TaskHandler callbacks and process them asynchronously in another context.

Both response and error objects provide as_json and to_json methods for serialization:

def on_complete(response, callback)
  # Serialize the response for background processing
  MyJobSystem.enqueue(callback, :on_complete, response.as_json)
end

def on_error(error, callback)
  # Serialize the error for background processing
  MyJobSystem.enqueue(callback, :on_error, error.as_json)
end

When deserializing, use the load class methods to reconstruct the objects:

response = PatientHttp::Response.load(json_data)
error = PatientHttp::HttpError.load(json_data)

The Response object includes the HTTP status code, headers, body, and callback arguments. Error objects (HttpError, RedirectError, RequestError) include the error message, context about the request, and callback arguments.

Response bodies are automatically encoded for JSON serialization. Binary content is Base64 encoded, and large text content is gzipped and then Base64 encoded to reduce payload size. Decoding is handled transparently when you access the body or json methods on the Response object.

Payload Stores

For large request/response payloads, you can configure external storage to keep serialized JSON payloads small. Payloads exceeding the configured threshold are automatically stored externally and fetched on demand.

If you are using a job queue or background processing system, this allows you to handle large requests or responses without hitting size limits or memory constraints on queue message payloads. The use of external storage is transparent to your application code.

# Register a payload store (see below for options; the file adapter should only be used for development/testing)
config.register_payload_store(:my_store, adapter: :file, directory: "/tmp/payloads")

# Use the ExternalStorage class to set and fetch stored payloads in your callbacks.
storage = PatientHttp::ExternalStorage.new(config)

large_response_data = storage.store(large_response.as_json)
# Returns a reference like: {"$ref" => {"store" => "my_store", "key" => "abc123"}}

small_response_data = storage.store(small_response.as_json, max_size: 1024)
# Will not store the payload and returns the original data hash if the JSON payload is under 1KB.

storage.storage_ref?(large_response_data) # => true
storage.storage_ref?(small_response_data) # => false

storage.fetch(large_response_data) # Fetches the original data from the store
storage.fetch(small_response_data) # Raises an error since this is not a reference

storage.delete(large_response_data) # Deletes the stored payload

File Store

For development and testing:

config.register_payload_store(:files, adapter: :file, directory: "/tmp/payloads")

Redis Store

For production with shared state across processes:

redis = RedisClient.new(url: ENV["REDIS_URL"])
config.register_payload_store(:redis, adapter: :redis, redis: redis, ttl: 86400)

Options: redis: (required), ttl: (seconds, optional), key_prefix: (default: "patient_http:payloads:")

S3 Store

For durable storage across instances (requires aws-sdk-s3 gem):

s3 = Aws::S3::Resource.new
bucket = s3.bucket("my-payloads-bucket")
config.register_payload_store(:s3, adapter: :s3, bucket: bucket)

Options: bucket: (required), key_prefix: (default: "patient_http/payloads/")

ActiveRecord Store

For database-backed storage with transactional guarantees:

config.register_payload_store(:database, adapter: :active_record)

This requires a database migration. Copy the migration from the gem:

# db/migrate/XXXXXX_create_patient_http_payloads.rb
class CreatePatientHttpPayloads < ActiveRecord::Migration[7.0]
  def change
    create_table :patient_http_payloads, id: false do |t|
      t.string :key, null: false, limit: 36
      t.text :data, null: false
      t.timestamps
    end

    add_index :patient_http_payloads, :key, unique: true
    add_index :patient_http_payloads, :created_at
  end
end

Options: model: (optional, defaults to built-in PatientHttp::PayloadStore::ActiveRecordStore::Payload)

Custom Stores

Implement your own by subclassing PatientHttp::PayloadStore::Base:

class MyStore < PatientHttp::PayloadStore::Base
  register :my_store, self

  def store(key, data)
    # Store the hash and return the key
  end

  def fetch(key)
    # Return the hash or nil if not found
  end

  def delete(key)
    # Delete the data (idempotent)
  end
end

config.register_payload_store(:custom, adapter: :my_store, **options)

Multiple stores can be registered for migration purposes. The last registered store is used for new writes; all registered stores remain available for reads.

Encryption

When using PatientHttp with a job queue system, request and response data is serialized into the queue (Redis, database, etc.). If this data contains sensitive information, you should encrypt it.

PatientHttp provides encryption helpers through the Configuration object, but it is up to the TaskHandler implementation to ensure that serialized data is actually encrypted. If you are using an integration gem like patient_http-sidekiq or patient_http-solid_queue, the TaskHandler provided by the gem handles encryption automatically — you just need to configure the encryption key or callables on the Configuration object.

If you are writing a custom TaskHandler, use Configuration#encryptor as the helper and call encrypt / decrypt explicitly wherever your handler serializes or deserializes data.

Using an encryption key

The simplest option is encryption_key=, which sets up ActiveSupport::MessageEncryptor automatically using AES-256-GCM:

config = PatientHttp::Configuration.new
config.encryption_key = ENV["PATIENT_HTTP_ENCRYPTION_KEY"]

To support key rotation, pass an array — the first key encrypts new data, and all keys attempt decryption:

config.encryption_key = [ENV["PATIENT_HTTP_ENCRYPTION_KEY"], ENV["PATIENT_HTTP_OLD_KEY"]]

Using custom callables

For custom encryption libraries, provide callables that accept and return raw bytes (String):

config.encryption { |bytes| MyEncryption.encrypt(bytes) }
config.decryption { |bytes| MyEncryption.decrypt(bytes) }

Or pass any object that responds to #call:

config.encryption(->(bytes) { MyEncryption.encrypt(bytes) })
config.decryption(->(bytes) { MyEncryption.decrypt(bytes) })

Wiring encryption into a custom TaskHandler

If you are writing your own TaskHandler (rather than using one from an integration gem), you must wire in encryption yourself. Configuration#encryptor returns an Encryptor built from the configured callables. Call it directly at every serialization boundary:

class MyTaskHandler < PatientHttp::TaskHandler
  def initialize(job_id, configuration:)
    @job_id = job_id
    @configuration = configuration
  end

  def on_complete(response, callback)
    # Encrypt the serialized response before enqueuing
    encrypted = @configuration.encryptor.encrypt(response.as_json)
    MyJobSystem.enqueue(callback, :on_complete, encrypted)
  end

  def on_error(error, callback)
    encrypted = @configuration.encryptor.encrypt(error.as_json)
    MyJobSystem.enqueue(callback, :on_error, encrypted)
  end

  def retry
    MyJobSystem.enqueue_job(@job_id)
  end
end

# Keep a configuration reference and use config.encryptor where needed
handler = MyTaskHandler.new("job-123", configuration: config)

In your callback, decrypt before processing:

class FetchDataCallback
  def initialize(configuration:)
    @configuration = configuration
  end

  def on_complete(data)
    response = PatientHttp::Response.load(@configuration.encryptor.decrypt(data))
    # ...
  end
end

How it works

Encrypted data is stored as {"__encrypted__" => true, "value" => "<base64>"}. The Encryptor JSON-serializes the original hash, passes the bytes to your callable, and Base64-encodes the result. Decryption reverses the process. Hashes without the "__encrypted__" key are passed through unchanged, so un-encrypted historical data continues to work while you roll out encryption.

Configuration

config = PatientHttp::Configuration.new(
  # Maximum concurrent HTTP requests (default: 256)
  max_connections: 256,

  # Default timeout for HTTP requests in seconds (default: 60)
  request_timeout: 60,

  # Timeout for graceful shutdown in seconds (default: 30)
  shutdown_timeout: 30,

  # Maximum response body size in bytes (default: 1MB)
  max_response_size: 1024 * 1024,

  # Default User-Agent header (default: "PatientHttp")
  user_agent: "MyApp/1.0",

  # Treat non-2xx responses as errors by default (default: false)
  raise_error_responses: false,

  # Maximum redirects to follow (default: 5, 0 disables)
  max_redirects: 5,

  # Maximum number of hosts to maintain persistent connections for (default: 100)
  connection_pool_size: 100,

  # Connection timeout in seconds (default: nil, uses request_timeout)
  connection_timeout: 10,

  # HTTP/HTTPS proxy URL (default: nil)
  proxy_url: "http://proxy.example.com:8080",

  # Retries for failed requests (default: 3)
  retries: 3,

  # Logger instance (default: Logger to STDERR at ERROR level)
  logger: Logger.new($stdout)
)

Tuning Tips

  • max_connections: Each connection uses memory and file descriptors. A tuned system can handle thousands.
  • request_timeout: Set based on expected API response times. AI/LLM APIs may need minutes.
  • connection_pool_size: Increase for applications calling many different API hosts.
  • max_response_size: Keeps memory usage bounded. Large responses may need external payload storage.

Processor Lifecycle

The processor transitions through these states:

stopped -> starting -> running -> draining -> stopping -> stopped
  • stopped: Not processing requests
  • starting: Initializing the reactor thread
  • running: Accepting and processing requests
  • draining: Rejecting new requests, completing in-flight ones
  • stopping: Shutting down, re-enqueuing incomplete requests
processor = PatientHttp::Processor.new(config)

processor.start              # Start processing
processor.running?           # => true

processor.drain              # Stop accepting new requests
processor.draining?          # => true

processor.stop(timeout: 25)  # Graceful shutdown
processor.stopped?           # => true

When the processor stops with in-flight requests, it calls TaskHandler#retry on each incomplete task so they can be re-enqueued.

Observing the Processor

Register observers to monitor processor events:

class MetricsObserver < PatientHttp::ProcessorObserver
  def request_start(request_task)
    StatsD.increment("http_pool.request.start")
  end

  def request_end(request_task)
    StatsD.timing("http_pool.request.duration", request_task.duration * 1000)
  end

  def request_error(error)
    StatsD.increment("http_pool.request.error")
  end

  def capacity_exceeded
    StatsD.increment("http_pool.capacity_exceeded")
  end
end

processor.observe(MetricsObserver.new)

Testing

Use SynchronousExecutor to execute requests synchronously in tests. This class can be used in place of the async processor for testing your request handling logic without needing to start the full async infrastructure.

It is integrated automatically in the patient_http-sidekiq and patient_http-solid_queue gems.

task = PatientHttp::RequestTask.new(
  request: request,
  task_handler: handler,
  callback: "MyCallback"
)

executor = PatientHttp::SynchronousExecutor.new(
  task,
  config: config,
  on_complete: ->(response) { StatsD.increment("complete") },
  on_error: ->(error) { StatsD.increment("error") }
)

executor.call

Integration

For Sidekiq integration, see the patient_http-sidekiq gem which provides workers, lifecycle hooks, crash recovery, and a Web UI built on this library.

For Solid Queue integration, see the patient_http-solid_queue gem which provides similar functionality for Solid Queue.

When using an integration gem, you can use the standard interface to make requests without coupling your code to the underlying processor or task handler implementations.

For large language model (LLM) requests, see the patient_llm gem which provides an integration for making LLM requests asynchronously via a variety of protocols.

Installation

Add this line to your application's Gemfile:

gem "patient_http"

Then execute:

bundle install

Contributing

Open a pull request on GitHub.

Please use the standardrb syntax and lint your code with standardrb --fix before submitting.

The patient_http-sidekiq and patient_http-solid_queue gems each provide a test application for integration testing.

Further Reading

License

The gem is available as open source under the terms of the MIT License.