Module: OMQ::Routing

Defined in:
lib/omq/routing.rb,
lib/omq/routing/pub.rb,
lib/omq/routing/rep.rb,
lib/omq/routing/req.rb,
lib/omq/routing/sub.rb,
lib/omq/routing/dish.rb,
lib/omq/routing/pair.rb,
lib/omq/routing/peer.rb,
lib/omq/routing/pull.rb,
lib/omq/routing/push.rb,
lib/omq/routing/xpub.rb,
lib/omq/routing/xsub.rb,
lib/omq/routing/radio.rb,
lib/omq/routing/client.rb,
lib/omq/routing/dealer.rb,
lib/omq/routing/gather.rb,
lib/omq/routing/router.rb,
lib/omq/routing/server.rb,
lib/omq/routing/channel.rb,
lib/omq/routing/fan_out.rb,
lib/omq/routing/scatter.rb,
lib/omq/routing/round_robin.rb,
lib/omq/routing/conn_send_pump.rb

Overview

Routing strategies for each ZMQ socket type.

Each strategy manages how messages flow between connections and the socket’s send/recv queues.

Defined Under Namespace

Modules: ConnSendPump, FanOut, RoundRobin Classes: Channel, Client, Dealer, Dish, Gather, Pair, Peer, Pub, Pull, Push, Radio, Rep, Req, Router, Scatter, Server, Sub, XPub, XSub

Class Attribute Summary collapse

Class Method Summary collapse

Class Attribute Details

.registryHash{Symbol => Class} (readonly)

Returns plugin registry.

Returns:

  • (Hash{Symbol => Class})

    plugin registry



24
25
26
# File 'lib/omq/routing.rb', line 24

def registry
  @registry
end

Class Method Details

.build_queue(hwm, on_mute) ⇒ Async::LimitedQueue, DropQueue

Builds a send or recv queue based on the mute strategy.

Parameters:

  • hwm (Integer)

    high water mark

  • on_mute (Symbol)

    :block, :drop_newest, or :drop_oldest

Returns:



45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/omq/routing.rb', line 45

def self.build_queue(hwm, on_mute)
  return Async::Queue.new if hwm.nil? || hwm == 0

  case on_mute
  when :block
    Async::LimitedQueue.new(hwm)
  when :drop_newest, :drop_oldest
    DropQueue.new(hwm, strategy: on_mute)
  else
    raise ArgumentError, "unknown on_mute strategy: #{on_mute.inspect}"
  end
end

.dequeue_batch(queue, batch = []) ⇒ void

This method returns an undefined value.

Drains all available messages from queue into batch without Blocks for the first message, then sweeps all immediately available messages into batch without blocking.

No cap is needed: IO::Stream auto-flushes at 64 KB, so the write buffer hits the wire naturally under sustained load. The explicit flush after the batch pushes out the remainder.

Parameters:

  • queue (Async::LimitedQueue)
  • batch (Array) (defaults to: [])


71
72
73
74
75
76
77
78
# File 'lib/omq/routing.rb', line 71

def self.dequeue_batch(queue, batch = [])
  batch << queue.dequeue

  loop do
    msg = queue.dequeue(timeout: 0) or break
    batch << msg
  end
end

.for(socket_type) ⇒ Class

Returns the routing strategy class for a socket type.

Parameters:

  • socket_type (Symbol)

    e.g. :PAIR, :REQ

Returns:

  • (Class)


86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/omq/routing.rb', line 86

def self.for(socket_type)
  case socket_type
  when :PAIR
    Pair
  when :REQ
    Req
  when :REP
    Rep
  when :DEALER
    Dealer
  when :ROUTER
    Router
  when :PUB
    Pub
  when :SUB
    Sub
  when :XPUB
    XPub
  when :XSUB
    XSub
  when :PUSH
    Push
  when :PULL
    Pull
  else
    @registry[socket_type] or raise ArgumentError, "unknown socket type: #{socket_type.inspect}"
  end
end

.register(socket_type, strategy_class) ⇒ Object

Registers a routing strategy class for a socket type. Called by omq-draft (and other plugins) at require time.

Parameters:

  • socket_type (Symbol)

    e.g. :RADIO, :CLIENT

  • strategy_class (Class)


33
34
35
# File 'lib/omq/routing.rb', line 33

def register(socket_type, strategy_class)
  @registry[socket_type] = strategy_class
end