Class: OMQ::Routing::Rep

Inherits:
Object
  • Object
show all
Defined in:
lib/omq/routing/rep.rb

Overview

REP socket routing: fair-queue receive, reply routed back to sender.

REP strips the routing envelope (everything up to and including the empty delimiter) on receive, saves it internally, and restores it on send.

Constant Summary collapse

EMPTY_FRAME =
"".b.freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(engine) ⇒ Rep

Returns a new instance of Rep.

Parameters:



22
23
24
25
26
27
# File 'lib/omq/routing/rep.rb', line 22

def initialize(engine)
  @engine          = engine
  @recv_queue      = Routing.build_queue(engine.options.recv_hwm, :block)
  @pending_replies = []
  @conn_queues     = {}
end

Instance Attribute Details

#recv_queueAsync::LimitedQueue (readonly)

Returns:

  • (Async::LimitedQueue)


17
18
19
# File 'lib/omq/routing/rep.rb', line 17

def recv_queue
  @recv_queue
end

Instance Method Details

#connection_added(connection) ⇒ Object

Parameters:

  • connection (Connection)


50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/omq/routing/rep.rb', line 50

def connection_added(connection)
  @engine.start_recv_pump(connection, @recv_queue) do |msg|
    delimiter = msg.index { |p| p.empty? } || msg.size
    envelope  = msg[0, delimiter]
    body      = msg[(delimiter + 1)..] || []

    @pending_replies << [connection, envelope]
    body
  end

  q = Routing.build_queue(@engine.options.send_hwm, :block)
  @conn_queues[connection] = q
  ConnSendPump.start(@engine, connection, q)
end

#connection_removed(connection) ⇒ Object

Parameters:

  • connection (Connection)


68
69
70
71
# File 'lib/omq/routing/rep.rb', line 68

def connection_removed(connection)
  @pending_replies.reject! { |r| r[0] == connection }
  @conn_queues.delete(connection)
end

#dequeue_recvArray<String>?

Dequeues the next received message. Blocks until one is available.

Returns:

  • (Array<String>, nil)


34
35
36
# File 'lib/omq/routing/rep.rb', line 34

def dequeue_recv
  @recv_queue.dequeue
end

#enqueue(parts) ⇒ Object

Enqueues a reply. Routes to the connection that sent the matching request by consuming the next pending_reply entry.

Parameters:

  • parts (Array<String>)


79
80
81
82
83
84
85
86
87
88
# File 'lib/omq/routing/rep.rb', line 79

def enqueue(parts)
  reply_info = @pending_replies.shift
  return unless reply_info

  conn, envelope = reply_info
  msg = envelope
  msg << EMPTY_FRAME
  msg.concat(parts)
  @conn_queues[conn]&.enqueue(msg)
end

#send_queues_drained?Boolean

Returns true when all per-connection send queues are empty.

Returns:

  • (Boolean)

    true when all per-connection send queues are empty



93
94
95
# File 'lib/omq/routing/rep.rb', line 93

def send_queues_drained?
  @conn_queues.values.all?(&:empty?)
end

#unblock_recvvoid

This method returns an undefined value.

Wakes a blocked #dequeue_recv with a nil sentinel.



43
44
45
# File 'lib/omq/routing/rep.rb', line 43

def unblock_recv
  @recv_queue.enqueue(nil)
end