Class: CDC::Parallel::ResultCollector

Inherits:
Object
  • Object
show all
Defined in:
lib/cdc/parallel/result_collector.rb

Overview

Converts raw worker responses into ‘CDC::Core::ProcessorResult` objects.

Ractors cannot freely share arbitrary mutable Ruby objects. Worker responses must therefore be normalized into shareable payloads before they cross back to the caller. ‘ResultCollector` owns that small translation boundary.

The worker side uses ResultCollector.worker_success and ResultCollector.worker_failure. The caller side uses ResultCollector.normalize to convert those payloads into the public result contract.

Examples:

Normalizing a processor return value

response = CDC::Parallel::ResultCollector.worker_success(value)
result = CDC::Parallel::ResultCollector.normalize(response)

Normalizing a worker exception

response = CDC::Parallel::ResultCollector.worker_failure(error)
result = CDC::Parallel::ResultCollector.normalize(response)
result.failure? #=> true

Constant Summary collapse

FAILURE_MARKER =

Internal marker used to identify serialized worker failures.

Returns:

  • (Symbol)
:__cdc_parallel_failure__

Class Method Summary collapse

Class Method Details

.normalize(value) ⇒ CDC::Core::ProcessorResult

Normalize a worker return value into a ‘CDC::Core::ProcessorResult`.

Failure payloads become failed processor results containing a ProcessorExecutionError. Existing processor results are returned unchanged. Other values are wrapped in a successful processor result.

Parameters:

  • value (Object)

    Raw worker response.

Returns:

  • (CDC::Core::ProcessorResult)


77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/cdc/parallel/result_collector.rb', line 77

def self.normalize(value)
  if worker_failure?(value)
    CDC::Core::ProcessorResult.failure(
      ProcessorExecutionError.new(
        original_class: value[:class],
        original_message: value[:message],
        original_backtrace: value[:backtrace]
      )
    )
  elsif value.is_a?(CDC::Core::ProcessorResult)
    value
  else
    CDC::Core::ProcessorResult.success(value)
  end
end

.worker_failure(error) ⇒ Hash

Build a shareable failure payload that can safely cross a Ractor boundary.

Exceptions themselves are not used as the cross-Ractor payload. Instead, the class name, message, and backtrace are serialized into a simple hash that can be reconstructed as a ProcessorExecutionError by normalize.

Parameters:

  • error (Exception)

    Exception raised inside a worker Ractor.

Returns:

  • (Hash)

    Shareable serialized failure payload.



57
58
59
60
61
62
63
64
65
66
# File 'lib/cdc/parallel/result_collector.rb', line 57

def self.worker_failure(error)
  ::Ractor.make_shareable(
    {
      type: FAILURE_MARKER,
      class: error.class.name,
      message: error.message,
      backtrace: (error.backtrace || []).map { |line| String(line) }.freeze
    }
  )
end

.worker_success(value) ⇒ Object

Build a shareable success payload that can safely cross a Ractor boundary.

If the processor already returned a ‘CDC::Core::ProcessorResult`, that result is preserved. Any other shareable value will later be wrapped in a success result by normalize.

Parameters:

  • value (Object)

    Processor return value.

Returns:

  • (Object)

    Shareable success payload.

Raises:

  • (Ractor::Error)

    Raised by Ruby when the value cannot be made shareable.



43
44
45
# File 'lib/cdc/parallel/result_collector.rb', line 43

def self.worker_success(value)
  ::Ractor.make_shareable(value)
end