Module: OMQ::Routing::ConnSendPump

Defined in:
lib/omq/routing/conn_send_pump.rb

Overview

Starts a dedicated send pump for one per-connection send queue.

Used by Router and Rep, which have per-connection queues but do not include the RoundRobin mixin.

Class Method Summary collapse

Class Method Details

.start(engine, conn, q, tasks) ⇒ Async::Task

Spawns the pump task and registers it in tasks.

Parameters:

  • engine (Engine)
  • conn (Connection)
  • q (Async::LimitedQueue)
  • tasks (Array)

Returns:

  • (Async::Task)


19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/omq/routing/conn_send_pump.rb', line 19

def self.start(engine, conn, q, tasks)
  task = engine.spawn_conn_pump_task(conn, annotation: "send pump") do
    loop do
      batch = [q.dequeue]
      Routing.drain_send_queue(q, batch)

      if batch.size == 1
        conn.write_message batch.first
      else
        conn.write_messages batch
      end

      conn.flush

      batch.each { |parts| engine.emit_verbose_msg_sent(conn, parts) }
    end
  end

  tasks << task
  task
end