Class: OMQ::Routing::Sub

Inherits:
Object
  • Object
show all
Defined in:
lib/omq/routing/sub.rb

Overview

SUB socket routing: subscription-based receive from PUB peers.

Sends SUBSCRIBE/CANCEL commands to connected PUB peers.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(engine) ⇒ Sub

Returns a new instance of Sub.

Parameters:



18
19
20
21
22
23
24
# File 'lib/omq/routing/sub.rb', line 18

def initialize(engine)
  @engine        = engine
  @connections   = Set.new
  @recv_queue    = FairQueue.new
  @subscriptions = Set.new
  @tasks         = []
end

Instance Attribute Details

#recv_queueFairQueue (readonly)

Returns:



13
14
15
# File 'lib/omq/routing/sub.rb', line 13

def recv_queue
  @recv_queue
end

Instance Method Details

#connection_added(connection) ⇒ Object

Parameters:

  • connection (Connection)


41
42
43
44
45
46
47
48
49
50
51
# File 'lib/omq/routing/sub.rb', line 41

def connection_added(connection)
  @connections << connection
  @subscriptions.each do |prefix|
    connection.send_command(Protocol::ZMTP::Codec::Command.subscribe(prefix))
  end
  conn_q    = Routing.build_queue(@engine.options.recv_hwm, @engine.options.on_mute)
  signaling = SignalingQueue.new(conn_q, @recv_queue)
  @recv_queue.add_queue(connection, conn_q)
  task = @engine.start_recv_pump(connection, signaling)
  @tasks << task if task
end

#connection_removed(connection) ⇒ Object

Parameters:

  • connection (Connection)


56
57
58
59
# File 'lib/omq/routing/sub.rb', line 56

def connection_removed(connection)
  @connections.delete(connection)
  @recv_queue.remove_queue(connection)
end

#dequeue_recvObject

Engine-facing recv contract. Delegates to the FairQueue.



29
30
31
# File 'lib/omq/routing/sub.rb', line 29

def dequeue_recv
  @recv_queue.dequeue
end

#enqueue(_parts) ⇒ Object

SUB is read-only.



64
65
66
# File 'lib/omq/routing/sub.rb', line 64

def enqueue(_parts)
  raise "SUB sockets cannot send"
end

#stopvoid

This method returns an undefined value.

Stops all background tasks.



97
98
99
100
# File 'lib/omq/routing/sub.rb', line 97

def stop
  @tasks.each(&:stop)
  @tasks.clear
end

#subscribe(prefix) ⇒ Object

Subscribes to a topic prefix.

Parameters:

  • prefix (String)


73
74
75
76
77
78
# File 'lib/omq/routing/sub.rb', line 73

def subscribe(prefix)
  @subscriptions << prefix
  @connections.each do |conn|
    conn.send_command(Protocol::ZMTP::Codec::Command.subscribe(prefix))
  end
end

#unblock_recvObject



34
35
36
# File 'lib/omq/routing/sub.rb', line 34

def unblock_recv
  @recv_queue.push(nil)
end

#unsubscribe(prefix) ⇒ Object

Unsubscribes from a topic prefix.

Parameters:

  • prefix (String)


85
86
87
88
89
90
# File 'lib/omq/routing/sub.rb', line 85

def unsubscribe(prefix)
  @subscriptions.delete(prefix)
  @connections.each do |conn|
    conn.send_command(Protocol::ZMTP::Codec::Command.cancel(prefix))
  end
end