Class: OMQ::Routing::Dish

Inherits:
Object
  • Object
show all
Defined in:
lib/omq/routing/dish.rb

Overview

DISH socket routing: group-based receive from RADIO peers.

Sends JOIN/LEAVE commands to connected RADIO peers. Receives two-frame messages (group + body) from RADIO.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(engine) ⇒ Dish

Returns a new instance of Dish.

Parameters:

  • engine (Engine)


13
14
15
16
17
18
19
# File 'lib/omq/routing/dish.rb', line 13

def initialize(engine)
  @engine      = engine
  @connections = []
  @recv_queue  = Routing.build_queue(engine.options.recv_hwm, :block)
  @groups      = Set.new
  @tasks       = []
end

Instance Attribute Details

#recv_queueAsync::LimitedQueue (readonly)

Returns:

  • (Async::LimitedQueue)


24
25
26
# File 'lib/omq/routing/dish.rb', line 24

def recv_queue
  @recv_queue
end

Instance Method Details

#connection_added(connection) ⇒ Object

Parameters:

  • connection (Connection)


47
48
49
50
51
52
53
54
# File 'lib/omq/routing/dish.rb', line 47

def connection_added(connection)
  @connections << connection
  @groups.each do |group|
    connection.send_command(Protocol::ZMTP::Codec::Command.join(group))
  end
  task = @engine.start_recv_pump(connection, @recv_queue)
  @tasks << task if task
end

#connection_removed(connection) ⇒ Object

Parameters:

  • connection (Connection)


59
60
61
# File 'lib/omq/routing/dish.rb', line 59

def connection_removed(connection)
  @connections.delete(connection)
end

#dequeue_recvArray<String>?

Dequeues the next received message. Blocks until one is available.

Returns:

  • (Array<String>, nil)


31
32
33
# File 'lib/omq/routing/dish.rb', line 31

def dequeue_recv
  @recv_queue.dequeue
end

#enqueue(_parts) ⇒ Object

DISH is read-only.



66
67
68
# File 'lib/omq/routing/dish.rb', line 66

def enqueue(_parts)
  raise "DISH sockets cannot send"
end

#join(group) ⇒ Object

Joins a group.

Parameters:

  • group (String)


75
76
77
78
79
80
# File 'lib/omq/routing/dish.rb', line 75

def join(group)
  @groups << group
  @connections.each do |conn|
    conn.send_command(Protocol::ZMTP::Codec::Command.join(group))
  end
end

#leave(group) ⇒ Object

Leaves a group.

Parameters:

  • group (String)


87
88
89
90
91
92
# File 'lib/omq/routing/dish.rb', line 87

def leave(group)
  @groups.delete(group)
  @connections.each do |conn|
    conn.send_command(Protocol::ZMTP::Codec::Command.leave(group))
  end
end

#stopObject

Stops all background tasks.



96
97
98
99
# File 'lib/omq/routing/dish.rb', line 96

def stop
  @tasks.each(&:stop)
  @tasks.clear
end

#unblock_recvvoid

This method returns an undefined value.

Wakes a blocked #dequeue_recv with a nil sentinel.



40
41
42
# File 'lib/omq/routing/dish.rb', line 40

def unblock_recv
  @recv_queue.enqueue(nil)
end