Class: OMQ::Engine::RecvPump

Inherits:
Object
  • Object
show all
Defined in:
lib/omq/engine/recv_pump.rb

Overview

Recv pump for a connection.

For inproc DirectPipe: wires the direct recv path (no fiber spawned). For TCP/IPC: spawns a transient task that reads messages from the connection and enqueues them into recv_queue.

The two-method structure (with/without transform) is intentional for YJIT: it gives the JIT a monomorphic call per routing strategy instead of a megamorphic ‘transform.call` dispatch inside a shared loop.

Constant Summary collapse

FAIRNESS_MESSAGES =

Max messages read from one connection before yielding to the scheduler. Prevents a busy peer from starving its siblings in fair-queue recv sockets. Symmetric with RoundRobin send batching.

256
FAIRNESS_BYTES =

Max bytes read from one connection before yielding. Only counted for ZMTP connections (inproc skips the check). Complements FAIRNESS_MESSAGES: small-message floods are bounded by count, large-message floods by bytes. Symmetric with RoundRobin send batching.

512 * 1024

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(conn, recv_queue, engine) ⇒ RecvPump

Returns a new instance of RecvPump.

Parameters:



47
48
49
50
51
52
# File 'lib/omq/engine/recv_pump.rb', line 47

def initialize(conn, recv_queue, engine)
  @conn        = conn
  @recv_queue  = recv_queue
  @engine      = engine
  @count_bytes = conn.instance_of?(Protocol::ZMTP::Connection)
end

Class Method Details

.start(parent, conn, recv_queue, engine, transform) ⇒ Async::Task?

Public entry point — callers use the class method.

Parameters:

  • parent (Async::Task, Async::Barrier)

    parent to spawn under

  • conn (Connection, Transport::Inproc::DirectPipe)
  • recv_queue (Async::LimitedQueue)
  • engine (Engine)
  • transform (Proc, nil)

Returns:

  • (Async::Task, nil)


38
39
40
# File 'lib/omq/engine/recv_pump.rb', line 38

def self.start(parent, conn, recv_queue, engine, transform)
  new(conn, recv_queue, engine).start(parent, transform)
end

Instance Method Details

#start(parent_task, transform) ⇒ Async::Task?

Starts the recv pump. For inproc DirectPipe, wires the direct path (no task spawned). For TCP/IPC, spawns a fiber that reads messages.

Parameters:

  • parent_task (Async::Task)
  • transform (Proc, nil)

    optional per-message transform

Returns:

  • (Async::Task, nil)


62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/omq/engine/recv_pump.rb', line 62

def start(parent_task, transform)
  if @conn.is_a?(Transport::Inproc::DirectPipe) && @conn.peer
    @conn.peer.direct_recv_queue     = @recv_queue
    @conn.peer.direct_recv_transform = transform
    return nil
  end

  if transform
    start_with_transform(parent_task, transform)
  else
    start_direct(parent_task)
  end
end