Class: Async::Bus::Protocol::Transaction

Inherits:
Object
  • Object
show all
Defined in:
lib/async/bus/protocol/transaction.rb

Overview

Represents a transaction for a remote procedure call.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(connection, id, timeout: nil) ⇒ Transaction

Initialize a new transaction.



17
18
19
20
21
22
23
24
25
# File 'lib/async/bus/protocol/transaction.rb', line 17

def initialize(connection, id, timeout: nil)
	@connection = connection
	@id = id
	
	@timeout = timeout
	
	@received = Thread::Queue.new
	@accept = nil
end

Instance Attribute Details

#accept(object, arguments, options, block_given) ⇒ Object (readonly)

Accept a remote procedure invocation.



40
41
42
# File 'lib/async/bus/protocol/transaction.rb', line 40

def accept
  @accept
end

#connectionObject (readonly)

Returns the value of attribute connection.



28
29
30
# File 'lib/async/bus/protocol/transaction.rb', line 28

def connection
  @connection
end

#idObject (readonly)

Returns the value of attribute id.



31
32
33
# File 'lib/async/bus/protocol/transaction.rb', line 31

def id
  @id
end

#receivedObject (readonly)

Returns the value of attribute received.



37
38
39
# File 'lib/async/bus/protocol/transaction.rb', line 37

def received
  @received
end

#The queue of received messages.(queueofreceivedmessages.) ⇒ Object (readonly)



37
# File 'lib/async/bus/protocol/transaction.rb', line 37

attr :received

#The transaction ID.(transactionID.) ⇒ Object (readonly)



31
# File 'lib/async/bus/protocol/transaction.rb', line 31

attr :id

#timeoutObject

Returns the value of attribute timeout.



34
35
36
# File 'lib/async/bus/protocol/transaction.rb', line 34

def timeout
  @timeout
end

Instance Method Details

#closeObject

Close the transaction and clean up resources.



73
74
75
76
77
78
79
80
# File 'lib/async/bus/protocol/transaction.rb', line 73

def close
	if connection = @connection
		@connection = nil
		@received.close
		
		connection.transactions.delete(@id)
	end
end

#invoke(name, arguments, options, &block) ⇒ Object

Invoke a remote procedure.



88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/async/bus/protocol/transaction.rb', line 88

def invoke(name, arguments, options, &block)
	Console.debug(self){[name, arguments, options, block]}
	
	self.write(Invoke.new(@id, name, arguments, options, block_given?))
	
	while response = self.read
		case response
		when Return
			return response.result
		when Yield
			begin
				result = yield(*response.result)
				self.write(Next.new(@id, result))
			rescue => error
				self.write(Error.new(@id, error))
			end
		when Error
			raise(response.result)
		when Throw
			# Re-throw the tag and value that was thrown on the server side
			# Throw.result contains [tag, value] array
			tag, value = response.result
			throw(tag, value)
		end
	end
end

#push(message) ⇒ Object

Push a message to the transaction’s received queue. Silently ignores messages if the queue is already closed.



66
67
68
69
70
# File 'lib/async/bus/protocol/transaction.rb', line 66

def push(message)
	@received.push(message)
rescue ClosedQueueError
	# Queue is closed (transaction already finished/closed) - ignore silently.
end

#readObject

Read a message from the transaction queue.



44
45
46
47
48
49
50
# File 'lib/async/bus/protocol/transaction.rb', line 44

def read
	if @received.empty?
		@connection.flush
	end
	
	@received.pop(timeout: @timeout)
end

#The accept handler.=(accepthandler. = (value)) ⇒ Object



40
# File 'lib/async/bus/protocol/transaction.rb', line 40

attr :accept

#The connection for this transaction.=(connection) ⇒ Object



28
# File 'lib/async/bus/protocol/transaction.rb', line 28

attr :connection

#The timeout for the transaction.=(timeout) ⇒ Object



34
# File 'lib/async/bus/protocol/transaction.rb', line 34

attr_accessor :timeout

#write(message) ⇒ Object

Write a message to the connection.



55
56
57
58
59
60
61
# File 'lib/async/bus/protocol/transaction.rb', line 55

def write(message)
	if @connection
		@connection.write(message)
	else
		raise RuntimeError, "Transaction is closed!"
	end
end