Class: OMQ::Routing::Rep
- Inherits:
-
Object
- Object
- OMQ::Routing::Rep
- Defined in:
- lib/omq/routing/rep.rb
Overview
REP socket routing: fair-queue receive, reply routed back to sender.
REP strips the routing envelope (everything up to and including the empty delimiter) on receive, saves it internally, and restores it on send.
Constant Summary collapse
- EMPTY_FRAME =
"".b.freeze
Instance Attribute Summary collapse
- #recv_queue ⇒ Async::LimitedQueue readonly
Instance Method Summary collapse
- #connection_added(connection) ⇒ Object
- #connection_removed(connection) ⇒ Object
-
#dequeue_recv ⇒ Array<String>?
Dequeues the next received message.
-
#enqueue(parts) ⇒ Object
Enqueues a reply.
-
#initialize(engine) ⇒ Rep
constructor
A new instance of Rep.
-
#send_queues_drained? ⇒ Boolean
True when all per-connection send queues are empty.
-
#stop ⇒ void
Stops all background tasks.
-
#unblock_recv ⇒ void
Wakes a blocked #dequeue_recv with a nil sentinel.
Constructor Details
#initialize(engine) ⇒ Rep
Returns a new instance of Rep.
22 23 24 25 26 27 28 29 |
# File 'lib/omq/routing/rep.rb', line 22 def initialize(engine) @engine = engine @recv_queue = Routing.build_queue(engine..recv_hwm, :block) @pending_replies = [] @conn_queues = {} # connection => per-connection send queue @conn_send_tasks = {} # connection => send pump task @tasks = [] end |
Instance Attribute Details
#recv_queue ⇒ Async::LimitedQueue (readonly)
17 18 19 |
# File 'lib/omq/routing/rep.rb', line 17 def recv_queue @recv_queue end |
Instance Method Details
#connection_added(connection) ⇒ Object
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/omq/routing/rep.rb', line 52 def connection_added(connection) task = @engine.start_recv_pump(connection, @recv_queue) do |msg| delimiter = msg.index { |p| p.empty? } || msg.size envelope = msg[0, delimiter] body = msg[(delimiter + 1)..] || [] @pending_replies << { conn: connection, envelope: envelope } body end @tasks << task if task q = Routing.build_queue(@engine..send_hwm, :block) @conn_queues[connection] = q @conn_send_tasks[connection] = ConnSendPump.start(@engine, connection, q, @tasks) end |
#connection_removed(connection) ⇒ Object
71 72 73 74 75 |
# File 'lib/omq/routing/rep.rb', line 71 def connection_removed(connection) @pending_replies.reject! { |r| r[:conn] == connection } @conn_queues.delete(connection) @conn_send_tasks.delete(connection)&.stop end |
#dequeue_recv ⇒ Array<String>?
Dequeues the next received message. Blocks until one is available.
36 37 38 |
# File 'lib/omq/routing/rep.rb', line 36 def dequeue_recv @recv_queue.dequeue end |
#enqueue(parts) ⇒ Object
Enqueues a reply. Routes to the connection that sent the matching request by consuming the next pending_reply entry.
83 84 85 86 87 88 |
# File 'lib/omq/routing/rep.rb', line 83 def enqueue(parts) reply_info = @pending_replies.shift return unless reply_info conn = reply_info[:conn] @conn_queues[conn]&.enqueue([*reply_info[:envelope], EMPTY_FRAME, *parts]) end |
#send_queues_drained? ⇒ Boolean
Returns true when all per-connection send queues are empty.
103 104 105 |
# File 'lib/omq/routing/rep.rb', line 103 def send_queues_drained? @conn_queues.values.all?(&:empty?) end |
#stop ⇒ void
This method returns an undefined value.
Stops all background tasks.
95 96 97 98 |
# File 'lib/omq/routing/rep.rb', line 95 def stop @tasks.each(&:stop) @tasks.clear end |
#unblock_recv ⇒ void
This method returns an undefined value.
Wakes a blocked #dequeue_recv with a nil sentinel.
45 46 47 |
# File 'lib/omq/routing/rep.rb', line 45 def unblock_recv @recv_queue.enqueue(nil) end |