Class: CDC::Parallel::ProcessorPool::WorkerSlot
- Inherits:
-
Object
- Object
- CDC::Parallel::ProcessorPool::WorkerSlot
- Defined in:
- lib/cdc/parallel/processor_pool.rb,
sig/cdc/parallel/processor_pool.rbs
Overview
One supervised worker position in the pool.
Instance Attribute Summary collapse
-
#respawns ⇒ Integer
readonly
private
Number of times this slot has booted a replacement worker.
Instance Method Summary collapse
- #boot! ⇒ void private
-
#complete(index) ⇒ void
private
Mark an in-flight work item as completed.
- #cool_down_if_needed ⇒ void private
- #crash_loop? ⇒ Boolean private
-
#degraded? ⇒ Boolean
private
Return whether this slot is currently in crash-loop cooldown.
-
#degraded_locked? ⇒ Boolean
private
Return whether this slot is degraded while the caller holds the lock.
- #fail_inflight(cause) ⇒ void private
-
#inbox ⇒ Ractor::Port
private
Return the current worker inbox port.
-
#initialize(index:, processor:, supervision:, max_respawns:, respawn_window:, respawn_cooldown:) ⇒ void
constructor
private
Create a supervised worker slot.
-
#join ⇒ Thread
private
Wait for the supervisor thread to finish.
-
#require_failure(failure) ⇒ Exception
private
Return a non-nil failure object for Steep and runtime dispatch.
- #send_failure(reply_port, index, error) ⇒ void private
-
#send_to_inbox(inbox, message) ⇒ void
private
Send a message to a worker inbox.
-
#send_work(index, item, reply_port) ⇒ void
private
Send one indexed work item to the current worker.
-
#shutdown ⇒ void
private
Request worker shutdown and prevent future respawns.
- #shutting_down? ⇒ Boolean private
- #supervise ⇒ void private
- #wait_for_worker_death(current_worker) ⇒ Exception private
-
#worker ⇒ Ractor
private
Return the current worker Ractor.
Constructor Details
#initialize(index:, processor:, supervision:, max_respawns:, respawn_window:, respawn_cooldown:) ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Create a supervised worker slot.
422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 |
# File 'lib/cdc/parallel/processor_pool.rb', line 422 def initialize(index:, processor:, supervision:, max_respawns:, respawn_window:, respawn_cooldown:) @index = index @processor = processor @supervision = supervision @max_respawns = Integer(max_respawns) @respawn_window = Float(respawn_window) @respawn_cooldown = Float(respawn_cooldown) @lock = Mutex.new @inflight = {} @crashes = [] @respawns = 0 @shutdown = false @degraded_until = nil boot! @supervisor_thread = Thread.new { supervise } end |
Instance Attribute Details
#respawns ⇒ Integer (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Number of times this slot has booted a replacement worker.
404 405 406 |
# File 'lib/cdc/parallel/processor_pool.rb', line 404 def respawns @respawns end |
Instance Method Details
#boot! ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
577 578 579 580 581 582 583 584 585 586 587 588 |
# File 'lib/cdc/parallel/processor_pool.rb', line 577 def boot! boot_port = ::Ractor::Port.new worker = CDC::Parallel::ProcessorPool.send(:start_worker, @processor, boot_port) inbox = boot_port.receive @lock.synchronize do @worker = worker @inbox = inbox end ensure boot_port&.close end |
#complete(index) ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
Mark an in-flight work item as completed.
491 492 493 |
# File 'lib/cdc/parallel/processor_pool.rb', line 491 def complete(index) @lock.synchronize { @inflight.delete(index) } end |
#cool_down_if_needed ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
631 632 633 634 635 636 637 638 |
# File 'lib/cdc/parallel/processor_pool.rb', line 631 def cool_down_if_needed return unless crash_loop? deadline = Process.clock_gettime(Process::CLOCK_MONOTONIC) + @respawn_cooldown @lock.synchronize { @degraded_until = deadline } sleep @respawn_cooldown @lock.synchronize { @degraded_until = nil } end |
#crash_loop? ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
640 641 642 643 644 645 646 647 |
# File 'lib/cdc/parallel/processor_pool.rb', line 640 def crash_loop? now = Process.clock_gettime(Process::CLOCK_MONOTONIC) @lock.synchronize do @crashes = @crashes.select { || now - <= @respawn_window } @crashes << now @crashes.length > @max_respawns end end |
#degraded? ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Return whether this slot is currently in crash-loop cooldown.
499 500 501 |
# File 'lib/cdc/parallel/processor_pool.rb', line 499 def degraded? @lock.synchronize { degraded_locked? } end |
#degraded_locked? ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Return whether this slot is degraded while the caller holds the lock.
507 508 509 510 511 512 513 514 515 516 517 |
# File 'lib/cdc/parallel/processor_pool.rb', line 507 def degraded_locked? degraded_until = @degraded_until return false unless degraded_until if Process.clock_gettime(Process::CLOCK_MONOTONIC) >= degraded_until @degraded_until = nil false else true end end |
#fail_inflight(cause) ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
615 616 617 618 619 620 621 622 623 |
# File 'lib/cdc/parallel/processor_pool.rb', line 615 def fail_inflight(cause) pending = {} # : Hash[Integer, Ractor::Port] @lock.synchronize do pending = @inflight.dup @inflight.clear end pending.each { |index, reply_port| send_failure(reply_port, index, cause) } end |
#inbox ⇒ Ractor::Port
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Return the current worker inbox port.
451 452 453 |
# File 'lib/cdc/parallel/processor_pool.rb', line 451 def inbox @lock.synchronize { @inbox } end |
#join ⇒ Thread
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Wait for the supervisor thread to finish.
571 572 573 |
# File 'lib/cdc/parallel/processor_pool.rb', line 571 def join @supervisor_thread.join end |
#require_failure(failure) ⇒ Exception
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Return a non-nil failure object for Steep and runtime dispatch.
541 542 543 544 545 |
# File 'lib/cdc/parallel/processor_pool.rb', line 541 def require_failure(failure) raise "worker failure unavailable" if failure.nil? failure end |
#send_failure(reply_port, index, error) ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
625 626 627 628 629 |
# File 'lib/cdc/parallel/processor_pool.rb', line 625 def send_failure(reply_port, index, error) reply_port << [index, ResultCollector.worker_failure(error)] rescue Ractor::ClosedError nil end |
#send_to_inbox(inbox, message) ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
Send a message to a worker inbox.
Ractor::Port supports both #send and #<<. Some tests replace the inbox with a small sentinel object that only implements #send so the closed inbox path can be exercised without relying on a live port. Keep this helper deliberately typed as an inbox boundary rather than forcing every caller to narrow the test double shape.
560 561 562 563 564 565 |
# File 'lib/cdc/parallel/processor_pool.rb', line 560 def send_to_inbox(inbox, ) raise Ractor::ClosedError, "worker inbox closed" if inbox.nil? inbox.send() nil end |
#send_work(index, item, reply_port) ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
Send one indexed work item to the current worker.
If the slot is cooling down or the worker inbox is already closed, a serialized failure response is sent to the caller reply port.
465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 |
# File 'lib/cdc/parallel/processor_pool.rb', line 465 def send_work(index, item, reply_port) target = nil immediate_failure = nil @lock.synchronize do if degraded_locked? immediate_failure = RuntimeError.new("worker slot #{@index} is cooling down after repeated crashes") else @inflight[index] = reply_port target = @inbox end end return send_failure(reply_port, index, require_failure(immediate_failure)) if immediate_failure send_to_inbox(target, [index, item, reply_port]) rescue Ractor::ClosedError => e complete(index) send_failure(reply_port, index, e) end |
#shutdown ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
Request worker shutdown and prevent future respawns.
523 524 525 526 527 528 529 530 531 532 533 |
# File 'lib/cdc/parallel/processor_pool.rb', line 523 def shutdown target = nil @lock.synchronize do @shutdown = true target = @inbox end send_to_inbox(target, nil) rescue Ractor::ClosedError nil end |
#shutting_down? ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
649 650 651 |
# File 'lib/cdc/parallel/processor_pool.rb', line 649 def shutting_down? @lock.synchronize { @shutdown } end |
#supervise ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 |
# File 'lib/cdc/parallel/processor_pool.rb', line 590 def supervise loop do current_worker = worker cause = wait_for_worker_death(current_worker) break if shutting_down? fail_inflight(cause) break unless @supervision cool_down_if_needed break if shutting_down? boot! @lock.synchronize { @respawns += 1 } end end |
#wait_for_worker_death(current_worker) ⇒ Exception
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
607 608 609 610 611 612 613 |
# File 'lib/cdc/parallel/processor_pool.rb', line 607 def wait_for_worker_death(current_worker) current_worker.value RuntimeError.new("worker slot #{@index} exited unexpectedly") rescue Ractor::Error => e cause = e.respond_to?(:cause) ? e.cause : nil cause.is_a?(Exception) ? cause : e end |
#worker ⇒ Ractor
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Return the current worker Ractor.
443 444 445 |
# File 'lib/cdc/parallel/processor_pool.rb', line 443 def worker @lock.synchronize { @worker } end |