Class: Winloop::Backend

Inherits:
Object
  • Object
show all
Defined in:
lib/winloop/ops.rb,
ext/winloop/winloop.c

Overview

The validated, keyword-argument face of the generic-op preparation primitive. Suite convention: validated kwargs live in Ruby; the C bridge (_op_prepare) is underscore-private so the checks can't be skipped.

Instance Method Summary collapse

Constructor Details

#initializeObject



307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
# File 'ext/winloop/winloop.c', line 307

static VALUE backend_initialize(VALUE self) {
    winloop_backend *b;
    TypedData_Get_Struct(self, winloop_backend, &backend_type, b);
    if (!resolve_ntdll()) rb_raise(eError, "winloop: could not resolve ntdll entry points");
    b->iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
    if (!b->iocp) rb_raise(eError, "winloop: CreateIoCompletionPort failed (%lu)", GetLastError());
    b->afd = afd_open();
    if (b->afd == INVALID_HANDLE_VALUE) {
        CloseHandle(b->iocp); b->iocp = NULL;
        rb_raise(eError, "winloop: could not open \\Device\\Afd (unsupported platform?)");
    }
    if (!CreateIoCompletionPort(b->afd, b->iocp, AFD_KEY, 0)) {
        CloseHandle(b->afd); CloseHandle(b->iocp); b->afd = INVALID_HANDLE_VALUE; b->iocp = NULL;
        rb_raise(eError, "winloop: could not associate AFD handle (%lu)", GetLastError());
    }
    b->next_id = 0;
    b->reqs = st_init_numtable();
    /* st_data_t is pointer-sized; numtables hold ids/addresses/handles arm64-clean. */
    b->ops_by_id     = st_init_numtable();
    b->ops_by_addr   = st_init_numtable();
    b->assoc_handles = st_init_numtable();
    b->op_bytes = 0;
    b->closed = 0;
    return self;
}

Instance Method Details

#_op_prepare(vhandle, vtag, vcap) ⇒ Object

Private primitive behind Winloop::Backend#op_prepare (lib/winloop/ops.rb owns the kwargs + range validation). Allocates one op record: zeroed OVERLAPPED + tag + optional embedded buffer; returns [op_id, ov_addr, buf_addr].



434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
# File 'ext/winloop/winloop.c', line 434

static VALUE backend__op_prepare(VALUE self, VALUE vhandle, VALUE vtag, VALUE vcap) {
    winloop_backend *b = get_backend_ops(self);
    /* All coercion before any allocation (raise hygiene). */
    uint64_t hv  = NUM2ULL(vhandle);
    uint64_t tag = NUM2ULL(vtag);
    uint64_t cap = NUM2ULL(vcap);
    size_t total;
    winloop_op *op;
    if (cap > 16ull * 1024 * 1024) /* defensive twin of OP_CAPACITY_MAX (send bypass) */
        rb_raise(rb_eArgError, "winloop: capacity %llu exceeds OP_CAPACITY_MAX",
                 (unsigned long long)cap);
    /* The anti-hang check (§3.4 rule 8): an op on a handle that is not on the
     * port never completes — not an error, a silent loop freeze. Honestly: a
     * handle that was associated, closed and recycled to the same value passes. */
    if (!st_lookup(b->assoc_handles, (st_data_t)hv, NULL))
        rb_raise(eError, "winloop: handle %llu was never associated with this backend "
                 "— call #associate first", (unsigned long long)hv);

    total = OP_BUF_OFFSET + (size_t)cap;
    op = (winloop_op *)xmalloc(total); /* a raise above this line leaks nothing */
    memset(op, 0, total); /* zeroed OVERLAPPED (offset 0 reads); buffer never leaks heap */
    op->id     = ++b->next_id;
    op->tag    = tag;
    op->handle = (HANDLE)(uintptr_t)hv;
    op->state  = OP_PREPARED;
    op->cap    = (size_t)cap;
    st_insert(b->ops_by_id, (st_data_t)op->id, (st_data_t)op);
    /* An OOM longjmp from this second insert leaks one record until the shutdown
     * sweep — the identical, accepted exposure as backend_poll's st_insert. */
    st_insert(b->ops_by_addr, (st_data_t)(uintptr_t)&op->ov, (st_data_t)op);
    b->op_bytes += total;
    return rb_ary_new_from_args(3, ULL2NUM(op->id), ULL2NUM((uintptr_t)&op->ov),
                                cap ? ULL2NUM((uintptr_t)((char *)op + OP_BUF_OFFSET))
                                    : ULL2NUM(0));
}

#associate(vhandle) ⇒ Object

Associate handle (opened FILE_FLAG_OVERLAPPED) with the backend's IOCP under EXT_KEY. Permanent for the handle's lifetime (Win32: one port per handle, no disassociation); the caller keeps ownership.



414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
# File 'ext/winloop/winloop.c', line 414

static VALUE backend_associate(VALUE self, VALUE vhandle) {
    winloop_backend *b = get_backend_ops(self);
    uint64_t hv = NUM2ULL(vhandle); /* coercion first: a raise here owns nothing */
    DWORD gle;
    if (hv == 0 || hv == UINT64_MAX)
        rb_raise(rb_eArgError, "winloop: %llu is not a usable HANDLE value",
                 (unsigned long long)hv);
    if (!CreateIoCompletionPort((HANDLE)(uintptr_t)hv, b->iocp, EXT_KEY, 0)) {
        gle = GetLastError(); /* captured before any Ruby allocation */
        rb_raise(eError, "winloop: could not associate handle %llu with the completion "
                 "port (error %lu: already associated with a completion port, or not "
                 "opened with FILE_FLAG_OVERLAPPED)", (unsigned long long)hv, gle);
    }
    st_insert(b->assoc_handles, (st_data_t)hv, (st_data_t)1);
    return Qtrue;
}

#cancel(vid) ⇒ Object

Cancel a pending poll. The cancelled op still posts a (STATUS_CANCELLED) completion, which #wait reaps and frees — so we do NOT free here.



367
368
369
370
371
372
373
374
375
376
# File 'ext/winloop/winloop.c', line 367

static VALUE backend_cancel(VALUE self, VALUE vid) {
    winloop_backend *b = get_backend(self);
    uint64_t id = NUM2ULL(vid);
    st_data_t val;
    if (st_lookup(b->reqs, (st_data_t)id, &val)) {
        winloop_req *req = (winloop_req *)val;
        CancelIoEx(b->afd, &req->ov);
    }
    return Qnil;
}

#op_abandon(vid) ⇒ Object

For a native call that failed SYNCHRONOUSLY (GetLastError != ERROR_IO_PENDING): the kernel never saw the op, no packet will ever arrive — free immediately.



484
485
486
487
488
489
490
491
492
493
494
495
# File 'ext/winloop/winloop.c', line 484

static VALUE backend_op_abandon(VALUE self, VALUE vid) {
    winloop_backend *b = get_backend_ops(self);
    winloop_op *op = op_lookup(b, NUM2ULL(vid));
    if (op->state == OP_SUBMITTED)
        rb_raise(eError, "winloop: op %llu is submitted; a submitted op is retired by "
                 "its completion, not op_abandon", (unsigned long long)op->id);
    if (op->state == OP_COMPLETED)
        rb_raise(eError, "winloop: op %llu is completed; retire it with op_result or "
                 "op_free, not op_abandon", (unsigned long long)op->id);
    op_retire(b, op);
    return Qtrue;
}

#op_cancel(vid) ⇒ Object

CancelIoEx(handle, ov) — targeted, cross-thread-safe. NEVER frees: a cancelled op still posts a packet (usually error 995) which #wait reaps. Returns true if a cancel was issued; false on ERROR_NOT_FOUND (already completed — benign) or ANY other CancelIoEx failure (rb_warn'ed, never raised: await_op's ensure path must never mask an in-flight Timeout/Fiber#raise unwind with a Winloop::Error).



502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
# File 'ext/winloop/winloop.c', line 502

static VALUE backend_op_cancel(VALUE self, VALUE vid) {
    winloop_backend *b = get_backend_ops(self);
    winloop_op *op = op_lookup(b, NUM2ULL(vid));
    DWORD gle;
    if (op->state == OP_PREPARED)
        rb_raise(eError, "winloop: op %llu is prepared; only a submitted op can be "
                 "cancelled (a prepared op is retired with op_abandon)",
                 (unsigned long long)op->id);
    if (CancelIoEx(op->handle, &op->ov)) return Qtrue;
    gle = GetLastError(); /* captured immediately, before any Ruby call */
    if (gle != ERROR_NOT_FOUND)
        rb_warn("winloop: CancelIoEx for op %llu failed (error %lu) — treated as "
                "no-cancel, never an exception", (unsigned long long)op->id, gle);
    return Qfalse;
}

#op_free(vid) ⇒ Object

Retire a COMPLETED op without materializing its data (orphaned completions).



540
541
542
543
544
545
546
547
548
549
# File 'ext/winloop/winloop.c', line 540

static VALUE backend_op_free(VALUE self, VALUE vid) {
    winloop_backend *b = get_backend_ops(self);
    winloop_op *op = op_lookup(b, NUM2ULL(vid));
    if (op->state != OP_COMPLETED)
        rb_raise(eError, "winloop: op %llu is %s; op_free is only legal once its "
                 "completion has been reaped by #wait",
                 (unsigned long long)op->id, op_state_name(op->state));
    op_retire(b, op);
    return Qtrue;
}

#op_prepare(handle, tag: 0, capacity: 0) ⇒ Object

Allocate one backend-owned op record (state :prepared): a zeroed OVERLAPPED, a tag echoed back in the completion tuple, and an optional embedded data buffer. Returns [op_id, ov_addr, buf_addr] (all Integers; buf_addr is 0 when capacity is 0). handle must have been passed to #associate on this backend first — an op on a handle that is not on the port never completes (a silent hang, not an error), so unknown handles raise Winloop::Error up front.

Raises:

  • (TypeError)


18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# File 'lib/winloop/ops.rb', line 18

def op_prepare(handle, tag: 0, capacity: 0)
  raise TypeError, "handle must be an Integer (got #{handle.class})"   unless handle.is_a?(Integer)
  raise TypeError, "tag must be an Integer (got #{tag.class})"         unless tag.is_a?(Integer)
  raise TypeError, "capacity must be an Integer (got #{capacity.class})" unless capacity.is_a?(Integer)
  if handle.zero? || handle == 0xFFFF_FFFF_FFFF_FFFF || !handle.between?(1, 2**64 - 1)
    raise ArgumentError, "#{handle} is not a usable HANDLE value"
  end
  unless tag.between?(0, 2**64 - 1)
    raise ArgumentError, "tag must be in 0..2**64-1 (got #{tag})"
  end
  unless capacity.between?(0, OP_CAPACITY_MAX)
    raise ArgumentError, "capacity must be in 0..#{OP_CAPACITY_MAX} (got #{capacity})"
  end
  _op_prepare(handle, tag, capacity)
end

#op_result(vid) ⇒ Object

[bytes, error, data] for a COMPLETED op; frees the record. The String is built BEFORE the free so an OOM raise leaves the op intact and retryable. data is clamped to min(bytes, capacity): a stray packet's bogus byte count cannot make winloop read past its own buffer.



522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
# File 'ext/winloop/winloop.c', line 522

static VALUE backend_op_result(VALUE self, VALUE vid) {
    winloop_backend *b = get_backend_ops(self);
    winloop_op *op = op_lookup(b, NUM2ULL(vid));
    VALUE data = Qnil, result;
    if (op->state != OP_COMPLETED)
        rb_raise(eError, "winloop: op %llu is %s; op_result is only legal once its "
                 "completion has been reaped by #wait",
                 (unsigned long long)op->id, op_state_name(op->state));
    if (op->cap) {
        size_t n = (size_t)op->bytes < op->cap ? (size_t)op->bytes : op->cap;
        data = rb_str_new((const char *)op + OP_BUF_OFFSET, (long)n); /* ASCII-8BIT */
    }
    result = rb_ary_new_from_args(3, ULONG2NUM(op->bytes), ULONG2NUM(op->error), data);
    op_retire(b, op);
    return result;
}

#op_state(vid) ⇒ Object



551
552
553
554
555
556
557
558
559
# File 'ext/winloop/winloop.c', line 551

static VALUE backend_op_state(VALUE self, VALUE vid) {
    winloop_backend *b = get_backend_ops(self);
    winloop_op *op = op_lookup(b, NUM2ULL(vid));
    switch (op->state) {
      case OP_PREPARED:  return ID2SYM(rb_intern("prepared"));
      case OP_SUBMITTED: return ID2SYM(rb_intern("submitted"));
      default:           return ID2SYM(rb_intern("completed"));
    }
}

#op_submitted(vid) ⇒ Object

Call after the client's native call returned success OR ERROR_IO_PENDING — both queue exactly one completion packet (no skip-on-success, ever).



472
473
474
475
476
477
478
479
480
# File 'ext/winloop/winloop.c', line 472

static VALUE backend_op_submitted(VALUE self, VALUE vid) {
    winloop_backend *b = get_backend_ops(self);
    winloop_op *op = op_lookup(b, NUM2ULL(vid));
    if (op->state != OP_PREPARED)
        rb_raise(eError, "winloop: op %llu is %s; op_submitted is only legal on a "
                 "prepared op", (unsigned long long)op->id, op_state_name(op->state));
    op->state = OP_SUBMITTED;
    return Qtrue;
}

#poll(io, vevents) ⇒ Object

Arm a one-shot readiness poll on io for events; returns the request id.



334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
# File 'ext/winloop/winloop.c', line 334

static VALUE backend_poll(VALUE self, VALUE io, VALUE vevents) {
    winloop_backend *b = get_backend(self);
    int events = NUM2INT(vevents);
    int fd = rb_io_descriptor(io);
    SOCKET sock = rb_w32_get_osfhandle(fd);
    if (sock == INVALID_SOCKET || !rb_w32_is_socket(fd))
        rb_raise(eError, "winloop: fd %d is not a socket (winloop polls sockets)", fd);
    SOCKET base = base_socket(sock);

    winloop_req *req = ALLOC(winloop_req);
    memset(req, 0, sizeof(*req));
    req->id = ++b->next_id;
    req->in.Timeout.QuadPart = INT64_MAX; /* stays pending until an event fires */
    req->in.NumberOfHandles = 1;
    req->in.Exclusive = FALSE;
    req->in.Handles[0].Handle = (HANDLE)base;
    req->in.Handles[0].Events = ruby_to_afd(events);

    NTSTATUS st = pNtDeviceIoControlFile(b->afd, NULL, NULL, &req->ov,
        (PIO_STATUS_BLOCK)&req->ov.Internal, IOCTL_AFD_POLL,
        &req->in, sizeof(req->in), &req->out, sizeof(req->out));
    if (st != STATUS_PENDING && st != 0 /*STATUS_SUCCESS*/) {
        xfree(req);
        rb_raise(eError, "winloop: AFD poll failed (status 0x%08lX)", (unsigned long)st);
    }
    /* STATUS_SUCCESS == satisfied synchronously, but a completion packet is still
       queued (we never set FILE_SKIP_COMPLETION_PORT_ON_SUCCESS) — handle uniformly. */
    st_insert(b->reqs, (st_data_t)req->id, (st_data_t)req);
    return ULL2NUM(req->id);
}

#port_handleObject

The raw IOCP HANDLE value — read-only, for tests and diagnostics. Hard rules: never wait on it, never close it, never associate handles behind winloop's back, never post after shutdown, at most ONE post per op.



564
565
566
567
# File 'ext/winloop/winloop.c', line 564

static VALUE backend_port_handle(VALUE self) {
    winloop_backend *b = get_backend_ops(self);
    return ULL2NUM((uintptr_t)b->iocp);
}

#shutdownObject



668
669
670
671
672
673
# File 'ext/winloop/winloop.c', line 668

static VALUE backend_shutdown(VALUE self) {
    winloop_backend *b;
    TypedData_Get_Struct(self, winloop_backend, &backend_type, b);
    if (b) backend_drain_and_close(b);
    return Qnil;
}

#wait(vtimeout_ms) ⇒ Object



594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
# File 'ext/winloop/winloop.c', line 594

static VALUE backend_wait(VALUE self, VALUE vtimeout_ms) {
    winloop_backend *b = get_backend(self);
    DWORD ms = NIL_P(vtimeout_ms) ? INFINITE : (DWORD)NUM2ULONG(vtimeout_ms);
    OVERLAPPED_ENTRY ents[REAP_CAP];

    gqcs_args a = { b->iocp, ents, REAP_CAP, 0, ms, 0, FALSE };
    rb_thread_call_without_gvl(gqcs_call, &a, gqcs_ubf, b->iocp);
    BOOL ok = a.ok;
    DWORD err = a.err;
    ULONG n = a.n;

    VALUE result = rb_ary_new();
    if (!ok) {
        if (err == WAIT_TIMEOUT) return result;
        rb_raise(eError, "winloop: GetQueuedCompletionStatusEx failed (%lu)", err);
    }
    for (ULONG i = 0; i < n; i++) {
        if (ents[i].lpCompletionKey == WAKE_KEY || ents[i].lpOverlapped == NULL) continue;
        if (ents[i].lpCompletionKey == EXT_KEY) {
            /* Generic-op path (winloop 0.2). Defensive dispatch: verify membership
             * in ops_by_addr AND op state before acting — an unknown lpOverlapped
             * is never cast, an already-COMPLETED op (double post) is never
             * re-completed and never yields a second tuple. The only accepted
             * anomaly is a packet for a still-PREPARED op (a standalone embedder
             * called #wait between the native submit and op_submitted): the packet
             * is real and the memory is ours, so accept it with a warning. */
            st_data_t val;
            winloop_op *op;
            if (!st_lookup(b->ops_by_addr, (st_data_t)(uintptr_t)ents[i].lpOverlapped, &val)) {
                rb_warn("winloop: dropped a completion packet for an unknown OVERLAPPED "
                        "0x%llx (stray or duplicate post)",
                        (unsigned long long)(uintptr_t)ents[i].lpOverlapped);
                continue;
            }
            op = (winloop_op *)val;
            if (op->state == OP_COMPLETED) {
                rb_warn("winloop: dropped a duplicate completion packet for op %llu "
                        "(already completed; bytes/error preserved)",
                        (unsigned long long)op->id);
                continue;
            }
            if (op->state == OP_PREPARED)
                rb_warn("winloop: completion packet reaped for op %llu before "
                        "op_submitted (protocol violation; accepting the packet)",
                        (unsigned long long)op->id);
            op->state = OP_COMPLETED;
            op->bytes = ents[i].dwNumberOfBytesTransferred;
            /* Status lives in the OVERLAPPED's Internal (an NTSTATUS), readable
             * only after the packet left the port; map it to a familiar Win32
             * code (0 ok, 995 cancelled, 1022 rescan, 38 EOF, ...). */
            op->error = pRtlNtStatusToDosError((NTSTATUS)op->ov.Internal);
            rb_ary_push(result, rb_ary_new_from_args(4, ULL2NUM(op->id),
                        ULONG2NUM(op->bytes), ULONG2NUM(op->error), ULL2NUM(op->tag)));
            continue;
        }
        /* AFD path — byte-for-byte the pre-0.2 code (only the AFD handle carries
         * AFD_KEY on this port). */
        winloop_req *req = (winloop_req *)ents[i].lpOverlapped;
        st_data_t key = (st_data_t)req->id;
        st_delete(b->reqs, &key, NULL);
        int events = afd_to_ruby(req->out.Handles[0].Events);
        VALUE pair = rb_ary_new_from_args(2, ULL2NUM(req->id), INT2NUM(events));
        rb_ary_push(result, pair);
        xfree(req);
    }
    return result;
}

#wakeupObject



662
663
664
665
666
# File 'ext/winloop/winloop.c', line 662

static VALUE backend_wakeup(VALUE self) {
    winloop_backend *b = get_backend(self);
    PostQueuedCompletionStatus(b->iocp, 0, WAKE_KEY, NULL);
    return Qnil;
}