Class: OMQ::Routing::Gather

Inherits:
Object
  • Object
show all
Defined in:
lib/omq/routing/gather.rb

Overview

GATHER socket routing: fair-queue receive from SCATTER peers.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(engine) ⇒ Gather

Returns a new instance of Gather.

Parameters:



15
16
17
18
# File 'lib/omq/routing/gather.rb', line 15

def initialize(engine)
  @engine     = engine
  @recv_queue = Routing.build_queue(engine.options.recv_hwm, :block)
end

Instance Attribute Details

#recv_queueAsync::LimitedQueue (readonly)

Returns:

  • (Async::LimitedQueue)


10
11
12
# File 'lib/omq/routing/gather.rb', line 10

def recv_queue
  @recv_queue
end

Instance Method Details

#connection_added(connection) ⇒ Object

Parameters:

  • connection (Protocol::ZMTP::Connection)


41
42
43
# File 'lib/omq/routing/gather.rb', line 41

def connection_added(connection)
  @engine.start_recv_pump(connection, @recv_queue)
end

#connection_removed(connection) ⇒ Object

Parameters:

  • connection (Protocol::ZMTP::Connection)


48
49
# File 'lib/omq/routing/gather.rb', line 48

def connection_removed(connection)
end

#dequeue_recvArray<String>?

Dequeues the next received message. Blocks until one is available.

Returns:

  • (Array<String>, nil)


25
26
27
# File 'lib/omq/routing/gather.rb', line 25

def dequeue_recv
  @recv_queue.dequeue
end

#enqueue(_parts) ⇒ Object

GATHER is read-only.



54
55
56
# File 'lib/omq/routing/gather.rb', line 54

def enqueue(_parts)
  raise "GATHER sockets cannot send"
end

#unblock_recvvoid

This method returns an undefined value.

Wakes a blocked #dequeue_recv with a nil sentinel.



34
35
36
# File 'lib/omq/routing/gather.rb', line 34

def unblock_recv
  @recv_queue.enqueue(nil)
end