Class: Winproc::Job

Inherits:
Object
  • Object
show all
Defined in:
lib/winproc.rb,
ext/winproc/winproc.c

Overview

-------------------------------------------------------------------- Job ---

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

._create(v_kill, v_mem, v_pmem, v_cpu, v_active, v_time) ⇒ Object

Job._create(kill_on_close, memory, process_memory, cpu_percent, active_processes, cpu_time_100ns) -> Job. nil = "unset".



852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
# File 'ext/winproc/winproc.c', line 852

static VALUE
job_create(VALUE klass, VALUE v_kill, VALUE v_mem, VALUE v_pmem,
           VALUE v_cpu, VALUE v_active, VALUE v_time)
{
    VALUE obj = job_alloc(cJob);
    job_t *j = job_get(obj);
    JOBOBJECT_ASSOCIATE_COMPLETION_PORT acp;
    JOBOBJECT_EXTENDED_LIMIT_INFORMATION eli;
    DWORD flags = 0;

    j->job = CreateJobObjectW(NULL, NULL);
    if (!j->job) raise_gle("CreateJobObject", GetLastError());

    j->iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 1);
    if (!j->iocp) {
        DWORD gle = GetLastError();
        CloseHandle(j->job); j->job = NULL;
        raise_gle("CreateIoCompletionPort", gle);
    }

    /* Associate the port while the job is provably inactive (E-14). */
    memset(&acp, 0, sizeof(acp));
    acp.CompletionKey = (PVOID)JOB_KEY;
    acp.CompletionPort = j->iocp;
    if (!SetInformationJobObject(j->job, JobObjectAssociateCompletionPortInformation,
                                 &acp, sizeof(acp))) {
        DWORD gle = GetLastError();
        CloseHandle(j->iocp); j->iocp = NULL;
        CloseHandle(j->job);  j->job = NULL;
        raise_gle("SetInformationJobObject(port)", gle);
    }

    /* One extended-limit Set combining KILL_ON_CLOSE / memory / time / active. */
    memset(&eli, 0, sizeof(eli));
    if (RTEST(v_kill)) flags |= JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE;
    if (!NIL_P(v_mem)) {
        flags |= JOB_OBJECT_LIMIT_JOB_MEMORY;
        eli.JobMemoryLimit = (SIZE_T)NUM2ULL(v_mem);
    }
    if (!NIL_P(v_pmem)) {
        flags |= JOB_OBJECT_LIMIT_PROCESS_MEMORY;
        eli.ProcessMemoryLimit = (SIZE_T)NUM2ULL(v_pmem);
    }
    if (!NIL_P(v_active)) {
        flags |= JOB_OBJECT_LIMIT_ACTIVE_PROCESS;
        eli.BasicLimitInformation.ActiveProcessLimit = NUM2ULONG(v_active);
    }
    if (!NIL_P(v_time)) {
        flags |= JOB_OBJECT_LIMIT_JOB_TIME;
        eli.BasicLimitInformation.PerJobUserTimeLimit.QuadPart = (LONGLONG)NUM2LL(v_time);
    }
    eli.BasicLimitInformation.LimitFlags = flags;
    if (flags && !SetInformationJobObject(j->job, JobObjectExtendedLimitInformation,
                                          &eli, sizeof(eli))) {
        DWORD gle = GetLastError();
        /* job handle already owned by the TypedData; close() will fire
         * KILL_ON_JOB_CLOSE (no processes yet) — consistent object. */
        raise_gle("SetInformationJobObject(limits)", gle);
    }

    /* CPU rate control is the LAST Set so a raise (RDP/DFSS error 50 -> the
     * Unsupported mapping) leaves a consistent, usable object (E-16). */
    if (!NIL_P(v_cpu)) {
        JOBOBJECT_CPU_RATE_CONTROL_INFORMATION crc;
        memset(&crc, 0, sizeof(crc));
        crc.ControlFlags = JOB_OBJECT_CPU_RATE_CONTROL_ENABLE |
                           JOB_OBJECT_CPU_RATE_CONTROL_HARD_CAP;
        crc.CpuRate = (DWORD)(NUM2ULONG(v_cpu) * 100);
        if (!SetInformationJobObject(j->job, JobObjectCpuRateControlInformation,
                                     &crc, sizeof(crc)))
            raise_gle("SetInformationJobObject(cpu)", GetLastError());
    }
    return obj;
}

.new(kill_on_close: true, memory: nil, process_memory: nil, cpu_percent: nil, active_processes: nil, cpu_time: nil) ⇒ Object

Create an ANONYMOUS job object and apply limits. A private IOCP is created and associated with the job before any process can join, so #wait_empty can never miss messages.



314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
# File 'lib/winproc.rb', line 314

def self.new(kill_on_close: true, memory: nil, process_memory: nil,
             cpu_percent: nil, active_processes: nil, cpu_time: nil)
  mem  = memory && nonneg_int(memory, "memory")
  pmem = process_memory && nonneg_int(process_memory, "process_memory")
  act  = active_processes && positive_int(active_processes, "active_processes")
  cpu  = cpu_percent && cpu_percent_int(cpu_percent)
  time = cpu_time && cpu_time_ticks(cpu_time)

  job = _create(kill_on_close, mem, pmem, cpu, act, time)
  return job unless block_given?

  begin
    yield job
  ensure
    job.close
  end
end

Instance Method Details

#_assign(vprocess) ⇒ Object



927
928
929
930
931
932
933
934
935
936
937
938
939
940
# File 'ext/winproc/winproc.c', line 927

static VALUE
job_assign(VALUE self, VALUE vprocess)
{
    job_t *j = job_live(self);
    process_t *pr;
    if (!rb_typeddata_is_kind_of(vprocess, &process_type))
        rb_raise(rb_eTypeError, "winproc: expected a Winproc::Process");
    pr = process_get(vprocess);
    if (pr->closed || pr->h == INVALID_HANDLE_VALUE)
        rb_raise(eClosed, "winproc: process is closed");
    if (!AssignProcessToJobObject(j->job, pr->h))
        raise_gle("AssignProcessToJobObject", GetLastError());
    return self;
}

#_terminate(*args) ⇒ Object



942
943
944
945
946
947
948
949
950
951
952
953
# File 'ext/winproc/winproc.c', line 942

static VALUE
job_terminate(int argc, VALUE *argv, VALUE self)
{
    job_t *j = job_live(self);
    VALUE vcode;
    UINT code = 1;
    rb_scan_args(argc, argv, "01", &vcode);
    if (!NIL_P(vcode)) code = (UINT)NUM2UINT(vcode);
    if (!TerminateJobObject(j->job, code))
        raise_gle("TerminateJobObject", GetLastError());
    return self;
}

#_wait_empty(vms) ⇒ Object

Job#_wait_empty(ms) -> true (empty) | false (timeout).



1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
# File 'ext/winproc/winproc.c', line 1073

static VALUE
job_wait_empty(VALUE self, VALUE vms)
{
    job_t *j = job_get(self);
    jwait_args a;

    if (j->closed || !j->job) rb_raise(eClosed, "winproc: job is closed");

    /* Convert the timeout BEFORE claiming the in-flight guard: a raise here
     * (e.g. a non-numeric ms) must not leave waiting==1 stuck — that would wedge
     * job_close's drain spin forever. NUM2LL is 64-bit so a multi-day ms never
     * overflows on LLP64 (where long is 32-bit); the wait loop clamps each slice
     * below INFINITE. (E-Slice / the close-wedge fix.) */
    a.j = j;
    a.ms_in = NUM2LL(vms);

    if (InterlockedCompareExchange(&j->waiting, 1, 0) != 0)
        rb_raise(eError, "winproc: another wait_empty is already in flight");

    return rb_ensure(job_wait_empty_body, (VALUE)&a, job_wait_empty_ensure, (VALUE)j);
}

#active_processesObject



966
967
968
969
970
971
# File 'ext/winproc/winproc.c', line 966

static VALUE
job_active_processes(VALUE self)
{
    job_t *j = job_live(self);
    return LONG2NUM(job_active_count(j));
}

#assign(process) ⇒ Object

AssignProcessToJobObject — the fallback path for a process NOT placed at creation. Prefer Winproc.spawn(job: job).



367
368
369
# File 'lib/winproc.rb', line 367

def assign(process)
  _assign(process)
end

#closeObject



1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
# File 'ext/winproc/winproc.c', line 1109

static VALUE
job_close(VALUE self)
{
    job_t *j = job_get(self);
    if (j->closed) return Qnil;
    j->closing = 1;
    if (j->waiting && j->iocp) {
        jclose_t c; c.j = j;
        PostQueuedCompletionStatus(j->iocp, 0, WAKE_KEY, NULL);
        rb_thread_call_without_gvl(jclose_spin_fn, &c, NULL, NULL);
    }
    if (j->iocp) { CloseHandle(j->iocp); j->iocp = NULL; }
    if (j->job)  { CloseHandle(j->job);  j->job = NULL; } /* kill-on-close fires */
    j->closed = 1;
    return Qnil;
}

#closed?Boolean

Returns:

  • (Boolean)


1126
# File 'ext/winproc/winproc.c', line 1126

static VALUE job_closed_p(VALUE self) { return job_get(self)->closed ? Qtrue : Qfalse; }

#terminate(exit_code = 1) ⇒ Object

TerminateJobObject: kill every process in the job (and nested child jobs).



372
373
374
# File 'lib/winproc.rb', line 372

def terminate(exit_code = 1)
  _terminate(exit_code)
end

#wait_empty(timeout: nil) ⇒ Object

Block until the job has zero active processes. true on empty, false on timeout. Only ONE wait_empty may be in flight per Job.



378
379
380
# File 'lib/winproc.rb', line 378

def wait_empty(timeout: nil)
  Winproc.run_blocking { _wait_empty(Winproc.ms_for(timeout)) }
end