Class: Async::Bus::Protocol::Connection

Inherits:
Object
  • Object
show all
Defined in:
lib/async/bus/protocol/connection.rb

Overview

Represents a connection between client and server for message passing.

Defined Under Namespace

Classes: Explicit, Implicit

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(peer, id, wrapper: Wrapper, timeout: nil) ⇒ Connection

Initialize a new connection.



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/async/bus/protocol/connection.rb', line 48

def initialize(peer, id, wrapper: Wrapper, timeout: nil)
	@peer = peer
	@id = id
	
	@wrapper = wrapper.new(self)
	@unpacker = @wrapper.unpacker(peer)
	@packer = @wrapper.packer(peer)
	
	@timeout = timeout
	
	@transactions = {}
	
	@objects = {}
	@proxies = ::ObjectSpace::WeakMap.new
	@finalized = ::Thread::Queue.new
end

Instance Attribute Details

#objectsObject (readonly)

Returns the value of attribute objects.



97
98
99
# File 'lib/async/bus/protocol/connection.rb', line 97

def objects
  @objects
end

#packerObject (readonly)

Returns the value of attribute packer.



106
107
108
# File 'lib/async/bus/protocol/connection.rb', line 106

def packer
  @packer
end

#proxiesObject (readonly)

Returns the value of attribute proxies.



100
101
102
# File 'lib/async/bus/protocol/connection.rb', line 100

def proxies
  @proxies
end

#The message packer.(messagepacker.) ⇒ Object (readonly)



106
# File 'lib/async/bus/protocol/connection.rb', line 106

attr :packer

#The message unpacker.(messageunpacker.) ⇒ Object (readonly)



103
# File 'lib/async/bus/protocol/connection.rb', line 103

attr :unpacker

#timeoutObject

Returns the value of attribute timeout.



66
67
68
# File 'lib/async/bus/protocol/connection.rb', line 66

def timeout
  @timeout
end

#transactionsObject (readonly)

Returns the value of attribute transactions.



118
119
120
# File 'lib/async/bus/protocol/connection.rb', line 118

def transactions
  @transactions
end

#unpackerObject (readonly)

Returns the value of attribute unpacker.



103
104
105
# File 'lib/async/bus/protocol/connection.rb', line 103

def unpacker
  @unpacker
end

Class Method Details

.client(peer, **options) ⇒ Object

Create a client-side connection.



31
32
33
# File 'lib/async/bus/protocol/connection.rb', line 31

def self.client(peer, **options)
	self.new(peer, 1, **options)
end

.server(peer, **options) ⇒ Object

Create a server-side connection.



39
40
41
# File 'lib/async/bus/protocol/connection.rb', line 39

def self.server(peer, **options)
	self.new(peer, 2, **options)
end

Instance Method Details

#[](name) ⇒ Object

Generate a proxy for a remotely bound object.

**This always returns a proxy, even if the object is bound locally.** The object bus is not shared between client and server, so ‘[]` always returns a proxy to the remote instance.



152
153
154
# File 'lib/async/bus/protocol/connection.rb', line 152

def [](name)
	return proxy_for(name)
end

#[]=(name, object) ⇒ Object

Explicitly bind an object to a name, such that it could be accessed remotely.

This is the same as #bind but due to the semantics of the ‘[]=` operator, it does not return a proxy instance.

Explicitly bound objects are not garbage collected until the connection is closed.



140
141
142
# File 'lib/async/bus/protocol/connection.rb', line 140

def []=(name, object)
	@objects[name] = Explicit.new(object)
end

#Active transactions.=(transactions. = (value)) ⇒ Object



118
# File 'lib/async/bus/protocol/connection.rb', line 118

attr :transactions

#bind(name, object) ⇒ Object

Explicitly bind an object to a name, such that it could be accessed remotely.

This method is identical to #[]= but also returns a Proxy instance for the bound object which can be passed by reference.

Explicitly bound objects are not garbage collected until the connection is closed.

Examples:

Binding an object to a name and accessing it remotely.

array_proxy = connection.bind(:items, [1, 2, 3])
connection[:remote].register(array_proxy)


169
170
171
172
173
174
175
# File 'lib/async/bus/protocol/connection.rb', line 169

def bind(name, object)
	# Bind the object into the local object store (explicitly bound, not temporary):
	@objects[name] = Explicit.new(object)
	
	# Always return a proxy for passing by reference, even for locally bound objects:
	return proxy_for(name)
end

#closeObject

Close the connection and clean up resources.



82
83
84
85
86
87
88
# File 'lib/async/bus/protocol/connection.rb', line 82

def close
	@transactions.each do |id, transaction|
		transaction.close
	end
	
	@peer.close
end

#flushObject

Flush the packer buffer.



69
70
71
# File 'lib/async/bus/protocol/connection.rb', line 69

def flush
	@packer.flush
end

#inspectObject

Return a string representation of the connection.



92
93
94
# File 'lib/async/bus/protocol/connection.rb', line 92

def inspect
	"#<#{self.class} #{@objects.size} objects>"
end

#invoke(name, arguments, options = {}, &block) ⇒ Object

Invoke a remote procedure.



270
271
272
273
274
275
276
# File 'lib/async/bus/protocol/connection.rb', line 270

def invoke(name, arguments, options = {}, &block)
	transaction = self.transaction!
	
	transaction.invoke(name, arguments, options, &block)
ensure
	transaction&.close
end

#next_idObject

Get the next transaction ID.



110
111
112
113
114
115
# File 'lib/async/bus/protocol/connection.rb', line 110

def next_id
	id = @id
	@id += 2
	
	return id
end

#proxy(object) ⇒ Object

Implicitly bind an object with a temporary name, such that it could be accessed remotely.

Implicitly bound objects are garbage collected when the remote end no longer references them.

This method is simliar to #bind but is designed to be used to generate temporary proxies for objects that are not explicitly bound.



185
186
187
188
189
190
191
192
193
# File 'lib/async/bus/protocol/connection.rb', line 185

def proxy(object)
	name = object.__id__
	
	# Bind the object into the local object store (temporary):
	@objects[name] ||= Implicit.new(object)
	
	# Always return a proxy for passing by reference:
	return proxy_for(name)
end

#proxy_name(object) ⇒ Object

Implicitly bind an object with a temporary name, such that it could be accessed remotely.

Implicitly bound objects are garbage collected when the remote end no longer references them.

This method is similar to #proxy but is designed to be used to generate temporary names for objects that are not explicitly bound during serialization.



203
204
205
206
207
208
209
210
211
# File 'lib/async/bus/protocol/connection.rb', line 203

def proxy_name(object)
	name = object.__id__
	
	# Bind the object into the local object store (temporary):
	@objects[name] ||= Implicit.new(object)
	
	# Return the name:
	return name
end

#proxy_object(name) ⇒ Object

Get an object or proxy for a bound object, handling reverse lookup.

If the object is bound locally and the proxy is for this connection, returns the actual object. If the object is bound remotely, or the proxy is from a different connection, returns a proxy. This is used when deserializing proxies to handle round-trip scenarios and avoid name collisions.



222
223
224
225
226
227
228
229
230
231
# File 'lib/async/bus/protocol/connection.rb', line 222

def proxy_object(name)
	# If the proxy is for this connection and the object is bound locally, return the actual object:
	if entry = @objects[name]
		# This handles round-trip scenarios correctly.
		return entry.object
	end
	
	# Otherwise, create a proxy for the remote object:
	return proxy_for(name)
end

#run(parent: Task.current) ⇒ Object

Run the connection message loop.



286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
# File 'lib/async/bus/protocol/connection.rb', line 286

def run(parent: Task.current)
	finalizer_task = parent.async do
		while name = @finalized.pop
			self.send_release(name)
		end
	end
	
	@unpacker.each do |message|
		case message
		when Invoke
			# If the object is not found, send an error response and skip the transaction:
			if object = @objects[message.name]&.object
				transaction = self.transaction!(message.id)
				
				parent.async(annotation: "Invoke #{message.name}") do
					# $stderr.puts "-> Accepting: #{message.name} #{message.arguments.inspect} #{message.options.inspect}"
					transaction.accept(object, message.arguments, message.options, message.block_given)
				ensure
					# $stderr.puts "<- Accepted: #{message.name}"
					# This will also delete the transaction from @transactions:
					transaction.close
				end
			else
				self.write(Error.new(message.id, NameError.new("Object not found: #{message.name}")))
			end
		when Response
			if transaction = @transactions[message.id]
				transaction.push(message)
			else
				# Stale message - transaction already closed (e.g. timeout) or never existed (ignore silently).
			end
		when Release
			name = message.name
			if @objects[name]&.temporary?
				# Only delete temporary objects, not explicitly bound ones:
				@objects.delete(name)
			end
		else
			Console.error(self, "Unexpected message:", message)
		end
	end
rescue IOError, EOFError
	# Connection closed - this is normal, not an error.
ensure
	finalizer_task&.stop
	
	@transactions.each do |id, transaction|
		transaction.close
	end
	
	@transactions.clear
	@proxies = ::ObjectSpace::WeakMap.new
end

#send_release(name) ⇒ Object

Send a release message for a named object.



280
281
282
# File 'lib/async/bus/protocol/connection.rb', line 280

def send_release(name)
	self.write(Release.new(name))
end

#The bound objects.=(boundobjects. = (value)) ⇒ Object



97
# File 'lib/async/bus/protocol/connection.rb', line 97

attr :objects

#The proxy cache.=(proxycache. = (value)) ⇒ Object



100
# File 'lib/async/bus/protocol/connection.rb', line 100

attr :proxies

#The timeout for transactions.=(timeout) ⇒ Object



66
# File 'lib/async/bus/protocol/connection.rb', line 66

attr_accessor :timeout

#transaction!(id = self.next_id) ⇒ Object

Create a new transaction.



257
258
259
260
261
262
# File 'lib/async/bus/protocol/connection.rb', line 257

def transaction!(id = self.next_id)
	transaction = Transaction.new(self, id, timeout: @timeout)
	@transactions[id] = transaction
	
	return transaction
end

#write(message) ⇒ Object

Write a message to the connection.



75
76
77
78
79
# File 'lib/async/bus/protocol/connection.rb', line 75

def write(message)
	# $stderr.puts "Writing: #{message.inspect}"
	@packer.write(message)
	@packer.flush
end