Class: Arcp::Transport::MemoryTransport

Inherits:
Base
  • Object
show all
Defined in:
lib/arcp/transport/memory_transport.rb

Overview

In-process transport backed by a pair of ‘Async::Queue`s. Used for tests and same-process server/client wiring.

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(incoming:, outgoing:) ⇒ MemoryTransport

Returns a new instance of MemoryTransport.



12
13
14
15
16
17
18
# File 'lib/arcp/transport/memory_transport.rb', line 12

def initialize(incoming:, outgoing:)
  super()
  @incoming = incoming
  @outgoing = outgoing
  @sent = []
  @closed = false
end

Instance Attribute Details

#sentObject (readonly)

Returns the value of attribute sent.



10
11
12
# File 'lib/arcp/transport/memory_transport.rb', line 10

def sent
  @sent
end

Class Method Details

.pairObject



20
21
22
23
24
# File 'lib/arcp/transport/memory_transport.rb', line 20

def self.pair
  a = Async::Queue.new
  b = Async::Queue.new
  [new(incoming: a, outgoing: b), new(incoming: b, outgoing: a)]
end

Instance Method Details

#close(reason: nil) ⇒ Object



43
44
45
46
47
48
49
# File 'lib/arcp/transport/memory_transport.rb', line 43

def close(reason: nil)
  return if @closed

  @closed = true
  @outgoing.enqueue(:__arcp_close__)
  @incoming.enqueue(:__arcp_close__) if @incoming.empty?
end

#closed?Boolean

Returns:

  • (Boolean)


51
# File 'lib/arcp/transport/memory_transport.rb', line 51

def closed? = @closed

#receiveObject



34
35
36
37
38
39
40
41
# File 'lib/arcp/transport/memory_transport.rb', line 34

def receive
  return nil if @closed && @incoming.empty?

  value = @incoming.dequeue
  return nil if value.equal?(:__arcp_close__)

  value
end

#send(envelope) ⇒ Object

Raises:

  • (IOError)


26
27
28
29
30
31
32
# File 'lib/arcp/transport/memory_transport.rb', line 26

def send(envelope)
  raise IOError, 'transport closed' if @closed

  @sent << envelope
  @outgoing.enqueue(envelope)
  nil
end