Class: OMQ::Routing::Pair

Inherits:
Object
  • Object
show all
Defined in:
lib/omq/routing/pair.rb

Overview

PAIR socket routing: exclusive 1-to-1 bidirectional.

Only one peer connection is allowed at a time. The send queue is socket-level (one shared bounded queue), and a single send pump fiber drains it into the connected peer. On disconnect, the in-flight batch is dropped (matching libzmq).

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(engine) ⇒ Pair

Returns a new instance of Pair.

Parameters:



20
21
22
23
24
25
# File 'lib/omq/routing/pair.rb', line 20

def initialize(engine)
  @engine     = engine
  @connection = nil
  @recv_queue = Routing.build_queue(engine.options.recv_hwm, :block)
  @send_queue = Routing.build_queue(engine.options.send_hwm, :block)
end

Instance Attribute Details

#recv_queueAsync::LimitedQueue (readonly)

Returns:

  • (Async::LimitedQueue)


15
16
17
# File 'lib/omq/routing/pair.rb', line 15

def recv_queue
  @recv_queue
end

Instance Method Details

#connection_added(connection) ⇒ Object

Parameters:

  • connection (Protocol::ZMTP::Connection)

Raises:

  • (RuntimeError)

    if a connection already exists



49
50
51
52
53
54
55
56
57
58
# File 'lib/omq/routing/pair.rb', line 49

def connection_added(connection)
  raise "PAIR allows only one peer" if @connection
  @connection = connection

  @engine.start_recv_pump(connection, @recv_queue)

  unless connection.is_a?(Transport::Inproc::Pipe)
    start_send_pump(connection)
  end
end

#connection_removed(connection) ⇒ Object

Parameters:

  • connection (Protocol::ZMTP::Connection)


63
64
65
# File 'lib/omq/routing/pair.rb', line 63

def connection_removed(connection)
  @connection = nil if @connection == connection
end

#dequeue_recvArray<String>?

Dequeues the next received message. Blocks until one is available.

Returns:

  • (Array<String>, nil)


32
33
34
# File 'lib/omq/routing/pair.rb', line 32

def dequeue_recv
  @recv_queue.dequeue
end

#enqueue(parts) ⇒ Object

Parameters:

  • parts (Array<String>)


70
71
72
73
74
75
76
77
# File 'lib/omq/routing/pair.rb', line 70

def enqueue(parts)
  conn = @connection
  if conn.is_a?(Transport::Inproc::Pipe) && conn.direct_recv_queue
    conn.send_message(parts)
  else
    @send_queue.enqueue(parts)
  end
end

#send_queues_drained?Boolean

Returns true when the shared send queue is empty.

Returns:

  • (Boolean)

    true when the shared send queue is empty



82
83
84
# File 'lib/omq/routing/pair.rb', line 82

def send_queues_drained?
  @send_queue.empty?
end

#unblock_recvvoid

This method returns an undefined value.

Wakes a blocked #dequeue_recv with a nil sentinel.



41
42
43
# File 'lib/omq/routing/pair.rb', line 41

def unblock_recv
  @recv_queue.enqueue(nil)
end