Class: Hyperion::IOUring::HotpathRing
- Inherits:
-
Object
- Object
- Hyperion::IOUring::HotpathRing
- Defined in:
- lib/hyperion/io_uring.rb
Overview
Plan #2 — io_uring hot path: multishot accept + multishot recv with kernel-managed buffer rings + send SQEs. One ring per worker; the accept fiber drains the unified completion queue.
Constant Summary collapse
- DEFAULT_QUEUE_DEPTH =
1024- DEFAULT_N_BUFS =
512- DEFAULT_BUF_SIZE =
8192- COMPLETION_BYTES =
Layout matches the Rust ‘#[repr©] Completion` struct (hotpath.rs has a compile-time assert of size = 24):
u8 op_kind | i32 fd | i64 result | i32 buf_id | u32 flagspadded to native alignment. Total: 24 bytes.
24- MAX_BATCH =
64- OP_ACCEPT =
Op-kind values must match Rust’s ‘OpKind` enum in hotpath.rs.
1- OP_RECV =
2- OP_SEND =
3- OP_CLOSE =
4
Instance Attribute Summary collapse
-
#ptr ⇒ Object
readonly
Returns the value of attribute ptr.
Instance Method Summary collapse
- #close ⇒ Object
- #closed? ⇒ Boolean
-
#copy_buffer(buf_id, len) ⇒ Object
Plan #2 Task 2.3.4 — copy ‘len` bytes from kernel buffer-ring slot `buf_id` into a freshly-allocated Ruby String.
-
#each_completion(min_complete: 1, timeout_ms: 100) ⇒ Object
Drain up to MAX_BATCH completions.
- #force_unhealthy! ⇒ Object
- #healthy? ⇒ Boolean
-
#initialize(queue_depth: DEFAULT_QUEUE_DEPTH, n_bufs: DEFAULT_N_BUFS, buf_size: DEFAULT_BUF_SIZE) ⇒ HotpathRing
constructor
A new instance of HotpathRing.
- #release_buffer(buf_id) ⇒ Object
- #submit_accept_multishot(listener_fd) ⇒ Object
- #submit_recv_multishot(fd) ⇒ Object
- #submit_send(fd, iov_ptr, iov_count) ⇒ Object
Constructor Details
#initialize(queue_depth: DEFAULT_QUEUE_DEPTH, n_bufs: DEFAULT_N_BUFS, buf_size: DEFAULT_BUF_SIZE) ⇒ HotpathRing
Returns a new instance of HotpathRing.
163 164 165 166 167 168 169 170 171 172 173 174 175 |
# File 'lib/hyperion/io_uring.rb', line 163 def initialize(queue_depth: DEFAULT_QUEUE_DEPTH, n_bufs: DEFAULT_N_BUFS, buf_size: DEFAULT_BUF_SIZE) raise Unsupported, 'io_uring hotpath not supported on this host' \ unless IOUring.hotpath_supported? @ptr = IOUring.hotpath_ring_new(queue_depth, n_bufs, buf_size) raise Unsupported, 'hotpath ring allocation failed' if @ptr.nil? @completion_buf = Fiddle::Pointer.malloc(COMPLETION_BYTES * MAX_BATCH, Fiddle::RUBY_FREE) @closed = false end |
Instance Attribute Details
#ptr ⇒ Object (readonly)
Returns the value of attribute ptr.
256 257 258 |
# File 'lib/hyperion/io_uring.rb', line 256 def ptr @ptr end |
Instance Method Details
#close ⇒ Object
245 246 247 248 249 250 |
# File 'lib/hyperion/io_uring.rb', line 245 def close return if @closed @closed = true IOUring.hotpath_ring_free(@ptr) if @ptr && !@ptr.null? @ptr = nil end |
#closed? ⇒ Boolean
252 253 254 |
# File 'lib/hyperion/io_uring.rb', line 252 def closed? @closed end |
#copy_buffer(buf_id, len) ⇒ Object
Plan #2 Task 2.3.4 — copy ‘len` bytes from kernel buffer-ring slot `buf_id` into a freshly-allocated Ruby String.
The caller MUST call ‘release_buffer(buf_id)` after this returns so the kernel can reuse the slot. One allocation per recv CQE —acceptable as a baseline; the zero-copy variant is deferred.
230 231 232 233 234 235 |
# File 'lib/hyperion/io_uring.rb', line 230 def copy_buffer(buf_id, len) out = Fiddle::Pointer.malloc(len, Fiddle::RUBY_FREE) rc = IOUring.hotpath_copy_buffer(@ptr, buf_id.to_i, len.to_i, out, len.to_i) raise SystemCallError.new('hotpath copy_buffer', -rc) if rc.negative? out.to_str(rc) end |
#each_completion(min_complete: 1, timeout_ms: 100) ⇒ Object
Drain up to MAX_BATCH completions. Yields each as a frozen Hash; returns the count yielded. Caller is responsible for ‘release_buffer(buf_id)` after consuming a recv buffer view.
If wait_completions returns -1 (ring went unhealthy), this method returns 0 yielded and the caller must check ‘healthy?` to detect the state and fall back to accept4.
202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 |
# File 'lib/hyperion/io_uring.rb', line 202 def each_completion(min_complete: 1, timeout_ms: 100) n = IOUring.hotpath_wait(@ptr, min_complete, timeout_ms, @completion_buf, MAX_BATCH) return 0 if n.negative? n.times do |i| offset = i * COMPLETION_BYTES op_kind = @completion_buf[offset, 1].unpack1('C') fd = @completion_buf[offset + 4, 4].unpack1('l<') result = @completion_buf[offset + 8, 8].unpack1('q<') buf_id = @completion_buf[offset + 16, 4].unpack1('l<') flags = @completion_buf[offset + 20, 4].unpack1('L<') yield(op_kind: op_kind, fd: fd, result: result, buf_id: buf_id, flags: flags) end n end |
#force_unhealthy! ⇒ Object
237 238 239 |
# File 'lib/hyperion/io_uring.rb', line 237 def force_unhealthy! IOUring.hotpath_force_unhealthy(@ptr) end |
#healthy? ⇒ Boolean
241 242 243 |
# File 'lib/hyperion/io_uring.rb', line 241 def healthy? IOUring.hotpath_is_healthy(@ptr) == 1 end |
#release_buffer(buf_id) ⇒ Object
220 221 222 |
# File 'lib/hyperion/io_uring.rb', line 220 def release_buffer(buf_id) IOUring.hotpath_release_buf(@ptr, buf_id.to_i) end |
#submit_accept_multishot(listener_fd) ⇒ Object
177 178 179 180 181 |
# File 'lib/hyperion/io_uring.rb', line 177 def submit_accept_multishot(listener_fd) rc = IOUring.hotpath_submit_accept(@ptr, listener_fd.to_i) raise SystemCallError.new('hotpath submit_accept', -rc) if rc.negative? nil end |
#submit_recv_multishot(fd) ⇒ Object
183 184 185 186 187 |
# File 'lib/hyperion/io_uring.rb', line 183 def submit_recv_multishot(fd) rc = IOUring.hotpath_submit_recv(@ptr, fd.to_i) raise SystemCallError.new('hotpath submit_recv', -rc) if rc.negative? nil end |
#submit_send(fd, iov_ptr, iov_count) ⇒ Object
189 190 191 192 193 |
# File 'lib/hyperion/io_uring.rb', line 189 def submit_send(fd, iov_ptr, iov_count) rc = IOUring.hotpath_submit_send(@ptr, fd.to_i, iov_ptr, iov_count.to_i) raise SystemCallError.new('hotpath submit_send', -rc) if rc.negative? nil end |