Class: OMQ::Routing::XPub

Inherits:
Object
  • Object
show all
Includes:
FanOut
Defined in:
lib/omq/routing/xpub.rb

Overview

XPUB socket routing: like PUB but exposes subscription messages.

Subscription/unsubscription messages from peers are delivered to the application as data frames: x01 + prefix for subscribe, x00 + prefix for unsubscribe.

The recv_queue is a simple bounded queue because messages come from subscription commands, not from peer data pumps.

Constant Summary

Constants included from FanOut

FanOut::EMPTY_BINARY

Instance Attribute Summary collapse

Attributes included from FanOut

#subscriber_joined

Instance Method Summary collapse

Methods included from FanOut

#send_queues_drained?

Constructor Details

#initialize(engine) ⇒ XPub

Returns a new instance of XPub.

Parameters:



24
25
26
27
28
29
30
# File 'lib/omq/routing/xpub.rb', line 24

def initialize(engine)
  @engine     = engine
  @recv_queue = Routing.build_queue(engine.options.recv_hwm, :block)
  @tasks      = []

  init_fan_out(engine)
end

Instance Attribute Details

#recv_queueAsync::LimitedQueue (readonly)

Returns:

  • (Async::LimitedQueue)


19
20
21
# File 'lib/omq/routing/xpub.rb', line 19

def recv_queue
  @recv_queue
end

Instance Method Details

#connection_added(connection) ⇒ Object

Parameters:

  • connection (Connection)


47
48
49
50
51
52
# File 'lib/omq/routing/xpub.rb', line 47

def connection_added(connection)
  @connections << connection
  @subscriptions[connection] = Set.new
  start_subscription_listener(connection)
  add_fan_out_send_connection(connection)
end

#connection_removed(connection) ⇒ Object

Parameters:

  • connection (Connection)


57
58
59
60
61
# File 'lib/omq/routing/xpub.rb', line 57

def connection_removed(connection)
  @connections.delete(connection)
  @subscriptions.delete(connection)
  remove_fan_out_send_connection(connection)
end

#dequeue_recvObject

Engine-facing recv contract. Delegates to the bounded queue.



35
36
37
# File 'lib/omq/routing/xpub.rb', line 35

def dequeue_recv
  @recv_queue.dequeue
end

#enqueue(parts) ⇒ Object

Parameters:

  • parts (Array<String>)


66
67
68
# File 'lib/omq/routing/xpub.rb', line 66

def enqueue(parts)
  fan_out_enqueue(parts)
end

#stopvoid

This method returns an undefined value.

Stops all background tasks.



75
76
77
78
# File 'lib/omq/routing/xpub.rb', line 75

def stop
  @tasks.each(&:stop)
  @tasks.clear
end

#unblock_recvObject



40
41
42
# File 'lib/omq/routing/xpub.rb', line 40

def unblock_recv
  @recv_queue.push(nil)
end