Class: CDC::Parallel::Runtime

Inherits:
Object
  • Object
show all
Defined in:
lib/cdc/parallel/runtime.rb,
sig/cdc/parallel/runtime.rbs

Overview

High-level Ractor runtime facade for cdc-core processors.

Instance Method Summary collapse

Constructor Details

#initialize(processor:, size: Etc.nprocessors, timeout: nil, supervision: true, max_respawns: 3, respawn_window: 60, respawn_cooldown: 5) ⇒ void

Parameters:

  • processor (CDC::Core::Processor)
  • size (Integer) (defaults to: Etc.nprocessors)
  • timeout (Numeric, nil) (defaults to: nil)


65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/cdc/parallel/runtime.rb', line 65

def initialize(
  processor:,
  size: Etc.nprocessors,
  timeout: nil,
  supervision: true,
  max_respawns: 3,
  respawn_window: 60,
  respawn_cooldown: 5
)
  @processor = processor
  @processor.start

  pool_options = build_pool_options(
    processor:, size:, timeout:, supervision:,
    max_respawns:, respawn_window:, respawn_cooldown:
  )

  @processor_pool = ProcessorPool.new(**pool_options)
  @transaction_pool = TransactionPool.new(**pool_options)
  @router = Router.new(processor_pool: @processor_pool, transaction_pool: @transaction_pool)
  @shutdown = false
end

Instance Method Details

#build_pool_options(processor:, size:, timeout:, supervision:, max_respawns:, respawn_window:, respawn_cooldown:) ⇒ Object

Build keyword arguments shared by internal pools.



154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
# File 'lib/cdc/parallel/runtime.rb', line 154

def build_pool_options(
  processor:,
  size:,
  timeout:,
  supervision:,
  max_respawns:,
  respawn_window:,
  respawn_cooldown:
)
  {
    processor:,
    size:,
    timeout:,
    supervision:,
    max_respawns:,
    respawn_window:,
    respawn_cooldown:,
    manage_lifecycle: false
  }
end

#process(item) ⇒ CDC::Core::ProcessorResult #process(item) ⇒ CDC::Core::ProcessorResult

Process a ChangeEvent or TransactionEnvelope.

Overloads:

  • #process(item) ⇒ CDC::Core::ProcessorResult

    Parameters:

    • item (work_item)

    Returns:

    • (CDC::Core::ProcessorResult)
  • #process(item) ⇒ CDC::Core::ProcessorResult

    Parameters:

    • item (Object)

    Returns:

    • (CDC::Core::ProcessorResult)

Parameters:

  • item (CDC::Core::ChangeEvent, CDC::Core::TransactionEnvelope)

Returns:

  • (CDC::Core::ProcessorResult)


101
102
103
104
105
# File 'lib/cdc/parallel/runtime.rb', line 101

def process(item)
  raise ShutdownError, "runtime has been shut down" if @shutdown

  @router.process(item)
end

#process_transaction(transaction) ⇒ CDC::Core::ProcessorResult

Alias for transaction-oriented processing.

Parameters:

  • transaction (CDC::Core::TransactionEnvelope)

Returns:

  • (CDC::Core::ProcessorResult)


115
116
117
# File 'lib/cdc/parallel/runtime.rb', line 115

def process_transaction(transaction)
  process(transaction)
end

#shutdownvoid

This method returns an undefined value.

Shut down all runtime resources and call processor lifecycle hooks.



127
128
129
130
131
132
133
134
135
# File 'lib/cdc/parallel/runtime.rb', line 127

def shutdown
  return if @shutdown

  @shutdown = true
  @processor_pool.shutdown
  @transaction_pool.shutdown
  @processor.flush
  @processor.stop
end