Class: Async::IO::Socket
- Inherits:
-
BasicSocket
- Object
- Wrapper
- Generic
- BasicSocket
- Async::IO::Socket
- Includes:
- Server
- Defined in:
- lib/async/io/socket.rb
Constant Summary
Constants inherited from Generic
Instance Attribute Summary
Attributes inherited from Generic
Class Method Summary collapse
-
.accept(*args, backlog: SOMAXCONN, &block) ⇒ Object
Bind to a local address and accept connections in a loop.
-
.bind(local_address, protocol: 0, task: Task.current, **options, &block) ⇒ Object
Bind to a local address.
-
.build(*args, timeout: nil, reuse_address: true, reuse_port: nil, linger: nil, task: Task.current) ⇒ Object
Build and wrap the underlying io.
-
.connect(remote_address, local_address: nil, task: Task.current, **options) ⇒ Object
Establish a connection to a given ‘remote_address`.
- .pair(*args) ⇒ Object
Instance Method Summary collapse
- #accept(timeout: nil, task: Task.current) ⇒ Object
- #accept_nonblock ⇒ Object
- #connect(*args) ⇒ Object
- #connect_nonblock ⇒ Object
- #sysaccept ⇒ Object
Methods included from Server
Methods included from Peer
#connected?, #eof, #eof?, #protocol, #sync, #sync=, #type
Methods inherited from Generic
#<<, #connected?, #dup, #nonblock, #nonblock=, #nonblock?, #read, #readable?, #sysread, #syswrite, #wait, wrap, wrap_blocking_method, wraps, #write
Class Method Details
.accept(*args, backlog: SOMAXCONN, &block) ⇒ Object
Bind to a local address and accept connections in a loop.
162 163 164 165 166 167 168 |
# File 'lib/async/io/socket.rb', line 162 def self.accept(*args, backlog: SOMAXCONN, &block) bind(*args) do |server, task| server.listen(backlog) if backlog server.accept_each(task: task, &block) end end |
.bind(local_address, protocol: 0, task: Task.current, **options, &block) ⇒ Object
Bind to a local address.
141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 |
# File 'lib/async/io/socket.rb', line 141 def self.bind(local_address, protocol: 0, task: Task.current, **, &block) Console.logger.debug(self) {"Binding to #{local_address.inspect}"} wrapper = build(local_address.afamily, local_address.socktype, protocol, **) do |socket| socket.bind(local_address.to_sockaddr) end return wrapper unless block_given? task.async do |task| task.annotate "binding to #{wrapper.local_address.inspect}" begin yield wrapper, task ensure wrapper.close end end end |
.build(*args, timeout: nil, reuse_address: true, reuse_port: nil, linger: nil, task: Task.current) ⇒ Object
Build and wrap the underlying io.
71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
# File 'lib/async/io/socket.rb', line 71 def self.build(*args, timeout: nil, reuse_address: true, reuse_port: nil, linger: nil, task: Task.current) socket = wrapped_klass.new(*args) if reuse_address socket.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) end if reuse_port socket.setsockopt(SOL_SOCKET, SO_REUSEPORT, 1) end if linger socket.setsockopt(SOL_SOCKET, SO_LINGER, linger) end yield socket wrapper = self.new(socket, task.reactor) wrapper.timeout = timeout return wrapper rescue Exception socket.close if socket raise end |
.connect(remote_address, local_address: nil, task: Task.current, **options) ⇒ Object
Establish a connection to a given ‘remote_address`.
103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 |
# File 'lib/async/io/socket.rb', line 103 def self.connect(remote_address, local_address: nil, task: Task.current, **) Console.logger.debug(self) {"Connecting to #{remote_address.inspect}"} task.annotate "connecting to #{remote_address.inspect}" wrapper = build(remote_address.afamily, remote_address.socktype, remote_address.protocol, **) do |socket| if local_address if defined?(IP_BIND_ADDRESS_NO_PORT) # Inform the kernel (Linux 4.2+) to not reserve an ephemeral port when using bind(2) with a port number of 0. The port will later be automatically chosen at connect(2) time, in a way that allows sharing a source port as long as the 4-tuple is unique. socket.setsockopt(SOL_IP, IP_BIND_ADDRESS_NO_PORT, 1) end socket.bind(local_address.to_sockaddr) end end begin wrapper.connect(remote_address.to_sockaddr) task.annotate "connected to #{remote_address.inspect} [fd=#{wrapper.fileno}]" rescue Exception wrapper.close raise end return wrapper unless block_given? begin yield wrapper, task ensure wrapper.close end end |
.pair(*args) ⇒ Object
172 173 174 |
# File 'lib/async/io/socket.rb', line 172 def self.pair(*args) ::Socket.pair(*args).map(&self.method(:new)) end |
Instance Method Details
#accept(timeout: nil, task: Task.current) ⇒ Object
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/async/io/socket.rb', line 46 def accept(timeout: nil, task: Task.current) peer, address = async_send(:accept_nonblock, timeout: timeout) wrapper = Socket.new(peer, task.reactor) wrapper.timeout = self.timeout return wrapper, address unless block_given? task.async do |task| task.annotate "incoming connection #{address.inspect} [fd=#{wrapper.fileno}]" begin yield wrapper, address ensure wrapper.close end end end |
#accept_nonblock ⇒ Object
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 |
# File 'lib/async/io/socket.rb', line 65 def accept(timeout: nil, task: Task.current) peer, address = async_send(:accept_nonblock, timeout: timeout) wrapper = Socket.new(peer, task.reactor) wrapper.timeout = self.timeout return wrapper, address unless block_given? task.async do |task| task.annotate "incoming connection #{address.inspect} [fd=#{wrapper.fileno}]" begin yield wrapper, address ensure wrapper.close end end end |
#connect(*args) ⇒ Object
35 36 37 38 39 40 41 |
# File 'lib/async/io/socket.rb', line 35 def connect(*args) begin async_send(:connect_nonblock, *args) rescue Errno::EISCONN # We are now connected. end end |
#connect_nonblock ⇒ Object
43 44 45 46 47 48 49 |
# File 'lib/async/io/socket.rb', line 43 def connect(*args) begin async_send(:connect_nonblock, *args) rescue Errno::EISCONN # We are now connected. end end |
#sysaccept ⇒ Object
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/async/io/socket.rb', line 66 def accept(timeout: nil, task: Task.current) peer, address = async_send(:accept_nonblock, timeout: timeout) wrapper = Socket.new(peer, task.reactor) wrapper.timeout = self.timeout return wrapper, address unless block_given? task.async do |task| task.annotate "incoming connection #{address.inspect} [fd=#{wrapper.fileno}]" begin yield wrapper, address ensure wrapper.close end end end |