Class: OMQ::Routing::Peer

Inherits:
Object
  • Object
show all
Defined in:
lib/omq/routing/peer.rb

Overview

PEER socket routing: bidirectional multi-peer with auto-generated 4-byte routing IDs.

Prepends routing ID on receive. Strips routing ID on send and routes to the identified connection.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(engine) ⇒ Peer

Returns a new instance of Peer.

Parameters:



26
27
28
29
30
31
32
# File 'lib/omq/routing/peer.rb', line 26

def initialize(engine)
  @engine                     = engine
  @recv_queue                 = Routing.build_queue(engine.options.recv_hwm, :block)
  @connections_by_routing_id  = {}
  @routing_id_by_connection   = {}
  @conn_queues                = {}
end

Instance Attribute Details

#connections_by_routing_idHash{String => Connection} (readonly)

Returns routing_id → connection.

Returns:

  • (Hash{String => Connection})

    routing_id → connection



21
22
23
# File 'lib/omq/routing/peer.rb', line 21

def connections_by_routing_id
  @connections_by_routing_id
end

#recv_queueAsync::LimitedQueue (readonly)

Returns:

  • (Async::LimitedQueue)


16
17
18
# File 'lib/omq/routing/peer.rb', line 16

def recv_queue
  @recv_queue
end

Instance Method Details

#connection_added(connection) ⇒ Object

Parameters:

  • connection (Protocol::ZMTP::Connection)


55
56
57
58
59
60
61
62
63
64
65
# File 'lib/omq/routing/peer.rb', line 55

def connection_added(connection)
  routing_id = SecureRandom.bytes(4)
  @connections_by_routing_id[routing_id] = connection
  @routing_id_by_connection[connection]  = routing_id

  @engine.start_recv_pump(connection, @recv_queue) { |msg| [routing_id, *msg] }

  q = Routing.build_queue(@engine.options.send_hwm, :block)
  @conn_queues[connection] = q
  ConnSendPump.start(@engine, connection, q)
end

#connection_removed(connection) ⇒ Object

Parameters:

  • connection (Protocol::ZMTP::Connection)


70
71
72
73
74
# File 'lib/omq/routing/peer.rb', line 70

def connection_removed(connection)
  routing_id = @routing_id_by_connection.delete(connection)
  @connections_by_routing_id.delete(routing_id) if routing_id
  @conn_queues.delete(connection)
end

#dequeue_recvArray<String>?

Dequeues the next received message. Blocks until one is available.

Returns:

  • (Array<String>, nil)


39
40
41
# File 'lib/omq/routing/peer.rb', line 39

def dequeue_recv
  @recv_queue.dequeue
end

#enqueue(parts) ⇒ Object

Parameters:

  • parts (Array<String>)


79
80
81
82
83
84
# File 'lib/omq/routing/peer.rb', line 79

def enqueue(parts)
  routing_id = parts.first
  conn = @connections_by_routing_id[routing_id]
  return unless conn
  @conn_queues[conn]&.enqueue(parts[1..])
end

#send_queues_drained?Boolean

True when all per-connection send queues are empty.

Returns:

  • (Boolean)


89
90
91
# File 'lib/omq/routing/peer.rb', line 89

def send_queues_drained?
  @conn_queues.values.all?(&:empty?)
end

#unblock_recvvoid

This method returns an undefined value.

Wakes a blocked #dequeue_recv with a nil sentinel.



48
49
50
# File 'lib/omq/routing/peer.rb', line 48

def unblock_recv
  @recv_queue.enqueue(nil)
end