Class: Async::Bus::Protocol::Connection
- Inherits:
-
Object
- Object
- Async::Bus::Protocol::Connection
- Defined in:
- lib/async/bus/protocol/connection.rb
Overview
Represents a connection between client and server for message passing.
Defined Under Namespace
Instance Attribute Summary collapse
-
#objects ⇒ Object
readonly
Returns the value of attribute objects.
-
#packer ⇒ Object
readonly
Returns the value of attribute packer.
-
#proxies ⇒ Object
readonly
Returns the value of attribute proxies.
- #The message packer.(messagepacker.) ⇒ Object readonly
- #The message unpacker.(messageunpacker.) ⇒ Object readonly
-
#timeout ⇒ Object
Returns the value of attribute timeout.
-
#transactions ⇒ Object
readonly
Returns the value of attribute transactions.
-
#unpacker ⇒ Object
readonly
Returns the value of attribute unpacker.
Class Method Summary collapse
-
.client(peer, **options) ⇒ Object
Create a client-side connection.
-
.server(peer, **options) ⇒ Object
Create a server-side connection.
Instance Method Summary collapse
-
#[](name) ⇒ Object
Generate a proxy for a remotely bound object.
-
#[]=(name, object) ⇒ Object
Explicitly bind an object to a name, such that it could be accessed remotely.
- #Active transactions.=(transactions. = (value)) ⇒ Object
-
#bind(name, object) ⇒ Object
Explicitly bind an object to a name, such that it could be accessed remotely.
-
#close ⇒ Object
Close the connection and clean up resources.
-
#flush ⇒ Object
Flush the packer buffer.
-
#initialize(peer, id, wrapper: Wrapper, timeout: nil) ⇒ Connection
constructor
Initialize a new connection.
-
#inspect ⇒ Object
Return a string representation of the connection.
-
#invoke(name, arguments, options = {}, &block) ⇒ Object
Invoke a remote procedure.
-
#next_id ⇒ Object
Get the next transaction ID.
-
#proxy(object) ⇒ Object
Implicitly bind an object with a temporary name, such that it could be accessed remotely.
-
#proxy_name(object) ⇒ Object
Implicitly bind an object with a temporary name, such that it could be accessed remotely.
-
#proxy_object(name) ⇒ Object
Get an object or proxy for a bound object, handling reverse lookup.
-
#run(parent: Task.current) ⇒ Object
Run the connection message loop.
-
#send_release(name) ⇒ Object
Send a release message for a named object.
- #The bound objects.=(boundobjects. = (value)) ⇒ Object
- #The proxy cache.=(proxycache. = (value)) ⇒ Object
- #The timeout for transactions.=(timeout) ⇒ Object
-
#transaction!(id = self.next_id) ⇒ Object
Create a new transaction.
-
#write(message) ⇒ Object
Write a message to the connection.
Constructor Details
#initialize(peer, id, wrapper: Wrapper, timeout: nil) ⇒ Connection
Initialize a new connection.
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/async/bus/protocol/connection.rb', line 48 def initialize(peer, id, wrapper: Wrapper, timeout: nil) @peer = peer @id = id @wrapper = wrapper.new(self) @unpacker = @wrapper.unpacker(peer) @packer = @wrapper.packer(peer) @timeout = timeout @transactions = {} @objects = {} @proxies = ::ObjectSpace::WeakMap.new @finalized = ::Thread::Queue.new end |
Instance Attribute Details
#objects ⇒ Object (readonly)
Returns the value of attribute objects.
97 98 99 |
# File 'lib/async/bus/protocol/connection.rb', line 97 def objects @objects end |
#packer ⇒ Object (readonly)
Returns the value of attribute packer.
106 107 108 |
# File 'lib/async/bus/protocol/connection.rb', line 106 def packer @packer end |
#proxies ⇒ Object (readonly)
Returns the value of attribute proxies.
100 101 102 |
# File 'lib/async/bus/protocol/connection.rb', line 100 def proxies @proxies end |
#The message packer.(messagepacker.) ⇒ Object (readonly)
106 |
# File 'lib/async/bus/protocol/connection.rb', line 106 attr :packer |
#The message unpacker.(messageunpacker.) ⇒ Object (readonly)
103 |
# File 'lib/async/bus/protocol/connection.rb', line 103 attr :unpacker |
#timeout ⇒ Object
Returns the value of attribute timeout.
66 67 68 |
# File 'lib/async/bus/protocol/connection.rb', line 66 def timeout @timeout end |
#transactions ⇒ Object (readonly)
Returns the value of attribute transactions.
118 119 120 |
# File 'lib/async/bus/protocol/connection.rb', line 118 def transactions @transactions end |
#unpacker ⇒ Object (readonly)
Returns the value of attribute unpacker.
103 104 105 |
# File 'lib/async/bus/protocol/connection.rb', line 103 def unpacker @unpacker end |
Class Method Details
.client(peer, **options) ⇒ Object
Create a client-side connection.
31 32 33 |
# File 'lib/async/bus/protocol/connection.rb', line 31 def self.client(peer, **) self.new(peer, 1, **) end |
.server(peer, **options) ⇒ Object
Create a server-side connection.
39 40 41 |
# File 'lib/async/bus/protocol/connection.rb', line 39 def self.server(peer, **) self.new(peer, 2, **) end |
Instance Method Details
#[](name) ⇒ Object
Generate a proxy for a remotely bound object.
**This always returns a proxy, even if the object is bound locally.** The object bus is not shared between client and server, so ‘[]` always returns a proxy to the remote instance.
152 153 154 |
# File 'lib/async/bus/protocol/connection.rb', line 152 def [](name) return proxy_for(name) end |
#[]=(name, object) ⇒ Object
Explicitly bind an object to a name, such that it could be accessed remotely.
This is the same as #bind but due to the semantics of the ‘[]=` operator, it does not return a proxy instance.
Explicitly bound objects are not garbage collected until the connection is closed.
140 141 142 |
# File 'lib/async/bus/protocol/connection.rb', line 140 def []=(name, object) @objects[name] = Explicit.new(object) end |
#Active transactions.=(transactions. = (value)) ⇒ Object
118 |
# File 'lib/async/bus/protocol/connection.rb', line 118 attr :transactions |
#bind(name, object) ⇒ Object
169 170 171 172 173 174 175 |
# File 'lib/async/bus/protocol/connection.rb', line 169 def bind(name, object) # Bind the object into the local object store (explicitly bound, not temporary): @objects[name] = Explicit.new(object) # Always return a proxy for passing by reference, even for locally bound objects: return proxy_for(name) end |
#close ⇒ Object
Close the connection and clean up resources.
82 83 84 85 86 87 88 |
# File 'lib/async/bus/protocol/connection.rb', line 82 def close @transactions.each do |id, transaction| transaction.close end @peer.close end |
#flush ⇒ Object
Flush the packer buffer.
69 70 71 |
# File 'lib/async/bus/protocol/connection.rb', line 69 def flush @packer.flush end |
#inspect ⇒ Object
Return a string representation of the connection.
92 93 94 |
# File 'lib/async/bus/protocol/connection.rb', line 92 def inspect "#<#{self.class} #{@objects.size} objects>" end |
#invoke(name, arguments, options = {}, &block) ⇒ Object
Invoke a remote procedure.
270 271 272 273 274 275 276 |
# File 'lib/async/bus/protocol/connection.rb', line 270 def invoke(name, arguments, = {}, &block) transaction = self.transaction! transaction.invoke(name, arguments, , &block) ensure transaction&.close end |
#next_id ⇒ Object
Get the next transaction ID.
110 111 112 113 114 115 |
# File 'lib/async/bus/protocol/connection.rb', line 110 def next_id id = @id @id += 2 return id end |
#proxy(object) ⇒ Object
Implicitly bind an object with a temporary name, such that it could be accessed remotely.
Implicitly bound objects are garbage collected when the remote end no longer references them.
This method is simliar to #bind but is designed to be used to generate temporary proxies for objects that are not explicitly bound.
185 186 187 188 189 190 191 192 193 |
# File 'lib/async/bus/protocol/connection.rb', line 185 def proxy(object) name = object.__id__ # Bind the object into the local object store (temporary): @objects[name] ||= Implicit.new(object) # Always return a proxy for passing by reference: return proxy_for(name) end |
#proxy_name(object) ⇒ Object
Implicitly bind an object with a temporary name, such that it could be accessed remotely.
Implicitly bound objects are garbage collected when the remote end no longer references them.
This method is similar to #proxy but is designed to be used to generate temporary names for objects that are not explicitly bound during serialization.
203 204 205 206 207 208 209 210 211 |
# File 'lib/async/bus/protocol/connection.rb', line 203 def proxy_name(object) name = object.__id__ # Bind the object into the local object store (temporary): @objects[name] ||= Implicit.new(object) # Return the name: return name end |
#proxy_object(name) ⇒ Object
Get an object or proxy for a bound object, handling reverse lookup.
If the object is bound locally and the proxy is for this connection, returns the actual object. If the object is bound remotely, or the proxy is from a different connection, returns a proxy. This is used when deserializing proxies to handle round-trip scenarios and avoid name collisions.
222 223 224 225 226 227 228 229 230 231 |
# File 'lib/async/bus/protocol/connection.rb', line 222 def proxy_object(name) # If the proxy is for this connection and the object is bound locally, return the actual object: if entry = @objects[name] # This handles round-trip scenarios correctly. return entry.object end # Otherwise, create a proxy for the remote object: return proxy_for(name) end |
#run(parent: Task.current) ⇒ Object
Run the connection message loop.
286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 |
# File 'lib/async/bus/protocol/connection.rb', line 286 def run(parent: Task.current) finalizer_task = parent.async do while name = @finalized.pop self.send_release(name) end end @unpacker.each do || case when Invoke # If the object is not found, send an error response and skip the transaction: if object = @objects[.name]&.object transaction = self.transaction!(.id) parent.async(annotation: "Invoke #{.name}") do # $stderr.puts "-> Accepting: #{message.name} #{message.arguments.inspect} #{message.options.inspect}" transaction.accept(object, .arguments, ., .block_given) ensure # $stderr.puts "<- Accepted: #{message.name}" # This will also delete the transaction from @transactions: transaction.close end else self.write(Error.new(.id, NameError.new("Object not found: #{.name}"))) end when Response if transaction = @transactions[.id] transaction.push() else # Stale message - transaction already closed (e.g. timeout) or never existed (ignore silently). end when Release name = .name if @objects[name]&.temporary? # Only delete temporary objects, not explicitly bound ones: @objects.delete(name) end else Console.error(self, "Unexpected message:", ) end end rescue IOError, EOFError # Connection closed - this is normal, not an error. ensure finalizer_task&.stop @transactions.each do |id, transaction| transaction.close end @transactions.clear @proxies = ::ObjectSpace::WeakMap.new end |
#send_release(name) ⇒ Object
Send a release message for a named object.
280 281 282 |
# File 'lib/async/bus/protocol/connection.rb', line 280 def send_release(name) self.write(Release.new(name)) end |
#The bound objects.=(boundobjects. = (value)) ⇒ Object
97 |
# File 'lib/async/bus/protocol/connection.rb', line 97 attr :objects |
#The proxy cache.=(proxycache. = (value)) ⇒ Object
100 |
# File 'lib/async/bus/protocol/connection.rb', line 100 attr :proxies |
#The timeout for transactions.=(timeout) ⇒ Object
66 |
# File 'lib/async/bus/protocol/connection.rb', line 66 attr_accessor :timeout |
#transaction!(id = self.next_id) ⇒ Object
Create a new transaction.
257 258 259 260 261 262 |
# File 'lib/async/bus/protocol/connection.rb', line 257 def transaction!(id = self.next_id) transaction = Transaction.new(self, id, timeout: @timeout) @transactions[id] = transaction return transaction end |
#write(message) ⇒ Object
Write a message to the connection.
75 76 77 78 79 |
# File 'lib/async/bus/protocol/connection.rb', line 75 def write() # $stderr.puts "Writing: #{message.inspect}" @packer.write() @packer.flush end |