Class: OMQ::Routing::Rep

Inherits:
Object
  • Object
show all
Includes:
FairRecv
Defined in:
lib/omq/routing/rep.rb

Overview

REP socket routing: fair-queue receive, reply routed back to sender.

REP strips the routing envelope (everything up to and including the empty delimiter) on receive, saves it internally, and restores it on send.

Constant Summary collapse

EMPTY_FRAME =
"".b.freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from FairRecv

#dequeue_recv, #unblock_recv

Constructor Details

#initialize(engine) ⇒ Rep

Returns a new instance of Rep.

Parameters:



24
25
26
27
28
29
30
31
# File 'lib/omq/routing/rep.rb', line 24

def initialize(engine)
  @engine          = engine
  @recv_queue      = FairQueue.new
  @pending_replies = []
  @conn_queues     = {}  # connection => per-connection send queue
  @conn_send_tasks = {}  # connection => send pump task
  @tasks           = []
end

Instance Attribute Details

#recv_queueFairQueue (readonly)

Returns:



19
20
21
# File 'lib/omq/routing/rep.rb', line 19

def recv_queue
  @recv_queue
end

Instance Method Details

#connection_added(connection) ⇒ Object

Parameters:

  • connection (Connection)


36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/omq/routing/rep.rb', line 36

def connection_added(connection)
  add_fair_recv_connection(connection) do |msg|
    delimiter = msg.index { |p| p.empty? } || msg.size
    envelope  = msg[0, delimiter]
    body      = msg[(delimiter + 1)..] || []

    @pending_replies << { conn: connection, envelope: envelope }
    body
  end

  q = Routing.build_queue(@engine.options.send_hwm, :block)
  @conn_queues[connection] = q
  @conn_send_tasks[connection] = ConnSendPump.start(@engine, connection, q, @tasks)
end

#connection_removed(connection) ⇒ Object

Parameters:

  • connection (Connection)


54
55
56
57
58
59
# File 'lib/omq/routing/rep.rb', line 54

def connection_removed(connection)
  @pending_replies.reject! { |r| r[:conn] == connection }
  @recv_queue.remove_queue(connection)
  @conn_queues.delete(connection)
  @conn_send_tasks.delete(connection)&.stop
end

#enqueue(parts) ⇒ Object

Enqueues a reply. Routes to the connection that sent the matching request by consuming the next pending_reply entry.

Parameters:

  • parts (Array<String>)


67
68
69
70
71
72
# File 'lib/omq/routing/rep.rb', line 67

def enqueue(parts)
  reply_info = @pending_replies.shift
  return unless reply_info
  conn = reply_info[:conn]
  @conn_queues[conn]&.enqueue([*reply_info[:envelope], EMPTY_FRAME, *parts])
end

#send_queues_drained?Boolean

Returns true when all per-connection send queues are empty.

Returns:

  • (Boolean)

    true when all per-connection send queues are empty



87
88
89
# File 'lib/omq/routing/rep.rb', line 87

def send_queues_drained?
  @conn_queues.values.all?(&:empty?)
end

#stopvoid

This method returns an undefined value.

Stops all background tasks.



79
80
81
82
# File 'lib/omq/routing/rep.rb', line 79

def stop
  @tasks.each(&:stop)
  @tasks.clear
end