Class: JRPC::Transport::Tcp

Inherits:
Base
  • Object
show all
Defined in:
lib/jrpc/transport/tcp.rb

Constant Summary collapse

ConnectionError =

Pull error classes into this scope so ‘raise ConnectionError` resolves correctly. Without this, Ruby’s constant lookup would find JRPC::ConnectionError (v1) instead.

Base::ConnectionError
Timeout =
Base::Timeout
MalformedFrame =
Base::MalformedFrame
TCP_MD5SIG =

RFC2385 TCP MD5 Signature. TCP_MD5SIG is a Linux socket option (IPPROTO_TCP level); it is absent on platforms that don’t support it, in which case the transport raises ConnectionError when a key is requested. The kernel caps the key at 80 bytes (TCP_MD5SIG_MAXKEYLEN, not exported as a Ruby constant).

defined?(::Socket::TCP_MD5SIG) ? ::Socket::TCP_MD5SIG : nil
TCP_MD5SIG_MAXKEYLEN =
80

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(server, **options) ⇒ Tcp

Returns a new instance of Tcp.



21
22
23
24
25
# File 'lib/jrpc/transport/tcp.rb', line 21

def initialize(server, **options)
  super
  @socket = nil
  @read_buffer = ''.b
end

Instance Attribute Details

#socketObject (readonly)

Returns the value of attribute socket.



115
116
117
# File 'lib/jrpc/transport/tcp.rb', line 115

def socket
  @socket
end

Instance Method Details

#closeObject



101
102
103
104
105
106
107
108
109
# File 'lib/jrpc/transport/tcp.rb', line 101

def close
  begin
    @socket&.close
  rescue StandardError
    nil
  end
  @socket = nil
  @read_buffer = ''.b
end

#closed?Boolean

Returns:

  • (Boolean)


111
112
113
# File 'lib/jrpc/transport/tcp.rb', line 111

def closed?
  @socket.nil? || @socket.closed?
end

#connectObject



27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/jrpc/transport/tcp.rb', line 27

def connect
  deadline = @connect_timeout ? monotonic_now + @connect_timeout : nil
  attempts_remaining = @connect_retry_count + 1
  begin
    connect_once(deadline)
  rescue ConnectionError, Timeout
    attempts_remaining -= 1
    raise if attempts_remaining <= 0
    raise if deadline && remaining_time(deadline) <= 0 # total budget spent

    sleep @connect_retry_interval
    retry
  end
end

#read_frame(timeout:) ⇒ Object

Raises:



70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/jrpc/transport/tcp.rb', line 70

def read_frame(timeout:)
  raise ConnectionError, 'transport closed' if @socket.nil?

  deadline = timeout ? monotonic_now + timeout : nil

  loop do
    result = try_parse_frame
    return result unless result == :wait

    remaining = remaining_time(deadline)
    close_and_raise_timeout!('read') if remaining && remaining <= 0

    readable = @socket.wait_readable(remaining)
    close_and_raise_timeout!('read') unless readable

    fill_buffer
  end
end

#try_read_frameObject

Raises:



89
90
91
92
93
94
95
96
97
98
99
# File 'lib/jrpc/transport/tcp.rb', line 89

def try_read_frame
  raise ConnectionError, 'transport closed' if @socket.nil?

  # Parse first so already-buffered frames survive an EOF on the next read.
  result = try_parse_frame
  return result unless result == :wait

  # fill_buffer swallows EAGAIN (no data yet); the re-parse then returns :wait.
  fill_buffer
  try_parse_frame
end

#write_frame(bytes, timeout:) ⇒ Object

Raises:



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/jrpc/transport/tcp.rb', line 42

def write_frame(bytes, timeout:)
  raise ConnectionError, 'transport closed' if @socket.nil?

  frame = "#{bytes.bytesize}:#{bytes},"
  written = 0
  deadline = timeout ? monotonic_now + timeout : nil

  while written < frame.bytesize
    remaining = remaining_time(deadline)
    close_and_raise_timeout!('write') if remaining && remaining <= 0

    writable = @socket.wait_writable(remaining)
    close_and_raise_timeout!('write') unless writable

    begin
      n = @socket.write_nonblock(frame.byteslice(written..))
      written += n
    rescue IO::WaitWritable
      # socket not ready yet; loop back to IO.select. Rescue the module (not the
      # concrete IO::EAGAINWaitWritable) so IO::EWOULDBLOCKWaitWritable is also
      # caught on platforms where EAGAIN != EWOULDBLOCK (macOS/BSD).
    rescue Errno::EPIPE, Errno::ECONNRESET, IOError => e
      close
      raise ConnectionError, "write failed: #{e.class}: #{e.message}"
    end
  end
end