Class: CDC::Parallel::ResultCollector

Inherits:
Object
  • Object
show all
Defined in:
lib/cdc/parallel/result_collector.rb,
sig/cdc/parallel/result_collector.rbs

Overview

Normalizes values returned by workers.

Constant Summary collapse

FAILURE_MARKER =

Internal marker used to identify serialized worker failures.

Returns:

  • (Symbol)
:__cdc_parallel_failure__

Class Method Summary collapse

Class Method Details

.normalize(value) ⇒ CDC::Core::ProcessorResult

Normalize a worker return value into a ProcessorResult.

Parameters:

  • value (Object)

Returns:

  • (CDC::Core::ProcessorResult)


77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/cdc/parallel/result_collector.rb', line 77

def self.normalize(value)
  if worker_failure?(value)
    CDC::Core::ProcessorResult.failure(
      ProcessorExecutionError.new(
        original_class: value[:class],
        original_message: value[:message],
        original_backtrace: value[:backtrace]
      )
    )
  elsif value.is_a?(CDC::Core::ProcessorResult)
    value
  else
    CDC::Core::ProcessorResult.success(value)
  end
end

.worker_failure(error) ⇒ Hash

Build a shareable failure payload that can safely cross a Ractor boundary.

Parameters:

  • error (Exception)

Returns:

  • (Hash)


57
58
59
60
61
62
63
64
65
66
# File 'lib/cdc/parallel/result_collector.rb', line 57

def self.worker_failure(error)
  ::Ractor.make_shareable(
    {
      type: FAILURE_MARKER,
      class: error.class.name,
      message: error.message,
      backtrace: (error.backtrace || []).map { |line| String(line) }.freeze
    }
  )
end

.worker_success(value) ⇒ Object

Build a shareable success payload that can safely cross a Ractor boundary.

Parameters:

  • value (Object)

Returns:

  • (Object)


43
44
45
# File 'lib/cdc/parallel/result_collector.rb', line 43

def self.worker_success(value)
  ::Ractor.make_shareable(value)
end