Module: Noiseless::Adapters::ExecutionModules::HttpTransport

Included in:
EsCompatibleExecution, TypesenseExecution
Defined in:
lib/noiseless/adapters/execution_modules/http_transport.rb

Overview

Shared Async::HTTP connection handling for HTTP-based adapters. Host classes must provide a private default_port method.

Constant Summary collapse

TRANSPORT_ERRORS =

Low-level failures the Async::HTTP stack raises when it cannot complete a round-trip with the backend (refused/reset connection, DNS failure, transport timeout). These are wrapped into Noiseless::ConnectionError so callers never have to know which HTTP stack is underneath.

[
  SystemCallError,
  SocketError,
  IOError,
  IO::TimeoutError,
  Timeout::Error
].freeze
DEFAULT_TIMEOUT =

Default per-operation IO timeout (seconds) for the search backend. This is an idle timeout: every socket read/write must make progress within this window. It bounds a stalled/unresponsive backend without capping the total duration of streaming operations (e.g. bulk import), since data keeps flowing during those. Override per-connection with timeout:.

5

Instance Method Summary collapse

Instance Method Details

#closeObject



52
53
54
# File 'lib/noiseless/adapters/execution_modules/http_transport.rb', line 52

def close
  @clients&.each_value(&:close)
end

#initialize(hosts: [], timeout: DEFAULT_TIMEOUT, **connection_params) ⇒ Object



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/noiseless/adapters/execution_modules/http_transport.rb', line 33

def initialize(hosts: [], timeout: DEFAULT_TIMEOUT, **connection_params)
  # Ensure we always have at least one host
  hosts_array = Array(hosts)
  @hosts = hosts_array.empty? ? ["http://localhost:#{default_port}"] : hosts_array
  @timeout = timeout
  @connection_params = connection_params

  # Initialize HTTP clients for each host. The endpoint timeout makes a
  # stalled backend raise IO::TimeoutError (wrapped below as
  # ConnectionError) instead of blocking the fiber/reactor indefinitely.
  @clients = {}
  @hosts.each do |host|
    endpoint = Async::HTTP::Endpoint.parse(host, timeout: @timeout)
    @clients[host] = Async::HTTP::Client.new(endpoint)
  end

  super(hosts: @hosts, **connection_params)
end