Module: OMQ::Routing

Defined in:
lib/omq/routing.rb,
lib/omq/routing/pub.rb,
lib/omq/routing/rep.rb,
lib/omq/routing/req.rb,
lib/omq/routing/sub.rb,
lib/omq/routing/pair.rb,
lib/omq/routing/pull.rb,
lib/omq/routing/push.rb,
lib/omq/routing/xpub.rb,
lib/omq/routing/xsub.rb,
lib/omq/routing/dealer.rb,
lib/omq/routing/router.rb,
lib/omq/routing/fan_out.rb,
lib/omq/routing/fair_recv.rb,
lib/omq/routing/fair_queue.rb,
lib/omq/routing/round_robin.rb,
lib/omq/routing/conn_send_pump.rb

Overview

Routing strategies for each ZMQ socket type.

Each strategy manages how messages flow between connections and the socket’s send/recv queues.

Defined Under Namespace

Modules: ConnSendPump, FairRecv, FanOut, RoundRobin Classes: Dealer, FairQueue, Pair, Pub, Pull, Push, Rep, Req, Router, SignalingQueue, Sub, XPub, XSub

Class Attribute Summary collapse

Class Method Summary collapse

Class Attribute Details

.registryHash{Symbol => Class} (readonly)

Returns plugin registry.

Returns:

  • (Hash{Symbol => Class})

    plugin registry



26
27
28
# File 'lib/omq/routing.rb', line 26

def registry
  @registry
end

Class Method Details

.build_queue(hwm, on_mute) ⇒ Async::LimitedQueue, DropQueue

Builds a send or recv queue based on the mute strategy.

Parameters:

  • hwm (Integer)

    high water mark

  • on_mute (Symbol)

    :block, :drop_newest, or :drop_oldest

Returns:



47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/omq/routing.rb', line 47

def self.build_queue(hwm, on_mute)
  return Async::Queue.new if hwm.nil? || hwm == 0

  case on_mute
  when :block
    Async::LimitedQueue.new(hwm)
  when :drop_newest, :drop_oldest
    DropQueue.new(hwm, strategy: on_mute)
  else
    raise ArgumentError, "unknown on_mute strategy: #{on_mute.inspect}"
  end
end

.drain_send_queue(queue, batch) ⇒ void

This method returns an undefined value.

Drains all available messages from queue into batch without blocking. Call after the initial blocking dequeue.

No cap is needed: IO::Stream auto-flushes at 64 KB, so the write buffer hits the wire naturally under sustained load. The explicit flush after the batch pushes out the remainder.

Parameters:

  • queue (Async::LimitedQueue)
  • batch (Array)


72
73
74
75
76
77
# File 'lib/omq/routing.rb', line 72

def self.drain_send_queue(queue, batch)
  loop do
    msg = queue.dequeue(timeout: 0) or break
    batch << msg
  end
end

.for(socket_type) ⇒ Class

Returns the routing strategy class for a socket type.

Parameters:

  • socket_type (Symbol)

    e.g. :PAIR, :REQ

Returns:

  • (Class)


85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/omq/routing.rb', line 85

def self.for(socket_type)
  case socket_type
  when :PAIR
    Pair
  when :REQ
    Req
  when :REP
    Rep
  when :DEALER
    Dealer
  when :ROUTER
    Router
  when :PUB
    Pub
  when :SUB
    Sub
  when :XPUB
    XPub
  when :XSUB
    XSub
  when :PUSH
    Push
  when :PULL
    Pull
  else
    @registry[socket_type] or raise ArgumentError, "unknown socket type: #{socket_type.inspect}"
  end
end

.register(socket_type, strategy_class) ⇒ Object

Registers a routing strategy class for a socket type. Called by omq-draft (and other plugins) at require time.

Parameters:

  • socket_type (Symbol)

    e.g. :RADIO, :CLIENT

  • strategy_class (Class)


35
36
37
# File 'lib/omq/routing.rb', line 35

def register(socket_type, strategy_class)
  @registry[socket_type] = strategy_class
end