Class: CDC::Parallel::ProcessorPool::WorkerSlot

Inherits:
Object
  • Object
show all
Defined in:
lib/cdc/parallel/processor_pool.rb,
sig/cdc/parallel/processor_pool.rbs

Overview

One supervised worker position in the pool.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(index:, processor:, supervision:, max_respawns:, respawn_window:, respawn_cooldown:) ⇒ void

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Create a supervised worker slot.

Parameters:

  • index (Integer)

    Stable slot index inside the owning pool.

  • processor (CDC::Core::Processor)

    Shareable processor instance used by the worker.

  • supervision (Boolean)

    Whether unexpected worker death should trigger respawn.

  • max_respawns (Integer)

    Maximum crash count inside the respawn window before cooldown.

  • respawn_window (Numeric)

    Sliding crash-loop window in seconds.

  • respawn_cooldown (Numeric)

    Cooldown duration in seconds after repeated crashes.



422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
# File 'lib/cdc/parallel/processor_pool.rb', line 422

def initialize(index:, processor:, supervision:, max_respawns:, respawn_window:, respawn_cooldown:)
  @index = index
  @processor = processor
  @supervision = supervision
  @max_respawns = Integer(max_respawns)
  @respawn_window = Float(respawn_window)
  @respawn_cooldown = Float(respawn_cooldown)
  @lock = Mutex.new
  @inflight = {}
  @crashes = []
  @respawns = 0
  @shutdown = false
  @degraded_until = nil
  boot!
  @supervisor_thread = Thread.new { supervise }
end

Instance Attribute Details

#respawnsInteger (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Number of times this slot has booted a replacement worker.

Returns:

  • (Integer)


404
405
406
# File 'lib/cdc/parallel/processor_pool.rb', line 404

def respawns
  @respawns
end

Instance Method Details

#boot!void

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.



577
578
579
580
581
582
583
584
585
586
587
588
# File 'lib/cdc/parallel/processor_pool.rb', line 577

def boot!
  boot_port = ::Ractor::Port.new
  worker = CDC::Parallel::ProcessorPool.send(:start_worker, @processor, boot_port)
  inbox = boot_port.receive

  @lock.synchronize do
    @worker = worker
    @inbox = inbox
  end
ensure
  boot_port&.close
end

#complete(index) ⇒ void

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Mark an in-flight work item as completed.

Parameters:

  • index (Integer)


491
492
493
# File 'lib/cdc/parallel/processor_pool.rb', line 491

def complete(index)
  @lock.synchronize { @inflight.delete(index) }
end

#cool_down_if_neededvoid

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.



631
632
633
634
635
636
637
638
# File 'lib/cdc/parallel/processor_pool.rb', line 631

def cool_down_if_needed
  return unless crash_loop?

  deadline = Process.clock_gettime(Process::CLOCK_MONOTONIC) + @respawn_cooldown
  @lock.synchronize { @degraded_until = deadline }
  sleep @respawn_cooldown
  @lock.synchronize { @degraded_until = nil }
end

#crash_loop?Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns:

  • (Boolean)


640
641
642
643
644
645
646
647
# File 'lib/cdc/parallel/processor_pool.rb', line 640

def crash_loop?
  now = Process.clock_gettime(Process::CLOCK_MONOTONIC)
  @lock.synchronize do
    @crashes = @crashes.select { |timestamp| now - timestamp <= @respawn_window }
    @crashes << now
    @crashes.length > @max_respawns
  end
end

#degraded?Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Return whether this slot is currently in crash-loop cooldown.

Returns:

  • (Boolean)


499
500
501
# File 'lib/cdc/parallel/processor_pool.rb', line 499

def degraded?
  @lock.synchronize { degraded_locked? }
end

#degraded_locked?Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Return whether this slot is degraded while the caller holds the lock.

Returns:

  • (Boolean)


507
508
509
510
511
512
513
514
515
516
517
# File 'lib/cdc/parallel/processor_pool.rb', line 507

def degraded_locked?
  degraded_until = @degraded_until
  return false unless degraded_until

  if Process.clock_gettime(Process::CLOCK_MONOTONIC) >= degraded_until
    @degraded_until = nil
    false
  else
    true
  end
end

#fail_inflight(cause) ⇒ void

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Parameters:

  • cause (Exception)


615
616
617
618
619
620
621
622
623
# File 'lib/cdc/parallel/processor_pool.rb', line 615

def fail_inflight(cause)
  pending = {} # : Hash[Integer, Ractor::Port]
  @lock.synchronize do
    pending = @inflight.dup
    @inflight.clear
  end

  pending.each { |index, reply_port| send_failure(reply_port, index, cause) }
end

#inboxRactor::Port

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Return the current worker inbox port.

Returns:

  • (Ractor::Port)


451
452
453
# File 'lib/cdc/parallel/processor_pool.rb', line 451

def inbox
  @lock.synchronize { @inbox }
end

#joinThread

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Wait for the supervisor thread to finish.

Returns:

  • (Thread)


571
572
573
# File 'lib/cdc/parallel/processor_pool.rb', line 571

def join
  @supervisor_thread.join
end

#require_failure(failure) ⇒ Exception

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Return a non-nil failure object for Steep and runtime dispatch.

Parameters:

  • failure (Exception, nil)

Returns:

  • (Exception)

Raises:

  • (RuntimeError)

    if no failure was supplied



541
542
543
544
545
# File 'lib/cdc/parallel/processor_pool.rb', line 541

def require_failure(failure)
  raise "worker failure unavailable" if failure.nil?

  failure
end

#send_failure(reply_port, index, error) ⇒ void

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Parameters:

  • reply_port (Ractor::Port)
  • index (Integer)
  • error (Exception)


625
626
627
628
629
# File 'lib/cdc/parallel/processor_pool.rb', line 625

def send_failure(reply_port, index, error)
  reply_port << [index, ResultCollector.worker_failure(error)]
rescue Ractor::ClosedError
  nil
end

#send_to_inbox(inbox, message) ⇒ void

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Send a message to a worker inbox.

Ractor::Port supports both #send and #<<. Some tests replace the inbox with a small sentinel object that only implements #send so the closed inbox path can be exercised without relying on a live port. Keep this helper deliberately typed as an inbox boundary rather than forcing every caller to narrow the test double shape.

Parameters:

  • inbox (#send, nil)
  • message (Object)

Raises:

  • (Ractor::ClosedError)

    when the worker inbox has already closed



560
561
562
563
564
565
# File 'lib/cdc/parallel/processor_pool.rb', line 560

def send_to_inbox(inbox, message)
  raise Ractor::ClosedError, "worker inbox closed" if inbox.nil?

  inbox.send(message)
  nil
end

#send_work(index, item, reply_port) ⇒ void

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Send one indexed work item to the current worker.

If the slot is cooling down or the worker inbox is already closed, a serialized failure response is sent to the caller reply port.

Parameters:

  • index (Integer)
  • item (Object)
  • reply_port (Ractor::Port)


465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
# File 'lib/cdc/parallel/processor_pool.rb', line 465

def send_work(index, item, reply_port)
  target = nil
  immediate_failure = nil

  @lock.synchronize do
    if degraded_locked?
      immediate_failure = RuntimeError.new("worker slot #{@index} is cooling down after repeated crashes")
    else
      @inflight[index] = reply_port
      target = @inbox
    end
  end

  return send_failure(reply_port, index, require_failure(immediate_failure)) if immediate_failure

  send_to_inbox(target, [index, item, reply_port])
rescue Ractor::ClosedError => e
  complete(index)
  send_failure(reply_port, index, e)
end

#shutdownvoid

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Request worker shutdown and prevent future respawns.



523
524
525
526
527
528
529
530
531
532
533
# File 'lib/cdc/parallel/processor_pool.rb', line 523

def shutdown
  target = nil
  @lock.synchronize do
    @shutdown = true
    target = @inbox
  end

  send_to_inbox(target, nil)
rescue Ractor::ClosedError
  nil
end

#shutting_down?Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns:

  • (Boolean)


649
650
651
# File 'lib/cdc/parallel/processor_pool.rb', line 649

def shutting_down?
  @lock.synchronize { @shutdown }
end

#supervisevoid

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.



590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
# File 'lib/cdc/parallel/processor_pool.rb', line 590

def supervise
  loop do
    current_worker = worker
    cause = wait_for_worker_death(current_worker)
    break if shutting_down?

    fail_inflight(cause)
    break unless @supervision

    cool_down_if_needed
    break if shutting_down?

    boot!
    @lock.synchronize { @respawns += 1 }
  end
end

#wait_for_worker_death(current_worker) ⇒ Exception

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Parameters:

  • current_worker (Ractor)

Returns:

  • (Exception)


607
608
609
610
611
612
613
# File 'lib/cdc/parallel/processor_pool.rb', line 607

def wait_for_worker_death(current_worker)
  current_worker.value
  RuntimeError.new("worker slot #{@index} exited unexpectedly")
rescue Ractor::Error => e
  cause = e.respond_to?(:cause) ? e.cause : nil
  cause.is_a?(Exception) ? cause : e
end

#workerRactor

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Return the current worker Ractor.

Returns:

  • (Ractor)


443
444
445
# File 'lib/cdc/parallel/processor_pool.rb', line 443

def worker
  @lock.synchronize { @worker }
end