Class: Winipc::Pipe

Inherits:
Object
  • Object
show all
Defined in:
lib/winipc.rb,
ext/winipc/winipc.c

Defined Under Namespace

Classes: Conn, Listener

Class Method Summary collapse

Class Method Details

._connect(wname, access_rights, want_message, vms) ⇒ Object

Winipc::Pipe._connect(wname, access_rights, want_message, timeout_ms) -> Conn



525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
# File 'ext/winipc/winipc.c', line 525

static VALUE
pipe_connect(VALUE klass, VALUE wname, VALUE access_rights, VALUE want_message, VALUE vms)
{
    DWORD access = NUM2ULONG(access_rights);
    long ms_in = NUM2LONG(vms);
    int want_msg = RTEST(want_message);
    HANDLE h = INVALID_HANDLE_VALUE;
    VALUE connobj;
    conn_t *c;
    DWORD gle;
    ULONGLONG deadline = (ms_in > 0) ? GetTickCount64() + (ULONGLONG)ms_in : 0;

    /* The wide name is rebuilt each iteration and freed before any raise /
     * interrupt check, so an interrupt (Thread#kill/Timeout) raised mid-loop
     * never leaks it. Each blocking wait is sliced so interrupts are serviced
     * promptly rather than only at the deadline. */
    for (;;) {
        WCHAR *name = to_wide(wname);
        DWORD remaining, slice;
        h = CreateFileW(name, access, 0, NULL, OPEN_EXISTING, FILE_FLAG_OVERLAPPED, NULL);
        if (h != INVALID_HANDLE_VALUE) { xfree(name); break; }
        gle = GetLastError();

        if (!((gle == ERROR_PIPE_BUSY || gle == ERROR_FILE_NOT_FOUND) && ms_in != 0)) {
            xfree(name);
            raise_gle("CreateFile(pipe)", gle);
        }
        /* server busy (all instances in use) or not-yet-present — wait/poll */
        if (ms_in < 0) {
            remaining = INFINITE;
        } else {
            ULONGLONG now = GetTickCount64();
            if (now >= deadline) { xfree(name); raise_gle("Connect(pipe)", gle); }
            remaining = (DWORD)(deadline - now);
        }
        slice = (remaining == INFINITE || remaining > 100) ? 100 : remaining;
        if (gle == ERROR_PIPE_BUSY) {
            wnp_t w;
            w.name = name; w.ms = slice; w.ok = FALSE; w.gle = 0;
            rb_thread_call_without_gvl(wnp_fn, &w, RUBY_UBF_IO, NULL);
        } else {
            sleep_gvl(slice);
        }
        xfree(name);              /* freed before the interrupt check below */
        rb_thread_check_ints();   /* deliver Thread#kill / Timeout promptly */
    }

    connobj = conn_alloc(cConn);
    c = conn_get(connobj);
    c->h = h;
    c->is_server = 0;
    c->read_event = CreateEventW(NULL, TRUE, FALSE, NULL);
    c->write_event = CreateEventW(NULL, TRUE, FALSE, NULL);
    if (!c->read_event || !c->write_event) raise_gle("CreateEvent", GetLastError());

    if (want_msg) {
        DWORD mode = PIPE_READMODE_MESSAGE;
        if (!SetNamedPipeHandleState(c->h, &mode, NULL, NULL))
            raise_gle("SetNamedPipeHandleState", GetLastError());
        c->message_read = 1;
    }
    return connobj;
}

._listen(wname, open_mode, pipe_mode, max_inst, in_buf, out_buf, access) ⇒ Object

Winipc::Pipe._listen(wname, open_mode, pipe_mode, max_inst, in_buf, out_buf, access)



385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
# File 'ext/winipc/winipc.c', line 385

static VALUE
pipe_listen(VALUE klass, VALUE wname, VALUE open_mode, VALUE pipe_mode,
            VALUE max_inst, VALUE in_buf, VALUE out_buf, VALUE access)
{
    VALUE obj = listener_alloc(cListener);
    listener_t *l = listener_get(obj);
    SECURITY_ATTRIBUTES sa;
    PSECURITY_DESCRIPTOR psd = NULL;

    l->open_mode = NUM2ULONG(open_mode);
    l->pipe_mode = NUM2ULONG(pipe_mode);
    l->max_inst  = NUM2ULONG(max_inst);
    l->in_buf    = NUM2ULONG(in_buf);
    l->out_buf   = NUM2ULONG(out_buf);
    l->name      = to_wide(wname); /* already \\.\pipe\... from Ruby */
    if (build_sa(access, &sa, &psd))
        l->psd = psd; /* stored; freed in listener_free */

    l->event = CreateEventW(NULL, TRUE, FALSE, NULL);
    if (!l->event) raise_gle("CreateEvent", GetLastError());
    /* Create the first listening instance now so the name is live immediately. */
    l->pending = listener_make_instance(l);
    return obj;
}

.connect(name, mode: :byte, direction: :duplex, timeout: nil) ⇒ Object

Connect to a named-pipe server. Yields a Conn (auto-closed) if a block is given, else returns it. timeout: nil fails fast if the server isn’t connectable now; a number of seconds retries (server busy or not yet up) until it elapses.



150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
# File 'lib/winipc.rb', line 150

def self.connect(name, mode: :byte, direction: :duplex, timeout: nil)
  access = Winipc.send(:client_access, direction)
  ms =
    if timeout.nil?
      0 # fail fast if the server isn't connectable now
    else
      t = Float(timeout)
      raise ArgumentError, "timeout must be non-negative, got #{timeout.inspect}" if t.negative?

      msr = (t * 1000).round
      msr.zero? && t.positive? ? 1 : msr # a tiny positive timeout still waits, not fail-fast
    end
  conn = Winipc.run_blocking do
    _connect(Winipc.pipe_path(name), access, mode == :message, ms)
  end
  return conn unless block_given?

  begin
    yield conn
  ensure
    conn.close
  end
end

.listen(name, mode: :byte, direction: :duplex, max_instances: :unlimited, in_buffer: 65_536, out_buffer: 65_536, reject_remote: true, access: :owner) ⇒ Object

Create a named-pipe server. Yields a Listener (auto-closed) if a block is given, else returns it.

Raises:

  • (ArgumentError)


126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
# File 'lib/winipc.rb', line 126

def self.listen(name, mode: :byte, direction: :duplex, max_instances: :unlimited,
                in_buffer: 65_536, out_buffer: 65_536, reject_remote: true, access: :owner)
  open_mode = Winipc.send(:server_access, direction) | FILE_FLAG_OVERLAPPED
  pipe_mode = PIPE_WAIT
  pipe_mode |= PIPE_TYPE_MESSAGE | PIPE_READMODE_MESSAGE if mode == :message
  pipe_mode |= PIPE_REJECT_REMOTE_CLIENTS if reject_remote
  max = max_instances == :unlimited ? PIPE_UNLIMITED_INSTANCES : Integer(max_instances)
  raise ArgumentError, "max_instances must be 1..255" unless (1..255).cover?(max)

  listener = _listen(Winipc.pipe_path(name), open_mode, pipe_mode, max,
                     Integer(in_buffer), Integer(out_buffer), access)
  return listener unless block_given?

  begin
    yield listener
  ensure
    listener.close
  end
end