Class: OMQ::Routing::Dish

Inherits:
Object
  • Object
show all
Defined in:
lib/omq/routing/dish.rb

Overview

DISH socket routing: group-based receive from RADIO peers.

Sends JOIN/LEAVE commands to connected RADIO peers. Receives two-frame messages (group + body) from RADIO.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(engine) ⇒ Dish

Returns a new instance of Dish.

Parameters:



18
19
20
21
22
23
# File 'lib/omq/routing/dish.rb', line 18

def initialize(engine)
  @engine      = engine
  @connections = []
  @recv_queue  = Routing.build_queue(engine.options.recv_hwm, :block)
  @groups      = Set.new
end

Instance Attribute Details

#recv_queueAsync::LimitedQueue (readonly)

Returns:

  • (Async::LimitedQueue)


13
14
15
# File 'lib/omq/routing/dish.rb', line 13

def recv_queue
  @recv_queue
end

Instance Method Details

#connection_added(connection) ⇒ Object

Parameters:

  • connection (Protocol::ZMTP::Connection)


46
47
48
49
50
51
52
# File 'lib/omq/routing/dish.rb', line 46

def connection_added(connection)
  @connections << connection
  @groups.each do |group|
    connection.send_command(Protocol::ZMTP::Codec::Command.join(group))
  end
  @engine.start_recv_pump(connection, @recv_queue)
end

#connection_removed(connection) ⇒ Object

Parameters:

  • connection (Protocol::ZMTP::Connection)


57
58
59
# File 'lib/omq/routing/dish.rb', line 57

def connection_removed(connection)
  @connections.delete(connection)
end

#dequeue_recvArray<String>?

Dequeues the next received message. Blocks until one is available.

Returns:

  • (Array<String>, nil)


30
31
32
# File 'lib/omq/routing/dish.rb', line 30

def dequeue_recv
  @recv_queue.dequeue
end

#enqueue(_parts) ⇒ Object

DISH is read-only.



64
65
66
# File 'lib/omq/routing/dish.rb', line 64

def enqueue(_parts)
  raise "DISH sockets cannot send"
end

#join(group) ⇒ Object

Joins a group.

Parameters:

  • group (String)


73
74
75
76
77
78
# File 'lib/omq/routing/dish.rb', line 73

def join(group)
  @groups << group
  @connections.each do |conn|
    conn.send_command(Protocol::ZMTP::Codec::Command.join(group))
  end
end

#leave(group) ⇒ Object

Leaves a group.

Parameters:

  • group (String)


85
86
87
88
89
90
# File 'lib/omq/routing/dish.rb', line 85

def leave(group)
  @groups.delete(group)
  @connections.each do |conn|
    conn.send_command(Protocol::ZMTP::Codec::Command.leave(group))
  end
end

#unblock_recvvoid

This method returns an undefined value.

Wakes a blocked #dequeue_recv with a nil sentinel.



39
40
41
# File 'lib/omq/routing/dish.rb', line 39

def unblock_recv
  @recv_queue.enqueue(nil)
end