Class: OMQ::Routing::Router
- Inherits:
-
Object
- Object
- OMQ::Routing::Router
- Includes:
- FairRecv
- Defined in:
- lib/omq/routing/router.rb
Overview
ROUTER socket routing: identity-based routing.
Prepends peer identity frame on receive. Uses first frame as routing identity on send.
Instance Attribute Summary collapse
- #recv_queue ⇒ FairQueue readonly
Instance Method Summary collapse
- #connection_added(connection) ⇒ Object
- #connection_removed(connection) ⇒ Object
-
#enqueue(parts) ⇒ Object
Enqueues a message for sending.
-
#initialize(engine) ⇒ Router
constructor
A new instance of Router.
-
#send_queues_drained? ⇒ Boolean
True when all per-connection send queues are empty.
-
#stop ⇒ void
Stops all background tasks.
Methods included from FairRecv
Constructor Details
#initialize(engine) ⇒ Router
Returns a new instance of Router.
24 25 26 27 28 29 30 31 32 |
# File 'lib/omq/routing/router.rb', line 24 def initialize(engine) @engine = engine @recv_queue = FairQueue.new @connections_by_identity = {} @identity_by_connection = {} @conn_queues = {} # connection => per-connection send queue @conn_send_tasks = {} # connection => send pump task @tasks = [] end |
Instance Attribute Details
#recv_queue ⇒ FairQueue (readonly)
19 20 21 |
# File 'lib/omq/routing/router.rb', line 19 def recv_queue @recv_queue end |
Instance Method Details
#connection_added(connection) ⇒ Object
37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/omq/routing/router.rb', line 37 def connection_added(connection) identity = connection.peer_identity identity = SecureRandom.bytes(5) if identity.nil? || identity.empty? @connections_by_identity[identity] = connection @identity_by_connection[connection] = identity add_fair_recv_connection(connection) { |msg| [identity, *msg] } q = Routing.build_queue(@engine..send_hwm, :block) @conn_queues[connection] = q @conn_send_tasks[connection] = ConnSendPump.start(@engine, connection, q, @tasks) end |
#connection_removed(connection) ⇒ Object
53 54 55 56 57 58 59 |
# File 'lib/omq/routing/router.rb', line 53 def connection_removed(connection) identity = @identity_by_connection.delete(connection) @connections_by_identity.delete(identity) if identity @recv_queue.remove_queue(connection) @conn_queues.delete(connection) @conn_send_tasks.delete(connection)&.stop end |
#enqueue(parts) ⇒ Object
Enqueues a message for sending. The first frame is the routing identity.
66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/omq/routing/router.rb', line 66 def enqueue(parts) identity = parts.first if @engine..router_mandatory? unless @connections_by_identity[identity] raise SocketError, "no route to identity #{identity.inspect}" end end conn = @connections_by_identity[identity] return unless conn # silently drop if peer disconnected @conn_queues[conn]&.enqueue(parts[1..]) end |
#send_queues_drained? ⇒ Boolean
Returns true when all per-connection send queues are empty.
91 92 93 |
# File 'lib/omq/routing/router.rb', line 91 def send_queues_drained? @conn_queues.values.all?(&:empty?) end |
#stop ⇒ void
This method returns an undefined value.
Stops all background tasks.
83 84 85 86 |
# File 'lib/omq/routing/router.rb', line 83 def stop @tasks.each(&:stop) @tasks.clear end |