Class: IO::Event::Selector::URing

Inherits:
Object
  • Object
show all
Defined in:
ext/io/event/selector/uring.c

Instance Method Summary collapse

Constructor Details

#initialize(loop) ⇒ Object



272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
# File 'ext/io/event/selector/uring.c', line 272

VALUE IO_Event_Selector_URing_initialize(VALUE self, VALUE loop) {
	struct IO_Event_Selector_URing *selector = NULL;
	TypedData_Get_Struct(self, struct IO_Event_Selector_URing, &IO_Event_Selector_URing_Type, selector);
	
	IO_Event_Selector_initialize(&selector->backend, self, loop);
	
	unsigned int flags = 0;
	// IORING_SETUP_SINGLE_ISSUER (kernel 6.0+): only the owner thread submits SQEs.
	// Safe here because wakeup() uses eventfd (no ring access from other threads).
#ifdef IORING_SETUP_SINGLE_ISSUER
	flags |= IORING_SETUP_SINGLE_ISSUER;
#endif
	// IORING_SETUP_DEFER_TASKRUN (kernel 6.1+, requires SINGLE_ISSUER): defer io_uring
	// task work to the application thread rather than a kernel thread, reducing
	// cross-CPU signaling overhead.
#ifdef IORING_SETUP_DEFER_TASKRUN
	flags |= IORING_SETUP_DEFER_TASKRUN;
#endif
	// IORING_SETUP_TASKRUN_FLAG (kernel 5.19+, always available alongside
	// DEFER_TASKRUN): the kernel surfaces IORING_SQ_TASKRUN in sq.flags whenever
	// task work is pending, so select() can skip the io_uring_get_events()
	// syscall when there is nothing deferred to flush.
#ifdef IORING_SETUP_TASKRUN_FLAG
	flags |= IORING_SETUP_TASKRUN_FLAG;
#endif
	// IORING_SETUP_SUBMIT_ALL (kernel 5.18+): keep processing the rest of the SQE
	// batch even when one fails, reducing the frequency of short submits.
#ifdef IORING_SETUP_SUBMIT_ALL
	flags |= IORING_SETUP_SUBMIT_ALL;
#endif
	
	int result = io_uring_queue_init(URING_ENTRIES, &selector->ring, flags);
	
#ifdef IORING_SETUP_SUBMIT_ALL
	if (result == -EINVAL) {
		// IORING_SETUP_SUBMIT_ALL was added in Linux 5.18; retry without it.
		if (DEBUG) fprintf(stderr, "IO_Event_Selector_URing_initialize: no IORING_SETUP_SUBMIT_ALL\n");
		flags &= ~IORING_SETUP_SUBMIT_ALL;
		result = io_uring_queue_init(URING_ENTRIES, &selector->ring, flags);
	}
#endif
	
	if (result < 0) {
		rb_syserr_fail(-result, "IO_Event_Selector_URing_initialize:io_uring_queue_init");
	}
	
	selector->owner = getpid();
	
	rb_update_max_fd(selector->ring.ring_fd);
	
	// Interrupt for cross-thread wakeup: another thread calls signal(); the owner
	// thread submits an async read before each blocking wait so the ring wakes up
	// without the waking thread ever touching the SQ.
	IO_Event_Interrupt_open(&selector->interrupt);
	if (selector->interrupt.descriptor < 0) {
		io_uring_queue_exit(&selector->ring);
		selector->ring.ring_fd = -1;
		rb_sys_fail("IO_Event_Selector_URing_initialize:IO_Event_Interrupt_open");
	}
	
	return self;
}

Instance Method Details

#closeObject



351
352
353
354
355
356
357
358
# File 'ext/io/event/selector/uring.c', line 351

VALUE IO_Event_Selector_URing_close(VALUE self) {
	struct IO_Event_Selector_URing *selector = NULL;
	TypedData_Get_Struct(self, struct IO_Event_Selector_URing, &IO_Event_Selector_URing_Type, selector);
	
	close_internal(selector);
	
	return Qnil;
}

#closed?Boolean

Returns:

  • (Boolean)


360
361
362
363
364
365
# File 'ext/io/event/selector/uring.c', line 360

VALUE IO_Event_Selector_URing_closed_p(VALUE self) {
	struct IO_Event_Selector_URing *selector = NULL;
	TypedData_Get_Struct(self, struct IO_Event_Selector_URing, &IO_Event_Selector_URing_Type, selector);
	
	return selector->ring.ring_fd < 0 || selector->owner != getpid() ? Qtrue : Qfalse;
}

#idle_durationObject



342
343
344
345
346
347
348
349
# File 'ext/io/event/selector/uring.c', line 342

VALUE IO_Event_Selector_URing_idle_duration(VALUE self) {
	struct IO_Event_Selector_URing *selector = NULL;
	TypedData_Get_Struct(self, struct IO_Event_Selector_URing, &IO_Event_Selector_URing_Type, selector);
	
	double duration = selector->idle_duration.tv_sec + (selector->idle_duration.tv_nsec / 1000000000.0);
	
	return DBL2NUM(duration);
}

#io_close(_descriptor) ⇒ Object



1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
# File 'ext/io/event/selector/uring.c', line 1157

VALUE IO_Event_Selector_URing_io_close(VALUE self, VALUE _descriptor) {
	struct IO_Event_Selector_URing *selector = NULL;
	TypedData_Get_Struct(self, struct IO_Event_Selector_URing, &IO_Event_Selector_URing_Type, selector);
	
	// Ruby's fiber scheduler `io_close` hook is invoked with a raw integer file descriptor (Ruby 4.0+); it does not pass the `IO` object.
	int descriptor = RB_NUM2INT(_descriptor);
	
	if (ASYNC_CLOSE) {
		struct io_uring_sqe *sqe = io_get_sqe(selector);
		io_uring_prep_close(sqe, descriptor);
		io_uring_sqe_set_data(sqe, NULL);
		io_uring_submit_now(selector);
		
		// It would be nice to explore not flushing immediately, but instead deferring to the next select cycle.
		// The problem with this approach is that if the user expects the file descriptor to be closed immediately, (e.g. before fork), it may not be closed in time.
		// io_uring_submit_pending(selector);
	} else {
		close(descriptor);
	}
	
	// We don't wait for the result of close since it has no use in practice:
	return Qtrue;
}

#io_pread(fiber, io, buffer, _from, _length, _offset) ⇒ Object



919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
# File 'ext/io/event/selector/uring.c', line 919

VALUE IO_Event_Selector_URing_io_pread(VALUE self, VALUE fiber, VALUE io, VALUE buffer, VALUE _from, VALUE _length, VALUE _offset) {
	struct IO_Event_Selector_URing *selector = NULL;
	TypedData_Get_Struct(self, struct IO_Event_Selector_URing, &IO_Event_Selector_URing_Type, selector);
	
	void *base;
	size_t size;
	rb_io_buffer_get_bytes_for_writing(buffer, &base, &size);
	
	size_t length = NUM2SIZET(_length);
	size_t offset = NUM2SIZET(_offset);
	size_t total = 0;
	off_t from = NUM2OFFT(_from);
	
	// Ensure offset is within the bounds of the buffer to avoid size_t underflow and out-of-bounds pointer arithmetic on (char *)base + offset.
	if (offset > size) {
		return rb_fiber_scheduler_io_result(-1, EINVAL);
	} else if (offset == size) {
		return rb_fiber_scheduler_io_result(0, 0);
	}
	
	int descriptor = IO_Event_Selector_io_descriptor(io);
	
	size_t maximum_size = size - offset;
	while (maximum_size) {
		int result = io_read(selector, fiber, descriptor, (char*)base+offset, maximum_size, from);
		
		if (result > 0) {
			total += result;
			offset += result;
			from += result;
			if ((size_t)result >= length) break;
			length -= result;
		} else if (result == 0) {
			break;
		} else if (length > 0 && IO_Event_try_again(-result)) {
			IO_Event_Selector_URing_io_wait(self, fiber, io, RB_INT2NUM(IO_EVENT_READABLE));
		} else {
			return rb_fiber_scheduler_io_result(-1, -result);
		}
		
		maximum_size = size - offset;
	}
	
	return rb_fiber_scheduler_io_result(total, 0);
}

#io_pwrite(fiber, io, buffer, _from, _length, _offset) ⇒ Object



1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
# File 'ext/io/event/selector/uring.c', line 1101

VALUE IO_Event_Selector_URing_io_pwrite(VALUE self, VALUE fiber, VALUE io, VALUE buffer, VALUE _from, VALUE _length, VALUE _offset) {
	struct IO_Event_Selector_URing *selector = NULL;
	TypedData_Get_Struct(self, struct IO_Event_Selector_URing, &IO_Event_Selector_URing_Type, selector);
	
	const void *base;
	size_t size;
	rb_io_buffer_get_bytes_for_reading(buffer, &base, &size);
	
	size_t length = NUM2SIZET(_length);
	size_t offset = NUM2SIZET(_offset);
	size_t total = 0;
	off_t from = NUM2OFFT(_from);
	
	if (length > size) {
		rb_raise(rb_eRuntimeError, "Length exceeds size of buffer!");
	}

	// Ensure offset is within the bounds of the buffer to avoid size_t underflow and out-of-bounds pointer arithmetic on (char *)base + offset.
	if (offset > size) {
		return rb_fiber_scheduler_io_result(-1, EINVAL);
	} else if (offset == size) {
		return rb_fiber_scheduler_io_result(0, 0);
	}
	
	int descriptor = IO_Event_Selector_io_descriptor(io);
	
	size_t maximum_size = size - offset;
	while (maximum_size) {
		int result = io_write(selector, fiber, descriptor, (char*)base+offset, maximum_size, from);
		
		if (result > 0) {
			total += result;
			offset += result;
			from += result;
			if ((size_t)result >= length) break;
			length -= result;
		} else if (result == 0) {
			break;
		} else if (length > 0 && IO_Event_try_again(-result)) {
			IO_Event_Selector_URing_io_wait(self, fiber, io, RB_INT2NUM(IO_EVENT_WRITABLE));
		} else {
			return rb_fiber_scheduler_io_result(-1, -result);
		}
		
		maximum_size = size - offset;
	}
	
	return rb_fiber_scheduler_io_result(total, 0);
}

#io_read(*args) ⇒ Object



906
907
908
909
910
911
912
913
914
915
916
917
# File 'ext/io/event/selector/uring.c', line 906

static VALUE IO_Event_Selector_URing_io_read_compatible(int argc, VALUE *argv, VALUE self)
{
	rb_check_arity(argc, 4, 5);
	
	VALUE _offset = SIZET2NUM(0);
	
	if (argc == 5) {
		_offset = argv[4];
	}
	
	return IO_Event_Selector_URing_io_read(self, argv[0], argv[1], argv[2], argv[3], _offset);
}

#io_wait(fiber, io, events) ⇒ Object



720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
# File 'ext/io/event/selector/uring.c', line 720

VALUE IO_Event_Selector_URing_io_wait(VALUE self, VALUE fiber, VALUE io, VALUE events) {
	struct IO_Event_Selector_URing *selector = NULL;
	TypedData_Get_Struct(self, struct IO_Event_Selector_URing, &IO_Event_Selector_URing_Type, selector);
	
	int descriptor = IO_Event_Selector_io_descriptor(io);
	
	short flags = poll_flags_from_events(NUM2INT(events));
	
	if (DEBUG) fprintf(stderr, "IO_Event_Selector_URing_io_wait:io_uring_prep_poll_add(descriptor=%d, flags=%d, fiber=%p)\n", descriptor, flags, (void*)fiber);
	
	struct IO_Event_Selector_URing_Waiting waiting = {
		.fiber = fiber,
	};
	
	RB_OBJ_WRITTEN(self, Qundef, fiber);
	
	struct IO_Event_Selector_URing_Completion *completion = IO_Event_Selector_URing_Completion_acquire(selector, &waiting);
	
	struct io_uring_sqe *sqe = io_get_sqe(selector);
	io_uring_prep_poll_add(sqe, descriptor, flags);
	io_uring_sqe_set_data(sqe, completion);
	// If we are going to wait, we assume that we are waiting for a while:
	io_uring_submit_pending(selector);
	
	struct io_wait_arguments io_wait_arguments = {
		.selector = selector,
		.waiting = &waiting,
		.flags = flags
	};
	
	return rb_ensure(io_wait_transfer, (VALUE)&io_wait_arguments, io_wait_ensure, (VALUE)&io_wait_arguments);
}

#io_write(*args) ⇒ Object



1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
# File 'ext/io/event/selector/uring.c', line 1088

static VALUE IO_Event_Selector_URing_io_write_compatible(int argc, VALUE *argv, VALUE self)
{
	rb_check_arity(argc, 4, 5);
	
	VALUE _offset = SIZET2NUM(0);
	
	if (argc == 5) {
		_offset = argv[4];
	}
	
	return IO_Event_Selector_URing_io_write(self, argv[0], argv[1], argv[2], argv[3], _offset);
}

#loopObject



335
336
337
338
339
340
# File 'ext/io/event/selector/uring.c', line 335

VALUE IO_Event_Selector_URing_loop(VALUE self) {
	struct IO_Event_Selector_URing *selector = NULL;
	TypedData_Get_Struct(self, struct IO_Event_Selector_URing, &IO_Event_Selector_URing_Type, selector);
	
	return selector->backend.loop;
}

#process_wait(fiber, _pid, _flags) ⇒ Object



589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
# File 'ext/io/event/selector/uring.c', line 589

VALUE IO_Event_Selector_URing_process_wait(VALUE self, VALUE fiber, VALUE _pid, VALUE _flags) {
	struct IO_Event_Selector_URing *selector = NULL;
	TypedData_Get_Struct(self, struct IO_Event_Selector_URing, &IO_Event_Selector_URing_Type, selector);
	
	pid_t pid = NUM2PIDT(_pid);
	int flags = NUM2INT(_flags);
	
#ifndef IO_EVENT_SELECTOR_URING_USE_WAITID
	// `pidfd_open` can only refer to a specific process, so waiting for any child or a process group (pid <= 0) is delegated to the threaded fallback:
	if (pid <= 0) {
		return IO_Event_Selector_process_wait(pid, flags);
	}
	
	int descriptor = pidfd_open(pid, 0);
	if (descriptor < 0) {
		rb_syserr_fail(errno, "IO_Event_Selector_URing_process_wait:pidfd_open");
	}
	rb_update_max_fd(descriptor);
#endif
	
	struct IO_Event_Selector_URing_Waiting waiting = {
		.fiber = fiber,
	};
	
	RB_OBJ_WRITTEN(self, Qundef, fiber);
	
	struct IO_Event_Selector_URing_Completion *completion = IO_Event_Selector_URing_Completion_acquire(selector, &waiting);
	
	struct process_wait_arguments process_wait_arguments = {
		.selector = selector,
		.waiting = &waiting,
		.pid = pid,
		.flags = flags,
#ifdef IO_EVENT_SELECTOR_URING_USE_WAITID
		.siginfo = {0},
#else
		.descriptor = descriptor,
#endif
	};
	
	struct io_uring_sqe *sqe = io_get_sqe(selector);
	
#ifdef IO_EVENT_SELECTOR_URING_USE_WAITID
	id_t id;
	idtype_t idtype = process_waitid_type(pid, &id);
	if (DEBUG) fprintf(stderr, "IO_Event_Selector_URing_process_wait:io_uring_prep_waitid(fiber=%p, idtype=%d, id=%d, flags=%d)\n", (void*)fiber, idtype, (int)id, flags);
	// `WNOWAIT` leaves the child in a waitable state so we can reap it with `rb_process_status_wait` afterwards and build a correct `Process::Status`:
	io_uring_prep_waitid(sqe, idtype, id, &process_wait_arguments.siginfo, WEXITED | WNOWAIT, 0);
#else
	if (DEBUG) fprintf(stderr, "IO_Event_Selector_URing_process_wait:io_uring_prep_poll_add(%p)\n", (void*)fiber);
	io_uring_prep_poll_add(sqe, descriptor, POLLIN|POLLHUP|POLLERR);
#endif
	io_uring_sqe_set_data(sqe, completion);
	io_uring_submit_pending(selector);
	
	return rb_ensure(process_wait_transfer, (VALUE)&process_wait_arguments, process_wait_ensure, (VALUE)&process_wait_arguments);
}

#push(fiber) ⇒ Object



391
392
393
394
395
396
397
398
399
# File 'ext/io/event/selector/uring.c', line 391

VALUE IO_Event_Selector_URing_push(VALUE self, VALUE fiber)
{
	struct IO_Event_Selector_URing *selector = NULL;
	TypedData_Get_Struct(self, struct IO_Event_Selector_URing, &IO_Event_Selector_URing_Type, selector);
	
	IO_Event_Selector_ready_push(&selector->backend, fiber);
	
	return Qnil;
}

#raise(*args) ⇒ Object



401
402
403
404
405
406
407
# File 'ext/io/event/selector/uring.c', line 401

VALUE IO_Event_Selector_URing_raise(int argc, VALUE *argv, VALUE self)
{
	struct IO_Event_Selector_URing *selector = NULL;
	TypedData_Get_Struct(self, struct IO_Event_Selector_URing, &IO_Event_Selector_URing_Type, selector);
	
	return IO_Event_Selector_raise(&selector->backend, argc, argv);
}

#ready?Boolean

Returns:

  • (Boolean)


409
410
411
412
413
414
# File 'ext/io/event/selector/uring.c', line 409

VALUE IO_Event_Selector_URing_ready_p(VALUE self) {
	struct IO_Event_Selector_URing *selector = NULL;
	TypedData_Get_Struct(self, struct IO_Event_Selector_URing, &IO_Event_Selector_URing_Type, selector);
	
	return selector->backend.ready ? Qtrue : Qfalse;
}

#resume(*args) ⇒ Object



375
376
377
378
379
380
381
# File 'ext/io/event/selector/uring.c', line 375

VALUE IO_Event_Selector_URing_resume(int argc, VALUE *argv, VALUE self)
{
	struct IO_Event_Selector_URing *selector = NULL;
	TypedData_Get_Struct(self, struct IO_Event_Selector_URing, &IO_Event_Selector_URing_Type, selector);
	
	return IO_Event_Selector_resume(&selector->backend, argc, argv);
}

#select(duration) ⇒ Object



1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
# File 'ext/io/event/selector/uring.c', line 1346

VALUE IO_Event_Selector_URing_select(VALUE self, VALUE duration) {
	struct IO_Event_Selector_URing *selector = NULL;
	TypedData_Get_Struct(self, struct IO_Event_Selector_URing, &IO_Event_Selector_URing_Type, selector);
	
	selector->idle_duration.tv_sec = 0;
	selector->idle_duration.tv_nsec = 0;
	
	// Flush any pending events:
	io_uring_submit_flush(selector);
	
#ifdef IORING_SETUP_DEFER_TASKRUN
	// With DEFER_TASKRUN the kernel holds completions as "deferred task work"
	// rather than placing them directly into the CQ.  We need to flush that work
	// into the CQ so the non-blocking select_process_completions below can see
	// it.  With TASKRUN_FLAG enabled the kernel sets IORING_SQ_TASKRUN in
	// sq.flags whenever task work is pending; a relaxed atomic load is enough
	// to check, and we only pay for an io_uring_enter syscall (via
	// io_uring_get_events) when there is actually deferred work to flush.
	if (selector->ring.flags & IORING_SETUP_DEFER_TASKRUN) {
#ifdef IORING_SETUP_TASKRUN_FLAG
		unsigned sq_flags = __atomic_load_n(selector->ring.sq.kflags, __ATOMIC_RELAXED);
		if (sq_flags & IORING_SQ_TASKRUN)
#endif
		{
			io_uring_get_events(&selector->ring);
		}
	}
#endif
	
	int ready = IO_Event_Selector_ready_flush(&selector->backend);
	
	int completed = select_process_completions(selector);
	
	// If we:
	// 1. Didn't process any ready fibers, and
	// 2. Didn't process any events from non-blocking select (above), and
	// 3. There are no items in the ready list,
	// then we can perform a blocking select.
	if (!ready && !completed && !selector->backend.ready) {
		// We might need to wait for events:
		struct select_arguments arguments = {
			.selector = selector,
			.result = 0,
			.timeout = NULL,
		};
		
		arguments.timeout = make_timeout(duration, &arguments.storage);
		
		if (!selector->backend.ready && select_blocking_allowed(arguments.timeout)) {
			struct timespec start_time;
			IO_Event_Time_current(&start_time);
			
			// This is a blocking operation, we wait for events:
			int result = select_internal_without_gvl(&arguments);
			
			struct timespec end_time;
			IO_Event_Time_current(&end_time);
			IO_Event_Time_elapsed(&start_time, &end_time, &selector->idle_duration);
			
			// After waiting/flushing the SQ, check if there are any completions:
			if (result > 0) {
				completed = select_process_completions(selector);
			}
		}
	}
	
	return RB_INT2NUM(completed);
}

#transferObject



367
368
369
370
371
372
373
# File 'ext/io/event/selector/uring.c', line 367

VALUE IO_Event_Selector_URing_transfer(VALUE self)
{
	struct IO_Event_Selector_URing *selector = NULL;
	TypedData_Get_Struct(self, struct IO_Event_Selector_URing, &IO_Event_Selector_URing_Type, selector);
	
	return IO_Event_Selector_loop_yield(&selector->backend);
}

#wakeupObject



1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
# File 'ext/io/event/selector/uring.c', line 1415

VALUE IO_Event_Selector_URing_wakeup(VALUE self) {
	struct IO_Event_Selector_URing *selector = NULL;
	TypedData_Get_Struct(self, struct IO_Event_Selector_URing, &IO_Event_Selector_URing_Type, selector);
	
	// Wake the selector by signalling the interrupt. This is safe from any thread
	// and never touches the ring's SQ, which is required for IORING_SETUP_SINGLE_ISSUER.
	if (selector->blocked) {
		IO_Event_Interrupt_signal(&selector->interrupt);
		return Qtrue;
	}
	
	return Qfalse;
}

#yieldObject



383
384
385
386
387
388
389
# File 'ext/io/event/selector/uring.c', line 383

VALUE IO_Event_Selector_URing_yield(VALUE self)
{
	struct IO_Event_Selector_URing *selector = NULL;
	TypedData_Get_Struct(self, struct IO_Event_Selector_URing, &IO_Event_Selector_URing_Type, selector);
	
	return IO_Event_Selector_yield(&selector->backend);
}