Class: CDC::Parallel::ResultCollector
- Inherits:
-
Object
- Object
- CDC::Parallel::ResultCollector
- Defined in:
- lib/cdc/parallel/result_collector.rb
Overview
Converts raw worker responses into ‘CDC::Core::ProcessorResult` objects.
Ractors cannot freely share arbitrary mutable Ruby objects. Worker responses must therefore be normalized into shareable payloads before they cross back to the caller. ‘ResultCollector` owns that small translation boundary.
The worker side uses ResultCollector.worker_success and ResultCollector.worker_failure. The caller side uses ResultCollector.normalize to convert those payloads into the public result contract.
Constant Summary collapse
- FAILURE_MARKER =
Internal marker used to identify serialized worker failures.
:__cdc_parallel_failure__
Class Method Summary collapse
-
.normalize(value) ⇒ CDC::Core::ProcessorResult
Normalize a worker return value into a ‘CDC::Core::ProcessorResult`.
-
.worker_failure(error) ⇒ Hash
Build a shareable failure payload that can safely cross a Ractor boundary.
-
.worker_success(value) ⇒ Object
Build a shareable success payload that can safely cross a Ractor boundary.
Class Method Details
.normalize(value) ⇒ CDC::Core::ProcessorResult
Normalize a worker return value into a ‘CDC::Core::ProcessorResult`.
Failure payloads become failed processor results containing a ProcessorExecutionError. Existing processor results are returned unchanged. Other values are wrapped in a successful processor result.
77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
# File 'lib/cdc/parallel/result_collector.rb', line 77 def self.normalize(value) if worker_failure?(value) CDC::Core::ProcessorResult.failure( ProcessorExecutionError.new( original_class: value[:class], original_message: value[:message], original_backtrace: value[:backtrace] ) ) elsif value.is_a?(CDC::Core::ProcessorResult) value else CDC::Core::ProcessorResult.success(value) end end |
.worker_failure(error) ⇒ Hash
Build a shareable failure payload that can safely cross a Ractor boundary.
Exceptions themselves are not used as the cross-Ractor payload. Instead, the class name, message, and backtrace are serialized into a simple hash that can be reconstructed as a ProcessorExecutionError by normalize.
57 58 59 60 61 62 63 64 65 66 |
# File 'lib/cdc/parallel/result_collector.rb', line 57 def self.worker_failure(error) ::Ractor.make_shareable( { type: FAILURE_MARKER, class: error.class.name, message: error., backtrace: (error.backtrace || []).map { |line| String(line) }.freeze } ) end |
.worker_success(value) ⇒ Object
Build a shareable success payload that can safely cross a Ractor boundary.
If the processor already returned a ‘CDC::Core::ProcessorResult`, that result is preserved. Any other shareable value will later be wrapped in a success result by normalize.
43 44 45 |
# File 'lib/cdc/parallel/result_collector.rb', line 43 def self.worker_success(value) ::Ractor.make_shareable(value) end |