Module: Noiseless::Adapters::ExecutionModules::HttpTransport
- Included in:
- EsCompatibleExecution, TypesenseExecution
- Defined in:
- lib/noiseless/adapters/execution_modules/http_transport.rb
Overview
Shared Async::HTTP connection handling for HTTP-based adapters. Host classes must provide a private default_port method.
Constant Summary collapse
- TRANSPORT_ERRORS =
Low-level failures the Async::HTTP stack raises when it cannot complete a round-trip with the backend (refused/reset connection, DNS failure, transport timeout). These are wrapped into Noiseless::ConnectionError so callers never have to know which HTTP stack is underneath.
[ SystemCallError, SocketError, IOError, IO::TimeoutError, Timeout::Error ].freeze
- DEFAULT_TIMEOUT =
Default per-operation IO timeout (seconds) for the search backend. This is an idle timeout: every socket read/write must make progress within this window. It bounds a stalled/unresponsive backend without capping the total duration of streaming operations (e.g. bulk import), since data keeps flowing during those. Override per-connection with
timeout:. 5
Instance Method Summary collapse
Instance Method Details
#close ⇒ Object
52 53 54 |
# File 'lib/noiseless/adapters/execution_modules/http_transport.rb', line 52 def close @clients&.each_value(&:close) end |
#initialize(hosts: [], timeout: DEFAULT_TIMEOUT, **connection_params) ⇒ Object
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/noiseless/adapters/execution_modules/http_transport.rb', line 33 def initialize(hosts: [], timeout: DEFAULT_TIMEOUT, **connection_params) # Ensure we always have at least one host hosts_array = Array(hosts) @hosts = hosts_array.empty? ? ["http://localhost:#{default_port}"] : hosts_array @timeout = timeout @connection_params = connection_params # Initialize HTTP clients for each host. The endpoint timeout makes a # stalled backend raise IO::TimeoutError (wrapped below as # ConnectionError) instead of blocking the fiber/reactor indefinitely. @clients = {} @hosts.each do |host| endpoint = Async::HTTP::Endpoint.parse(host, timeout: @timeout) @clients[host] = Async::HTTP::Client.new(endpoint) end super(hosts: @hosts, **connection_params) end |