Class: CDC::Parallel::ProcessorPool

Inherits:
Object
  • Object
show all
Defined in:
lib/cdc/parallel/processor_pool.rb,
sig/cdc/parallel/processor_pool.rbs

Overview

Executes one Ractor-safe processor in pre-warmed persistent Ractor workers.

Defined Under Namespace

Classes: WorkerSlot

Instance Method Summary collapse

Constructor Details

#initialize(processor:, size: Etc.nprocessors, timeout: nil, supervision: true, max_respawns: 3, respawn_window: 60, respawn_cooldown: 5, manage_lifecycle: true) ⇒ void

Parameters:

  • processor (CDC::Core::Processor)
  • size (Integer) (defaults to: Etc.nprocessors)
  • timeout (Numeric, nil) (defaults to: nil)
  • supervision (Boolean) (defaults to: true)
  • max_respawns (Integer) (defaults to: 3)
  • respawn_window (Numeric) (defaults to: 60)
  • respawn_cooldown (Numeric) (defaults to: 5)
  • manage_lifecycle (Boolean) (defaults to: true)


128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
# File 'lib/cdc/parallel/processor_pool.rb', line 128

def initialize(
  processor:,
  size: Etc.nprocessors,
  timeout: nil,
  supervision: true,
  max_respawns: 3,
  respawn_window: 60,
  respawn_cooldown: 5,
  manage_lifecycle: true
)
  validate_processor!(processor)

  processor.start if manage_lifecycle
  @processor = ::Ractor.make_shareable(processor)
  @manage_lifecycle = manage_lifecycle
  @configuration = Configuration.new(size:, timeout:)
  @slots = Array.new(@configuration.size) do |index|
    WorkerSlot.new(
      index:,
      processor: @processor,
      supervision:,
      max_respawns:,
      respawn_window:,
      respawn_cooldown:
    )
  end.freeze
  @workers = @slots.map(&:worker).freeze
  @worker_inboxes = @slots.map(&:inbox).freeze

  @next_worker = 0
  @dispatch_mutex = Mutex.new
  @shutdown = false
end

Instance Method Details

#collect_results(reply_port, count, assignments = []) ⇒ Array[result?]

Parameters:

  • reply_port (reply_port)
  • count (Integer)
  • assignments (Array[WorkerSlot?]) (defaults to: [])

Returns:

  • (Array[result?])


338
339
340
341
342
343
344
345
346
347
# File 'lib/cdc/parallel/processor_pool.rb', line 338

def collect_results(reply_port, count, assignments = [])
  results = Array.new(count)
  return results.freeze if count.zero?

  if @configuration.timeout
    collect_results_with_timeout(reply_port, results, assignments)
  else
    collect_results_without_timeout(reply_port, results, assignments)
  end
end

#collect_results_with_timeout(reply_port, results, assignments = []) ⇒ Array[result?]

Parameters:

  • reply_port (reply_port)
  • results (Array[result?])
  • assignments (Array[WorkerSlot?]) (defaults to: [])

Returns:

  • (Array[result?])


359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
# File 'lib/cdc/parallel/processor_pool.rb', line 359

def collect_results_with_timeout(reply_port, results, assignments = [])
  timeout = @configuration.timeout
  return results.freeze unless timeout

  deadline = Process.clock_gettime(Process::CLOCK_MONOTONIC) + timeout

  results.length.times do
    remaining = deadline - Process.clock_gettime(Process::CLOCK_MONOTONIC)
    return timeout_results(results) unless remaining.positive?

    index, response = ::Timeout.timeout(remaining, TimeoutError) { reply_port.receive }
    results[index] = ResultCollector.normalize(response)
    assignments[index]&.complete(index)
  rescue TimeoutError
    return timeout_results(results)
  end

  results.freeze
end

#collect_results_without_timeout(reply_port, results, assignments = []) ⇒ Array[result?]

Parameters:

  • reply_port (reply_port)
  • results (Array[result?])
  • assignments (Array[WorkerSlot?]) (defaults to: [])

Returns:

  • (Array[result?])


349
350
351
352
353
354
355
356
357
# File 'lib/cdc/parallel/processor_pool.rb', line 349

def collect_results_without_timeout(reply_port, results, assignments = [])
  results.length.times do
    index, response = reply_port.receive
    results[index] = ResultCollector.normalize(response)
    assignments[index]&.complete(index)
  end

  results.freeze
end

#degraded?Boolean

Whether any worker slot is in crash-loop cooldown.

Returns:

  • (Boolean)


173
174
175
# File 'lib/cdc/parallel/processor_pool.rb', line 173

def degraded?
  @slots.any?(&:degraded?)
end

#dispatch(work_items, reply_port) ⇒ Array[WorkerSlot?]

Parameters:

  • work_items (Array[work_item])
  • reply_port (reply_port)

Returns:



243
244
245
246
247
248
249
250
251
252
253
254
255
# File 'lib/cdc/parallel/processor_pool.rb', line 243

def dispatch(work_items, reply_port)
  @dispatch_mutex.synchronize do
    raise ShutdownError, "processor pool has been shut down" if @shutdown

    assignments = Array.new(work_items.length)
    work_items.each_with_index do |item, index|
      slot = next_worker_slot
      assignments[index] = slot
      slot.send_work(index, item, reply_port)
    end
    assignments
  end
end

#next_worker_slotWorkerSlot

Returns:



329
330
331
332
333
334
335
336
# File 'lib/cdc/parallel/processor_pool.rb', line 329

def next_worker_slot
  slot = @slots[@next_worker]

  @next_worker += 1
  @next_worker = 0 if @next_worker >= @slots.length

  slot
end

#process(item) ⇒ CDC::Core::ProcessorResult

Process one work item synchronously.

Parameters:

  • item (Object)

Returns:

  • (CDC::Core::ProcessorResult)


191
192
193
# File 'lib/cdc/parallel/processor_pool.rb', line 191

def process(item)
  process_many([item]).fetch(0)
end

#process_many(items) ⇒ Array<CDC::Core::ProcessorResult>

Process many work items using the pre-warmed worker pool.

Parameters:

  • items (Array<Object>)

Returns:

  • (Array<CDC::Core::ProcessorResult>)


208
209
210
211
212
213
214
215
216
217
# File 'lib/cdc/parallel/processor_pool.rb', line 208

def process_many(items)
  work_items = items.map { |item| ::Ractor.make_shareable(item) }
  reply_port = ::Ractor::Port.new

  assignments = dispatch(work_items, reply_port)

  collect_results(reply_port, work_items.length, assignments).compact.freeze
ensure
  reply_port&.close
end

#respawnsInteger

Total worker-slot respawns since pool boot.

Returns:

  • (Integer)


166
167
168
# File 'lib/cdc/parallel/processor_pool.rb', line 166

def respawns
  @slots.sum(&:respawns)
end

#shutdownvoid

This method returns an undefined value.

Shut down the pool and call processor lifecycle hooks if manage_lifecycle is true.



226
227
228
229
230
231
232
233
234
235
236
237
238
239
# File 'lib/cdc/parallel/processor_pool.rb', line 226

def shutdown
  @dispatch_mutex.synchronize do
    return if @shutdown

    @shutdown = true
    signal_workers
  end

  wait_for_workers
  return unless @manage_lifecycle

  @processor.flush
  @processor.stop
end

#signal_workersvoid

This method returns an undefined value.



257
258
259
# File 'lib/cdc/parallel/processor_pool.rb', line 257

def signal_workers
  @slots.each(&:shutdown)
end

#timeout_results(results) ⇒ Array[result]

Parameters:

  • results (Array[result?])

Returns:

  • (Array[result])


379
380
381
382
383
384
385
386
387
388
# File 'lib/cdc/parallel/processor_pool.rb', line 379

def timeout_results(results)
  missing = results.count(&:nil?)
  timeout_error = TimeoutError.new(
    "processor pool timed out after #{@configuration.timeout} seconds waiting for #{missing} result(s)"
  )

  results.map do |result|
    result || CDC::Core::ProcessorResult.failure(timeout_error)
  end.freeze
end

#validate_processor!(processor) ⇒ void

This method returns an undefined value.

Parameters:

  • processor (CDC::Core::Processor)

Raises:



285
286
287
288
289
290
291
# File 'lib/cdc/parallel/processor_pool.rb', line 285

def validate_processor!(processor)
  return if processor.class.respond_to?(:ractor_safe?) &&
            processor.class.ractor_safe?

  raise UnsafeProcessorError,
        "#{processor.class} must declare ractor_safe!"
end

#wait_for_workersvoid

This method returns an undefined value.



261
262
263
264
265
266
267
# File 'lib/cdc/parallel/processor_pool.rb', line 261

def wait_for_workers
  if @configuration.timeout
    wait_for_workers_with_timeout
  else
    @slots.each(&:join)
  end
end

#wait_for_workers_with_timeoutvoid

This method returns an undefined value.



269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
# File 'lib/cdc/parallel/processor_pool.rb', line 269

def wait_for_workers_with_timeout
  timeout = @configuration.timeout
  return unless timeout

  deadline = Process.clock_gettime(Process::CLOCK_MONOTONIC) + timeout

  @slots.each do |slot|
    remaining = deadline - Process.clock_gettime(Process::CLOCK_MONOTONIC)
    break unless remaining.positive?

    ::Timeout.timeout(remaining, TimeoutError) { slot.join }
  rescue TimeoutError
    break
  end
end