Class: OMQ::Routing::Rep
- Inherits:
-
Object
- Object
- OMQ::Routing::Rep
- Defined in:
- lib/omq/routing/rep.rb
Overview
REP socket routing: fair-queue receive, reply routed back to sender.
REP strips the routing envelope (everything up to and including the empty delimiter) on receive, saves it internally, and restores it on send.
Constant Summary collapse
- EMPTY_FRAME =
"".b.freeze
Instance Attribute Summary collapse
- #recv_queue ⇒ Async::LimitedQueue readonly
Instance Method Summary collapse
- #connection_added(connection) ⇒ Object
- #connection_removed(connection) ⇒ Object
-
#dequeue_recv ⇒ Array<String>?
Dequeues the next received message.
-
#enqueue(parts) ⇒ Object
Enqueues a reply.
-
#initialize(engine) ⇒ Rep
constructor
A new instance of Rep.
-
#send_queues_drained? ⇒ Boolean
True when all per-connection send queues are empty.
-
#unblock_recv ⇒ void
Wakes a blocked #dequeue_recv with a nil sentinel.
Constructor Details
#initialize(engine) ⇒ Rep
Returns a new instance of Rep.
22 23 24 25 26 27 |
# File 'lib/omq/routing/rep.rb', line 22 def initialize(engine) @engine = engine @recv_queue = Routing.build_queue(engine..recv_hwm, :block) @pending_replies = [] @conn_queues = {} end |
Instance Attribute Details
#recv_queue ⇒ Async::LimitedQueue (readonly)
17 18 19 |
# File 'lib/omq/routing/rep.rb', line 17 def recv_queue @recv_queue end |
Instance Method Details
#connection_added(connection) ⇒ Object
50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/omq/routing/rep.rb', line 50 def connection_added(connection) @engine.start_recv_pump(connection, @recv_queue) do |msg| delimiter = msg.index { |p| p.empty? } || msg.size envelope = msg[0, delimiter] body = msg[(delimiter + 1)..] || [] @pending_replies << [connection, envelope] body end q = Routing.build_queue(@engine..send_hwm, :block) @conn_queues[connection] = q ConnSendPump.start(@engine, connection, q) end |
#connection_removed(connection) ⇒ Object
68 69 70 71 |
# File 'lib/omq/routing/rep.rb', line 68 def connection_removed(connection) @pending_replies.reject! { |r| r[0] == connection } @conn_queues.delete(connection) end |
#dequeue_recv ⇒ Array<String>?
Dequeues the next received message. Blocks until one is available.
34 35 36 |
# File 'lib/omq/routing/rep.rb', line 34 def dequeue_recv @recv_queue.dequeue end |
#enqueue(parts) ⇒ Object
Enqueues a reply. Routes to the connection that sent the matching request by consuming the next pending_reply entry.
79 80 81 82 83 84 85 86 87 88 |
# File 'lib/omq/routing/rep.rb', line 79 def enqueue(parts) reply_info = @pending_replies.shift return unless reply_info conn, envelope = reply_info msg = envelope msg << EMPTY_FRAME msg.concat(parts) @conn_queues[conn]&.enqueue(msg) end |
#send_queues_drained? ⇒ Boolean
Returns true when all per-connection send queues are empty.
93 94 95 |
# File 'lib/omq/routing/rep.rb', line 93 def send_queues_drained? @conn_queues.values.all?(&:empty?) end |
#unblock_recv ⇒ void
This method returns an undefined value.
Wakes a blocked #dequeue_recv with a nil sentinel.
43 44 45 |
# File 'lib/omq/routing/rep.rb', line 43 def unblock_recv @recv_queue.enqueue(nil) end |