Class: OMQ::Routing::Dish

Inherits:
Object
  • Object
show all
Includes:
FairRecv
Defined in:
lib/omq/routing/dish.rb

Overview

DISH socket routing: group-based receive from RADIO peers.

Sends JOIN/LEAVE commands to connected RADIO peers. Receives two-frame messages (group + body) from RADIO.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(engine) ⇒ Dish

Returns a new instance of Dish.

Parameters:

  • engine (Engine)


15
16
17
18
19
20
21
# File 'lib/omq/routing/dish.rb', line 15

def initialize(engine)
  @engine      = engine
  @connections = []
  @recv_queue  = FairQueue.new
  @groups      = Set.new
  @tasks       = []
end

Instance Attribute Details

#recv_queueFairQueue (readonly)

Returns:

  • (FairQueue)


26
27
28
# File 'lib/omq/routing/dish.rb', line 26

def recv_queue
  @recv_queue
end

Instance Method Details

#connection_added(connection) ⇒ Object

Parameters:

  • connection (Connection)


30
31
32
33
34
35
36
# File 'lib/omq/routing/dish.rb', line 30

def connection_added(connection)
  @connections << connection
  @groups.each do |group|
    connection.send_command(Protocol::ZMTP::Codec::Command.join(group))
  end
  add_fair_recv_connection(connection)
end

#connection_removed(connection) ⇒ Object

Parameters:

  • connection (Connection)


41
42
43
44
# File 'lib/omq/routing/dish.rb', line 41

def connection_removed(connection)
  @connections.delete(connection)
  @recv_queue.remove_queue(connection)
end

#enqueue(_parts) ⇒ Object

DISH is read-only.



49
50
51
# File 'lib/omq/routing/dish.rb', line 49

def enqueue(_parts)
  raise "DISH sockets cannot send"
end

#join(group) ⇒ Object

Joins a group.

Parameters:

  • group (String)


58
59
60
61
62
63
# File 'lib/omq/routing/dish.rb', line 58

def join(group)
  @groups << group
  @connections.each do |conn|
    conn.send_command(Protocol::ZMTP::Codec::Command.join(group))
  end
end

#leave(group) ⇒ Object

Leaves a group.

Parameters:

  • group (String)


70
71
72
73
74
75
# File 'lib/omq/routing/dish.rb', line 70

def leave(group)
  @groups.delete(group)
  @connections.each do |conn|
    conn.send_command(Protocol::ZMTP::Codec::Command.leave(group))
  end
end

#stopObject

Stops all background tasks.



79
80
81
82
# File 'lib/omq/routing/dish.rb', line 79

def stop
  @tasks.each(&:stop)
  @tasks.clear
end