Class: OMQ::Routing::Rep

Inherits:
Object
  • Object
show all
Defined in:
lib/omq/routing/rep.rb

Overview

REP socket routing: fair-queue receive, reply routed back to sender.

REP strips the routing envelope (everything up to and including the empty delimiter) on receive, saves it internally, and restores it on send.

Constant Summary collapse

EMPTY_FRAME =
"".b.freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(engine) ⇒ Rep

Returns a new instance of Rep.

Parameters:



22
23
24
25
26
27
28
29
# File 'lib/omq/routing/rep.rb', line 22

def initialize(engine)
  @engine          = engine
  @recv_queue      = Routing.build_queue(engine.options.recv_hwm, :block)
  @pending_replies = []
  @conn_queues     = {}  # connection => per-connection send queue
  @conn_send_tasks = {}  # connection => send pump task
  @tasks           = []
end

Instance Attribute Details

#recv_queueAsync::LimitedQueue (readonly)

Returns:

  • (Async::LimitedQueue)


17
18
19
# File 'lib/omq/routing/rep.rb', line 17

def recv_queue
  @recv_queue
end

Instance Method Details

#connection_added(connection) ⇒ Object

Parameters:

  • connection (Connection)


52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/omq/routing/rep.rb', line 52

def connection_added(connection)
  task = @engine.start_recv_pump(connection, @recv_queue) do |msg|
    delimiter = msg.index { |p| p.empty? } || msg.size
    envelope  = msg[0, delimiter]
    body      = msg[(delimiter + 1)..] || []

    @pending_replies << { conn: connection, envelope: envelope }
    body
  end
  @tasks << task if task

  q = Routing.build_queue(@engine.options.send_hwm, :block)
  @conn_queues[connection] = q
  @conn_send_tasks[connection] = ConnSendPump.start(@engine, connection, q, @tasks)
end

#connection_removed(connection) ⇒ Object

Parameters:

  • connection (Connection)


71
72
73
74
75
# File 'lib/omq/routing/rep.rb', line 71

def connection_removed(connection)
  @pending_replies.reject! { |r| r[:conn] == connection }
  @conn_queues.delete(connection)
  @conn_send_tasks.delete(connection)&.stop
end

#dequeue_recvArray<String>?

Dequeues the next received message. Blocks until one is available.

Returns:

  • (Array<String>, nil)


36
37
38
# File 'lib/omq/routing/rep.rb', line 36

def dequeue_recv
  @recv_queue.dequeue
end

#enqueue(parts) ⇒ Object

Enqueues a reply. Routes to the connection that sent the matching request by consuming the next pending_reply entry.

Parameters:

  • parts (Array<String>)


83
84
85
86
87
88
# File 'lib/omq/routing/rep.rb', line 83

def enqueue(parts)
  reply_info = @pending_replies.shift
  return unless reply_info
  conn = reply_info[:conn]
  @conn_queues[conn]&.enqueue([*reply_info[:envelope], EMPTY_FRAME, *parts])
end

#send_queues_drained?Boolean

Returns true when all per-connection send queues are empty.

Returns:

  • (Boolean)

    true when all per-connection send queues are empty



103
104
105
# File 'lib/omq/routing/rep.rb', line 103

def send_queues_drained?
  @conn_queues.values.all?(&:empty?)
end

#stopvoid

This method returns an undefined value.

Stops all background tasks.



95
96
97
98
# File 'lib/omq/routing/rep.rb', line 95

def stop
  @tasks.each(&:stop)
  @tasks.clear
end

#unblock_recvvoid

This method returns an undefined value.

Wakes a blocked #dequeue_recv with a nil sentinel.



45
46
47
# File 'lib/omq/routing/rep.rb', line 45

def unblock_recv
  @recv_queue.enqueue(nil)
end