Class: CDC::Concurrent::ProcessorPool
- Inherits:
-
Object
- Object
- CDC::Concurrent::ProcessorPool
- Defined in:
- lib/cdc/concurrent/processor_pool.rb
Overview
Executes one concurrent-safe processor using Async tasks.
Instance Method Summary collapse
-
#initialize(processor:, concurrency: 100, timeout: nil, preserve_order: true) ⇒ ProcessorPool
constructor
A new instance of ProcessorPool.
- #process(event) ⇒ CDC::Core::ProcessorResult
- #process_many(events) ⇒ Array<CDC::Core::ProcessorResult>
- #shutdown ⇒ void
Constructor Details
#initialize(processor:, concurrency: 100, timeout: nil, preserve_order: true) ⇒ ProcessorPool
Returns a new instance of ProcessorPool.
11 12 13 14 15 16 17 |
# File 'lib/cdc/concurrent/processor_pool.rb', line 11 def initialize(processor:, concurrency: 100, timeout: nil, preserve_order: true) validate_processor!(processor) @processor = processor @configuration = Configuration.new(concurrency:, timeout:, preserve_order:) @shutdown = false end |
Instance Method Details
#process(event) ⇒ CDC::Core::ProcessorResult
21 22 23 24 25 |
# File 'lib/cdc/concurrent/processor_pool.rb', line 21 def process(event) raise ShutdownError, "processor pool has been shut down" if @shutdown process_one(event) end |
#process_many(events) ⇒ Array<CDC::Core::ProcessorResult>
29 30 31 32 33 34 35 36 37 38 39 |
# File 'lib/cdc/concurrent/processor_pool.rb', line 29 def process_many(events) raise ShutdownError, "processor pool has been shut down" if @shutdown return [].freeze if events.empty? indexed_results = [] process_batch(events, indexed_results) indexed_results.sort_by!(&:first) if @configuration.preserve_order indexed_results.map(&:last).freeze end |
#shutdown ⇒ void
This method returns an undefined value.
42 43 44 |
# File 'lib/cdc/concurrent/processor_pool.rb', line 42 def shutdown @shutdown = true end |