Module: Rperf

Defined in:
lib/rperf.rb,
lib/rperf/meta.rb,
lib/rperf/table.rb,
lib/rperf/version.rb,
ext/rperf/rperf.c

Defined Under Namespace

Modules: ActiveJobMiddleware, Collapsed, Meta, PProf, Table, Text Classes: RackMiddleware, SidekiqMiddleware, Viewer

Constant Summary collapse

VM_STATE_LABELS =

VM state integer → label value mapping. These values appear as “%GVL” / “%GC” label keys in label_sets.

{
  1 => ["%GVL", "blocked"],
  2 => ["%GVL", "wait"],
  3 => ["%GC",  "mark"],
  4 => ["%GC",  "sweep"],
}.freeze
TOP_N =
10
VERSION =
"0.10.0"

Class Method Summary collapse

Class Method Details

._aggregate_and_report(root_data = nil) ⇒ Object

root_data: the root process’s own profile data — GC/OS stats in the merged summary come from the root only (same policy as ‘rperf stat`).



1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
# File 'lib/rperf.rb', line 1027

def self._aggregate_and_report(root_data = nil)
  session_dir = ENV["RPERF_SESSION_DIR"]
  return unless session_dir && File.directory?(session_dir)

  merged_samples = []
  merged_label_sets = [{}]
  merged_label_sets_index = { {} => 0 }
  total_trigger_count = 0
  total_sampling_count = 0
  total_sampling_time_ns = 0
  max_duration_ns = 0
  total_duration_ns = 0
  total_user_ns = 0
  total_sys_ns = 0
  process_count = 0

  Dir.glob(File.join(session_dir, "profile-*.json.gz")).each do |file|
    begin
      data = load(file)
    rescue StandardError => e
      $stderr.puts "rperf: warning: failed to load #{file}: #{e.message}"
      next
    end
    next unless data
    _merge_into(merged_samples, merged_label_sets, data, merged_label_sets_index)
    total_trigger_count += (data[:trigger_count] || 0)
    total_sampling_count += (data[:sampling_count] || 0)
    total_sampling_time_ns += (data[:sampling_time_ns] || 0)
    d = data[:duration_ns] || 0
    max_duration_ns = d if d > max_duration_ns
    total_duration_ns += d
    total_user_ns += (data[:user_ns] || 0)
    total_sys_ns += (data[:sys_ns] || 0)
    process_count += 1
  end

  if process_count == 0
    # Nothing loadable — remove the session dir here, or stop's empty-dir
    # rmdir would fail on the leftover corrupt files and leak the dir
    _cleanup_session_dir(session_dir)
    return
  end

  # mode/frequency: the root's own profile is authoritative; the env vars
  # are only set by the CLI or inherit: true (and default to the root's
  # actual settings via _setup_inherit for the API case)
  saved = @_child_start_opts
  merged_data = {
    mode: (root_data && root_data[:mode]) || (saved ? saved[:mode] : (ENV["RPERF_MODE"] || "wall").to_sym),
    frequency: (root_data && root_data[:frequency]) || (saved ? saved[:frequency] : (ENV["RPERF_FREQUENCY"] || 1000).to_i),
    aggregated_samples: merged_samples,
    label_sets: merged_label_sets,
    trigger_count: total_trigger_count,
    sampling_count: total_sampling_count,
    sampling_time_ns: total_sampling_time_ns,
    duration_ns: max_duration_ns,
    total_duration_ns: total_duration_ns,
    user_ns: total_user_ns,
    sys_ns: total_sys_ns,
    process_count: process_count,
  }

  if root_data
    merged_data[:gc_stats] = root_data[:gc_stats] if root_data[:gc_stats]
    merged_data[:maxrss_mb] = root_data[:maxrss_mb] if root_data[:maxrss_mb]
  end

  print_stat(merged_data) if @stat
  if @output
    write_data(@output, merged_data, @format)
  end

  _cleanup_session_dir(session_dir)

  merged_data
rescue => e
  $stderr.puts "rperf: warning: failed to aggregate multi-process data: #{e.message}"
  # stop() falls back to writing the root's own data when this returns nil
  _cleanup_session_dir(session_dir)
  nil
end

._c_get_labelObject

_c_get_label() — get current thread’s label_set_id. Returns 0 if not profiling or thread not yet seen.



1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
# File 'ext/rperf/rperf.c', line 1700

static VALUE
rb_rperf_get_label(VALUE self)
{
    if (!g_profiler.running) return INT2FIX(0);

    VALUE thread = rb_thread_current();
    rperf_thread_data_t *td = (rperf_thread_data_t *)rb_internal_thread_specific_get(thread, g_profiler.ts_key);
    if (td == NULL) return INT2FIX(0);
    return INT2NUM(td->label_set_id);
}

._c_get_label_setsObject

_c_get_label_sets() — get label_sets Ruby Array



1720
1721
1722
1723
1724
# File 'ext/rperf/rperf.c', line 1720

static VALUE
rb_rperf_get_label_sets(VALUE self)
{
    return g_profiler.label_sets;
}

._c_profile_decObject

_c_profile_dec() — decrement profile refcount; pause timer on 1→0. Called with GVL held.



1808
1809
1810
1811
1812
1813
1814
1815
1816
1817
1818
# File 'ext/rperf/rperf.c', line 1808

static VALUE
rb_rperf_profile_dec(VALUE self)
{
    if (!g_profiler.running) return Qfalse;
    if (g_profiler.profile_refcount <= 0) return Qfalse;
    g_profiler.profile_refcount--;
    if (g_profiler.profile_refcount == 0) {
        rperf_disarm_timer(&g_profiler);
    }
    return Qtrue;
}

._c_profile_incObject

_c_profile_inc() — increment profile refcount; resume timer on 0→1. Called with GVL held.



1794
1795
1796
1797
1798
1799
1800
1801
1802
1803
1804
# File 'ext/rperf/rperf.c', line 1794

static VALUE
rb_rperf_profile_inc(VALUE self)
{
    if (!g_profiler.running) return Qfalse;
    g_profiler.profile_refcount++;
    if (g_profiler.profile_refcount == 1) {
        rperf_reset_thread_times(&g_profiler);
        rperf_arm_timer(&g_profiler);
    }
    return Qtrue;
}

._c_profiler_wrapperObject



1827
1828
1829
1830
1831
# File 'ext/rperf/rperf.c', line 1827

static VALUE
rb_rperf_profiler_wrapper(VALUE self)
{
    return g_profiler_wrapper;
}

._c_running?Boolean

_c_running?() — check if profiler is running.

Returns:

  • (Boolean)


1821
1822
1823
1824
1825
# File 'ext/rperf/rperf.c', line 1821

static VALUE
rb_rperf_running_p(VALUE self)
{
    return g_profiler.running ? Qtrue : Qfalse;
}

._c_set_label(vid) ⇒ Object

_c_set_label(label_set_id) — set current thread’s label_set_id. Called from Ruby with GVL held.



1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
# File 'ext/rperf/rperf.c', line 1682

static VALUE
rb_rperf_set_label(VALUE self, VALUE vid)
{
    if (!g_profiler.running) return vid;

    int label_set_id = NUM2INT(vid);
    VALUE thread = rb_thread_current();
    rperf_thread_data_t *td = (rperf_thread_data_t *)rb_internal_thread_specific_get(thread, g_profiler.ts_key);
    if (td == NULL) {
        td = rperf_thread_data_create(&g_profiler, thread);
        if (!td) rb_raise(rb_eNoMemError, "rperf: failed to allocate thread data");
    }
    td->label_set_id = label_set_id;
    return vid;
}

._c_set_label_sets(ary) ⇒ Object

_c_set_label_sets(ary) — store label_sets Ruby Array for result building



1712
1713
1714
1715
1716
1717
# File 'ext/rperf/rperf.c', line 1712

static VALUE
rb_rperf_set_label_sets(VALUE self, VALUE ary)
{
    g_profiler.label_sets = ary;
    return ary;
}

._c_snapshot(vclear) ⇒ Object



1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
# File 'ext/rperf/rperf.c', line 1646

static VALUE
rb_rperf_snapshot(VALUE self, VALUE vclear)
{
    VALUE result;

    if (!g_profiler.running) {
        return Qnil;
    }

    if (!g_profiler.aggregate) {
        rb_raise(rb_eRuntimeError, "snapshot requires aggregate mode (aggregate: true)");
    }

    /* GVL is held → no postponed jobs fire → no new samples written.
     * Lock worker_mutex to pause worker thread's aggregation. */
    CHECKED(pthread_mutex_lock(&g_profiler.worker_mutex));
    rperf_flush_buffers(&g_profiler);

    /* Build result while mutex is held.  If clear is requested, we must
     * also clear under the same lock to avoid a window where the worker
     * could aggregate into the table between build and clear. */
    result = rperf_build_aggregated_result(&g_profiler);

    if (RTEST(vclear)) {
        rperf_clear_aggregated_data(&g_profiler);
    }

    CHECKED(pthread_mutex_unlock(&g_profiler.worker_mutex));

    return result;
}

._c_start(vfreq, vmode, vagg, vsig, vdefer) ⇒ Object

_c_start(frequency, mode, aggregate, signal, defer)

frequency: Integer (Hz)
mode:      0 = cpu, 1 = wall
aggregate: 0 or 1
signal:    Integer (RT signal number, 0 = nanosleep, -1 = default)
defer:     if truthy, start with timer paused (profile_refcount = 0)


1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
# File 'ext/rperf/rperf.c', line 1242

static VALUE
rb_rperf_start(VALUE self, VALUE vfreq, VALUE vmode, VALUE vagg, VALUE vsig, VALUE vdefer)
{
    int frequency = NUM2INT(vfreq);
    enum rperf_mode mode = (enum rperf_mode)NUM2INT(vmode);
    int aggregate = RTEST(vagg) ? 1 : 0;
#if RPERF_USE_TIMER_SIGNAL
    int sig = NUM2INT(vsig);
    int timer_signal = (sig < 0) ? RPERF_TIMER_SIGNAL_DEFAULT : sig;
#endif

    if (g_profiler.running) {
        rb_raise(rb_eRuntimeError, "Rperf is already running");
    }

    g_profiler.frequency = frequency;
    g_profiler.mode = mode;
    g_profiler.aggregate = aggregate;
    g_profiler.next_thread_seq = 0;
    g_profiler.stats.sampling_count = 0;
    g_profiler.stats.sampling_total_ns = 0;
    g_profiler.stats.trigger_count = 0;
    g_profiler.stats.dropped_samples = 0;
    g_profiler.stats.dropped_aggregation = 0;
    atomic_store_explicit(&g_profiler.active_idx, 0, memory_order_relaxed);
    atomic_store_explicit(&g_profiler.swap_ready, 0, memory_order_relaxed);
    g_profiler.label_sets = Qnil;

    /* Initialize worker mutex/cond */
    CHECKED(pthread_mutex_init(&g_profiler.worker_mutex, NULL));
#ifdef __linux__
    {
        /* Use CLOCK_MONOTONIC for pthread_cond_timedwait so that
         * system clock adjustments (NTP etc.) don't affect timer intervals. */
        pthread_condattr_t cond_attr;
        CHECKED(pthread_condattr_init(&cond_attr));
        CHECKED(pthread_condattr_setclock(&cond_attr, CLOCK_MONOTONIC));
        CHECKED(pthread_cond_init(&g_profiler.worker_cond, &cond_attr));
        CHECKED(pthread_condattr_destroy(&cond_attr));
    }
#else
    CHECKED(pthread_cond_init(&g_profiler.worker_cond, NULL));
#endif

    /* Initialize sample buffer(s) */
    if (rperf_sample_buffer_init(&g_profiler.buffers[0]) < 0) {
        CHECKED(pthread_mutex_destroy(&g_profiler.worker_mutex));
        CHECKED(pthread_cond_destroy(&g_profiler.worker_cond));
        rb_raise(rb_eNoMemError, "rperf: failed to allocate sample buffer 0");
    }
    if (aggregate) {
        if (rperf_sample_buffer_init(&g_profiler.buffers[1]) < 0) {
            rperf_sample_buffer_free(&g_profiler.buffers[0]);
            CHECKED(pthread_mutex_destroy(&g_profiler.worker_mutex));
            CHECKED(pthread_cond_destroy(&g_profiler.worker_cond));
            rb_raise(rb_eNoMemError, "rperf: failed to allocate sample buffer 1");
        }

        /* Initialize aggregation structures */
        if (rperf_frame_table_init(&g_profiler.frame_table) < 0) {
            rperf_sample_buffer_free(&g_profiler.buffers[0]);
            rperf_sample_buffer_free(&g_profiler.buffers[1]);
            CHECKED(pthread_mutex_destroy(&g_profiler.worker_mutex));
            CHECKED(pthread_cond_destroy(&g_profiler.worker_cond));
            rb_raise(rb_eNoMemError, "rperf: failed to allocate frame table");
        }
        if (rperf_agg_table_init(&g_profiler.agg_table) < 0) {
            rperf_frame_table_free(&g_profiler.frame_table);
            rperf_sample_buffer_free(&g_profiler.buffers[0]);
            rperf_sample_buffer_free(&g_profiler.buffers[1]);
            CHECKED(pthread_mutex_destroy(&g_profiler.worker_mutex));
            CHECKED(pthread_cond_destroy(&g_profiler.worker_cond));
            rb_raise(rb_eNoMemError, "rperf: failed to allocate aggregation table");
        }
    }

    /* Register GC event hook */
    g_profiler.gc.phase = RPERF_GC_NONE;
    g_profiler.gc.enter_ns = 0;
    rb_add_event_hook(rperf_gc_event_hook,
                      RUBY_INTERNAL_EVENT_GC_START |
                      RUBY_INTERNAL_EVENT_GC_END_MARK |
                      RUBY_INTERNAL_EVENT_GC_END_SWEEP |
                      RUBY_INTERNAL_EVENT_GC_ENTER |
                      RUBY_INTERNAL_EVENT_GC_EXIT,
                      Qnil);

    /* Register thread event hook for all events */
    g_profiler.thread_hook = rb_internal_thread_add_event_hook(
        rperf_thread_event_hook,
        RUBY_INTERNAL_THREAD_EVENT_EXITED |
        RUBY_INTERNAL_THREAD_EVENT_SUSPENDED |
        RUBY_INTERNAL_THREAD_EVENT_READY |
        RUBY_INTERNAL_THREAD_EVENT_RESUMED,
        &g_profiler);

    /* Pre-initialize current thread's time so the first sample is not skipped */
    {
        VALUE cur_thread = rb_thread_current();
        /* A stale td can survive a fork (the atfork child handler does not
         * free the forking thread's data) — free it before creating a fresh
         * one, or it would leak on every fork + restart cycle. */
        rperf_thread_data_t *stale = (rperf_thread_data_t *)rb_internal_thread_specific_get(cur_thread, g_profiler.ts_key);
        if (stale) {
            free(stale);
            rb_internal_thread_specific_set(cur_thread, g_profiler.ts_key, NULL);
        }
        rperf_thread_data_t *td = rperf_thread_data_create(&g_profiler, cur_thread);
        if (!td) {
            rb_remove_event_hook(rperf_gc_event_hook);
            rb_internal_thread_remove_event_hook(g_profiler.thread_hook);
            g_profiler.thread_hook = NULL;
            if (g_profiler.aggregate) {
                rperf_sample_buffer_free(&g_profiler.buffers[1]);
                rperf_frame_table_free(&g_profiler.frame_table);
                rperf_agg_table_free(&g_profiler.agg_table);
            }
            rperf_sample_buffer_free(&g_profiler.buffers[0]);
            CHECKED(pthread_mutex_destroy(&g_profiler.worker_mutex));
            CHECKED(pthread_cond_destroy(&g_profiler.worker_cond));
            rb_raise(rb_eNoMemError, "rperf: failed to allocate thread data");
        }
    }

    clock_gettime(CLOCK_REALTIME, &g_profiler.start_realtime);
    clock_gettime(CLOCK_MONOTONIC, &g_profiler.start_monotonic);

    g_profiler.running = 1;
    g_profiler.profile_refcount = RTEST(vdefer) ? 0 : 1;
    g_profiler.worker_paused = 0;

#if RPERF_USE_TIMER_SIGNAL
    g_profiler.timer_signal = timer_signal;

    if (timer_signal > 0) {
        struct sigaction sa;
        struct sigevent sev;
        struct itimerspec its;

        memset(&sa, 0, sizeof(sa));
        sa.sa_handler = rperf_signal_handler;
        sa.sa_flags = SA_RESTART;
        if (sigaction(g_profiler.timer_signal, &sa, &g_profiler.old_sigaction) != 0) {
            g_profiler.running = 0;
            goto timer_fail;
        }

        /* Start worker thread first to get its kernel TID */
        g_profiler.worker_tid = 0;
        if (pthread_create(&g_profiler.worker_thread, NULL,
                           rperf_worker_signal_func, &g_profiler) != 0) {
            g_profiler.running = 0;
            sigaction(g_profiler.timer_signal, &g_profiler.old_sigaction, NULL);
            goto timer_fail;
        }

        /* Wait for worker thread to publish its TID */
        CHECKED(pthread_mutex_lock(&g_profiler.worker_mutex));
        while (g_profiler.worker_tid == 0) {
            CHECKED(pthread_cond_wait(&g_profiler.worker_cond, &g_profiler.worker_mutex));
        }
        CHECKED(pthread_mutex_unlock(&g_profiler.worker_mutex));

        /* Create timer targeting the worker thread via SIGEV_THREAD_ID */
        memset(&sev, 0, sizeof(sev));
        sev.sigev_notify = SIGEV_THREAD_ID;
        sev.sigev_signo = g_profiler.timer_signal;
        sev._sigev_un._tid = g_profiler.worker_tid;
        if (timer_create(CLOCK_MONOTONIC, &sev, &g_profiler.timer_id) != 0) {
            g_profiler.running = 0;
            sigaction(g_profiler.timer_signal, &g_profiler.old_sigaction, NULL);
            /* Signal under the mutex — see rb_rperf_stop for the rationale */
            CHECKED(pthread_mutex_lock(&g_profiler.worker_mutex));
            CHECKED(pthread_cond_signal(&g_profiler.worker_cond));
            CHECKED(pthread_mutex_unlock(&g_profiler.worker_mutex));
            CHECKED(pthread_join(g_profiler.worker_thread, NULL));
            goto timer_fail;
        }

        if (RPERF_PAUSED(&g_profiler)) {
            /* defer mode: create timer but don't arm it */
            its.it_value.tv_sec = 0;
            its.it_value.tv_nsec = 0;
        } else {
            /* Split into sec/nsec: frequency 1 gives a 1s interval, and
             * tv_nsec must be < 1e9 or timer_settime fails with EINVAL */
            long interval_ns = 1000000000L / g_profiler.frequency;
            its.it_value.tv_sec = interval_ns / 1000000000L;
            its.it_value.tv_nsec = interval_ns % 1000000000L;
        }
        its.it_interval = its.it_value;
        if (timer_settime(g_profiler.timer_id, 0, &its, NULL) != 0) {
            timer_delete(g_profiler.timer_id);
            g_profiler.running = 0;
            sigaction(g_profiler.timer_signal, &g_profiler.old_sigaction, NULL);
            /* Signal under the mutex — see rb_rperf_stop for the rationale */
            CHECKED(pthread_mutex_lock(&g_profiler.worker_mutex));
            CHECKED(pthread_cond_signal(&g_profiler.worker_cond));
            CHECKED(pthread_mutex_unlock(&g_profiler.worker_mutex));
            CHECKED(pthread_join(g_profiler.worker_thread, NULL));
            goto timer_fail;
        }
    } else
#endif
    {
        /* Start worker thread (timer via timedwait + aggregation) */
        if (pthread_create(&g_profiler.worker_thread, NULL,
                           rperf_worker_nanosleep_func, &g_profiler) != 0) {
            g_profiler.running = 0;
            goto timer_fail;
        }
    }

    if (0) {
timer_fail:
        {
            VALUE cur = rb_thread_current();
            rperf_thread_data_t *td = (rperf_thread_data_t *)rb_internal_thread_specific_get(cur, g_profiler.ts_key);
            if (td) {
                free(td);
                rb_internal_thread_specific_set(cur, g_profiler.ts_key, NULL);
            }
        }
        rb_remove_event_hook(rperf_gc_event_hook);
        rb_internal_thread_remove_event_hook(g_profiler.thread_hook);
        g_profiler.thread_hook = NULL;
        if (g_profiler.aggregate) {
            rperf_sample_buffer_free(&g_profiler.buffers[1]);
            rperf_frame_table_free(&g_profiler.frame_table);
            rperf_agg_table_free(&g_profiler.agg_table);
        }
        rperf_sample_buffer_free(&g_profiler.buffers[0]);
        CHECKED(pthread_mutex_destroy(&g_profiler.worker_mutex));
        CHECKED(pthread_cond_destroy(&g_profiler.worker_cond));
        rb_raise(rb_eRuntimeError, "rperf: failed to create timer");
    }

    return Qtrue;
}

._c_stopObject



1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
# File 'ext/rperf/rperf.c', line 1482

static VALUE
rb_rperf_stop(VALUE self)
{
    VALUE result;

    if (!g_profiler.running) {
        return Qnil;
    }

    g_profiler.running = 0;
#if RPERF_USE_TIMER_SIGNAL
    if (g_profiler.timer_signal > 0) {
        /* Delete timer first to stop generating new signals.
         * Do NOT restore signal handler yet — the worker thread may still have
         * pending timer signals.  rperf_signal_handler handles them harmlessly. */
        timer_delete(g_profiler.timer_id);
    }
#endif

    /* Wake and join worker thread.  Signal while holding worker_mutex:
     * the worker re-checks its predicate (running) with the mutex held, so
     * signaling under the mutex guarantees it either sees running == 0 or is
     * already inside cond_wait when the signal fires — no lost wakeup.
     * Any pending timer signals are still handled by rperf_signal_handler
     * (just increments trigger_count + calls rb_postponed_job_trigger). */
    CHECKED(pthread_mutex_lock(&g_profiler.worker_mutex));
    CHECKED(pthread_cond_signal(&g_profiler.worker_cond));
    CHECKED(pthread_mutex_unlock(&g_profiler.worker_mutex));
    CHECKED(pthread_join(g_profiler.worker_thread, NULL));
    CHECKED(pthread_mutex_destroy(&g_profiler.worker_mutex));
    CHECKED(pthread_cond_destroy(&g_profiler.worker_cond));

#if RPERF_USE_TIMER_SIGNAL
    if (g_profiler.timer_signal > 0) {
        /* Worker thread is gone — safe to restore old signal handler now. */
        sigaction(g_profiler.timer_signal, &g_profiler.old_sigaction, NULL);
    }
#endif

    if (g_profiler.thread_hook) {
        rb_internal_thread_remove_event_hook(g_profiler.thread_hook);
        g_profiler.thread_hook = NULL;
    }

    /* Remove GC event hook */
    rb_remove_event_hook(rperf_gc_event_hook);

    if (g_profiler.aggregate) {
        /* Worker thread is joined; no concurrent access. */
        rperf_flush_buffers(&g_profiler);
    }

    /* Clean up thread-specific data for all live threads */
    {
        VALUE threads = rb_funcall(rb_cThread, rb_intern("list"), 0);
        long tc = RARRAY_LEN(threads);
        long ti;
        for (ti = 0; ti < tc; ti++) {
            VALUE thread = RARRAY_AREF(threads, ti);
            rperf_thread_data_t *td = (rperf_thread_data_t *)rb_internal_thread_specific_get(thread, g_profiler.ts_key);
            if (td) {
                free(td);
                rb_internal_thread_specific_set(thread, g_profiler.ts_key, NULL);
            }
        }
    }

    if (g_profiler.aggregate) {
        result = rperf_build_aggregated_result(&g_profiler);

        rperf_sample_buffer_free(&g_profiler.buffers[1]);
        rperf_frame_table_free(&g_profiler.frame_table);
        rperf_agg_table_free(&g_profiler.agg_table);
    } else {
        /* Raw samples path (aggregate: false) */
        VALUE samples_ary;
        size_t i;
        int j;
        rperf_sample_buffer_t *buf = &g_profiler.buffers[0];

        result = rb_hash_new();
        rb_hash_aset(result, ID2SYM(rb_intern("mode")),
                     ID2SYM(rb_intern(g_profiler.mode == RPERF_MODE_WALL ? "wall" : "cpu")));
        rb_hash_aset(result, ID2SYM(rb_intern("frequency")), INT2NUM(g_profiler.frequency));
        rb_hash_aset(result, ID2SYM(rb_intern("trigger_count")), SIZET2NUM(g_profiler.stats.trigger_count));
        rb_hash_aset(result, ID2SYM(rb_intern("sampling_count")), SIZET2NUM(g_profiler.stats.sampling_count));
        rb_hash_aset(result, ID2SYM(rb_intern("sampling_time_ns")), LL2NUM(g_profiler.stats.sampling_total_ns));
        if (g_profiler.stats.dropped_samples > 0)
            rb_hash_aset(result, ID2SYM(rb_intern("dropped_samples")), SIZET2NUM(g_profiler.stats.dropped_samples));
        if (g_profiler.stats.dropped_aggregation > 0)
            rb_hash_aset(result, ID2SYM(rb_intern("dropped_aggregation")), SIZET2NUM(g_profiler.stats.dropped_aggregation));
        rb_hash_aset(result, ID2SYM(rb_intern("detected_thread_count")), INT2NUM(g_profiler.next_thread_seq));
        {
            struct timespec stop_monotonic;
            int64_t start_ns, duration_ns;
            clock_gettime(CLOCK_MONOTONIC, &stop_monotonic);
            start_ns = (int64_t)g_profiler.start_realtime.tv_sec * 1000000000LL
                     + (int64_t)g_profiler.start_realtime.tv_nsec;
            duration_ns = ((int64_t)stop_monotonic.tv_sec - (int64_t)g_profiler.start_monotonic.tv_sec) * 1000000000LL
                        + ((int64_t)stop_monotonic.tv_nsec - (int64_t)g_profiler.start_monotonic.tv_nsec);
            rb_hash_aset(result, ID2SYM(rb_intern("start_time_ns")), LL2NUM(start_ns));
            rb_hash_aset(result, ID2SYM(rb_intern("duration_ns")), LL2NUM(duration_ns));
        }

        samples_ary = rb_ary_new_capa((long)buf->sample_count);
        for (i = 0; i < buf->sample_count; i++) {
            rperf_sample_t *s = &buf->samples[i];
            VALUE frames = rb_ary_new_capa(s->depth);

            for (j = 0; j < s->depth; j++) {
                if (s->frame_start + j >= buf->frame_pool_count) break;
                VALUE fval = buf->frame_pool[s->frame_start + j];
                rb_ary_push(frames, rperf_resolve_frame(fval));
            }

            VALUE sample = rb_ary_new_capa(5);
            rb_ary_push(sample, frames);
            rb_ary_push(sample, LL2NUM(s->weight));
            rb_ary_push(sample, INT2NUM(s->thread_seq));
            rb_ary_push(sample, INT2NUM(s->label_set_id));
            rb_ary_push(sample, INT2NUM(s->vm_state));
            rb_ary_push(samples_ary, sample);
        }
        rb_hash_aset(result, ID2SYM(rb_intern("raw_samples")), samples_ary);
        if (g_profiler.label_sets != Qnil) {
            rb_hash_aset(result, ID2SYM(rb_intern("label_sets")), g_profiler.label_sets);
        }
    }

    /* Cleanup */
    rperf_sample_buffer_free(&g_profiler.buffers[0]);

    return result;
}

._init_label_setsObject

Label set management for per-context profiling. Label sets are stored as an Array of Hashes, indexed by label_set_id. Index 0 is reserved (no labels).



270
271
272
273
# File 'lib/rperf.rb', line 270

def self._init_label_sets
  @label_set_table = [{}]  # id 0 = no labels
  @label_set_index = { {} => 0 }
end

._intern_label_set(hash) ⇒ Object



275
276
277
278
279
280
281
282
283
# File 'lib/rperf.rb', line 275

def self._intern_label_set(hash)
  hash.freeze
  @label_set_index[hash] ||= begin
    id = @label_set_table.size
    @label_set_table << hash
    _c_set_label_sets(@label_set_table)
    id
  end
end

._on_first_forkObject



975
976
977
978
979
980
981
982
983
984
# File 'lib/rperf.rb', line 975

def self._on_first_fork
  return if @_session_dir_created
  session_dir = ENV["RPERF_SESSION_DIR"]
  return unless session_dir && File.directory?(session_dir)

  @_session_dir_created = true
  # Root's @output/@format/@stat are kept as-is (user's original settings).
  # stop() writes root's profile to session dir with fixed json.gz format,
  # then uses the original settings for the merged output.
end

._restart_in_childObject



986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
# File 'lib/rperf.rb', line 986

def self._restart_in_child
  session_dir = ENV["RPERF_SESSION_DIR"]
  return unless session_dir && File.directory?(session_dir)
  return if _c_running?  # should not happen, but guard against it

  # C state is already cleaned up by pthread_atfork child handler.
  @label_set_table = nil
  @label_set_index = nil

  require "securerandom"
  # Random suffix: PIDs can be recycled within a long-lived session, and a
  # plain profile-<pid> name would silently overwrite an earlier child's data
  child_output = File.join(session_dir, "profile-#{Process.pid}-#{SecureRandom.hex(4)}.json.gz")

  # Start options: prefer the values remembered by _setup_inherit (API
  # inherit: :fork / true — fork preserves module state); fall back to the
  # RPERF_* env vars (CLI-managed sessions always export them).
  saved = @_child_start_opts
  opts = {
    frequency: saved ? saved[:frequency] : (ENV["RPERF_FREQUENCY"] || 1000).to_i,
    mode: saved ? saved[:mode] : (ENV["RPERF_MODE"] == "cpu" ? :cpu : :wall),
    aggregate: saved ? saved[:aggregate] : ENV["RPERF_AGGREGATE"] != "0",
    output: child_output,
    format: :json,
    stat: false,
    verbose: false,
  }
  sig = saved ? saved[:signal] : _parse_signal_env
  opts[:signal] = sig unless sig.nil?
  opts[:defer] = true if saved ? saved[:defer] : ENV["RPERF_DEFER"] == "1"

  start(**opts, inherit: false)
  @_session_dir_output = true
  label("%pid": Process.pid.to_s)

  # Register at_exit so child's profile is written even without explicit stop
  at_exit { Rperf.stop }
end

.gzip(data) ⇒ Object



527
528
529
530
531
532
533
534
# File 'lib/rperf.rb', line 527

def self.gzip(data)
  io = StringIO.new
  io.set_encoding("ASCII-8BIT")
  gz = Zlib::GzipWriter.new(io)
  gz.write(data)
  gz.close
  io.string
end

.label(**kw, &block) ⇒ Object

Sets labels on the current thread for profiling annotation. With a block: restores previous labels when the block exits. Without a block: sets labels persistently on the current thread. Labels are key-value pairs written into pprof sample labels.

Rperf.label(request: "abc") { handle_request }
Rperf.label(request: "abc")  # persistent set

Values of nil remove that key. Existing labels are merged.



313
314
315
316
317
318
319
320
321
322
323
324
325
326
# File 'lib/rperf.rb', line 313

def self.label(**kw, &block)
  return yield if block && !_c_running?
  return unless _c_running?

  cur_id, _new_id = _merge_and_set_label(kw)

  if block
    begin
      yield
    ensure
      _c_set_label(cur_id)
    end
  end
end

.labelsObject

Returns the current thread’s labels as a Hash. Returns an empty Hash if no labels are set or profiling is not running.



361
362
363
364
365
# File 'lib/rperf.rb', line 361

def self.labels
  return {} unless @label_set_table
  cur_id = _c_get_label
  @label_set_table[cur_id] || {}
end

.load(path) ⇒ Object

Load a profile saved by rperf record (.json.gz or .json). Returns the data hash (same format as Rperf.stop / Rperf.snapshot). Warns to stderr if the file was saved by a different rperf version.



486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
# File 'lib/rperf.rb', line 486

def self.load(path)
  raw_bytes = File.binread(path)
  # Auto-detect gzip by magic bytes (1f 8b)
  raw = if raw_bytes.byteslice(0, 2) == "\x1f\x8b".b
    Zlib::GzipReader.new(StringIO.new(raw_bytes)).read
  else
    raw_bytes
  end
  require "json"
  data = JSON.parse(raw, symbolize_names: true)
  # symbolize_names only converts keys — :mode round-trips as a String
  # ("wall"), which encoders compare against :wall/:cpu symbols
  data[:mode] = data[:mode].to_sym if data[:mode].is_a?(String)
  saved_version = data.delete(:rperf_version)
  if saved_version && saved_version != VERSION
    $stderr.puts "rperf: warning: file was saved by rperf #{saved_version} (current: #{VERSION})"
  elsif saved_version.nil?
    $stderr.puts "rperf: warning: file has no version info (may be from an older rperf)"
  end
  data
end


615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
# File 'lib/rperf.rb', line 615

def self.print_stat(data)
  samples_raw = data[:aggregated_samples] || []
  real_ns = ((Process.clock_gettime(Process::CLOCK_MONOTONIC) - @stat_start_mono) * 1_000_000_000).to_i
  times = Process.times
  start_times = @stat_start_times || ZERO_TIMES
  user_ns = ((times.utime - start_times.utime) * 1_000_000_000).to_i
  sys_ns = ((times.stime - start_times.stime) * 1_000_000_000).to_i

  # In multi-process mode, use aggregated user/sys from all processes
  process_count = data[:process_count] || 0
  if process_count > 1 && data[:user_ns]
    user_ns = data[:user_ns]
    sys_ns = data[:sys_ns] || 0
  end

  command = ENV["RPERF_STAT_COMMAND"] || "(unknown)"

  $stderr.puts
  $stderr.puts " Performance stats for '#{command}':"
  $stderr.puts
  $stderr.puts format("  %14s ms   user", format_ms(user_ns))
  $stderr.puts format("  %14s ms   sys", format_ms(sys_ns))
  $stderr.puts format("  %14s ms   real", format_ms(real_ns))

  if samples_raw.size > 0
    breakdown, total_weight = compute_stat_breakdown(samples_raw, data[:label_sets])
    print_stat_breakdown(breakdown, total_weight, data)
    print_stat_runtime_info(data)
    print_stat_system_info(data)
    print_stat_report(data) if ENV["RPERF_STAT_REPORT"] == "1"
    print_stat_footer(samples_raw, real_ns, data)
  end

  $stderr.puts
end


536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
# File 'lib/rperf.rb', line 536

def self.print_stats(data)
  count = data[:sampling_count] || 0
  total_ns = data[:sampling_time_ns] || 0
  mode = data[:mode] || :cpu
  frequency = data[:frequency] || 0

  total_ms = total_ns / 1_000_000.0
  avg_us = count > 0 ? total_ns / count / 1000.0 : 0.0

  $stderr.puts "[Rperf] mode=#{mode} frequency=#{frequency}Hz"
  $stderr.puts "[Rperf] sampling: #{count} calls, #{format("%.2f", total_ms)}ms total, #{format("%.1f", avg_us)}us/call avg"
  $stderr.puts "[Rperf] samples recorded: #{count}"

  print_top(data)
end

Samples from C are now [[path_str, label_str], …], weight]



583
584
585
586
587
588
589
590
591
592
# File 'lib/rperf.rb', line 583

def self.print_top(data)
  samples_raw = data[:aggregated_samples]
  return if !samples_raw || samples_raw.empty?

  result = compute_flat_cum(samples_raw)
  return if result[:cum].empty?

  print_top_table("flat", result[:flat], result[:total_weight])
  print_top_table("cum", result[:cum], result[:total_weight])
end


594
595
596
597
598
599
600
601
602
603
604
# File 'lib/rperf.rb', line 594

def self.print_top_table(kind, table, total_weight)
  top = table.sort_by { |_, w| -w }.first(TOP_N)
  $stderr.puts "[Rperf] top #{top.size} by #{kind}:"
  top.each do |key, weight|
    label, path = key
    ms = weight / 1_000_000.0
    pct = total_weight > 0 ? weight * 100.0 / total_weight : 0.0
    loc = path.empty? ? "" : " (#{path})"
    $stderr.puts format("[Rperf]   %8.1fms %5.1f%%  %s%s", ms, pct, label, loc)
  end
end

.profile(**kw, &block) ⇒ Object

Profiles the given block: activates timer sampling for the duration and optionally applies labels. Use with start(defer: true) to profile only specific sections of code.

Rperf.start(defer: true, mode: :wall)
Rperf.profile(endpoint: "/users") { handle_request }
data = Rperf.stop

Nesting is supported: timer stays active until the outermost profile exits. Requires a block. Raises if profiling is not started.

Raises:

  • (ArgumentError)


338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
# File 'lib/rperf.rb', line 338

def self.profile(**kw, &block)
  raise ArgumentError, "Rperf.profile requires a block" unless block
  raise RuntimeError, "Rperf is not started" unless _c_running?

  cur_id, _new_id = _merge_and_set_label(kw)

  _c_profile_inc

  begin
    yield
  ensure
    _c_profile_dec
    _c_set_label(cur_id)
  end
end

.read_meta(path) ⇒ Object

Read only the meta/summary head of a profile saved by rperf record (.json.gz or .json) without loading the sample body. Returns { meta: Hash|nil, summary: Hash|nil }, or nil for files saved by older rperf versions (no leading meta) or unreadable files.



512
513
514
# File 'lib/rperf.rb', line 512

def self.read_meta(path)
  Meta.read(path)
end

.running?Boolean

Returns true while a profiling session is active (between start and stop).

Returns:

  • (Boolean)


355
356
357
# File 'lib/rperf.rb', line 355

def self.running?
  _c_running?
end

.save(path, data, format: nil) ⇒ Object

Saves profiling data to a file. format: :json, :pprof, :collapsed, or :text. nil = auto-detect from path extension

.json.gz   → json (rperf native, gzip compressed, default)
.json      → json (plain text, readable by jq etc.)
.collapsed → collapsed stacks (FlameGraph / speedscope compatible)
.txt       → text report (human/AI readable flat + cumulative table)
.pb.gz     → pprof protobuf (gzip compressed)


416
417
418
# File 'lib/rperf.rb', line 416

def self.save(path, data, format: nil)
  write_data(path, data, format)
end

.snapshot(clear: false) ⇒ Object

Returns a snapshot of the current profiling data without stopping. Only works in aggregate mode (the default). Returns nil if not profiling. The returned data has the same format as stop’s return value and can be passed to save(), PProf.encode(), Collapsed.encode(), or Text.encode().

clear: if true, resets aggregated data after taking the snapshot. This allows interval-based profiling where each snapshot covers only the period since the last clear.



242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
# File 'lib/rperf.rb', line 242

def self.snapshot(clear: false)
  data = _c_snapshot(clear)
  return unless data
  # GC/memory stats for the snapshot's summary. The baseline advances on
  # clear: true so interval snapshots report per-interval deltas.
  if @gc_stat_snapshot_base
    gc = GC.stat
    base = @gc_stat_snapshot_base
    data[:gc_stats] = {
      count: gc[:count] - base[:count],
      minor_count: gc[:minor_gc_count] - base[:minor_gc_count],
      major_count: gc[:major_gc_count] - base[:major_gc_count],
      time_ms: (gc[:time] || 0) - (base[:time] || 0),
      allocated_objects: gc[:total_allocated_objects] - base[:total_allocated_objects],
      freed_objects: gc[:total_freed_objects] - base[:total_freed_objects],
    }
    @gc_stat_snapshot_base = gc if clear
  end
  sys_stats = get_system_stats
  data[:maxrss_mb] = (sys_stats[:maxrss_kb] / 1024.0).round if sys_stats[:maxrss_kb]
  merge_vm_state_labels!(data)
  data
end

.start(frequency: 1000, mode: :cpu, output: nil, verbose: false, format: nil, stat: false, signal: nil, aggregate: true, defer: false, inherit: :fork) ⇒ Object

Starts profiling. format: :json, :pprof, :collapsed, or :text. nil = auto-detect from output extension

.json.gz   → json (rperf native, default)
.collapsed → collapsed stacks (FlameGraph / speedscope compatible)
.txt       → text report (human/AI readable flat + cumulative table)
.pb.gz     → pprof protobuf (gzip compressed)

inherit: controls child process profiling.

:fork  — (default) automatically profile forked child processes via Process._fork hook.
         Session dir is created eagerly at start time. Spawned processes are NOT tracked.
true   — profile both forked and spawned Ruby child processes. Sets RUBYOPT=-rrperf
         and RPERF_* env vars so spawned Ruby processes auto-start profiling.
         Use with caution: affects ALL spawned Ruby processes, including independent
         programs that may use rperf themselves.
false  — do not track child processes (single-process mode).

Raises:

  • (ArgumentError)


48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/rperf.rb', line 48

def self.start(frequency: 1000, mode: :cpu, output: nil, verbose: false, format: nil, stat: false, signal: nil, aggregate: true, defer: false, inherit: :fork)
  raise ArgumentError, "frequency must be a positive integer (got #{frequency.inspect})" unless frequency.is_a?(Integer) && frequency > 0
  raise ArgumentError, "frequency must be <= 10000 (10KHz), got #{frequency}" if frequency > 10_000
  raise ArgumentError, "mode must be :cpu or :wall, got #{mode.inspect}" unless %i[cpu wall].include?(mode)
  raise ArgumentError, "inherit must be :fork, true, or false, got #{inherit.inspect}" unless [true, false, :fork].include?(inherit)
  c_mode = mode == :cpu ? 0 : 1
  unless signal.nil? || signal == false || signal.is_a?(Integer)
    raise ArgumentError, "signal must be nil, false, or an Integer, got #{signal.inspect}"
  end
  c_signal = signal.nil? ? -1 : (signal ? signal.to_i : 0)
  if c_signal > 0
    raise ArgumentError, "signal mode is only supported on Linux" unless RUBY_PLATFORM =~ /linux/
    uncatchable = [Signal.list["KILL"], Signal.list["STOP"]].compact
    if uncatchable.include?(c_signal)
      name = Signal.signame(c_signal) rescue c_signal.to_s
      raise ArgumentError, "signal #{c_signal} (#{name}) cannot be caught; use a different signal"
    end
  end
  @verbose = verbose || ENV["RPERF_VERBOSE"] == "1"
  @output = output
  @format = format
  @stat = stat
  @stat_start_mono = Process.clock_gettime(Process::CLOCK_MONOTONIC)
  @stat_start_times = Process.times
  @gc_stat_start = GC.stat
  @gc_stat_snapshot_base = @gc_stat_start
  @label_set_table = nil
  @label_set_index = nil
  _c_start(frequency, c_mode, aggregate, c_signal, defer)

  # Set up child process tracking
  if inherit && !ENV["RPERF_SESSION_DIR"]
    _setup_inherit(mode, frequency, signal, aggregate, inherit, defer)
  end

  if block_given?
    begin
      yield
    ensure
      result = stop
    end
    result
  end
end

.stopObject



106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
# File 'lib/rperf.rb', line 106

def self.stop
  # Check if we need to aggregate child process data.
  # @_session_dir_created: fork happened and session dir is active.
  # Otherwise: check for actual child profile files (spawn-only case).
  session_dir = ENV["RPERF_SESSION_DIR"]
  is_root = session_dir && Process.pid.to_s == ENV["RPERF_ROOT_PROCESS"]
  has_child_profiles = is_root && !@_session_dir_created &&
                       File.directory?(session_dir.to_s) &&
                       !Dir.glob(File.join(session_dir.to_s, "profile-*.json.gz")).empty?
  needs_aggregation = is_root && (@_session_dir_created || has_child_profiles)

  data = _c_stop
  return unless data

  # Record process times for multi-process aggregation
  times = Process.times
  start_times = @stat_start_times || ZERO_TIMES
  data[:user_ns] = ((times.utime - start_times.utime) * 1_000_000_000).to_i
  data[:sys_ns] = ((times.stime - start_times.stime) * 1_000_000_000).to_i

  # GC / memory statistics for the summary (deltas since start; GC.stat is
  # cumulative over the process lifetime). maxrss is a process-lifetime
  # peak — no delta is possible.
  if @gc_stat_start
    gc = GC.stat
    data[:gc_stats] = {
      count: gc[:count] - @gc_stat_start[:count],
      minor_count: gc[:minor_gc_count] - @gc_stat_start[:minor_gc_count],
      major_count: gc[:major_gc_count] - @gc_stat_start[:major_gc_count],
      time_ms: (gc[:time] || 0) - (@gc_stat_start[:time] || 0),
      allocated_objects: gc[:total_allocated_objects] - @gc_stat_start[:total_allocated_objects],
      freed_objects: gc[:total_freed_objects] - @gc_stat_start[:total_freed_objects],
    }
    @gc_stat_start = nil
  end
  sys_stats = get_system_stats
  data[:maxrss_mb] = (sys_stats[:maxrss_kb] / 1024.0).round if sys_stats[:maxrss_kb]

  # When aggregate: false, C extension returns :raw_samples but not
  # :aggregated_samples.  Build aggregated view so encoders always work.
  if data[:raw_samples] && !data[:aggregated_samples]
    merged = {}
    data[:raw_samples].each do |frames, weight, thread_seq, label_set_id, vm_state|
      key = [frames, thread_seq || 0, label_set_id || 0, vm_state || 0]
      if merged.key?(key)
        merged[key] += weight
      else
        merged[key] = weight
      end
    end
    data[:aggregated_samples] = merged.map { |(frames, ts, lsi, vs), w| [frames, w, ts, lsi, vs] }
  end

  merge_vm_state_labels!(data)

  if needs_aggregation
    # Root process with children: write root's own profile to session dir
    # (fixed json.gz format), then aggregate all profiles.
    # Root's @output/@format/@stat are preserved for the merged result.
    print_stats(data) if @verbose
    begin
      write_data(File.join(session_dir, "profile-#{Process.pid}.json.gz"), data, :json, internal: true)
    rescue SystemCallError
      # Session dir may have been removed (e.g., test scenario) — continue to aggregation
    end
    merged = _aggregate_and_report(data)
    if merged.nil? && data
      # Aggregation failed — fall back to root's own data
      $stderr.puts "rperf: warning: multi-process aggregation failed; writing root process data only"
      write_data(@output, data, @format) if @output
      print_stat(data) if @stat
    end
    _cleanup_session_state
    return merged || data
  end

  print_stats(data) if @verbose
  print_stat(data) if @stat

  if @output
    if @_session_dir_output
      # Child process writing to session dir — tolerate missing dir
      begin
        write_data(@output, data, @format, internal: true)
      rescue SystemCallError
        # Parent may have already cleaned up the session dir (e.g., parent
        # exited first and rm_rf'd it), or disk is full. Silently skip —
        # crashing in at_exit is worse than losing one child's profile.
      end
    else
      write_data(@output, data, @format)
    end
    @output = nil
    @format = nil
  end

  _cleanup_session_state
  data
end