Class: Winipc::Pipe
- Inherits:
-
Object
- Object
- Winipc::Pipe
- Defined in:
- lib/winipc.rb,
ext/winipc/winipc.c
Defined Under Namespace
Class Method Summary collapse
-
._connect(wname, access_rights, want_message, vms) ⇒ Object
Winipc::Pipe._connect(wname, access_rights, want_message, timeout_ms) -> Conn.
-
._listen(wname, open_mode, pipe_mode, max_inst, in_buf, out_buf, access) ⇒ Object
Winipc::Pipe._listen(wname, open_mode, pipe_mode, max_inst, in_buf, out_buf, access).
-
.connect(name, mode: :byte, direction: :duplex, timeout: nil) ⇒ Object
Connect to a named-pipe server.
-
.listen(name, mode: :byte, direction: :duplex, max_instances: :unlimited, in_buffer: 65_536, out_buffer: 65_536, reject_remote: true, access: :owner) ⇒ Object
Create a named-pipe server.
Class Method Details
._connect(wname, access_rights, want_message, vms) ⇒ Object
Winipc::Pipe._connect(wname, access_rights, want_message, timeout_ms) -> Conn
525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 |
# File 'ext/winipc/winipc.c', line 525
static VALUE
pipe_connect(VALUE klass, VALUE wname, VALUE access_rights, VALUE want_message, VALUE vms)
{
DWORD access = NUM2ULONG(access_rights);
long ms_in = NUM2LONG(vms);
int want_msg = RTEST(want_message);
HANDLE h = INVALID_HANDLE_VALUE;
VALUE connobj;
conn_t *c;
DWORD gle;
ULONGLONG deadline = (ms_in > 0) ? GetTickCount64() + (ULONGLONG)ms_in : 0;
/* The wide name is rebuilt each iteration and freed before any raise /
* interrupt check, so an interrupt (Thread#kill/Timeout) raised mid-loop
* never leaks it. Each blocking wait is sliced so interrupts are serviced
* promptly rather than only at the deadline. */
for (;;) {
WCHAR *name = to_wide(wname);
DWORD remaining, slice;
h = CreateFileW(name, access, 0, NULL, OPEN_EXISTING, FILE_FLAG_OVERLAPPED, NULL);
if (h != INVALID_HANDLE_VALUE) { xfree(name); break; }
gle = GetLastError();
if (!((gle == ERROR_PIPE_BUSY || gle == ERROR_FILE_NOT_FOUND) && ms_in != 0)) {
xfree(name);
raise_gle("CreateFile(pipe)", gle);
}
/* server busy (all instances in use) or not-yet-present — wait/poll */
if (ms_in < 0) {
remaining = INFINITE;
} else {
ULONGLONG now = GetTickCount64();
if (now >= deadline) { xfree(name); raise_gle("Connect(pipe)", gle); }
remaining = (DWORD)(deadline - now);
}
slice = (remaining == INFINITE || remaining > 100) ? 100 : remaining;
if (gle == ERROR_PIPE_BUSY) {
wnp_t w;
w.name = name; w.ms = slice; w.ok = FALSE; w.gle = 0;
rb_thread_call_without_gvl(wnp_fn, &w, RUBY_UBF_IO, NULL);
} else {
sleep_gvl(slice);
}
xfree(name); /* freed before the interrupt check below */
rb_thread_check_ints(); /* deliver Thread#kill / Timeout promptly */
}
connobj = conn_alloc(cConn);
c = conn_get(connobj);
c->h = h;
c->is_server = 0;
c->read_event = CreateEventW(NULL, TRUE, FALSE, NULL);
c->write_event = CreateEventW(NULL, TRUE, FALSE, NULL);
if (!c->read_event || !c->write_event) raise_gle("CreateEvent", GetLastError());
if (want_msg) {
DWORD mode = PIPE_READMODE_MESSAGE;
if (!SetNamedPipeHandleState(c->h, &mode, NULL, NULL))
raise_gle("SetNamedPipeHandleState", GetLastError());
c->message_read = 1;
}
return connobj;
}
|
._listen(wname, open_mode, pipe_mode, max_inst, in_buf, out_buf, access) ⇒ Object
Winipc::Pipe._listen(wname, open_mode, pipe_mode, max_inst, in_buf, out_buf, access)
385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 |
# File 'ext/winipc/winipc.c', line 385
static VALUE
pipe_listen(VALUE klass, VALUE wname, VALUE open_mode, VALUE pipe_mode,
VALUE max_inst, VALUE in_buf, VALUE out_buf, VALUE access)
{
VALUE obj = listener_alloc(cListener);
listener_t *l = listener_get(obj);
SECURITY_ATTRIBUTES sa;
PSECURITY_DESCRIPTOR psd = NULL;
l->open_mode = NUM2ULONG(open_mode);
l->pipe_mode = NUM2ULONG(pipe_mode);
l->max_inst = NUM2ULONG(max_inst);
l->in_buf = NUM2ULONG(in_buf);
l->out_buf = NUM2ULONG(out_buf);
l->name = to_wide(wname); /* already \\.\pipe\... from Ruby */
if (build_sa(access, &sa, &psd))
l->psd = psd; /* stored; freed in listener_free */
l->event = CreateEventW(NULL, TRUE, FALSE, NULL);
if (!l->event) raise_gle("CreateEvent", GetLastError());
/* Create the first listening instance now so the name is live immediately. */
l->pending = listener_make_instance(l);
return obj;
}
|
.connect(name, mode: :byte, direction: :duplex, timeout: nil) ⇒ Object
Connect to a named-pipe server. Yields a Conn (auto-closed) if a block is given, else returns it. timeout: nil fails fast if the server isn’t connectable now; a number of seconds retries (server busy or not yet up) until it elapses.
150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 |
# File 'lib/winipc.rb', line 150 def self.connect(name, mode: :byte, direction: :duplex, timeout: nil) access = Winipc.send(:client_access, direction) ms = if timeout.nil? 0 # fail fast if the server isn't connectable now else t = Float(timeout) raise ArgumentError, "timeout must be non-negative, got #{timeout.inspect}" if t.negative? msr = (t * 1000).round msr.zero? && t.positive? ? 1 : msr # a tiny positive timeout still waits, not fail-fast end conn = Winipc.run_blocking do _connect(Winipc.pipe_path(name), access, mode == :message, ms) end return conn unless block_given? begin yield conn ensure conn.close end end |
.listen(name, mode: :byte, direction: :duplex, max_instances: :unlimited, in_buffer: 65_536, out_buffer: 65_536, reject_remote: true, access: :owner) ⇒ Object
Create a named-pipe server. Yields a Listener (auto-closed) if a block is given, else returns it.
126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 |
# File 'lib/winipc.rb', line 126 def self.listen(name, mode: :byte, direction: :duplex, max_instances: :unlimited, in_buffer: 65_536, out_buffer: 65_536, reject_remote: true, access: :owner) open_mode = Winipc.send(:server_access, direction) | FILE_FLAG_OVERLAPPED pipe_mode = PIPE_WAIT pipe_mode |= PIPE_TYPE_MESSAGE | PIPE_READMODE_MESSAGE if mode == :message pipe_mode |= PIPE_REJECT_REMOTE_CLIENTS if reject_remote max = max_instances == :unlimited ? PIPE_UNLIMITED_INSTANCES : Integer(max_instances) raise ArgumentError, "max_instances must be 1..255" unless (1..255).cover?(max) listener = _listen(Winipc.pipe_path(name), open_mode, pipe_mode, max, Integer(in_buffer), Integer(out_buffer), access) return listener unless block_given? begin yield listener ensure listener.close end end |