Class: Thrift::Socket

Inherits:
BaseTransport show all
Defined in:
lib/thrift/transport/socket.rb

Direct Known Subclasses

SSLSocket, UNIXSocket

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from BaseTransport

#flush, #read_all, #read_byte, #read_into_buffer

Constructor Details

#initialize(host = 'localhost', port = 9090, timeout = nil) ⇒ Socket

Returns a new instance of Socket.



25
26
27
28
29
30
31
# File 'lib/thrift/transport/socket.rb', line 25

def initialize(host = 'localhost', port = 9090, timeout = nil)
  @host = host
  @port = port
  @timeout = timeout
  @desc = "#{host}:#{port}"
  @handle = nil
end

Instance Attribute Details

#handleObject

Returns the value of attribute handle.



33
34
35
# File 'lib/thrift/transport/socket.rb', line 33

def handle
  @handle
end

#timeoutObject

Returns the value of attribute timeout.



33
34
35
# File 'lib/thrift/transport/socket.rb', line 33

def timeout
  @timeout
end

Instance Method Details

#closeObject



129
130
131
132
# File 'lib/thrift/transport/socket.rb', line 129

def close
  @handle.close unless @handle.nil? or @handle.closed?
  @handle = nil
end

#openObject

Raises:



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/thrift/transport/socket.rb', line 35

def open
  for addrinfo in ::Socket::getaddrinfo(@host, @port, nil, ::Socket::SOCK_STREAM) do
    begin
      socket = ::Socket.new(addrinfo[4], ::Socket::SOCK_STREAM, 0)
      socket.setsockopt(::Socket::IPPROTO_TCP, ::Socket::TCP_NODELAY, 1)
      sockaddr = ::Socket.sockaddr_in(addrinfo[1], addrinfo[3])
      begin
        socket.connect_nonblock(sockaddr)
      rescue Errno::EINPROGRESS
        unless IO.select(nil, [ socket ], nil, @timeout)
          next
        end
        begin
          socket.connect_nonblock(sockaddr)
        rescue Errno::EISCONN
        end
      end
      return @handle = socket
    rescue StandardError => e
      next
    end
  end
  raise TransportException.new(TransportException::NOT_OPEN, "Could not connect to #{@desc}: #{e}")
end

#open?Boolean

Returns:

  • (Boolean)


60
61
62
# File 'lib/thrift/transport/socket.rb', line 60

def open?
  !@handle.nil? and !@handle.closed?
end

#read(sz) ⇒ Object

Raises:



96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# File 'lib/thrift/transport/socket.rb', line 96

def read(sz)
  raise TransportException.new(TransportException::NOT_OPEN, "closed stream") unless open?

  begin
    if @timeout.nil? or @timeout == 0
      data = @handle.readpartial(sz)
    else
      deadline = Process.clock_gettime(Process::CLOCK_MONOTONIC) + @timeout

      data = loop do
        begin
          break @handle.read_nonblock(sz)
        rescue IO::WaitReadable
          wait_for(:read, deadline, sz)
        rescue IO::WaitWritable
          wait_for(:write, deadline, sz)
        end
      end
    end
  rescue TransportException => e
    # don't let this get caught by the StandardError handler
    raise e
  rescue StandardError => e
    @handle.close unless @handle.closed?
    @handle = nil
    raise TransportException.new(TransportException::NOT_OPEN, e.message)
  end
  if (data.nil? or data.length == 0)
    raise TransportException.new(TransportException::UNKNOWN, "Socket: Could not read #{sz} bytes from #{@desc}")
  end
  data
end

#to_ioObject



134
135
136
# File 'lib/thrift/transport/socket.rb', line 134

def to_io
  @handle&.to_io || raise(IOError, 'closed stream')
end

#to_sObject



138
139
140
# File 'lib/thrift/transport/socket.rb', line 138

def to_s
  "socket(#{@host}:#{@port})"
end

#write(str) ⇒ Object

Raises:



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/thrift/transport/socket.rb', line 64

def write(str)
  raise TransportException.new(TransportException::NOT_OPEN, "closed stream") unless open?
  str = Bytes.force_binary_encoding(str)
  begin
    if @timeout.nil? or @timeout == 0
      @handle.write(str)
    else
      deadline = Process.clock_gettime(Process::CLOCK_MONOTONIC) + @timeout
      len = 0

      while len < str.length
        begin
          len += @handle.write_nonblock(str[len..-1])
        rescue IO::WaitWritable
          wait_for(:write, deadline, str.length)
        rescue IO::WaitReadable
          wait_for(:read, deadline, str.length)
        end
      end

      len
    end
  rescue TransportException => e
    # pass this on
    raise e
  rescue StandardError => e
    @handle.close
    @handle = nil
    raise TransportException.new(TransportException::NOT_OPEN, e.message)
  end
end