Class: CDC::Parallel::Runtime
- Inherits:
-
Object
- Object
- CDC::Parallel::Runtime
- Defined in:
- lib/cdc/parallel/runtime.rb,
sig/cdc/parallel/runtime.rbs
Overview
High-level Ractor runtime facade for cdc-core processors.
Instance Method Summary collapse
-
#build_pool_options(processor:, size:, timeout:, supervision:, max_respawns:, respawn_window:, respawn_cooldown:) ⇒ Object
Build keyword arguments shared by internal pools.
- #initialize(processor:, size: Etc.nprocessors, timeout: nil, supervision: true, max_respawns: 3, respawn_window: 60, respawn_cooldown: 5) ⇒ void constructor
-
#process(item) ⇒ CDC::Core::ProcessorResult
Process a ChangeEvent or TransactionEnvelope.
-
#process_transaction(transaction) ⇒ CDC::Core::ProcessorResult
Alias for transaction-oriented processing.
-
#shutdown ⇒ void
Shut down all runtime resources and call processor lifecycle hooks.
Constructor Details
#initialize(processor:, size: Etc.nprocessors, timeout: nil, supervision: true, max_respawns: 3, respawn_window: 60, respawn_cooldown: 5) ⇒ void
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/cdc/parallel/runtime.rb', line 65 def initialize( processor:, size: Etc.nprocessors, timeout: nil, supervision: true, max_respawns: 3, respawn_window: 60, respawn_cooldown: 5 ) @processor = processor @processor.start = ( processor:, size:, timeout:, supervision:, max_respawns:, respawn_window:, respawn_cooldown: ) @processor_pool = ProcessorPool.new(**) @transaction_pool = TransactionPool.new(**) @router = Router.new(processor_pool: @processor_pool, transaction_pool: @transaction_pool) @shutdown = false end |
Instance Method Details
#build_pool_options(processor:, size:, timeout:, supervision:, max_respawns:, respawn_window:, respawn_cooldown:) ⇒ Object
Build keyword arguments shared by internal pools.
154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 |
# File 'lib/cdc/parallel/runtime.rb', line 154 def ( processor:, size:, timeout:, supervision:, max_respawns:, respawn_window:, respawn_cooldown: ) { processor:, size:, timeout:, supervision:, max_respawns:, respawn_window:, respawn_cooldown:, manage_lifecycle: false } end |
#process(item) ⇒ CDC::Core::ProcessorResult #process(item) ⇒ CDC::Core::ProcessorResult
Process a ChangeEvent or TransactionEnvelope.
101 102 103 104 105 |
# File 'lib/cdc/parallel/runtime.rb', line 101 def process(item) raise ShutdownError, "runtime has been shut down" if @shutdown @router.process(item) end |
#process_transaction(transaction) ⇒ CDC::Core::ProcessorResult
Alias for transaction-oriented processing.
115 116 117 |
# File 'lib/cdc/parallel/runtime.rb', line 115 def process_transaction(transaction) process(transaction) end |
#shutdown ⇒ void
This method returns an undefined value.
Shut down all runtime resources and call processor lifecycle hooks.
127 128 129 130 131 132 133 134 135 |
# File 'lib/cdc/parallel/runtime.rb', line 127 def shutdown return if @shutdown @shutdown = true @processor_pool.shutdown @transaction_pool.shutdown @processor.flush @processor.stop end |