Class: Winloop::Backend
- Inherits:
-
Object
- Object
- Winloop::Backend
- Defined in:
- lib/winloop/ops.rb,
ext/winloop/winloop.c
Overview
The validated, keyword-argument face of the generic-op preparation primitive. Suite convention: validated kwargs live in Ruby; the C bridge (_op_prepare) is underscore-private so the checks can't be skipped.
Instance Method Summary collapse
-
#_op_prepare(vhandle, vtag, vcap) ⇒ Object
Private primitive behind Winloop::Backend#op_prepare (lib/winloop/ops.rb owns the kwargs + range validation).
-
#associate(vhandle) ⇒ Object
Associate
handle(opened FILE_FLAG_OVERLAPPED) with the backend's IOCP under EXT_KEY. -
#cancel(vid) ⇒ Object
Cancel a pending poll.
- #initialize ⇒ Object constructor
-
#op_abandon(vid) ⇒ Object
For a native call that failed SYNCHRONOUSLY (GetLastError != ERROR_IO_PENDING): the kernel never saw the op, no packet will ever arrive — free immediately.
-
#op_cancel(vid) ⇒ Object
CancelIoEx(handle, ov) — targeted, cross-thread-safe.
-
#op_free(vid) ⇒ Object
Retire a COMPLETED op without materializing its data (orphaned completions).
-
#op_prepare(handle, tag: 0, capacity: 0) ⇒ Object
Allocate one backend-owned op record (state :prepared): a zeroed OVERLAPPED, a
tagechoed back in the completion tuple, and an optional embedded data buffer. -
#op_result(vid) ⇒ Object
[bytes, error, data] for a COMPLETED op; frees the record.
- #op_state(vid) ⇒ Object
-
#op_submitted(vid) ⇒ Object
Call after the client's native call returned success OR ERROR_IO_PENDING — both queue exactly one completion packet (no skip-on-success, ever).
-
#poll(io, vevents) ⇒ Object
Arm a one-shot readiness poll on
ioforevents; returns the request id. -
#port_handle ⇒ Object
The raw IOCP HANDLE value — read-only, for tests and diagnostics.
- #shutdown ⇒ Object
- #wait(vtimeout_ms) ⇒ Object
- #wakeup ⇒ Object
Constructor Details
#initialize ⇒ Object
307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 |
# File 'ext/winloop/winloop.c', line 307
static VALUE backend_initialize(VALUE self) {
winloop_backend *b;
TypedData_Get_Struct(self, winloop_backend, &backend_type, b);
if (!resolve_ntdll()) rb_raise(eError, "winloop: could not resolve ntdll entry points");
b->iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
if (!b->iocp) rb_raise(eError, "winloop: CreateIoCompletionPort failed (%lu)", GetLastError());
b->afd = afd_open();
if (b->afd == INVALID_HANDLE_VALUE) {
CloseHandle(b->iocp); b->iocp = NULL;
rb_raise(eError, "winloop: could not open \\Device\\Afd (unsupported platform?)");
}
if (!CreateIoCompletionPort(b->afd, b->iocp, AFD_KEY, 0)) {
CloseHandle(b->afd); CloseHandle(b->iocp); b->afd = INVALID_HANDLE_VALUE; b->iocp = NULL;
rb_raise(eError, "winloop: could not associate AFD handle (%lu)", GetLastError());
}
b->next_id = 0;
b->reqs = st_init_numtable();
/* st_data_t is pointer-sized; numtables hold ids/addresses/handles arm64-clean. */
b->ops_by_id = st_init_numtable();
b->ops_by_addr = st_init_numtable();
b->assoc_handles = st_init_numtable();
b->op_bytes = 0;
b->closed = 0;
return self;
}
|
Instance Method Details
#_op_prepare(vhandle, vtag, vcap) ⇒ Object
Private primitive behind Winloop::Backend#op_prepare (lib/winloop/ops.rb owns the kwargs + range validation). Allocates one op record: zeroed OVERLAPPED + tag + optional embedded buffer; returns [op_id, ov_addr, buf_addr].
434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 |
# File 'ext/winloop/winloop.c', line 434
static VALUE backend__op_prepare(VALUE self, VALUE vhandle, VALUE vtag, VALUE vcap) {
winloop_backend *b = get_backend_ops(self);
/* All coercion before any allocation (raise hygiene). */
uint64_t hv = NUM2ULL(vhandle);
uint64_t tag = NUM2ULL(vtag);
uint64_t cap = NUM2ULL(vcap);
size_t total;
winloop_op *op;
if (cap > 16ull * 1024 * 1024) /* defensive twin of OP_CAPACITY_MAX (send bypass) */
rb_raise(rb_eArgError, "winloop: capacity %llu exceeds OP_CAPACITY_MAX",
(unsigned long long)cap);
/* The anti-hang check (§3.4 rule 8): an op on a handle that is not on the
* port never completes — not an error, a silent loop freeze. Honestly: a
* handle that was associated, closed and recycled to the same value passes. */
if (!st_lookup(b->assoc_handles, (st_data_t)hv, NULL))
rb_raise(eError, "winloop: handle %llu was never associated with this backend "
"— call #associate first", (unsigned long long)hv);
total = OP_BUF_OFFSET + (size_t)cap;
op = (winloop_op *)xmalloc(total); /* a raise above this line leaks nothing */
memset(op, 0, total); /* zeroed OVERLAPPED (offset 0 reads); buffer never leaks heap */
op->id = ++b->next_id;
op->tag = tag;
op->handle = (HANDLE)(uintptr_t)hv;
op->state = OP_PREPARED;
op->cap = (size_t)cap;
st_insert(b->ops_by_id, (st_data_t)op->id, (st_data_t)op);
/* An OOM longjmp from this second insert leaks one record until the shutdown
* sweep — the identical, accepted exposure as backend_poll's st_insert. */
st_insert(b->ops_by_addr, (st_data_t)(uintptr_t)&op->ov, (st_data_t)op);
b->op_bytes += total;
return rb_ary_new_from_args(3, ULL2NUM(op->id), ULL2NUM((uintptr_t)&op->ov),
cap ? ULL2NUM((uintptr_t)((char *)op + OP_BUF_OFFSET))
: ULL2NUM(0));
}
|
#associate(vhandle) ⇒ Object
Associate handle (opened FILE_FLAG_OVERLAPPED) with the backend's IOCP under
EXT_KEY. Permanent for the handle's lifetime (Win32: one port per handle, no
disassociation); the caller keeps ownership.
414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 |
# File 'ext/winloop/winloop.c', line 414
static VALUE backend_associate(VALUE self, VALUE vhandle) {
winloop_backend *b = get_backend_ops(self);
uint64_t hv = NUM2ULL(vhandle); /* coercion first: a raise here owns nothing */
DWORD gle;
if (hv == 0 || hv == UINT64_MAX)
rb_raise(rb_eArgError, "winloop: %llu is not a usable HANDLE value",
(unsigned long long)hv);
if (!CreateIoCompletionPort((HANDLE)(uintptr_t)hv, b->iocp, EXT_KEY, 0)) {
gle = GetLastError(); /* captured before any Ruby allocation */
rb_raise(eError, "winloop: could not associate handle %llu with the completion "
"port (error %lu: already associated with a completion port, or not "
"opened with FILE_FLAG_OVERLAPPED)", (unsigned long long)hv, gle);
}
st_insert(b->assoc_handles, (st_data_t)hv, (st_data_t)1);
return Qtrue;
}
|
#cancel(vid) ⇒ Object
Cancel a pending poll. The cancelled op still posts a (STATUS_CANCELLED) completion, which #wait reaps and frees — so we do NOT free here.
367 368 369 370 371 372 373 374 375 376 |
# File 'ext/winloop/winloop.c', line 367
static VALUE backend_cancel(VALUE self, VALUE vid) {
winloop_backend *b = get_backend(self);
uint64_t id = NUM2ULL(vid);
st_data_t val;
if (st_lookup(b->reqs, (st_data_t)id, &val)) {
winloop_req *req = (winloop_req *)val;
CancelIoEx(b->afd, &req->ov);
}
return Qnil;
}
|
#op_abandon(vid) ⇒ Object
For a native call that failed SYNCHRONOUSLY (GetLastError != ERROR_IO_PENDING): the kernel never saw the op, no packet will ever arrive — free immediately.
484 485 486 487 488 489 490 491 492 493 494 495 |
# File 'ext/winloop/winloop.c', line 484
static VALUE backend_op_abandon(VALUE self, VALUE vid) {
winloop_backend *b = get_backend_ops(self);
winloop_op *op = op_lookup(b, NUM2ULL(vid));
if (op->state == OP_SUBMITTED)
rb_raise(eError, "winloop: op %llu is submitted; a submitted op is retired by "
"its completion, not op_abandon", (unsigned long long)op->id);
if (op->state == OP_COMPLETED)
rb_raise(eError, "winloop: op %llu is completed; retire it with op_result or "
"op_free, not op_abandon", (unsigned long long)op->id);
op_retire(b, op);
return Qtrue;
}
|
#op_cancel(vid) ⇒ Object
CancelIoEx(handle, ov) — targeted, cross-thread-safe. NEVER frees: a cancelled op still posts a packet (usually error 995) which #wait reaps. Returns true if a cancel was issued; false on ERROR_NOT_FOUND (already completed — benign) or ANY other CancelIoEx failure (rb_warn'ed, never raised: await_op's ensure path must never mask an in-flight Timeout/Fiber#raise unwind with a Winloop::Error).
502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 |
# File 'ext/winloop/winloop.c', line 502
static VALUE backend_op_cancel(VALUE self, VALUE vid) {
winloop_backend *b = get_backend_ops(self);
winloop_op *op = op_lookup(b, NUM2ULL(vid));
DWORD gle;
if (op->state == OP_PREPARED)
rb_raise(eError, "winloop: op %llu is prepared; only a submitted op can be "
"cancelled (a prepared op is retired with op_abandon)",
(unsigned long long)op->id);
if (CancelIoEx(op->handle, &op->ov)) return Qtrue;
gle = GetLastError(); /* captured immediately, before any Ruby call */
if (gle != ERROR_NOT_FOUND)
rb_warn("winloop: CancelIoEx for op %llu failed (error %lu) — treated as "
"no-cancel, never an exception", (unsigned long long)op->id, gle);
return Qfalse;
}
|
#op_free(vid) ⇒ Object
Retire a COMPLETED op without materializing its data (orphaned completions).
540 541 542 543 544 545 546 547 548 549 |
# File 'ext/winloop/winloop.c', line 540
static VALUE backend_op_free(VALUE self, VALUE vid) {
winloop_backend *b = get_backend_ops(self);
winloop_op *op = op_lookup(b, NUM2ULL(vid));
if (op->state != OP_COMPLETED)
rb_raise(eError, "winloop: op %llu is %s; op_free is only legal once its "
"completion has been reaped by #wait",
(unsigned long long)op->id, op_state_name(op->state));
op_retire(b, op);
return Qtrue;
}
|
#op_prepare(handle, tag: 0, capacity: 0) ⇒ Object
Allocate one backend-owned op record (state :prepared): a zeroed OVERLAPPED,
a tag echoed back in the completion tuple, and an optional embedded data
buffer. Returns [op_id, ov_addr, buf_addr] (all Integers; buf_addr is 0 when
capacity is 0). handle must have been passed to #associate on this backend
first — an op on a handle that is not on the port never completes (a silent
hang, not an error), so unknown handles raise Winloop::Error up front.
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
# File 'lib/winloop/ops.rb', line 18 def op_prepare(handle, tag: 0, capacity: 0) raise TypeError, "handle must be an Integer (got #{handle.class})" unless handle.is_a?(Integer) raise TypeError, "tag must be an Integer (got #{tag.class})" unless tag.is_a?(Integer) raise TypeError, "capacity must be an Integer (got #{capacity.class})" unless capacity.is_a?(Integer) if handle.zero? || handle == 0xFFFF_FFFF_FFFF_FFFF || !handle.between?(1, 2**64 - 1) raise ArgumentError, "#{handle} is not a usable HANDLE value" end unless tag.between?(0, 2**64 - 1) raise ArgumentError, "tag must be in 0..2**64-1 (got #{tag})" end unless capacity.between?(0, OP_CAPACITY_MAX) raise ArgumentError, "capacity must be in 0..#{OP_CAPACITY_MAX} (got #{capacity})" end _op_prepare(handle, tag, capacity) end |
#op_result(vid) ⇒ Object
[bytes, error, data] for a COMPLETED op; frees the record. The String is built
BEFORE the free so an OOM raise leaves the op intact and retryable. data is
clamped to min(bytes, capacity): a stray packet's bogus byte count cannot make
winloop read past its own buffer.
522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 |
# File 'ext/winloop/winloop.c', line 522
static VALUE backend_op_result(VALUE self, VALUE vid) {
winloop_backend *b = get_backend_ops(self);
winloop_op *op = op_lookup(b, NUM2ULL(vid));
VALUE data = Qnil, result;
if (op->state != OP_COMPLETED)
rb_raise(eError, "winloop: op %llu is %s; op_result is only legal once its "
"completion has been reaped by #wait",
(unsigned long long)op->id, op_state_name(op->state));
if (op->cap) {
size_t n = (size_t)op->bytes < op->cap ? (size_t)op->bytes : op->cap;
data = rb_str_new((const char *)op + OP_BUF_OFFSET, (long)n); /* ASCII-8BIT */
}
result = rb_ary_new_from_args(3, ULONG2NUM(op->bytes), ULONG2NUM(op->error), data);
op_retire(b, op);
return result;
}
|
#op_state(vid) ⇒ Object
551 552 553 554 555 556 557 558 559 |
# File 'ext/winloop/winloop.c', line 551
static VALUE backend_op_state(VALUE self, VALUE vid) {
winloop_backend *b = get_backend_ops(self);
winloop_op *op = op_lookup(b, NUM2ULL(vid));
switch (op->state) {
case OP_PREPARED: return ID2SYM(rb_intern("prepared"));
case OP_SUBMITTED: return ID2SYM(rb_intern("submitted"));
default: return ID2SYM(rb_intern("completed"));
}
}
|
#op_submitted(vid) ⇒ Object
Call after the client's native call returned success OR ERROR_IO_PENDING — both queue exactly one completion packet (no skip-on-success, ever).
472 473 474 475 476 477 478 479 480 |
# File 'ext/winloop/winloop.c', line 472
static VALUE backend_op_submitted(VALUE self, VALUE vid) {
winloop_backend *b = get_backend_ops(self);
winloop_op *op = op_lookup(b, NUM2ULL(vid));
if (op->state != OP_PREPARED)
rb_raise(eError, "winloop: op %llu is %s; op_submitted is only legal on a "
"prepared op", (unsigned long long)op->id, op_state_name(op->state));
op->state = OP_SUBMITTED;
return Qtrue;
}
|
#poll(io, vevents) ⇒ Object
Arm a one-shot readiness poll on io for events; returns the request id.
334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 |
# File 'ext/winloop/winloop.c', line 334
static VALUE backend_poll(VALUE self, VALUE io, VALUE vevents) {
winloop_backend *b = get_backend(self);
int events = NUM2INT(vevents);
int fd = rb_io_descriptor(io);
SOCKET sock = rb_w32_get_osfhandle(fd);
if (sock == INVALID_SOCKET || !rb_w32_is_socket(fd))
rb_raise(eError, "winloop: fd %d is not a socket (winloop polls sockets)", fd);
SOCKET base = base_socket(sock);
winloop_req *req = ALLOC(winloop_req);
memset(req, 0, sizeof(*req));
req->id = ++b->next_id;
req->in.Timeout.QuadPart = INT64_MAX; /* stays pending until an event fires */
req->in.NumberOfHandles = 1;
req->in.Exclusive = FALSE;
req->in.Handles[0].Handle = (HANDLE)base;
req->in.Handles[0].Events = ruby_to_afd(events);
NTSTATUS st = pNtDeviceIoControlFile(b->afd, NULL, NULL, &req->ov,
(PIO_STATUS_BLOCK)&req->ov.Internal, IOCTL_AFD_POLL,
&req->in, sizeof(req->in), &req->out, sizeof(req->out));
if (st != STATUS_PENDING && st != 0 /*STATUS_SUCCESS*/) {
xfree(req);
rb_raise(eError, "winloop: AFD poll failed (status 0x%08lX)", (unsigned long)st);
}
/* STATUS_SUCCESS == satisfied synchronously, but a completion packet is still
queued (we never set FILE_SKIP_COMPLETION_PORT_ON_SUCCESS) — handle uniformly. */
st_insert(b->reqs, (st_data_t)req->id, (st_data_t)req);
return ULL2NUM(req->id);
}
|
#port_handle ⇒ Object
The raw IOCP HANDLE value — read-only, for tests and diagnostics. Hard rules: never wait on it, never close it, never associate handles behind winloop's back, never post after shutdown, at most ONE post per op.
564 565 566 567 |
# File 'ext/winloop/winloop.c', line 564
static VALUE backend_port_handle(VALUE self) {
winloop_backend *b = get_backend_ops(self);
return ULL2NUM((uintptr_t)b->iocp);
}
|
#shutdown ⇒ Object
668 669 670 671 672 673 |
# File 'ext/winloop/winloop.c', line 668
static VALUE backend_shutdown(VALUE self) {
winloop_backend *b;
TypedData_Get_Struct(self, winloop_backend, &backend_type, b);
if (b) backend_drain_and_close(b);
return Qnil;
}
|
#wait(vtimeout_ms) ⇒ Object
594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 |
# File 'ext/winloop/winloop.c', line 594
static VALUE backend_wait(VALUE self, VALUE vtimeout_ms) {
winloop_backend *b = get_backend(self);
DWORD ms = NIL_P(vtimeout_ms) ? INFINITE : (DWORD)NUM2ULONG(vtimeout_ms);
OVERLAPPED_ENTRY ents[REAP_CAP];
gqcs_args a = { b->iocp, ents, REAP_CAP, 0, ms, 0, FALSE };
rb_thread_call_without_gvl(gqcs_call, &a, gqcs_ubf, b->iocp);
BOOL ok = a.ok;
DWORD err = a.err;
ULONG n = a.n;
VALUE result = rb_ary_new();
if (!ok) {
if (err == WAIT_TIMEOUT) return result;
rb_raise(eError, "winloop: GetQueuedCompletionStatusEx failed (%lu)", err);
}
for (ULONG i = 0; i < n; i++) {
if (ents[i].lpCompletionKey == WAKE_KEY || ents[i].lpOverlapped == NULL) continue;
if (ents[i].lpCompletionKey == EXT_KEY) {
/* Generic-op path (winloop 0.2). Defensive dispatch: verify membership
* in ops_by_addr AND op state before acting — an unknown lpOverlapped
* is never cast, an already-COMPLETED op (double post) is never
* re-completed and never yields a second tuple. The only accepted
* anomaly is a packet for a still-PREPARED op (a standalone embedder
* called #wait between the native submit and op_submitted): the packet
* is real and the memory is ours, so accept it with a warning. */
st_data_t val;
winloop_op *op;
if (!st_lookup(b->ops_by_addr, (st_data_t)(uintptr_t)ents[i].lpOverlapped, &val)) {
rb_warn("winloop: dropped a completion packet for an unknown OVERLAPPED "
"0x%llx (stray or duplicate post)",
(unsigned long long)(uintptr_t)ents[i].lpOverlapped);
continue;
}
op = (winloop_op *)val;
if (op->state == OP_COMPLETED) {
rb_warn("winloop: dropped a duplicate completion packet for op %llu "
"(already completed; bytes/error preserved)",
(unsigned long long)op->id);
continue;
}
if (op->state == OP_PREPARED)
rb_warn("winloop: completion packet reaped for op %llu before "
"op_submitted (protocol violation; accepting the packet)",
(unsigned long long)op->id);
op->state = OP_COMPLETED;
op->bytes = ents[i].dwNumberOfBytesTransferred;
/* Status lives in the OVERLAPPED's Internal (an NTSTATUS), readable
* only after the packet left the port; map it to a familiar Win32
* code (0 ok, 995 cancelled, 1022 rescan, 38 EOF, ...). */
op->error = pRtlNtStatusToDosError((NTSTATUS)op->ov.Internal);
rb_ary_push(result, rb_ary_new_from_args(4, ULL2NUM(op->id),
ULONG2NUM(op->bytes), ULONG2NUM(op->error), ULL2NUM(op->tag)));
continue;
}
/* AFD path — byte-for-byte the pre-0.2 code (only the AFD handle carries
* AFD_KEY on this port). */
winloop_req *req = (winloop_req *)ents[i].lpOverlapped;
st_data_t key = (st_data_t)req->id;
st_delete(b->reqs, &key, NULL);
int events = afd_to_ruby(req->out.Handles[0].Events);
VALUE pair = rb_ary_new_from_args(2, ULL2NUM(req->id), INT2NUM(events));
rb_ary_push(result, pair);
xfree(req);
}
return result;
}
|
#wakeup ⇒ Object
662 663 664 665 666 |
# File 'ext/winloop/winloop.c', line 662
static VALUE backend_wakeup(VALUE self) {
winloop_backend *b = get_backend(self);
PostQueuedCompletionStatus(b->iocp, 0, WAKE_KEY, NULL);
return Qnil;
}
|