Class: OMQ::Routing::Pull

Inherits:
Object
  • Object
show all
Defined in:
lib/omq/routing/pull.rb

Overview

PULL socket routing: fair-queue receive from PUSH peers.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(engine) ⇒ Pull

Returns a new instance of Pull.

Parameters:



10
11
12
13
14
# File 'lib/omq/routing/pull.rb', line 10

def initialize(engine)
  @engine     = engine
  @recv_queue = Routing.build_queue(engine.options.recv_hwm, :block)
  @tasks      = []
end

Instance Attribute Details

#recv_queueAsync::LimitedQueue (readonly)

Returns:

  • (Async::LimitedQueue)


19
20
21
# File 'lib/omq/routing/pull.rb', line 19

def recv_queue
  @recv_queue
end

Instance Method Details

#connection_added(connection) ⇒ Object

Parameters:

  • connection (Connection)


44
45
46
47
# File 'lib/omq/routing/pull.rb', line 44

def connection_added(connection)
  task = @engine.start_recv_pump(connection, @recv_queue)
  @tasks << task if task
end

#connection_removed(connection) ⇒ Object

Parameters:

  • connection (Connection)


52
53
54
# File 'lib/omq/routing/pull.rb', line 52

def connection_removed(connection)
  # recv pump stops on EOFError via its connection barrier
end

#dequeue_recvArray<String>?

Dequeues the next received message. Blocks until one is available. Engine-facing contract — Engine must not touch @recv_queue directly.

Returns:

  • (Array<String>, nil)


27
28
29
# File 'lib/omq/routing/pull.rb', line 27

def dequeue_recv
  @recv_queue.dequeue
end

#enqueue(_parts) ⇒ Object

PULL is read-only.



59
60
61
# File 'lib/omq/routing/pull.rb', line 59

def enqueue(_parts)
  raise "PULL sockets cannot send"
end

#stopvoid

This method returns an undefined value.

Stops all background tasks.



68
69
70
71
# File 'lib/omq/routing/pull.rb', line 68

def stop
  @tasks.each(&:stop)
  @tasks.clear
end

#unblock_recvvoid

This method returns an undefined value.

Wakes a blocked #dequeue_recv with a nil sentinel. Called by Engine on close or fatal-error propagation.



37
38
39
# File 'lib/omq/routing/pull.rb', line 37

def unblock_recv
  @recv_queue.enqueue(nil)
end