Class: CDC::Parallel::ResultCollector

Inherits:
Object
  • Object
show all
Defined in:
lib/cdc/parallel/result_collector.rb

Overview

Normalizes values returned by workers.

Constant Summary collapse

FAILURE_MARKER =
:__cdc_parallel_failure__

Class Method Summary collapse

Class Method Details

.normalize(value) ⇒ CDC::Core::ProcessorResult

Normalize a worker return value into a ProcessorResult.

Parameters:

  • value (Object)

Returns:

  • (CDC::Core::ProcessorResult)


36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/cdc/parallel/result_collector.rb', line 36

def self.normalize(value)
  if worker_failure?(value)
    CDC::Core::ProcessorResult.failure(
      ProcessorExecutionError.new(
        original_class: value[:class],
        original_message: value[:message],
        original_backtrace: value[:backtrace]
      )
    )
  elsif value.is_a?(CDC::Core::ProcessorResult)
    value
  else
    CDC::Core::ProcessorResult.success(value)
  end
end

.worker_failure(error) ⇒ Hash

Build a shareable failure payload that can safely cross a Ractor boundary.

Parameters:

  • error (Exception)

Returns:

  • (Hash)


21
22
23
24
25
26
27
28
29
30
# File 'lib/cdc/parallel/result_collector.rb', line 21

def self.worker_failure(error)
  ::Ractor.make_shareable(
    {
      type: FAILURE_MARKER,
      class: error.class.name,
      message: error.message,
      backtrace: (error.backtrace || []).map { |line| String(line) }.freeze
    }
  )
end

.worker_success(value) ⇒ Object

Build a shareable success payload that can safely cross a Ractor boundary.

Parameters:

  • value (Object)

Returns:

  • (Object)


13
14
15
# File 'lib/cdc/parallel/result_collector.rb', line 13

def self.worker_success(value)
  ::Ractor.make_shareable(value)
end