Class: Winproc::Stream

Inherits:
Object
  • Object
show all
Defined in:
lib/winproc.rb,
ext/winproc/winproc.c

Overview

----------------------------------------------------------------- Stream ---

Instance Method Summary collapse

Constructor Details

#initialize(*args) ⇒ Object



562
563
564
565
566
567
568
# File 'ext/winproc/winproc.c', line 562

static VALUE
stream_initialize(int argc, VALUE *argv, VALUE self)
{
    (void)argc; (void)argv; (void)self;
    rb_raise(eError, "winproc: streams are created by Winproc.spawn / Winproc.pty");
    return self;
}

Instance Method Details

#<<(bytes) ⇒ Object



396
397
398
399
# File 'lib/winproc.rb', line 396

def <<(bytes)
  write(bytes)
  self
end

#_read(vmax) ⇒ Object

Stream#_read(maxlen) -> binary String, or nil at EOF.



618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
# File 'ext/winproc/winproc.c', line 618

static VALUE
stream_read(VALUE self, VALUE vmax)
{
    stream_t *s = stream_live(self);
    long max = NUM2LONG(vmax);
    VALUE out;

    if (s->writable) rb_raise(eModeError, "winproc: cannot read a write-only stream");
    if (max < 1 || max > 0x7FFFFFFFL)
        rb_raise(rb_eArgError, "winproc: read length must be 1..0x7FFFFFFF");

    for (;;) {
        sio_t io;
        HANDLE dup = NULL;
        char *buf;

        if (s->closing) rb_raise(eClosed, "winproc: stream is closed");

        /* publish op_thread (GVL held), malloc the private buffer */
        if (!DuplicateHandle(GetCurrentProcess(), GetCurrentThread(),
                             GetCurrentProcess(), &dup, 0, FALSE, DUPLICATE_SAME_ACCESS))
            raise_gle("DuplicateHandle", GetLastError());
        buf = (char *)malloc((size_t)max);
        if (!buf) { CloseHandle(dup); rb_raise(rb_eNoMemError, "winproc: out of memory"); }

        s->op_thread = dup;
        InterlockedExchange(&s->inflight, 1);

        io.s = s; io.buf = buf; io.cap = (DWORD)max; io.done = 0;
        io.gle = 0; io.ok = FALSE; io.is_write = 0;
        rb_thread_call_without_gvl(sio_fn, &io, sio_ubf, &io);

        /* RETIRE resources before anything that can longjmp */
        s->op_thread = NULL;
        CloseHandle(dup);

        if (io.ok) {
            if (io.done == 0) { free(buf); rb_thread_check_ints(); continue; } /* E-27 */
            out = rb_str_new(buf, (long)io.done);
            free(buf);
            rb_enc_associate(out, rb_ascii8bit_encoding());
            return out;
        }
        /* failure */
        {
            DWORD gle = io.gle;
            free(buf);
            if (gle == ERROR_BROKEN_PIPE || gle == ERROR_PIPE_NOT_CONNECTED ||
                gle == ERROR_NO_DATA || gle == ERROR_HANDLE_EOF)
                return Qnil; /* clean EOF */
            if (gle == ERROR_OPERATION_ABORTED) {
                if (s->closing) rb_raise(eClosed, "winproc: stream is closed");
                rb_thread_check_ints(); /* interrupt; may longjmp, else retry */
                continue;
            }
            raise_gle("ReadFile", gle);
        }
    }
}

#_write(data) ⇒ Object

Stream#_write(bytes) -> Integer bytes written (writes ALL bytes).



729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
# File 'ext/winproc/winproc.c', line 729

static VALUE
stream_write(VALUE self, VALUE data)
{
    stream_t *s = stream_live(self);
    swrite_args a;
    long len;

    if (!s->writable) rb_raise(eModeError, "winproc: cannot write a read-only stream");
    StringValue(data);
    len = RSTRING_LEN(data);
    if ((unsigned long long)len > 0x7FFFFFFFull)
        rb_raise(rb_eArgError, "winproc: data too large (> 2 GiB)");
    if (len == 0) return INT2FIX(0);

    /* Copy bytes into a private buffer BEFORE releasing the GVL (no RSTRING_PTR
     * across the no-GVL region). */
    a.s = s;
    a.buf = (char *)malloc((size_t)len);
    if (!a.buf) rb_raise(rb_eNoMemError, "winproc: out of memory");
    memcpy(a.buf, RSTRING_PTR(data), (size_t)len);
    a.total = (DWORD)len;

    return rb_ensure(stream_write_body, (VALUE)&a, stream_write_ensure, (VALUE)&a);
}

#closeObject



767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
# File 'ext/winproc/winproc.c', line 767

static VALUE
stream_close(VALUE self)
{
    stream_t *s = stream_get(self);
    if (s->closed) return Qnil;
    s->closing = 1;
    if (s->inflight) {
        sclose_t c;
        HANDLE dup = NULL;
        /* Take our OWN duplicate of op_thread under the GVL: it can never
         * observe a retired/recycled handle, and keeps the thread object alive
         * for the cancel below (ยง4, E-12). */
        if (s->op_thread)
            DuplicateHandle(GetCurrentProcess(), s->op_thread,
                            GetCurrentProcess(), &dup, 0, FALSE, DUPLICATE_SAME_ACCESS);
        c.s = s; c.dup = dup;
        rb_thread_call_without_gvl(sclose_spin_fn, &c, NULL, NULL);
        if (dup) CloseHandle(dup);
    }
    if (s->h != INVALID_HANDLE_VALUE) { CloseHandle(s->h); s->h = INVALID_HANDLE_VALUE; }
    s->closed = 1;
    return Qnil;
}

#closed?Boolean

Returns:

  • (Boolean)


791
# File 'ext/winproc/winproc.c', line 791

static VALUE stream_closed_p(VALUE self) { return stream_get(self)->closed ? Qtrue : Qfalse; }

#read(maxlen = 65_536) ⇒ Object

Up to maxlen bytes as a binary (ASCII-8BIT) String; nil at EOF. Blocks until at least 1 byte. Cooperative under a scheduler.



387
388
389
# File 'lib/winproc.rb', line 387

def read(maxlen = 65_536)
  Winproc.run_blocking { _read(maxlen) }
end

#writable?Boolean

Returns:

  • (Boolean)


792
# File 'ext/winproc/winproc.c', line 792

static VALUE stream_writable_p(VALUE self) { return stream_get(self)->writable ? Qtrue : Qfalse; }

#write(bytes) ⇒ Object

Write ALL bytes (loops on partial); returns the byte count. Binary-safe.



392
393
394
# File 'lib/winproc.rb', line 392

def write(bytes)
  Winproc.run_blocking { _write(bytes) }
end