Class: Iodine::WorkerPool
- Inherits:
-
Object
- Object
- Iodine::WorkerPool
- Defined in:
- ext/iodine/iodine_worker_pool.c
Class Method Summary collapse
-
.__busy(*args) ⇒ nil
Iodine::WorkerPool.busy(duration: 1.0) -> nil.
Instance Method Summary collapse
-
#close ⇒ Object
pool.close -> nil.
-
#enqueue(blocking_operation_value) { ... } ⇒ Object
pool.enqueue(blocking_operation) { fiber.resume }.
-
#initialize(size) ⇒ Object
constructor
Iodine::WorkerPool.new(size).
-
#size ⇒ Object
pool.size -> Integer.
-
#stats ⇒ Hash
pool.stats -> Hash.
Constructor Details
#initialize(size) ⇒ Object
Iodine::WorkerPool.new(size)
Creates a new worker pool with size threads for executing
blocking operations without holding the GVL.
324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 |
# File 'ext/iodine/iodine_worker_pool.c', line 324
static VALUE worker_pool_initialize(VALUE self, VALUE size) {
Check_Type(size, T_FIXNUM);
long requested = NUM2LONG(size);
if (requested <= 0) {
rb_raise(rb_eArgError, "pool size must be greater than 0");
}
size_t max_workers = (size_t)requested;
struct iodine_worker_pool *pool;
TypedData_Get_Struct(self, struct iodine_worker_pool,
&iodine_worker_pool_type, pool);
pthread_mutex_init(&pool->mutex, NULL);
pthread_cond_init(&pool->work_available, NULL);
pool->initialized = true;
pool->max_workers = max_workers;
pool->shutdown = false;
IodineStore.add(self);
for (size_t i = 0; i < max_workers; i++) {
if (create_worker(self, pool) != 0) {
worker_pool_close(self);
rb_raise(rb_eRuntimeError, "failed to create worker thread %zu", i);
}
}
return self;
}
|
Class Method Details
.__busy(*args) ⇒ nil
Iodine::WorkerPool.busy(duration: 1.0) -> nil
Creates a blocking operation for testing that releases the GVL. When called within a fiber scheduler context with a worker pool, this will trigger blocking_operation_wait.
97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 |
# File 'ext/iodine/iodine_worker_pool_test.c', line 97
static VALUE worker_pool_test_busy(int argc, VALUE *argv, VALUE self) {
double duration = 1.0;
VALUE kwargs = Qnil;
VALUE rb_duration = Qundef;
rb_scan_args(argc, argv, "0:", &kwargs);
if (!NIL_P(kwargs)) {
VALUE kwvals[1];
ID kwkeys[1] = {id_duration};
rb_get_kwargs(kwargs, kwkeys, 0, 1, kwvals);
rb_duration = kwvals[0];
}
if (rb_duration != Qundef && !NIL_P(rb_duration)) {
duration = NUM2DBL(rb_duration);
}
/* Create pipe for cancellation */
int pipe_fds[2];
if (pipe(pipe_fds) != 0) {
rb_sys_fail("pipe creation failed");
}
struct busy_operation_data busy_data = {
.read_fd = pipe_fds[0], .write_fd = pipe_fds[1], .duration = duration};
return rb_ensure(busy_operation_execute, (VALUE)&busy_data,
busy_operation_cleanup, (VALUE)&busy_data);
(void)self;
}
|
Instance Method Details
#close ⇒ Object
This is intended for teardown only.
pool.close -> nil
Closes the worker pool during process shutdown. Queued work items that have not started are discarded and their callbacks are not invoked. Work already executing is not cancelled; close waits for worker threads to return naturally.
470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 |
# File 'ext/iodine/iodine_worker_pool.c', line 470
static VALUE worker_pool_close(VALUE self) {
struct iodine_worker_pool *pool;
TypedData_Get_Struct(self, struct iodine_worker_pool,
&iodine_worker_pool_type, pool);
if (pool->shutdown || !pool->initialized) {
return Qnil;
}
pthread_mutex_lock(&pool->mutex);
pool->shutdown = true;
pthread_cond_broadcast(&pool->work_available);
pthread_mutex_unlock(&pool->mutex);
/* Join all worker threads */
struct iodine_worker_pool_worker *worker = pool->workers;
while (worker) {
if (!NIL_P(worker->thread)) {
rb_funcall(worker->thread, rb_intern("join"), 0);
}
worker = worker->next;
}
/* Free worker structures */
worker = pool->workers;
while (worker) {
struct iodine_worker_pool_worker *next = worker->next;
fio_free(worker);
worker = next;
}
pool->workers = NULL;
pool->worker_count = 0;
/* Free any remaining queued work */
struct iodine_worker_pool_work *work = pool->work_head;
while (work) {
struct iodine_worker_pool_work *next = work->next;
IodineStore.remove(work->callback);
fio_free(work);
work = next;
}
pool->work_head = pool->work_tail = NULL;
pthread_mutex_destroy(&pool->mutex);
pthread_cond_destroy(&pool->work_available);
pool->initialized = false;
IodineStore.remove(self);
return Qnil;
}
|
#enqueue(blocking_operation_value) { ... } ⇒ Object
pool.enqueue(blocking_operation) { fiber.resume }
Enqueues a blocking operation to be executed on a background thread. The block is called (with GVL held) after the operation completes.
365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 |
# File 'ext/iodine/iodine_worker_pool.c', line 365
static VALUE worker_pool_enqueue(VALUE self, VALUE blocking_operation_value) {
if (!rb_block_given_p()) {
rb_raise(rb_eArgError, "block required");
}
struct iodine_worker_pool *pool;
TypedData_Get_Struct(self, struct iodine_worker_pool,
&iodine_worker_pool_type, pool);
if (pool->shutdown) {
rb_raise(rb_eRuntimeError, "Worker pool is shut down");
}
rb_fiber_scheduler_blocking_operation_t *blocking_op =
rb_fiber_scheduler_blocking_operation_extract(blocking_operation_value);
if (!blocking_op) {
rb_raise(rb_eArgError, "Invalid blocking operation");
}
VALUE callback = rb_block_proc();
IodineStore.add(callback);
struct iodine_worker_pool_work *work = fio_malloc(sizeof(*work));
FIO_ASSERT_ALLOC(work);
work->blocking_operation = blocking_op;
work->callback = callback;
work->next = NULL;
/* Enqueue work */
pthread_mutex_lock(&pool->mutex);
enqueue_work(pool, work);
pool->submitted_count++;
pthread_cond_signal(&pool->work_available);
pthread_mutex_unlock(&pool->mutex);
return Qtrue;
}
|
#size ⇒ Object
pool.size -> Integer
Returns the number of worker threads.
527 528 529 530 531 532 |
# File 'ext/iodine/iodine_worker_pool.c', line 527
static VALUE worker_pool_size_method(VALUE self) {
struct iodine_worker_pool *pool;
TypedData_Get_Struct(self, struct iodine_worker_pool,
&iodine_worker_pool_type, pool);
return SIZET2NUM(pool->worker_count);
}
|
#stats ⇒ Hash
pool.stats -> Hash
Returns statistics about the worker pool.
429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 |
# File 'ext/iodine/iodine_worker_pool.c', line 429
static VALUE worker_pool_statistics(VALUE self) {
struct iodine_worker_pool *pool;
TypedData_Get_Struct(self, struct iodine_worker_pool,
&iodine_worker_pool_type, pool);
size_t submitted = pool->submitted_count;
size_t completed = pool->completed_count;
bool closed = pool->shutdown;
size_t in_progress = 0;
size_t workers = 0;
size_t queued = 0;
if (pool->initialized) {
pthread_mutex_lock(&pool->mutex);
workers = pool->worker_count;
in_progress = pool->in_progress_count;
queued = count_queue_size(pool);
pthread_mutex_unlock(&pool->mutex);
}
VALUE stats = rb_hash_new();
rb_hash_aset(stats, ID2SYM(workers_id), SIZET2NUM(workers));
rb_hash_aset(stats, ID2SYM(queued_id), SIZET2NUM(queued));
rb_hash_aset(stats, ID2SYM(in_progress_id), SIZET2NUM(in_progress));
rb_hash_aset(stats, ID2SYM(completed_id), SIZET2NUM(completed));
rb_hash_aset(stats, ID2SYM(submitted_id), SIZET2NUM(submitted));
rb_hash_aset(stats, ID2SYM(closed_id), closed ? Qtrue : Qfalse);
return stats;
}
|