Class: Async::IO::SharedEndpoint
- Defined in:
- lib/async/io/shared_endpoint.rb
Overview
Pre-connect and pre-bind sockets so that it can be used between processes.
Instance Attribute Summary collapse
-
#endpoint ⇒ Object
readonly
Returns the value of attribute endpoint.
-
#wrappers ⇒ Object
readonly
Returns the value of attribute wrappers.
Attributes inherited from Endpoint
Class Method Summary collapse
-
.bound(endpoint, backlog: Socket::SOMAXCONN, close_on_exec: false, **options) ⇒ Object
Create a new ‘SharedEndpoint` by binding to the given endpoint.
-
.connected(endpoint, close_on_exec: false) ⇒ Object
Create a new ‘SharedEndpoint` by connecting to the given endpoint.
Instance Method Summary collapse
- #accept(backlog = nil, **options, &block) ⇒ Object
- #bind ⇒ Object
-
#close ⇒ Object
Close all the internal wrappers.
- #connect ⇒ Object
-
#initialize(endpoint, wrappers, **options) ⇒ SharedEndpoint
constructor
A new instance of SharedEndpoint.
- #local_address_endpoint(**options) ⇒ Object
- #remote_address_endpoint(**options) ⇒ Object
- #to_s ⇒ Object
Methods inherited from Endpoint
#bound, composite, #connected, #each, each, #hostname, #linger, #local_address, parse, #reuse_address, #reuse_port, socket, ssl, tcp, #timeout, try_convert, udp, unix, #with
Constructor Details
#initialize(endpoint, wrappers, **options) ⇒ SharedEndpoint
Returns a new instance of SharedEndpoint.
43 44 45 46 47 48 |
# File 'lib/async/io/shared_endpoint.rb', line 43 def initialize(endpoint, wrappers, **) super(**) @endpoint = endpoint @wrappers = wrappers end |
Instance Attribute Details
#endpoint ⇒ Object (readonly)
Returns the value of attribute endpoint.
50 51 52 |
# File 'lib/async/io/shared_endpoint.rb', line 50 def endpoint @endpoint end |
#wrappers ⇒ Object (readonly)
Returns the value of attribute wrappers.
51 52 53 |
# File 'lib/async/io/shared_endpoint.rb', line 51 def wrappers @wrappers end |
Class Method Details
.bound(endpoint, backlog: Socket::SOMAXCONN, close_on_exec: false, **options) ⇒ Object
Create a new ‘SharedEndpoint` by binding to the given endpoint.
15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
# File 'lib/async/io/shared_endpoint.rb', line 15 def self.bound(endpoint, backlog: Socket::SOMAXCONN, close_on_exec: false, **) sockets = Array(endpoint.bind(**)) wrappers = sockets.each do |server| # This is somewhat optional. We want to have a generic interface as much as possible so that users of this interface can just call it without knowing a lot of internal details. Therefore, we ignore errors here if it's because the underlying socket does not support the operation. begin server.listen(backlog) rescue Errno::EOPNOTSUPP # Ignore. end server.close_on_exec = close_on_exec server.reactor = nil end return self.new(endpoint, wrappers) end |
.connected(endpoint, close_on_exec: false) ⇒ Object
Create a new ‘SharedEndpoint` by connecting to the given endpoint.
34 35 36 37 38 39 40 41 |
# File 'lib/async/io/shared_endpoint.rb', line 34 def self.connected(endpoint, close_on_exec: false) wrapper = endpoint.connect wrapper.close_on_exec = close_on_exec wrapper.reactor = nil return self.new(endpoint, [wrapper]) end |
Instance Method Details
#accept(backlog = nil, **options, &block) ⇒ Object
105 106 107 108 109 |
# File 'lib/async/io/shared_endpoint.rb', line 105 def accept(backlog = nil, **, &block) bind do |server| server.accept_each(**, &block) end end |
#bind ⇒ Object
76 77 78 79 80 81 82 83 84 85 |
# File 'lib/async/io/shared_endpoint.rb', line 76 def bind task = Async::Task.current @wrappers.each do |server| task.async do |task| task.annotate "binding to #{server.inspect}" yield server, task end end end |
#close ⇒ Object
Close all the internal wrappers.
71 72 73 74 |
# File 'lib/async/io/shared_endpoint.rb', line 71 def close @wrappers.each(&:close) @wrappers.clear end |
#connect ⇒ Object
87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 |
# File 'lib/async/io/shared_endpoint.rb', line 87 def connect task = Async::Task.current @wrappers.each do |peer| peer = peer.dup task.async do |task| task.annotate "connected to #{peer.inspect} [#{peer.fileno}]" begin yield peer, task ensure peer.close end end end end |
#local_address_endpoint(**options) ⇒ Object
53 54 55 56 57 58 59 60 |
# File 'lib/async/io/shared_endpoint.rb', line 53 def local_address_endpoint(**) endpoints = @wrappers.map do |wrapper| # Forward the options to the internal endpoints: AddressEndpoint.new(wrapper.to_io.local_address, **) end return CompositeEndpoint.new(endpoints) end |
#remote_address_endpoint(**options) ⇒ Object
62 63 64 65 66 67 68 |
# File 'lib/async/io/shared_endpoint.rb', line 62 def remote_address_endpoint(**) endpoints = @wrappers.map do |wrapper| AddressEndpoint.new(wrapper.to_io.remote_address) end return CompositeEndpoint.new(endpoints, **) end |
#to_s ⇒ Object
111 112 113 |
# File 'lib/async/io/shared_endpoint.rb', line 111 def to_s "\#<#{self.class} #{@wrappers.size} descriptors for #{@endpoint}>" end |