Class: OMQ::Routing::Rep
- Inherits:
-
Object
- Object
- OMQ::Routing::Rep
- Includes:
- FairRecv
- Defined in:
- lib/omq/routing/rep.rb
Overview
REP socket routing: fair-queue receive, reply routed back to sender.
REP strips the routing envelope (everything up to and including the empty delimiter) on receive, saves it internally, and restores it on send.
Constant Summary collapse
- EMPTY_FRAME =
"".b.freeze
Instance Attribute Summary collapse
- #recv_queue ⇒ FairQueue readonly
Instance Method Summary collapse
- #connection_added(connection) ⇒ Object
- #connection_removed(connection) ⇒ Object
-
#enqueue(parts) ⇒ Object
Enqueues a reply.
-
#initialize(engine) ⇒ Rep
constructor
A new instance of Rep.
-
#send_queues_drained? ⇒ Boolean
True when all per-connection send queues are empty.
-
#stop ⇒ void
Stops all background tasks.
Methods included from FairRecv
Constructor Details
#initialize(engine) ⇒ Rep
Returns a new instance of Rep.
24 25 26 27 28 29 30 31 |
# File 'lib/omq/routing/rep.rb', line 24 def initialize(engine) @engine = engine @recv_queue = FairQueue.new @pending_replies = [] @conn_queues = {} # connection => per-connection send queue @conn_send_tasks = {} # connection => send pump task @tasks = [] end |
Instance Attribute Details
#recv_queue ⇒ FairQueue (readonly)
19 20 21 |
# File 'lib/omq/routing/rep.rb', line 19 def recv_queue @recv_queue end |
Instance Method Details
#connection_added(connection) ⇒ Object
36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/omq/routing/rep.rb', line 36 def connection_added(connection) add_fair_recv_connection(connection) do |msg| delimiter = msg.index { |p| p.empty? } || msg.size envelope = msg[0, delimiter] body = msg[(delimiter + 1)..] || [] @pending_replies << { conn: connection, envelope: envelope } body end q = Routing.build_queue(@engine..send_hwm, :block) @conn_queues[connection] = q @conn_send_tasks[connection] = ConnSendPump.start(@engine, connection, q, @tasks) end |
#connection_removed(connection) ⇒ Object
54 55 56 57 58 59 |
# File 'lib/omq/routing/rep.rb', line 54 def connection_removed(connection) @pending_replies.reject! { |r| r[:conn] == connection } @recv_queue.remove_queue(connection) @conn_queues.delete(connection) @conn_send_tasks.delete(connection)&.stop end |
#enqueue(parts) ⇒ Object
Enqueues a reply. Routes to the connection that sent the matching request by consuming the next pending_reply entry.
67 68 69 70 71 72 |
# File 'lib/omq/routing/rep.rb', line 67 def enqueue(parts) reply_info = @pending_replies.shift return unless reply_info conn = reply_info[:conn] @conn_queues[conn]&.enqueue([*reply_info[:envelope], EMPTY_FRAME, *parts]) end |
#send_queues_drained? ⇒ Boolean
Returns true when all per-connection send queues are empty.
87 88 89 |
# File 'lib/omq/routing/rep.rb', line 87 def send_queues_drained? @conn_queues.values.all?(&:empty?) end |
#stop ⇒ void
This method returns an undefined value.
Stops all background tasks.
79 80 81 82 |
# File 'lib/omq/routing/rep.rb', line 79 def stop @tasks.each(&:stop) @tasks.clear end |