Class: Pulsar::Internal::TcpTransport
- Inherits:
-
Object
- Object
- Pulsar::Internal::TcpTransport
- Defined in:
- lib/pulsar/internal/tcp_transport.rb
Overview
Plain TCP transport for reading and writing broker frames.
Class Method Summary collapse
Instance Method Summary collapse
- #close ⇒ Object
- #closed? ⇒ Boolean
-
#initialize(socket) ⇒ TcpTransport
constructor
A new instance of TcpTransport.
- #read_exact(size, timeout:) ⇒ Object
- #write(bytes) ⇒ Object
Constructor Details
#initialize(socket) ⇒ TcpTransport
Returns a new instance of TcpTransport.
17 18 19 20 21 |
# File 'lib/pulsar/internal/tcp_transport.rb', line 17 def initialize(socket) @socket = socket @mutex = Mutex.new @closed = false end |
Class Method Details
.connect(host:, port:, connection_timeout:) ⇒ Object
10 11 12 13 14 15 |
# File 'lib/pulsar/internal/tcp_transport.rb', line 10 def self.connect(host:, port:, connection_timeout:) socket = Socket.tcp(host, port, connect_timeout: connection_timeout) new(socket) rescue SystemCallError, SocketError, IOError => e raise ConnectionError, "failed to connect to #{host}:#{port}: #{e.}" end |
Instance Method Details
#close ⇒ Object
53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/pulsar/internal/tcp_transport.rb', line 53 def close socket = @mutex.synchronize do return nil if @closed @closed = true @socket end socket.close unless socket.closed? nil end |
#closed? ⇒ Boolean
65 66 67 |
# File 'lib/pulsar/internal/tcp_transport.rb', line 65 def closed? @mutex.synchronize { @closed } end |
#read_exact(size, timeout:) ⇒ Object
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/pulsar/internal/tcp_transport.rb', line 31 def read_exact(size, timeout:) ensure_open! Timeout.timeout(timeout, TimeoutError) do buffer = +'' buffer.force_encoding(Encoding::BINARY) while buffer.bytesize < size chunk = @socket.read(size - buffer.bytesize) raise ConnectionError, 'socket closed while reading' if chunk.nil? buffer << chunk end buffer end rescue TimeoutError raise TimeoutError, 'operation timed out' rescue SystemCallError, IOError => e raise ConnectionError, "failed to read from socket: #{e.}" end |
#write(bytes) ⇒ Object
23 24 25 26 27 28 29 |
# File 'lib/pulsar/internal/tcp_transport.rb', line 23 def write(bytes) ensure_open! @socket.write(String(bytes).b) nil rescue SystemCallError, IOError => e raise ConnectionError, "failed to write to socket: #{e.}" end |