Class: OMQ::Routing::Push
- Inherits:
-
Object
- Object
- OMQ::Routing::Push
- Includes:
- RoundRobin
- Defined in:
- lib/omq/routing/push.rb
Overview
PUSH socket routing: round-robin send to PULL peers.
Constant Summary
Constants included from RoundRobin
RoundRobin::BATCH_BYTE_CAP, RoundRobin::BATCH_MSG_CAP
Instance Method Summary collapse
- #connection_added(connection) ⇒ Object
- #connection_removed(connection) ⇒ Object
- #dequeue_recv ⇒ Object
- #enqueue(parts) ⇒ Object
-
#initialize(engine) ⇒ Push
constructor
A new instance of Push.
-
#recv_queue ⇒ Object
PUSH is write-only.
-
#stop ⇒ Object
Stops all background tasks (send pumps, reapers).
- #unblock_recv ⇒ Object
Methods included from RoundRobin
Constructor Details
#initialize(engine) ⇒ Push
Returns a new instance of Push.
13 14 15 16 17 |
# File 'lib/omq/routing/push.rb', line 13 def initialize(engine) @engine = engine @tasks = [] init_round_robin(engine) end |
Instance Method Details
#connection_added(connection) ⇒ Object
39 40 41 42 |
# File 'lib/omq/routing/push.rb', line 39 def connection_added(connection) add_round_robin_send_connection(connection) start_reaper(connection) end |
#connection_removed(connection) ⇒ Object
47 48 49 50 |
# File 'lib/omq/routing/push.rb', line 47 def connection_removed(connection) @connections.delete(connection) remove_round_robin_send_connection(connection) end |
#dequeue_recv ⇒ Object
28 29 30 |
# File 'lib/omq/routing/push.rb', line 28 def dequeue_recv raise "PUSH sockets cannot receive" end |
#enqueue(parts) ⇒ Object
55 56 57 |
# File 'lib/omq/routing/push.rb', line 55 def enqueue(parts) enqueue_round_robin(parts) end |
#recv_queue ⇒ Object
PUSH is write-only. Engine-facing recv contract: dequeue raises, unblock is a no-op (fatal-error propagation still calls it).
23 24 25 |
# File 'lib/omq/routing/push.rb', line 23 def recv_queue raise "PUSH sockets cannot receive" end |
#stop ⇒ Object
Stops all background tasks (send pumps, reapers).
62 63 64 65 |
# File 'lib/omq/routing/push.rb', line 62 def stop @tasks.each(&:stop) @tasks.clear end |
#unblock_recv ⇒ Object
33 34 |
# File 'lib/omq/routing/push.rb', line 33 def unblock_recv end |