Class: CMDx::Executors

Inherits:
Object
  • Object
show all
Defined in:
lib/cmdx/executors.rb,
lib/cmdx/executors/fiber.rb,
lib/cmdx/executors/thread.rb

Overview

Registry of named executors used by ‘:parallel` workflow groups to dispatch tasks concurrently. Ships with built-ins for `:threads` and `:fibers`. Executors are any callable accepting `call(jobs:, concurrency:, on_job:)` and must invoke `on_job.call(job)` for each job, blocking until every job is done.

Defined Under Namespace

Modules: Fiber, Thread

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeExecutors

Returns a new instance of Executors.



13
14
15
16
17
18
# File 'lib/cmdx/executors.rb', line 13

def initialize
  @registry = {
    threads: Executors::Thread,
    fibers: Executors::Fiber
  }
end

Instance Attribute Details

#registryObject (readonly)

Returns the value of attribute registry.



11
12
13
# File 'lib/cmdx/executors.rb', line 11

def registry
  @registry
end

Instance Method Details

#deregister(name) ⇒ Executors

Returns self for chaining.

Parameters:

  • name (Symbol)

Returns:



50
51
52
53
# File 'lib/cmdx/executors.rb', line 50

def deregister(name)
  registry.delete(name.to_sym)
  self
end

#empty?Boolean

Returns:

  • (Boolean)


85
86
87
# File 'lib/cmdx/executors.rb', line 85

def empty?
  registry.empty?
end

#initialize_copy(source) ⇒ void

This method returns an undefined value.

Parameters:

  • source (Executors)

    registry to duplicate



22
23
24
# File 'lib/cmdx/executors.rb', line 22

def initialize_copy(source)
  @registry = source.registry.dup
end

#lookup(name) ⇒ #call

Returns the registered executor.

Parameters:

  • name (Symbol)

Returns:

  • (#call)

    the registered executor

Raises:

  • (ArgumentError)

    when ‘name` isn’t registered



58
59
60
61
62
# File 'lib/cmdx/executors.rb', line 58

def lookup(name)
  registry[name] || begin
    raise ArgumentError, "unknown executor: #{name.inspect}"
  end
end

#register(name, callable = nil, &block) { ... } ⇒ Executors

Registers a named executor, overwriting any existing entry.

Parameters:

  • name (Symbol)
  • callable (#call, nil) (defaults to: nil)

    pass either this or a block

  • block (#call, nil)

    executor callable when ‘callable` is omitted

Yields:

  • executor body — ‘call(jobs:, concurrency:, on_job:)`

Returns:

Raises:

  • (ArgumentError)

    when both ‘callable` and a block are given, or when the resolved executor isn’t callable



35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/cmdx/executors.rb', line 35

def register(name, callable = nil, &block)
  executor = callable || block

  if callable && block
    raise ArgumentError, "provide either a callable or a block, not both"
  elsif !executor.respond_to?(:call)
    raise ArgumentError, "executor must respond to #call"
  end

  registry[name.to_sym] = executor
  self
end

#resolve(spec) ⇒ #call

Resolves a declaration’s ‘:executor` option to a concrete callable. Accepts `nil` (default `:threads`), a Symbol (registry lookup), or any object responding to `#call`.

Parameters:

  • spec (Symbol, #call, nil)

Returns:

  • (#call)

Raises:

  • (ArgumentError)

    when ‘spec` is an unknown symbol or not callable



71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/cmdx/executors.rb', line 71

def resolve(spec)
  case spec
  when NilClass
    lookup(:threads)
  when Symbol
    lookup(spec)
  else
    return spec if spec.respond_to?(:call)

    raise ArgumentError, "unknown executor: #{spec.inspect}"
  end
end

#sizeInteger

Returns:

  • (Integer)


90
91
92
# File 'lib/cmdx/executors.rb', line 90

def size
  registry.size
end