Module: OMQ::Routing::ConnSendPump

Defined in:
lib/omq/routing/conn_send_pump.rb

Overview

Starts a dedicated send pump for one per-connection send queue.

Used by Router and Rep, which have per-connection queues but do not include the RoundRobin mixin.

Class Method Summary collapse

Class Method Details

.start(engine, conn, q) ⇒ Async::Task

Spawns the pump task on the connection’s lifecycle barrier so it is torn down with the rest of the connection’s pumps.

Parameters:

  • engine (Engine)
  • conn (Protocol::ZMTP::Connection)
  • q (Async::LimitedQueue)

Returns:

  • (Async::Task)


19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/omq/routing/conn_send_pump.rb', line 19

def self.start(engine, conn, q)
  engine.spawn_conn_pump_task(conn, annotation: "send pump") do
    batch = []

    loop do
      Routing.dequeue_batch(q, batch)

      if batch.size == 1
        conn.write_message batch.first
      else
        conn.write_messages batch
      end

      conn.flush

      batch.each do |parts|
        engine.emit_verbose_msg_sent(conn, parts)
      end

      batch.clear
    end
  end
end