Class: OMQ::Engine::RecvPump
- Inherits:
-
Object
- Object
- OMQ::Engine::RecvPump
- Defined in:
- lib/omq/engine/recv_pump.rb
Overview
Recv pump for a connection.
For inproc DirectPipe: wires the direct recv path (no fiber spawned). For TCP/IPC: spawns a transient task that reads messages from the connection and enqueues them into recv_queue.
The two-method structure (with/without transform) is intentional for YJIT: it gives the JIT a monomorphic call per routing strategy instead of a megamorphic ‘transform.call` dispatch inside a shared loop.
Constant Summary collapse
- FAIRNESS_MESSAGES =
Max messages read from one connection before yielding to the scheduler. Prevents a busy peer from starving its siblings in fair-queue recv sockets.
64- FAIRNESS_BYTES =
Max bytes read from one connection before yielding. Only counted for ZMTP connections (inproc skips the check). Complements FAIRNESS_MESSAGES: small-message floods are bounded by count, large-message floods by bytes.
1 << 20
Class Method Summary collapse
-
.start(parent, conn, recv_queue, engine, transform) ⇒ Async::Task?
Public entry point — callers use the class method.
Instance Method Summary collapse
-
#initialize(conn, recv_queue, engine) ⇒ RecvPump
constructor
A new instance of RecvPump.
-
#start(parent_task, transform) ⇒ Async::Task?
Starts the recv pump.
Constructor Details
#initialize(conn, recv_queue, engine) ⇒ RecvPump
Returns a new instance of RecvPump.
47 48 49 50 51 52 |
# File 'lib/omq/engine/recv_pump.rb', line 47 def initialize(conn, recv_queue, engine) @conn = conn @recv_queue = recv_queue @engine = engine @count_bytes = conn.instance_of?(Protocol::ZMTP::Connection) end |
Class Method Details
.start(parent, conn, recv_queue, engine, transform) ⇒ Async::Task?
Public entry point — callers use the class method.
38 39 40 |
# File 'lib/omq/engine/recv_pump.rb', line 38 def self.start(parent, conn, recv_queue, engine, transform) new(conn, recv_queue, engine).start(parent, transform) end |
Instance Method Details
#start(parent_task, transform) ⇒ Async::Task?
Starts the recv pump. For inproc DirectPipe, wires the direct path (no task spawned). For TCP/IPC, spawns a fiber that reads messages.
62 63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/omq/engine/recv_pump.rb', line 62 def start(parent_task, transform) if @conn.is_a?(Transport::Inproc::DirectPipe) && @conn.peer @conn.peer.direct_recv_queue = @recv_queue @conn.peer.direct_recv_transform = transform return nil end if transform start_with_transform(parent_task, transform) else start_direct(parent_task) end end |