Class: Winproc::Stream
- Inherits:
-
Object
- Object
- Winproc::Stream
- Defined in:
- lib/winproc.rb,
ext/winproc/winproc.c
Overview
----------------------------------------------------------------- Stream ---
Instance Method Summary collapse
- #<<(bytes) ⇒ Object
-
#_read(vmax) ⇒ Object
Stream#_read(maxlen) -> binary String, or nil at EOF.
-
#_write(data) ⇒ Object
Stream#_write(bytes) -> Integer bytes written (writes ALL bytes).
- #close ⇒ Object
- #closed? ⇒ Boolean
- #initialize(*args) ⇒ Object constructor
-
#read(maxlen = 65_536) ⇒ Object
Up to
maxlenbytes as a binary (ASCII-8BIT) String; nil at EOF. - #writable? ⇒ Boolean
-
#write(bytes) ⇒ Object
Write ALL
bytes(loops on partial); returns the byte count.
Constructor Details
#initialize(*args) ⇒ Object
562 563 564 565 566 567 568 |
# File 'ext/winproc/winproc.c', line 562
static VALUE
stream_initialize(int argc, VALUE *argv, VALUE self)
{
(void)argc; (void)argv; (void)self;
rb_raise(eError, "winproc: streams are created by Winproc.spawn / Winproc.pty");
return self;
}
|
Instance Method Details
#<<(bytes) ⇒ Object
396 397 398 399 |
# File 'lib/winproc.rb', line 396 def <<(bytes) write(bytes) self end |
#_read(vmax) ⇒ Object
Stream#_read(maxlen) -> binary String, or nil at EOF.
618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 |
# File 'ext/winproc/winproc.c', line 618
static VALUE
stream_read(VALUE self, VALUE vmax)
{
stream_t *s = stream_live(self);
long max = NUM2LONG(vmax);
VALUE out;
if (s->writable) rb_raise(eModeError, "winproc: cannot read a write-only stream");
if (max < 1 || max > 0x7FFFFFFFL)
rb_raise(rb_eArgError, "winproc: read length must be 1..0x7FFFFFFF");
for (;;) {
sio_t io;
HANDLE dup = NULL;
char *buf;
if (s->closing) rb_raise(eClosed, "winproc: stream is closed");
/* publish op_thread (GVL held), malloc the private buffer */
if (!DuplicateHandle(GetCurrentProcess(), GetCurrentThread(),
GetCurrentProcess(), &dup, 0, FALSE, DUPLICATE_SAME_ACCESS))
raise_gle("DuplicateHandle", GetLastError());
buf = (char *)malloc((size_t)max);
if (!buf) { CloseHandle(dup); rb_raise(rb_eNoMemError, "winproc: out of memory"); }
s->op_thread = dup;
InterlockedExchange(&s->inflight, 1);
io.s = s; io.buf = buf; io.cap = (DWORD)max; io.done = 0;
io.gle = 0; io.ok = FALSE; io.is_write = 0;
rb_thread_call_without_gvl(sio_fn, &io, sio_ubf, &io);
/* RETIRE resources before anything that can longjmp */
s->op_thread = NULL;
CloseHandle(dup);
if (io.ok) {
if (io.done == 0) { free(buf); rb_thread_check_ints(); continue; } /* E-27 */
out = rb_str_new(buf, (long)io.done);
free(buf);
rb_enc_associate(out, rb_ascii8bit_encoding());
return out;
}
/* failure */
{
DWORD gle = io.gle;
free(buf);
if (gle == ERROR_BROKEN_PIPE || gle == ERROR_PIPE_NOT_CONNECTED ||
gle == ERROR_NO_DATA || gle == ERROR_HANDLE_EOF)
return Qnil; /* clean EOF */
if (gle == ERROR_OPERATION_ABORTED) {
if (s->closing) rb_raise(eClosed, "winproc: stream is closed");
rb_thread_check_ints(); /* interrupt; may longjmp, else retry */
continue;
}
raise_gle("ReadFile", gle);
}
}
}
|
#_write(data) ⇒ Object
Stream#_write(bytes) -> Integer bytes written (writes ALL bytes).
729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 |
# File 'ext/winproc/winproc.c', line 729
static VALUE
stream_write(VALUE self, VALUE data)
{
stream_t *s = stream_live(self);
swrite_args a;
long len;
if (!s->writable) rb_raise(eModeError, "winproc: cannot write a read-only stream");
StringValue(data);
len = RSTRING_LEN(data);
if ((unsigned long long)len > 0x7FFFFFFFull)
rb_raise(rb_eArgError, "winproc: data too large (> 2 GiB)");
if (len == 0) return INT2FIX(0);
/* Copy bytes into a private buffer BEFORE releasing the GVL (no RSTRING_PTR
* across the no-GVL region). */
a.s = s;
a.buf = (char *)malloc((size_t)len);
if (!a.buf) rb_raise(rb_eNoMemError, "winproc: out of memory");
memcpy(a.buf, RSTRING_PTR(data), (size_t)len);
a.total = (DWORD)len;
return rb_ensure(stream_write_body, (VALUE)&a, stream_write_ensure, (VALUE)&a);
}
|
#close ⇒ Object
767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 |
# File 'ext/winproc/winproc.c', line 767
static VALUE
stream_close(VALUE self)
{
stream_t *s = stream_get(self);
if (s->closed) return Qnil;
s->closing = 1;
if (s->inflight) {
sclose_t c;
HANDLE dup = NULL;
/* Take our OWN duplicate of op_thread under the GVL: it can never
* observe a retired/recycled handle, and keeps the thread object alive
* for the cancel below (ยง4, E-12). */
if (s->op_thread)
DuplicateHandle(GetCurrentProcess(), s->op_thread,
GetCurrentProcess(), &dup, 0, FALSE, DUPLICATE_SAME_ACCESS);
c.s = s; c.dup = dup;
rb_thread_call_without_gvl(sclose_spin_fn, &c, NULL, NULL);
if (dup) CloseHandle(dup);
}
if (s->h != INVALID_HANDLE_VALUE) { CloseHandle(s->h); s->h = INVALID_HANDLE_VALUE; }
s->closed = 1;
return Qnil;
}
|
#closed? ⇒ Boolean
791 |
# File 'ext/winproc/winproc.c', line 791
static VALUE stream_closed_p(VALUE self) { return stream_get(self)->closed ? Qtrue : Qfalse; }
|
#read(maxlen = 65_536) ⇒ Object
Up to maxlen bytes as a binary (ASCII-8BIT) String; nil at EOF. Blocks
until at least 1 byte. Cooperative under a scheduler.
387 388 389 |
# File 'lib/winproc.rb', line 387 def read(maxlen = 65_536) Winproc.run_blocking { _read(maxlen) } end |
#writable? ⇒ Boolean
792 |
# File 'ext/winproc/winproc.c', line 792
static VALUE stream_writable_p(VALUE self) { return stream_get(self)->writable ? Qtrue : Qfalse; }
|
#write(bytes) ⇒ Object
Write ALL bytes (loops on partial); returns the byte count. Binary-safe.
392 393 394 |
# File 'lib/winproc.rb', line 392 def write(bytes) Winproc.run_blocking { _write(bytes) } end |