Module: OMQ::Routing
- Defined in:
- lib/omq/routing.rb,
lib/omq/routing/pub.rb,
lib/omq/routing/rep.rb,
lib/omq/routing/req.rb,
lib/omq/routing/sub.rb,
lib/omq/routing/pair.rb,
lib/omq/routing/pull.rb,
lib/omq/routing/push.rb,
lib/omq/routing/xpub.rb,
lib/omq/routing/xsub.rb,
lib/omq/routing/dealer.rb,
lib/omq/routing/router.rb,
lib/omq/routing/fan_out.rb,
lib/omq/routing/fair_recv.rb,
lib/omq/routing/fair_queue.rb,
lib/omq/routing/round_robin.rb,
lib/omq/routing/conn_send_pump.rb
Overview
Routing strategies for each ZMQ socket type.
Each strategy manages how messages flow between connections and the socket’s send/recv queues.
Defined Under Namespace
Modules: ConnSendPump, FairRecv, FanOut, RoundRobin Classes: Dealer, FairQueue, Pair, Pub, Pull, Push, Rep, Req, Router, SignalingQueue, Sub, XPub, XSub
Class Attribute Summary collapse
-
.registry ⇒ Hash{Symbol => Class}
readonly
Plugin registry.
Class Method Summary collapse
-
.build_queue(hwm, on_mute) ⇒ Async::LimitedQueue, DropQueue
Builds a send or recv queue based on the mute strategy.
-
.drain_send_queue(queue, batch) ⇒ void
Drains all available messages from
queueintobatchwithout blocking. -
.for(socket_type) ⇒ Class
Returns the routing strategy class for a socket type.
-
.register(socket_type, strategy_class) ⇒ Object
Registers a routing strategy class for a socket type.
Class Attribute Details
.registry ⇒ Hash{Symbol => Class} (readonly)
Returns plugin registry.
26 27 28 |
# File 'lib/omq/routing.rb', line 26 def registry @registry end |
Class Method Details
.build_queue(hwm, on_mute) ⇒ Async::LimitedQueue, DropQueue
Builds a send or recv queue based on the mute strategy.
47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/omq/routing.rb', line 47 def self.build_queue(hwm, on_mute) return Async::Queue.new if hwm.nil? || hwm == 0 case on_mute when :block Async::LimitedQueue.new(hwm) when :drop_newest, :drop_oldest DropQueue.new(hwm, strategy: on_mute) else raise ArgumentError, "unknown on_mute strategy: #{on_mute.inspect}" end end |
.drain_send_queue(queue, batch) ⇒ void
This method returns an undefined value.
Drains all available messages from queue into batch without blocking. Call after the initial blocking dequeue.
No cap is needed: IO::Stream auto-flushes at 64 KB, so the write buffer hits the wire naturally under sustained load. The explicit flush after the batch pushes out the remainder.
72 73 74 75 76 77 |
# File 'lib/omq/routing.rb', line 72 def self.drain_send_queue(queue, batch) loop do msg = queue.dequeue(timeout: 0) or break batch << msg end end |
.for(socket_type) ⇒ Class
Returns the routing strategy class for a socket type.
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 |
# File 'lib/omq/routing.rb', line 85 def self.for(socket_type) case socket_type when :PAIR Pair when :REQ Req when :REP Rep when :DEALER Dealer when :ROUTER Router when :PUB Pub when :SUB Sub when :XPUB XPub when :XSUB XSub when :PUSH Push when :PULL Pull else @registry[socket_type] or raise ArgumentError, "unknown socket type: #{socket_type.inspect}" end end |
.register(socket_type, strategy_class) ⇒ Object
Registers a routing strategy class for a socket type. Called by omq-draft (and other plugins) at require time.
35 36 37 |
# File 'lib/omq/routing.rb', line 35 def register(socket_type, strategy_class) @registry[socket_type] = strategy_class end |