Class: Winproc::Job
- Inherits:
-
Object
- Object
- Winproc::Job
- Defined in:
- lib/winproc.rb,
ext/winproc/winproc.c
Overview
-------------------------------------------------------------------- Job ---
Class Method Summary collapse
-
._create(v_kill, v_mem, v_pmem, v_cpu, v_active, v_time) ⇒ Object
Job._create(kill_on_close, memory, process_memory, cpu_percent, active_processes, cpu_time_100ns) -> Job.
-
.new(kill_on_close: true, memory: nil, process_memory: nil, cpu_percent: nil, active_processes: nil, cpu_time: nil) ⇒ Object
Create an ANONYMOUS job object and apply limits.
Instance Method Summary collapse
- #_assign(vprocess) ⇒ Object
- #_terminate(*args) ⇒ Object
-
#_wait_empty(vms) ⇒ Object
Job#_wait_empty(ms) -> true (empty) | false (timeout).
- #active_processes ⇒ Object
-
#assign(process) ⇒ Object
AssignProcessToJobObject — the fallback path for a process NOT placed at creation.
- #close ⇒ Object
- #closed? ⇒ Boolean
-
#terminate(exit_code = 1) ⇒ Object
TerminateJobObject: kill every process in the job (and nested child jobs).
-
#wait_empty(timeout: nil) ⇒ Object
Block until the job has zero active processes.
Class Method Details
._create(v_kill, v_mem, v_pmem, v_cpu, v_active, v_time) ⇒ Object
Job._create(kill_on_close, memory, process_memory, cpu_percent, active_processes, cpu_time_100ns) -> Job. nil = "unset".
852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 |
# File 'ext/winproc/winproc.c', line 852
static VALUE
job_create(VALUE klass, VALUE v_kill, VALUE v_mem, VALUE v_pmem,
VALUE v_cpu, VALUE v_active, VALUE v_time)
{
VALUE obj = job_alloc(cJob);
job_t *j = job_get(obj);
JOBOBJECT_ASSOCIATE_COMPLETION_PORT acp;
JOBOBJECT_EXTENDED_LIMIT_INFORMATION eli;
DWORD flags = 0;
j->job = CreateJobObjectW(NULL, NULL);
if (!j->job) raise_gle("CreateJobObject", GetLastError());
j->iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 1);
if (!j->iocp) {
DWORD gle = GetLastError();
CloseHandle(j->job); j->job = NULL;
raise_gle("CreateIoCompletionPort", gle);
}
/* Associate the port while the job is provably inactive (E-14). */
memset(&acp, 0, sizeof(acp));
acp.CompletionKey = (PVOID)JOB_KEY;
acp.CompletionPort = j->iocp;
if (!SetInformationJobObject(j->job, JobObjectAssociateCompletionPortInformation,
&acp, sizeof(acp))) {
DWORD gle = GetLastError();
CloseHandle(j->iocp); j->iocp = NULL;
CloseHandle(j->job); j->job = NULL;
raise_gle("SetInformationJobObject(port)", gle);
}
/* One extended-limit Set combining KILL_ON_CLOSE / memory / time / active. */
memset(&eli, 0, sizeof(eli));
if (RTEST(v_kill)) flags |= JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE;
if (!NIL_P(v_mem)) {
flags |= JOB_OBJECT_LIMIT_JOB_MEMORY;
eli.JobMemoryLimit = (SIZE_T)NUM2ULL(v_mem);
}
if (!NIL_P(v_pmem)) {
flags |= JOB_OBJECT_LIMIT_PROCESS_MEMORY;
eli.ProcessMemoryLimit = (SIZE_T)NUM2ULL(v_pmem);
}
if (!NIL_P(v_active)) {
flags |= JOB_OBJECT_LIMIT_ACTIVE_PROCESS;
eli.BasicLimitInformation.ActiveProcessLimit = NUM2ULONG(v_active);
}
if (!NIL_P(v_time)) {
flags |= JOB_OBJECT_LIMIT_JOB_TIME;
eli.BasicLimitInformation.PerJobUserTimeLimit.QuadPart = (LONGLONG)NUM2LL(v_time);
}
eli.BasicLimitInformation.LimitFlags = flags;
if (flags && !SetInformationJobObject(j->job, JobObjectExtendedLimitInformation,
&eli, sizeof(eli))) {
DWORD gle = GetLastError();
/* job handle already owned by the TypedData; close() will fire
* KILL_ON_JOB_CLOSE (no processes yet) — consistent object. */
raise_gle("SetInformationJobObject(limits)", gle);
}
/* CPU rate control is the LAST Set so a raise (RDP/DFSS error 50 -> the
* Unsupported mapping) leaves a consistent, usable object (E-16). */
if (!NIL_P(v_cpu)) {
JOBOBJECT_CPU_RATE_CONTROL_INFORMATION crc;
memset(&crc, 0, sizeof(crc));
crc.ControlFlags = JOB_OBJECT_CPU_RATE_CONTROL_ENABLE |
JOB_OBJECT_CPU_RATE_CONTROL_HARD_CAP;
crc.CpuRate = (DWORD)(NUM2ULONG(v_cpu) * 100);
if (!SetInformationJobObject(j->job, JobObjectCpuRateControlInformation,
&crc, sizeof(crc)))
raise_gle("SetInformationJobObject(cpu)", GetLastError());
}
return obj;
}
|
.new(kill_on_close: true, memory: nil, process_memory: nil, cpu_percent: nil, active_processes: nil, cpu_time: nil) ⇒ Object
Create an ANONYMOUS job object and apply limits. A private IOCP is created and associated with the job before any process can join, so #wait_empty can never miss messages.
314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 |
# File 'lib/winproc.rb', line 314 def self.new(kill_on_close: true, memory: nil, process_memory: nil, cpu_percent: nil, active_processes: nil, cpu_time: nil) mem = memory && nonneg_int(memory, "memory") pmem = process_memory && nonneg_int(process_memory, "process_memory") act = active_processes && positive_int(active_processes, "active_processes") cpu = cpu_percent && cpu_percent_int(cpu_percent) time = cpu_time && cpu_time_ticks(cpu_time) job = _create(kill_on_close, mem, pmem, cpu, act, time) return job unless block_given? begin yield job ensure job.close end end |
Instance Method Details
#_assign(vprocess) ⇒ Object
927 928 929 930 931 932 933 934 935 936 937 938 939 940 |
# File 'ext/winproc/winproc.c', line 927
static VALUE
job_assign(VALUE self, VALUE vprocess)
{
job_t *j = job_live(self);
process_t *pr;
if (!rb_typeddata_is_kind_of(vprocess, &process_type))
rb_raise(rb_eTypeError, "winproc: expected a Winproc::Process");
pr = process_get(vprocess);
if (pr->closed || pr->h == INVALID_HANDLE_VALUE)
rb_raise(eClosed, "winproc: process is closed");
if (!AssignProcessToJobObject(j->job, pr->h))
raise_gle("AssignProcessToJobObject", GetLastError());
return self;
}
|
#_terminate(*args) ⇒ Object
942 943 944 945 946 947 948 949 950 951 952 953 |
# File 'ext/winproc/winproc.c', line 942
static VALUE
job_terminate(int argc, VALUE *argv, VALUE self)
{
job_t *j = job_live(self);
VALUE vcode;
UINT code = 1;
rb_scan_args(argc, argv, "01", &vcode);
if (!NIL_P(vcode)) code = (UINT)NUM2UINT(vcode);
if (!TerminateJobObject(j->job, code))
raise_gle("TerminateJobObject", GetLastError());
return self;
}
|
#_wait_empty(vms) ⇒ Object
Job#_wait_empty(ms) -> true (empty) | false (timeout).
1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 |
# File 'ext/winproc/winproc.c', line 1073
static VALUE
job_wait_empty(VALUE self, VALUE vms)
{
job_t *j = job_get(self);
jwait_args a;
if (j->closed || !j->job) rb_raise(eClosed, "winproc: job is closed");
/* Convert the timeout BEFORE claiming the in-flight guard: a raise here
* (e.g. a non-numeric ms) must not leave waiting==1 stuck — that would wedge
* job_close's drain spin forever. NUM2LL is 64-bit so a multi-day ms never
* overflows on LLP64 (where long is 32-bit); the wait loop clamps each slice
* below INFINITE. (E-Slice / the close-wedge fix.) */
a.j = j;
a.ms_in = NUM2LL(vms);
if (InterlockedCompareExchange(&j->waiting, 1, 0) != 0)
rb_raise(eError, "winproc: another wait_empty is already in flight");
return rb_ensure(job_wait_empty_body, (VALUE)&a, job_wait_empty_ensure, (VALUE)j);
}
|
#active_processes ⇒ Object
966 967 968 969 970 971 |
# File 'ext/winproc/winproc.c', line 966
static VALUE
job_active_processes(VALUE self)
{
job_t *j = job_live(self);
return LONG2NUM(job_active_count(j));
}
|
#assign(process) ⇒ Object
AssignProcessToJobObject — the fallback path for a process NOT placed at creation. Prefer Winproc.spawn(job: job).
367 368 369 |
# File 'lib/winproc.rb', line 367 def assign(process) _assign(process) end |
#close ⇒ Object
1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 |
# File 'ext/winproc/winproc.c', line 1109
static VALUE
job_close(VALUE self)
{
job_t *j = job_get(self);
if (j->closed) return Qnil;
j->closing = 1;
if (j->waiting && j->iocp) {
jclose_t c; c.j = j;
PostQueuedCompletionStatus(j->iocp, 0, WAKE_KEY, NULL);
rb_thread_call_without_gvl(jclose_spin_fn, &c, NULL, NULL);
}
if (j->iocp) { CloseHandle(j->iocp); j->iocp = NULL; }
if (j->job) { CloseHandle(j->job); j->job = NULL; } /* kill-on-close fires */
j->closed = 1;
return Qnil;
}
|
#closed? ⇒ Boolean
1126 |
# File 'ext/winproc/winproc.c', line 1126
static VALUE job_closed_p(VALUE self) { return job_get(self)->closed ? Qtrue : Qfalse; }
|
#terminate(exit_code = 1) ⇒ Object
TerminateJobObject: kill every process in the job (and nested child jobs).
372 373 374 |
# File 'lib/winproc.rb', line 372 def terminate(exit_code = 1) _terminate(exit_code) end |
#wait_empty(timeout: nil) ⇒ Object
Block until the job has zero active processes. true on empty, false on timeout. Only ONE wait_empty may be in flight per Job.
378 379 380 |
# File 'lib/winproc.rb', line 378 def wait_empty(timeout: nil) Winproc.run_blocking { _wait_empty(Winproc.ms_for(timeout)) } end |