Class: Hyperion::IOUring::HotpathRing

Inherits:
Object
  • Object
show all
Defined in:
lib/hyperion/io_uring.rb

Overview

Plan #2 — io_uring hot path: multishot accept + multishot recv with kernel-managed buffer rings + send SQEs. One ring per worker; the accept fiber drains the unified completion queue.

Constant Summary collapse

DEFAULT_QUEUE_DEPTH =
1024
DEFAULT_N_BUFS =
512
DEFAULT_BUF_SIZE =
8192
COMPLETION_BYTES =

Layout matches the Rust ‘#[repr©] Completion` struct (hotpath.rs has a compile-time assert of size = 24):

u8 op_kind | i32 fd | i64 result | i32 buf_id | u32 flags

padded to native alignment. Total: 24 bytes.

24
MAX_BATCH =
64
OP_ACCEPT =

Op-kind values must match Rust’s ‘OpKind` enum in hotpath.rs.

1
OP_RECV =
2
OP_SEND =
3
OP_CLOSE =
4

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(queue_depth: DEFAULT_QUEUE_DEPTH, n_bufs: DEFAULT_N_BUFS, buf_size: DEFAULT_BUF_SIZE) ⇒ HotpathRing

Returns a new instance of HotpathRing.

Raises:



163
164
165
166
167
168
169
170
171
172
173
174
175
# File 'lib/hyperion/io_uring.rb', line 163

def initialize(queue_depth: DEFAULT_QUEUE_DEPTH,
               n_bufs: DEFAULT_N_BUFS,
               buf_size: DEFAULT_BUF_SIZE)
  raise Unsupported, 'io_uring hotpath not supported on this host' \
    unless IOUring.hotpath_supported?

  @ptr = IOUring.hotpath_ring_new(queue_depth, n_bufs, buf_size)
  raise Unsupported, 'hotpath ring allocation failed' if @ptr.nil?

  @completion_buf = Fiddle::Pointer.malloc(COMPLETION_BYTES * MAX_BATCH,
                                           Fiddle::RUBY_FREE)
  @closed = false
end

Instance Attribute Details

#ptrObject (readonly)

Returns the value of attribute ptr.



256
257
258
# File 'lib/hyperion/io_uring.rb', line 256

def ptr
  @ptr
end

Instance Method Details

#closeObject



245
246
247
248
249
250
# File 'lib/hyperion/io_uring.rb', line 245

def close
  return if @closed
  @closed = true
  IOUring.hotpath_ring_free(@ptr) if @ptr && !@ptr.null?
  @ptr = nil
end

#closed?Boolean

Returns:

  • (Boolean)


252
253
254
# File 'lib/hyperion/io_uring.rb', line 252

def closed?
  @closed
end

#copy_buffer(buf_id, len) ⇒ Object

Plan #2 Task 2.3.4 — copy ‘len` bytes from kernel buffer-ring slot `buf_id` into a freshly-allocated Ruby String.

The caller MUST call ‘release_buffer(buf_id)` after this returns so the kernel can reuse the slot. One allocation per recv CQE —acceptable as a baseline; the zero-copy variant is deferred.

Raises:

  • (SystemCallError)


230
231
232
233
234
235
# File 'lib/hyperion/io_uring.rb', line 230

def copy_buffer(buf_id, len)
  out = Fiddle::Pointer.malloc(len, Fiddle::RUBY_FREE)
  rc = IOUring.hotpath_copy_buffer(@ptr, buf_id.to_i, len.to_i, out, len.to_i)
  raise SystemCallError.new('hotpath copy_buffer', -rc) if rc.negative?
  out.to_str(rc)
end

#each_completion(min_complete: 1, timeout_ms: 100) ⇒ Object

Drain up to MAX_BATCH completions. Yields each as a frozen Hash; returns the count yielded. Caller is responsible for ‘release_buffer(buf_id)` after consuming a recv buffer view.

If wait_completions returns -1 (ring went unhealthy), this method returns 0 yielded and the caller must check ‘healthy?` to detect the state and fall back to accept4.



202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
# File 'lib/hyperion/io_uring.rb', line 202

def each_completion(min_complete: 1, timeout_ms: 100)
  n = IOUring.hotpath_wait(@ptr, min_complete, timeout_ms,
                           @completion_buf, MAX_BATCH)
  return 0 if n.negative?

  n.times do |i|
    offset = i * COMPLETION_BYTES
    op_kind = @completion_buf[offset, 1].unpack1('C')
    fd      = @completion_buf[offset + 4, 4].unpack1('l<')
    result  = @completion_buf[offset + 8, 8].unpack1('q<')
    buf_id  = @completion_buf[offset + 16, 4].unpack1('l<')
    flags   = @completion_buf[offset + 20, 4].unpack1('L<')
    yield(op_kind: op_kind, fd: fd, result: result,
          buf_id: buf_id, flags: flags)
  end
  n
end

#force_unhealthy!Object



237
238
239
# File 'lib/hyperion/io_uring.rb', line 237

def force_unhealthy!
  IOUring.hotpath_force_unhealthy(@ptr)
end

#healthy?Boolean

Returns:

  • (Boolean)


241
242
243
# File 'lib/hyperion/io_uring.rb', line 241

def healthy?
  IOUring.hotpath_is_healthy(@ptr) == 1
end

#release_buffer(buf_id) ⇒ Object



220
221
222
# File 'lib/hyperion/io_uring.rb', line 220

def release_buffer(buf_id)
  IOUring.hotpath_release_buf(@ptr, buf_id.to_i)
end

#submit_accept_multishot(listener_fd) ⇒ Object

Raises:

  • (SystemCallError)


177
178
179
180
181
# File 'lib/hyperion/io_uring.rb', line 177

def submit_accept_multishot(listener_fd)
  rc = IOUring.hotpath_submit_accept(@ptr, listener_fd.to_i)
  raise SystemCallError.new('hotpath submit_accept', -rc) if rc.negative?
  nil
end

#submit_recv_multishot(fd) ⇒ Object

Raises:

  • (SystemCallError)


183
184
185
186
187
# File 'lib/hyperion/io_uring.rb', line 183

def submit_recv_multishot(fd)
  rc = IOUring.hotpath_submit_recv(@ptr, fd.to_i)
  raise SystemCallError.new('hotpath submit_recv', -rc) if rc.negative?
  nil
end

#submit_send(fd, iov_ptr, iov_count) ⇒ Object

Raises:

  • (SystemCallError)


189
190
191
192
193
# File 'lib/hyperion/io_uring.rb', line 189

def submit_send(fd, iov_ptr, iov_count)
  rc = IOUring.hotpath_submit_send(@ptr, fd.to_i, iov_ptr, iov_count.to_i)
  raise SystemCallError.new('hotpath submit_send', -rc) if rc.negative?
  nil
end