Class: OMQ::Routing::Dealer
- Inherits:
-
Object
- Object
- OMQ::Routing::Dealer
- Includes:
- RoundRobin
- Defined in:
- lib/omq/routing/dealer.rb
Overview
DEALER socket routing: round-robin send, fair-queue receive.
No envelope manipulation — messages pass through unchanged.
Constant Summary
Constants included from RoundRobin
RoundRobin::BATCH_BYTE_CAP, RoundRobin::BATCH_MSG_CAP
Instance Attribute Summary collapse
- #recv_queue ⇒ Async::LimitedQueue readonly
Instance Method Summary collapse
- #connection_added(connection) ⇒ Object
- #connection_removed(connection) ⇒ Object
-
#dequeue_recv ⇒ Array<String>?
Dequeues the next received message.
- #enqueue(parts) ⇒ Object
-
#initialize(engine) ⇒ Dealer
constructor
A new instance of Dealer.
-
#stop ⇒ void
Stops all background tasks.
-
#unblock_recv ⇒ void
Wakes a blocked #dequeue_recv with a nil sentinel.
Methods included from RoundRobin
Constructor Details
#initialize(engine) ⇒ Dealer
Returns a new instance of Dealer.
20 21 22 23 24 25 |
# File 'lib/omq/routing/dealer.rb', line 20 def initialize(engine) @engine = engine @recv_queue = Routing.build_queue(engine..recv_hwm, :block) @tasks = [] init_round_robin(engine) end |
Instance Attribute Details
#recv_queue ⇒ Async::LimitedQueue (readonly)
15 16 17 |
# File 'lib/omq/routing/dealer.rb', line 15 def recv_queue @recv_queue end |
Instance Method Details
#connection_added(connection) ⇒ Object
48 49 50 51 52 |
# File 'lib/omq/routing/dealer.rb', line 48 def connection_added(connection) task = @engine.start_recv_pump(connection, @recv_queue) @tasks << task if task add_round_robin_send_connection(connection) end |
#connection_removed(connection) ⇒ Object
57 58 59 60 |
# File 'lib/omq/routing/dealer.rb', line 57 def connection_removed(connection) @connections.delete(connection) remove_round_robin_send_connection(connection) end |
#dequeue_recv ⇒ Array<String>?
Dequeues the next received message. Blocks until one is available.
32 33 34 |
# File 'lib/omq/routing/dealer.rb', line 32 def dequeue_recv @recv_queue.dequeue end |
#enqueue(parts) ⇒ Object
65 66 67 |
# File 'lib/omq/routing/dealer.rb', line 65 def enqueue(parts) enqueue_round_robin(parts) end |
#stop ⇒ void
This method returns an undefined value.
Stops all background tasks.
74 75 76 77 |
# File 'lib/omq/routing/dealer.rb', line 74 def stop @tasks.each(&:stop) @tasks.clear end |
#unblock_recv ⇒ void
This method returns an undefined value.
Wakes a blocked #dequeue_recv with a nil sentinel.
41 42 43 |
# File 'lib/omq/routing/dealer.rb', line 41 def unblock_recv @recv_queue.enqueue(nil) end |