Class: Bolt::Transport::Choria

Inherits:
Base
  • Object
show all
Defined in:
lib/bolt/transport/choria.rb,
lib/bolt/transport/choria/shell.rb,
lib/bolt/transport/choria/client.rb,
lib/bolt/transport/choria/helpers.rb,
lib/bolt/transport/choria/bolt_tasks.rb,
lib/bolt/transport/choria/agent_discovery.rb,
lib/bolt/transport/choria/command_builders.rb

Overview

Choria transport for OpenBolt. Communicates with nodes via Choria’s NATS pub/sub messaging infrastructure using the choria-mcorpc-support gem as the client library. Extends Transport::Base directly (not Simple) because Choria’s pub/sub model doesn’t fit the persistent connection/shell abstraction that Simple assumes.

Available capabilities depend on which agents are installed on the target node:

bolt_tasks agent only: Only run_task works, via the bolt_tasks agent
  which downloads task files from an OpenVox/Puppet Server and executes
  them via task_wrapper. All other operations fail with an actionable
  error directing the user to install the shell agent.

shell agent installed (>= 1.2.1): run_command, run_script, and
  run_task work. run_task uses the bolt_tasks agent by default.
  To run local tasks via the shell agent, set task-agent to 'shell'
  in project config or specify --choria-task-agent shell.

Upload, download, and plans are not yet supported.

Constant Summary collapse

SHELL_DONE_STATUSES =

Terminal shell job statuses that indicate the process has finished.

%w[stopped failed].freeze
RPC_FAILURE_RETRIES =

Number of consecutive RPC poll failures before giving up and marking all remaining targets as failed. Used by both polling loops (poll_task_status and wait_for_shell_results).

3
POLL_INTERVAL_SECONDS =

Polling interval between rounds, used by poll_task_status and wait_for_shell_results. Each round makes one batched RPC call regardless of target count, so a 1-second interval balances responsiveness against broker load.

1
WINDOWS_PATH_REGEX =

Matches Windows absolute paths like C:temp or D:/foo. Used by validate_file_name! and Config::Transport::Choria#absolute_path?.

%r{\A[A-Za-z]:[\\/]}
SHELL_MIN_VERSION =
'1.2.1'
AGENT_MIN_VERSIONS =
{
  'shell' => SHELL_MIN_VERSION
}.freeze

Instance Attribute Summary

Attributes inherited from Base

#logger

Instance Method Summary collapse

Methods inherited from Base

#assert_batch_size_one, #batch_download, #batch_upload, #connected?, #default_input_method, #run_command, #run_script, #run_task, #select_interpreter, #unwrap_sensitive_args, #with_events

Constructor Details

#initializeChoria

Returns a new instance of Choria.



34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/bolt/transport/choria.rb', line 34

def initialize
  super
  @config_mutex = Mutex.new
  @config_error = nil
  @client_configured = false
  # Serializes RPC calls across batch threads. See the comment on
  # rpc_request in helpers.rb for why this is necessary.
  @rpc_mutex = Mutex.new
  # Multiple batch threads write to this map concurrently when we
  # have more than one collective.
  @agent_cache = Concurrent::Map.new
  @default_collective = nil
end

Instance Method Details

#batch_command(targets, command, options = {}, position = [], &callback) ⇒ Array<Bolt::Result>

Run a command on targets via the shell agent. Assumes all targets in the batch are the same platform (POSIX or Windows). Mixed-platform batches use the first capable target’s platform for command syntax.

Parameters:

  • targets (Array<Bolt::Target>)

    Targets in a single collective batch

  • command (String)

    Shell command to execute

  • options (Hash) (defaults to: {})

    Execution options - supports :env_vars for environment variables

  • position (Array) (defaults to: [])

    Positional info for result tracking

  • callback (Proc)

    Called with :node_start and :node_result events

Returns:



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# File 'lib/bolt/transport/choria/shell.rb', line 19

def batch_command(targets, command, options = {}, position = [], &callback)
  result_opts = { action: 'command', name: command, position: position }
  shell_targets, results = prepare_targets(targets, 'shell', result_opts, &callback)
  return results if shell_targets.empty?

  logger.debug { "Running command via shell agent on #{target_count(shell_targets)}" }

  first_target = shell_targets.first
  timeout = first_target.options['command-timeout']
  command = prepend_env_vars(first_target, command, options[:env_vars], 'run_command env_vars')

  shell_targets.each { |target| callback&.call(type: :node_start, target: target) }

  pending, start_failures = shell_start(shell_targets, command)
  results += emit_results(start_failures, **result_opts, &callback)
  results += emit_results(wait_for_shell_results(pending, timeout), **result_opts, &callback)

  results
end

#batch_connected?(targets) ⇒ Boolean

Override batch_connected? to check all targets in one RPC call. Only used for wait_until_available in plans.

Parameters:

  • targets (Array<Bolt::Target>)

    Targets to check connectivity for

Returns:

  • (Boolean)

    True if all targets responded to ping



167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
# File 'lib/bolt/transport/choria.rb', line 167

def batch_connected?(targets)
  logger.debug { "Checking connectivity for #{target_count(targets)}" }
  first_target = targets.first
  configure_client(first_target)

  response = rpc_request('rpcutil', targets, 'rpcutil.ping') do |client|
    client.ping
  end
  response[:responded].length == targets.length
rescue StandardError => e
  raise if e.is_a?(Bolt::Error)

  logger.warn { "Batch connectivity check failed: #{e.class}: #{e.message}" }
  false
end

#batch_script(targets, script, arguments, options = {}, position = [], &callback) ⇒ Array<Bolt::Result>

Run a script on targets via the shell agent. Assumes all targets in the batch are the same platform (POSIX or Windows). Mixed-platform batches use the first capable target’s platform for infrastructure commands (mkdir, upload, chmod, cleanup).

Parameters:

  • targets (Array<Bolt::Target>)

    Targets in a single collective batch

  • script (String)

    Local path to the script file

  • arguments (Array<String>)

    Command-line arguments to pass to the script

  • options (Hash) (defaults to: {})

    Execution options; supports :script_interpreter

  • position (Array) (defaults to: [])

    Positional info for result tracking

  • callback (Proc)

    Called with :node_start and :node_result events

Returns:



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/bolt/transport/choria/shell.rb', line 51

def batch_script(targets, script, arguments, options = {}, position = [], &callback)
  result_opts = { action: 'script', name: script, position: position }
  shell_targets, results = prepare_targets(targets, 'shell', result_opts, &callback)
  return results if shell_targets.empty?

  logger.debug { "Running script via shell agent on #{target_count(shell_targets)}" }

  first_target = shell_targets.first
  arguments = unwrap_sensitive_args(arguments)
  timeout = first_target.options['command-timeout']
  tmpdir = generate_tmpdir_path(first_target)

  script_content = File.binread(script)

  shell_targets.each { |target| callback&.call(type: :node_start, target: target) }

  begin
    remote_path = join_path(first_target, tmpdir, File.basename(script))
    active_targets = shell_targets.dup

    # Create a temp directory with restricted permissions
    failures = shell_run(active_targets,
                         make_dir_command(first_target, tmpdir),
                         description: 'mkdir tmpdir')
    results += emit_results(failures, **result_opts, &callback)
    active_targets -= failures.keys

    # Upload the script file
    if active_targets.any?
      failures = upload_file_content(active_targets, script_content, remote_path)
      results += emit_results(failures, **result_opts, &callback)
      active_targets -= failures.keys
    end

    # Make the script executable (no-op on Windows)
    chmod_cmd = make_executable_command(first_target, remote_path)
    if active_targets.any? && chmod_cmd
      failures = shell_run(active_targets, chmod_cmd, description: 'chmod script')
      results += emit_results(failures, **result_opts, &callback)
      active_targets -= failures.keys
    end

    # Execute the script asynchronously and poll for completion
    if active_targets.any?
      interpreter = select_interpreter(script, first_target.options['interpreters'])
      cmd_parts = []
      cmd_parts += Array(interpreter).map { |part| escape_arg(first_target, part) } if interpreter && options[:script_interpreter]
      cmd_parts << escape_arg(first_target, remote_path)
      cmd_parts += arguments.map { |arg| escape_arg(first_target, arg) }

      pending, start_failures = shell_start(active_targets, cmd_parts.join(' '))
      results += emit_results(start_failures, **result_opts, &callback)
      results += emit_results(wait_for_shell_results(pending, timeout), **result_opts, &callback)
    end
  ensure
    cleanup_tmpdir(shell_targets, tmpdir)
  end

  results
end

#batch_task(targets, task, arguments, _options = {}, position = [], &callback) ⇒ Array<Bolt::Result>

Override batch_task to handle multiple targets in one thread using the RPC. Implementation grouping (mixed-platform support) is handled internally by run_task_via_bolt_tasks and run_task_via_shell.

Parameters:

  • targets (Array<Bolt::Target>)

    Targets in a single collective batch

  • task (Bolt::Task)

    Task to execute

  • arguments (Hash)

    Task parameter names to values

  • options (Hash)

    Execution options (unused currently, passed through from Base)

  • position (Array) (defaults to: [])

    Positional info for result tracking

  • callback (Proc)

    Called with :node_start and :node_result events

Returns:

  • (Array<Bolt::Result>)

    Results for all targets (successes and failures)



99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# File 'lib/bolt/transport/choria.rb', line 99

def batch_task(targets, task, arguments, _options = {}, position = [], &callback)
  chosen_agent = targets.first.options['task-agent'] || 'bolt_tasks'
  result_opts = { action: 'task', name: task.name, position: position }

  # The results var here is the error results for incapable targets, to which we'll add in
  # the successful results from the capable targets as we go.
  capable, results = prepare_targets(targets, chosen_agent, result_opts, &callback)

  logger.debug { "Task #{task.name} routing: agent: #{chosen_agent}, #{capable.size} capable / #{targets.size - capable.size} incapable" }

  unless capable.empty?
    capable.each { |target| callback&.call(type: :node_start, target: target) }
    arguments = unwrap_sensitive_args(arguments)

    results += case chosen_agent
               when 'bolt_tasks'
                 run_task_via_bolt_tasks(capable, task, arguments, result_opts, &callback)
               when 'shell'
                 run_task_via_shell(capable, task, arguments, result_opts, &callback)
               else
                 raise Bolt::Error.new(
                   "Unsupported task-agent '#{chosen_agent}'",
                   'bolt/choria-unsupported-agent'
                 )
               end
  end

  results
end

#batch_task_with(targets, task, target_mapping, options = {}, position = [], &callback) ⇒ Array<Bolt::Result>

Override batch_task_with for per-target arguments. Only called from the run_task_with Puppet plan function (no CLI or Ruby API path uses this). Discovery is batched upfront, but execution is sequential per-target because MCollective RPC calls send the same arguments to all targets. A future optimization could batch the download/infra-setup/polling steps while keeping only the start step per-target.

THIS IS NOT YET READY FOR PRODUCTION. The API is stable, but we don’t yet have full plan support and this runs the task sequentially across targets, which is very inefficient. It had to be implemented now, though, in order to prevent the assert_batch_size_one from the Base interface from blowing things up.

Parameters:

  • targets (Array<Bolt::Target>)

    Targets in a single collective batch

  • task (Bolt::Task)

    Task to execute

  • target_mapping (Hash{Bolt::Target => Hash})

    Per-target argument hashes

  • options (Hash) (defaults to: {})

    Execution options (passed through from Base)

  • position (Array) (defaults to: [])

    Positional info for result tracking

  • callback (Proc)

    Called with :node_start and :node_result events

Returns:



150
151
152
153
154
155
156
157
158
159
160
# File 'lib/bolt/transport/choria.rb', line 150

def batch_task_with(targets, task, target_mapping, options = {}, position = [], &callback)
  # Pre-warm the agent cache so individual batch_task calls are cache hits
  configure_client(targets.first)
  discover_agents(targets)

  results = []
  targets.each do |target|
    results += batch_task([target], task, target_mapping[target], options, position, &callback)
  end
  results
end

#batches(targets) ⇒ Array<Array<Bolt::Target>>

Group targets by collective so each batch uses a single RPC client scope. MCollective RPC calls are published to a collective-specific NATS subject, so targets in different collectives must be in separate batches. Most deployments have one collective, yielding one batch. Bolt runs each batch in its own thread and @rpc_mutex serializes the RPC calls across threads to prevent response misrouting.

Parameters:

  • targets (Array<Bolt::Target>)

    All targets for this operation

Returns:

  • (Array<Array<Bolt::Target>>)

    Targets grouped by collective



81
82
83
84
85
86
# File 'lib/bolt/transport/choria.rb', line 81

def batches(targets)
  # Populates @default_collective from the Choria config so targets
  # without an explicit collective are grouped correctly.
  configure_client(targets.first)
  targets.group_by { |target| collective_for(target) }.values
end

#build_result(target, data, action:, name:, position:) ⇒ Bolt::Result

Build a Bolt::Result from an output hash. Handles both success and error cases based on the presence of the :error key.

Parameters:

  • target (Bolt::Target)

    The target this result belongs to

  • data (Hash)

    Output hash with keys :stdout, :stderr, :exitcode, and optionally :error and :error_kind for failures

  • action (String)

    One of ‘task’, ‘command’, or ‘script’

  • name (String)

    Task/command/script name for result metadata

  • position (Array)

    Positional info for result tracking

Returns:



79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/bolt/transport/choria/helpers.rb', line 79

def build_result(target, data, action:, name:, position:)
  if data[:error]
    Bolt::Result.from_exception(
      target, Bolt::Error.new(data[:error], data[:error_kind]),
      action: action, position: position
    )
  elsif action == 'task'
    Bolt::Result.for_task(target, data[:stdout], data[:stderr],
                          data[:exitcode], name, position)
  elsif %w[command script].include?(action)
    Bolt::Result.for_command(
      target,
      { 'stdout' => data[:stdout], 'stderr' => data[:stderr], 'exit_code' => data[:exitcode] },
      action, name, position
    )
  else
    raise Bolt::Error.new(
      "Unknown action '#{action}' in build_result",
      'bolt/choria-unknown-action'
    )
  end
end

#build_task_command(target, remote_task_path, arguments, input_method, interpreter_options) ⇒ String

Build the full command string for task execution via the shell agent, handling interpreter selection, environment variable injection, and stdin piping.

Parameters:

  • target (Bolt::Target)

    Target (used for platform detection)

  • remote_task_path (String)

    Absolute path to the task executable on the remote node

  • arguments (Hash)

    Task parameter names to values

  • input_method (String)

    How to pass arguments: ‘stdin’, ‘environment’, or ‘both’

  • interpreter_options (Hash{String => String})

    File extension to interpreter path mapping

Returns:

  • (String)

    The fully constructed shell command



150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
# File 'lib/bolt/transport/choria/command_builders.rb', line 150

def build_task_command(target, remote_task_path, arguments, input_method, interpreter_options)
  interpreter = select_interpreter(remote_task_path, interpreter_options)
  cmd = interpreter ?
    "#{Array(interpreter).map { |part| escape_arg(target, part) }.join(' ')} #{escape_arg(target, remote_task_path)}" :
    escape_arg(target, remote_task_path)

  needs_env = Bolt::Task::ENVIRONMENT_METHODS.include?(input_method)
  needs_stdin = Bolt::Task::STDIN_METHODS.include?(input_method)

  if needs_env && needs_stdin && windows_target?(target)
    # On Windows, piping stdin into a multi-statement command
    # requires a script block. Pipeline data doesn't automatically
    # flow through a script block to inner commands, so we
    # explicitly forward $input via a pipe.
    env_params = envify_params(arguments)
    env_params.each_key { |key| validate_env_key!(key, 'task argument') }
    set_stmts = env_params.map { |key, val| "$env:#{key} = '#{ps_escape(val)}'" }
    cmd = stdin_pipe_command(target, arguments.to_json,
                             "{ #{set_stmts.join('; ')}; $input | & #{cmd} }")
  else
    if needs_env
      cmd = prepend_env_vars(target, cmd, envify_params(arguments), 'task argument')
    end

    if needs_stdin
      cmd = stdin_pipe_command(target, arguments.to_json, cmd)
    end
  end

  cmd
end

#choria_identity(target) ⇒ Object

Returns the Choria node identity for a target. Uses the transport ‘host’ config if set, falling back to target.host (which Bolt derives from the URI or target name).



200
201
202
# File 'lib/bolt/transport/choria.rb', line 200

def choria_identity(target)
  target.options['host'] || target.host
end

#cleanup_dir_command(target, path) ⇒ String

Build a recursive directory removal command.

Parameters:

  • target (Bolt::Target)

    Used for platform detection

  • path (String)

    Absolute path to the directory

Returns:

  • (String)

    Shell command



40
41
42
43
44
# File 'lib/bolt/transport/choria/command_builders.rb', line 40

def cleanup_dir_command(target, path)
  windows_target?(target) ?
    "Remove-Item -Recurse -Force -Path '#{ps_escape(path)}'" :
    "rm -rf #{Shellwords.shellescape(path)}"
end

#cleanup_tmpdir(targets, tmpdir) ⇒ Object

Clean up a remote tmpdir on targets, logging per-target failures. Used in ensure blocks after batch_script and batch_task_shell.

Parameters:

  • targets (Array<Bolt::Target>)

    Targets to clean up on

  • tmpdir (String)

    Absolute path to the temporary directory to remove



127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
# File 'lib/bolt/transport/choria/shell.rb', line 127

def cleanup_tmpdir(targets, tmpdir)
  return unless targets.first.options.fetch('cleanup', true)

  unless File.basename(tmpdir).start_with?('bolt-choria-')
    logger.warn { "Refusing to delete unexpected tmpdir path: #{tmpdir}" }
    return
  end

  begin
    failures = shell_run(targets, cleanup_dir_command(targets.first, tmpdir),
                         description: 'cleanup tmpdir')
    failures.each do |target, failure|
      logger.warn { "Cleanup failed on #{target.safe_name}. Task data may remain in #{tmpdir}. #{failure[:error]}" }
    end
  rescue StandardError => e
    logger.warn { "Cleanup of #{tmpdir} failed on all targets: #{e.message}" }
  end
end

#collective_for(target) ⇒ Object

Returns the collective for a target, used by batches() to group targets. Falls back to the default collective from the loaded config.



206
207
208
# File 'lib/bolt/transport/choria.rb', line 206

def collective_for(target)
  target.options['collective'] || @default_collective
end

#configure_client(target) ⇒ Object

One-time setup of the local MCollective client connection to the NATS broker. MCollective::Config.loadconfig must only be called once since it loads plugins via PluginManager.loadclass, and a second call raises “Plugin already loaded”.

The @client_configured flag is checked twice: once before taking the mutex (fast path to avoid lock overhead on every call after setup) and once inside (handles the race where two batch threads both see false simultaneously and try to configure concurrently).

This function is idempotent, so it should be called before any operation that needs the client connection to ensure it is configured correctly.

Parameters:

  • target (Bolt::Target)

    Any target in the batch (used to read transport options)



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/bolt/transport/choria/client.rb', line 26

def configure_client(target)
  return if @client_configured

  @config_mutex.synchronize do
    return if @client_configured
    # If a previous attempt failed after partially initializing
    # MCollective (e.g., plugins loaded but NATS connector failed),
    # retrying loadconfig would hit "Plugin already loaded" errors.
    # Re-raise the original error so the caller gets a clear message.
    raise @config_error if @config_error

    # We do the require here because this is a pretty meaty library, and
    # no need to load it when OpenBolt starts up if the user isn't using
    # the Choria transport.
    require 'mcollective'

    opts = target.options
    config = MCollective::Config.instance

    unless config.configured
      config_file = opts['config-file'] || MCollective::Util.config_file_for_user

      unless File.readable?(config_file)
        msg = if opts['config-file']
                "Choria config file not found or not readable: #{config_file}"
              else
                "Could not find a readable Choria client config file. " \
                "Searched: #{MCollective::Util.config_paths_for_user.join(', ')}. " \
                "Set the 'config-file' option in the Choria transport configuration."
              end
        raise Bolt::Error.new(msg, 'bolt/choria-config-not-found')
      end

      begin
        config.loadconfig(config_file)
      rescue StandardError => e
        @config_error = Bolt::Error.new(
          "Choria client configuration failed: #{e.class}: #{e.message}",
          'bolt/choria-config-failed'
        )
        raise @config_error
      end
      logger.debug { "Loaded Choria client config from #{config_file}" }
    end

    if opts['mcollective-certname']
      ENV['MCOLLECTIVE_CERTNAME'] = opts['mcollective-certname']
      logger.debug { "MCOLLECTIVE_CERTNAME set to #{opts['mcollective-certname']}" }
    end

    if opts['brokers']
      brokers = Array(opts['brokers']).map { |broker| broker.include?(':') ? broker : "#{broker}:4222" }
      config.pluginconf['choria.middleware_hosts'] = brokers.join(',')
      logger.debug { "Choria brokers overridden: #{brokers.join(', ')}" }
    end

    if opts['ssl-ca'] && opts['ssl-cert'] && opts['ssl-key']
      unreadable = %w[ssl-ca ssl-cert ssl-key].find { |key| !File.readable?(opts[key]) }
      if unreadable
        raise Bolt::Error.new(
          "File for #{unreadable} is not readable: #{opts[unreadable]}",
          'bolt/choria-config-failed'
        )
      end

      config.pluginconf['security.provider'] = 'file'
      config.pluginconf['security.file.ca'] = opts['ssl-ca']
      config.pluginconf['security.file.certificate'] = opts['ssl-cert']
      config.pluginconf['security.file.key'] = opts['ssl-key']
      logger.debug { "Using file-based TLS security provider with given SSL override(s)" }
    end

    @default_collective = config.main_collective
    @client_configured = true
  end
end

#create_rpc_client(agent_name, targets, timeout) ⇒ MCollective::RPC::Client

Create an MCollective::RPC::Client for one or more targets. Accepts a single target or an array. Uses MCollective’s direct addressing mode (client.discover(nodes:)) to skip broadcast discovery and send requests directly to the specified nodes.

Note that when the client is created, if the shell agent isn’t already installed on the OpenBolt controller node, then the shell DDL that we bundle with OpenBolt at lib/mcollective/agent/shell.ddl automatically gets loaded since it’s on the $LOAD_PATH and in the right place for MCollective’s plugin loading. The bolt_tasks DDL is already included in the choria-mcorpc-support gem.

Parameters:

  • agent_name (String)

    MCollective agent name (e.g. ‘shell’, ‘bolt_tasks’)

  • targets (Bolt::Target, Array<Bolt::Target>)

    One or more targets to address

  • timeout (Numeric)

    RPC call timeout in seconds

Returns:

  • (MCollective::RPC::Client)

    Configured client with direct addressing enabled



119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
# File 'lib/bolt/transport/choria/client.rb', line 119

def create_rpc_client(agent_name, targets, timeout)
  targets = Array(targets)
  options = MCollective::Util.default_options
  options[:timeout] = timeout
  options[:verbose] = false
  options[:connection_timeout] = targets.first.options['broker-timeout']

  collective = collective_for(targets.first)
  options[:collective] = collective if collective

  client = MCollective::RPC::Client.new(agent_name, options: options)
  client.progress = false

  identities = targets.map { |target| choria_identity(target) }.uniq
  client.discover(nodes: identities)

  client
end

#discover_agent_list(targets) ⇒ Object

Discover available agents on targets via rpcutil.agent_inventory and populate @agent_cache with agent lists.

Parameters:

  • targets (Array<Bolt::Target>)

    Targets to query for agent inventory



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/bolt/transport/choria/agent_discovery.rb', line 43

def discover_agent_list(targets)
  response = rpc_request('rpcutil', targets, 'rpcutil.agent_inventory') do |client|
    client.agent_inventory
  end
  response[:errors].each { |target, output| logger.debug { "agent_inventory failed for #{target.safe_name}: #{output[:error]}" } }

  response[:responded].each do |target, data|
    sender = choria_identity(target)
    agents = filter_agents(sender, data[:agents])
    unless agents
      logger.warn { "Unexpected agent_inventory response from #{sender}. This target will be treated as unreachable." }
      next
    end
    @agent_cache[sender] = { agents: agents }
    logger.debug { "Discovered agents on #{sender}: #{agents.join(', ')}" }
  end
rescue StandardError => e
  raise if e.is_a?(Bolt::Error)

  logger.warn { "Agent discovery failed: #{e.class}: #{e.message}" }
end

#discover_agents(targets) ⇒ Object

Discover agents and detect OS on targets via two batched RPC calls (agent_inventory for agents+versions, get_fact for os.family). Populates @agent_cache with { agents: […], os: ‘redhat’ | ‘windows’ | … }.

Parameters:

  • targets (Array<Bolt::Target>)

    Targets to discover agents on



17
18
19
20
21
22
23
24
25
26
27
28
29
# File 'lib/bolt/transport/choria/agent_discovery.rb', line 17

def discover_agents(targets)
  uncached = targets.reject { |target| @agent_cache.key?(choria_identity(target)) }
  return if uncached.empty?

  logger.debug { "Discovering agents on #{target_count(uncached)}" }
  discover_agent_list(uncached)
  discover_os_family(uncached)

  uncached.each do |target|
    identity = choria_identity(target)
    logger.warn { "No response from #{identity} during agent discovery" } unless @agent_cache.key?(identity)
  end
end

#discover_os_family(targets) ⇒ Object

Detect the OS family on targets via rpcutil.get_fact and update

Parameters:



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/bolt/transport/choria/agent_discovery.rb', line 69

def discover_os_family(targets)
  # Only fetch OS for targets that responded to agent_inventory
  responded = targets.select { |target| @agent_cache.key?(choria_identity(target)) }
  return if responded.empty?

  response = rpc_request('rpcutil', responded, 'rpcutil.get_fact') do |client|
    client.get_fact(fact: 'os.family')
  end
  response[:errors].each { |target, output|
    logger.warn {
      "OS detection failed for #{target.safe_name}: #{output[:error]}. Defaulting to POSIX command syntax."
    }
  }

  response[:responded].each do |target, data|
    sender = choria_identity(target)
    os_family = data[:value].to_s.downcase
    if os_family.empty?
      logger.warn { "os.family fact is empty on #{sender}. Defaulting to POSIX command syntax." }
      next
    end
    @agent_cache[sender][:os] = os_family
    logger.debug { "Detected OS on #{sender}: #{os_family}" }
  end
rescue StandardError => e
  raise if e.is_a?(Bolt::Error)

  logger.warn { "OS detection failed: #{e.class}: #{e.message}. Defaulting to POSIX command syntax." }
end

#download(_target, _source, _destination, _options = {}, _position = []) ⇒ Object

Raises:



190
191
192
193
194
195
# File 'lib/bolt/transport/choria.rb', line 190

def download(_target, _source, _destination, _options = {}, _position = [])
  raise Bolt::Error.new(
    'The Choria transport does not yet support download.',
    'bolt/choria-unsupported-operation'
  )
end

#download_and_start_task(targets, task, implementation, arguments, result_opts, &callback) ⇒ Hash

Download task files from the server and start execution for one implementation group via bolt_tasks.download and bolt_tasks.run_no_wait.

Parameters:

  • targets (Array<Bolt::Target>)

    Targets sharing the same implementation

  • task (Bolt::Task)

    Task being executed

  • implementation (Hash)

    Task implementation with ‘path’, ‘name’, ‘input_method’, ‘files’ keys

  • arguments (Hash)

    Task parameter names to values

  • result_opts (Hash)

    Options passed through to emit_results (:action, :name, :position)

  • callback (Proc)

    Called with :node_start and :node_result events

Returns:

  • (Hash)

    with keys:

    • :failed_results [Array<Bolt::Result>] Error results from setup phase

    • :targets [Array<Bolt::Target>] Targets that successfully started

    • :task_id [String, nil] Shared task ID for polling, nil if nothing started



55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
# File 'lib/bolt/transport/choria/bolt_tasks.rb', line 55

def download_and_start_task(targets, task, implementation, arguments, result_opts, &callback)
  environment = targets.first.options['puppet-environment']
  input_method = implementation['input_method']
  impl_files = [{ 'name' => File.basename(implementation['name']), 'path' => implementation['path'] }] +
               (implementation['files'] || [])
  file_specs_json = impl_files.map { |file| task_file_spec(file, task.module_name, environment) }.to_json

  # The failed_results reference will get updated and if we ever end up without
  # any targets left to act on, we can return it immediately.
  failed_results = []
  none_started_result = { failed_results: failed_results, targets: [], task_id: nil }

  # Download task files
  logger.debug { "Downloading task #{task.name} files via bolt_tasks to #{target_count(targets)}" }
  response = rpc_request('bolt_tasks', targets, 'bolt_tasks.download') do |client|
    client.download(task: task.name, files: file_specs_json, environment: environment)
  end
  # The bolt_tasks agent uses reply.fail! with statuscode 1 for download
  # failures, which rpc_request routes to :responded since statuscode 0-1
  # means the action completed. Check rpc_statuscodes to catch these and
  # report the download failure clearly instead of letting run_no_wait
  # fail with a confusing "task not available" error.
  dl_errors = response[:errors]
  response[:rpc_statuscodes].each do |target, code|
    next if code.zero? || dl_errors.key?(target)

    dl_errors[target] = error_output(
      "bolt_tasks.download on #{target.safe_name} failed to download task files",
      'bolt/choria-download-failed'
    )
  end
  # Must use concat rather than += to preserve reference to failed_results for early return
  failed_results.concat(emit_results(dl_errors, **result_opts, &callback))
  remaining = response[:responded].keys - dl_errors.keys
  return none_started_result if remaining.empty?

  # Start task execution
  logger.debug { "Starting task #{task.name} on #{target_count(remaining)}" }
  response = rpc_request('bolt_tasks', remaining, 'bolt_tasks.run_no_wait') do |client|
    client.run_no_wait(task: task.name, input_method: input_method,
                       files: file_specs_json, input: arguments.to_json)
  end
  failed_results.concat(emit_results(response[:errors], **result_opts, &callback))
  return none_started_result if response[:responded].empty?

  # Extract the shared task_id (all targets get the same one from
  # the single run_no_wait call that fanned out to all of them)
  task_id = response[:responded].values.first&.dig(:task_id)
  unless task_id
    no_id_errors = response[:responded].each_with_object({}) do |(target, _), errors|
      errors[target] = error_output(
        "bolt_tasks.run_no_wait on #{target.safe_name} succeeded but returned no task_id",
        'bolt/choria-missing-task-id'
      )
    end
    failed_results.concat(emit_results(no_id_errors, **result_opts, &callback))
    return none_started_result
  end

  logger.debug { "Started task #{task.name} on #{target_count(response[:responded])}, task_id: #{task_id}" }
  { failed_results: failed_results, targets: response[:responded].keys, task_id: task_id }
end

#emit_results(target_outputs, action:, name:, position:, fire_node_start: false, &callback) ⇒ Array<Bolt::Result>

Convert a hash of { target => output } into Results, fire callbacks, and return the Results array. When fire_node_start is true, fires a :node_start callback before each :node_result.

Parameters:

  • target_outputs (Hash{Bolt::Target => Hash})

    Map of targets to output hashes

  • action (String)

    One of ‘task’, ‘command’, or ‘script’

  • name (String)

    Task/command/script name for result metadata

  • position (Array)

    Positional info for result tracking

  • fire_node_start (Boolean) (defaults to: false)

    Whether to emit :node_start before each result

  • callback (Proc)

    Called with :node_start and :node_result events

Returns:

  • (Array<Bolt::Result>)

    Results for all targets in the hash



113
114
115
116
117
118
119
120
# File 'lib/bolt/transport/choria/helpers.rb', line 113

def emit_results(target_outputs, action:, name:, position:, fire_node_start: false, &callback)
  target_outputs.map do |target, data|
    callback&.call(type: :node_start, target: target) if fire_node_start
    result = build_result(target, data, action: action, name: name, position: position)
    callback&.call(type: :node_result, result: result)
    result
  end
end

#envify_params(params) ⇒ Hash{String => String}

Convert task arguments to PT_-prefixed environment variable hash. Duplicated from Bolt::Shell#envify_params. We don’t use Bolt::Shell classes because they interleave command building with connection-based execution (IO pipes, sudo prompts). With the Choria transport, we just need to build the command and send it via RPC so all the shell agents on the targets can execute it themselves.

Parameters:

  • params (Hash{String => Object})

    Task parameter names to values

Returns:

  • (Hash{String => String})

    Environment variables with PT_ prefix



191
192
193
194
195
196
# File 'lib/bolt/transport/choria/command_builders.rb', line 191

def envify_params(params)
  params.each_with_object({}) do |(key, val), env|
    val = val.to_json unless val.is_a?(String)
    env["PT_#{key}"] = val
  end
end

#error_output(message, kind, stdout: nil, stderr: nil, exitcode: 1) ⇒ Object

Build an error output hash. When actual output is available (e.g. a command ran but failed), pass it through so the user sees it.



129
130
131
132
# File 'lib/bolt/transport/choria/helpers.rb', line 129

def error_output(message, kind, stdout: nil, stderr: nil, exitcode: 1)
  output(stdout: stdout, stderr: stderr, exitcode: exitcode)
    .merge(error: message, error_kind: kind)
end

#escape_arg(target, str) ⇒ String

Escape a string for use as a shell argument on the target platform.

Parameters:

  • target (Bolt::Target)

    Used for platform detection

  • str (String)

    The string to escape

Returns:

  • (String)

    Escaped string (single-quoted on Windows, sh-escaped on POSIX)



105
106
107
# File 'lib/bolt/transport/choria/command_builders.rb', line 105

def escape_arg(target, str)
  windows_target?(target) ? "'#{ps_escape(str)}'" : Shellwords.shellescape(str)
end

#exitcode_from(data, target, context) ⇒ Integer

Extract exit code from RPC response data, defaulting to 1 with a warning if the agent returned nil.

Parameters:

  • data (Hash)

    RPC response data containing :exitcode

  • target (Bolt::Target)

    Target for logging context

  • context (String)

    Human-readable label for the log message

Returns:

  • (Integer)

    The exit code from the data, or 1 if nil



141
142
143
144
145
146
147
148
149
150
151
# File 'lib/bolt/transport/choria/helpers.rb', line 141

def exitcode_from(data, target, context)
  exitcode = data[:exitcode] || data['exitcode']
  if exitcode.nil?
    logger.warn {
      "Agent on #{target.safe_name} returned no exit code for #{context}. " \
      "Defaulting to exit code 1. This usually indicates an agent-level error."
    }
    exitcode = 1
  end
  exitcode
end

#extract_task_output(data, target) ⇒ Hash

Extract stdout, stderr, and exitcode from a bolt_tasks task_status response.

Parameters:

  • data (Hash)

    Task_status response data with :stdout, :stderr, :exitcode keys

  • target (Bolt::Target)

    Target for logging and stdout unwrapping context

Returns:

  • (Hash)

    Output hash from output() or error_output()



170
171
172
173
174
# File 'lib/bolt/transport/choria/bolt_tasks.rb', line 170

def extract_task_output(data, target)
  exitcode = exitcode_from(data, target, 'task')
  output(stdout: unwrap_bolt_tasks_stdout(data[:stdout]),
         stderr: data[:stderr], exitcode: exitcode)
end

#filter_agents(sender, agent_list) ⇒ Array<String>?

Filter out agents that don’t meet minimum version requirements.

Parameters:

  • sender (String)

    Choria node identity (for logging)

  • agent_list (Array<Hash>, nil)

    Agent entries from agent_inventory, each with :agent (name) and :version keys

Returns:

  • (Array<String>, nil)

    Agent names that meet version requirements, or nil if agent_list is not an Array



106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# File 'lib/bolt/transport/choria/agent_discovery.rb', line 106

def filter_agents(sender, agent_list)
  return nil unless agent_list.is_a?(Array)

  agent_list.filter_map do |entry|
    name = entry['agent']
    next unless name

    version = entry['version']
    min_version = AGENT_MIN_VERSIONS[name]
    if min_version && !meets_min_version?(version, min_version)
      logger.warn {
        "The '#{name}' agent on #{sender} is version #{version || 'unknown'}, " \
        "but #{min_version} or later is required. It will be treated as unavailable."
      }
      next
    end

    name
  end
end

#generate_tmpdir_path(target) ⇒ String

Generate a unique remote tmpdir path for batch operations.

Parameters:

  • target (Bolt::Target)

    Target whose platform and tmpdir config determine the base path

Returns:

  • (String)

    Absolute path to a unique temporary directory



116
117
118
119
120
# File 'lib/bolt/transport/choria/shell.rb', line 116

def generate_tmpdir_path(target)
  base = target.options['tmpdir']
  base = 'C:\Windows\Temp' if base == '/tmp' && windows_target?(target)
  join_path(target, base, "bolt-choria-#{SecureRandom.uuid}")
end

#has_agent?(target, agent_name) ⇒ Boolean

Returns:

  • (Boolean)


31
32
33
# File 'lib/bolt/transport/choria/agent_discovery.rb', line 31

def has_agent?(target, agent_name)
  @agent_cache[choria_identity(target)]&.dig(:agents)&.include?(agent_name) || false
end

#index_results_by_sender(results, targets, context) ⇒ Hash{String => Hash}

Index RPC results by sender, keeping only the first response per sender and only from the set of expected identities. Logs and discards responses from unexpected senders and duplicates.

Parameters:

  • results (Array<Hash>)

    Raw MCollective RPC result hashes with :sender keys

  • targets (Array<Bolt::Target>)

    Expected targets (used to build the allowed sender set)

  • context (String)

    Human-readable label for log messages

Returns:

  • (Hash{String => Hash})

    Sender identity to first valid RPC result hash



254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
# File 'lib/bolt/transport/choria/client.rb', line 254

def index_results_by_sender(results, targets, context)
  expected = targets.to_set { |target| choria_identity(target) }
  by_sender = {}
  results.each do |result|
    sender = result[:sender]
    unless sender
      logger.warn { "Discarding #{context} response with nil sender" }
      next
    end
    unless expected.include?(sender)
      logger.warn { "Discarding #{context} response from unexpected sender '#{sender}'" }
      next
    end
    if by_sender.key?(sender)
      if result[:data] == by_sender[sender][:data]
        logger.debug { "Ignoring duplicate #{context} response from #{sender}" }
      else
        logger.warn { "Ignoring duplicate #{context} response from #{sender} with different data" }
      end
      next
    end
    by_sender[sender] = result
  end
  by_sender
end

#join_path(target, *parts) ⇒ String

Join path segments using the target platform’s separator. Normalizes embedded forward slashes to backslashes on Windows.

Parameters:

  • target (Bolt::Target)

    Used for platform detection

  • parts (Array<String>)

    Path segments to join

Returns:

  • (String)

    Joined path



115
116
117
118
119
# File 'lib/bolt/transport/choria/command_builders.rb', line 115

def join_path(target, *parts)
  sep = windows_target?(target) ? '\\' : '/'
  parts = parts.map { |part| part.tr('/', sep) } if sep != '/'
  parts.join(sep)
end

#kill_timed_out_processes(targets) ⇒ Object

Kill processes on timed-out targets. Sequential because each target has a unique handle, requiring a separate shell.kill RPC call per target. A future batched kill action (like shell.statuses) would eliminate this.

Parameters:

  • targets (Hash{Bolt::Target => Hash})

    Timed-out targets mapped to { handle: uuid_string }



548
549
550
551
552
553
554
555
556
557
# File 'lib/bolt/transport/choria/shell.rb', line 548

def kill_timed_out_processes(targets)
  logger.debug { "Killing timed-out processes on #{target_count(targets)}" }
  targets.each do |target, state|
    rpc_request('shell', target, 'shell.kill') do |client|
      client.kill(handle: state[:handle])
    end
  rescue StandardError => e
    logger.warn { "Failed to kill process on #{target.safe_name}: #{e.message}" }
  end
end

#make_dir_command(target, *paths) ⇒ String

Build a mkdir command for one or more directories.

Parameters:

  • target (Bolt::Target)

    Used for platform detection

  • paths (Array<String>)

    Absolute directory paths to create

Returns:

  • (String)

    Shell command



16
17
18
19
20
21
22
23
24
# File 'lib/bolt/transport/choria/command_builders.rb', line 16

def make_dir_command(target, *paths)
  if windows_target?(target)
    escaped = paths.map { |path| "'#{ps_escape(path)}'" }.join(', ')
    "New-Item -ItemType Directory -Force -Path #{escaped}"
  else
    escaped = paths.map { |path| Shellwords.shellescape(path) }.join(' ')
    "mkdir -m 700 -p #{escaped}"
  end
end

#make_executable_command(target, path) ⇒ String?

Build a chmod +x command. Returns nil on Windows (not needed).

Parameters:

  • target (Bolt::Target)

    Used for platform detection

  • path (String)

    Absolute path to the file

Returns:

  • (String, nil)

    Shell command or nil



31
32
33
# File 'lib/bolt/transport/choria/command_builders.rb', line 31

def make_executable_command(target, path)
  windows_target?(target) ? nil : "chmod u+x #{Shellwords.shellescape(path)}"
end

#meets_min_version?(version, min_version) ⇒ Boolean

Returns:

  • (Boolean)


127
128
129
130
131
132
133
134
# File 'lib/bolt/transport/choria/agent_discovery.rb', line 127

def meets_min_version?(version, min_version)
  return false unless version

  Gem::Version.new(version) >= Gem::Version.new(min_version)
rescue ArgumentError => e
  logger.warn { "Could not parse version '#{version}': #{e.message}. Treating agent as unavailable." }
  false
end

#output(stdout: nil, stderr: nil, exitcode: nil) ⇒ Object

Build an output hash from command/task output.



123
124
125
# File 'lib/bolt/transport/choria/helpers.rb', line 123

def output(stdout: nil, stderr: nil, exitcode: nil)
  { stdout: stdout || '', stderr: stderr || '', exitcode: exitcode || 0 }
end

#poll_task_status(targets, task_id, task) ⇒ Hash{Bolt::Target => Hash}

Poll bolt_tasks.task_status until all targets complete or timeout.

Parameters:

  • targets (Array<Bolt::Target>)

    Targets that were started successfully

  • task_id (String)

    Shared task ID from run_no_wait

  • task (Bolt::Task)

    Task being polled (used for timeout and error messages)

Returns:

  • (Hash{Bolt::Target => Hash})

    Output hash for every target (success and error)



124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
# File 'lib/bolt/transport/choria/bolt_tasks.rb', line 124

def poll_task_status(targets, task_id, task)
  timeout = targets.first.options['task-timeout']

  poll_result = poll_with_retries(targets, timeout, 'bolt_tasks.task_status') do |remaining|
    response = rpc_request('bolt_tasks', remaining, 'bolt_tasks.task_status') do |client|
      client.task_status(task_id: task_id)
    end
    next { rpc_failed: true, done: {} } if response[:rpc_failed]

    done = response[:errors].dup

    response[:responded].each do |target, data|
      if data.nil?
        done[target] = error_output(
          "bolt_tasks.task_status on #{target.safe_name} returned success but no data",
          'bolt/choria-missing-data'
        )
        next
      end
      next unless data[:completed]

      done[target] = extract_task_output(data, target)
    end

    { rpc_failed: false, done: done }
  end

  remaining_errors = poll_result[:remaining].each_with_object({}) do |target, errors|
    errors[target] =
      if poll_result[:rpc_persistent_failure]
        error_output("RPC requests to poll task status on #{target.safe_name} failed persistently",
                     'bolt/choria-poll-failed')
      else
        error_output("Task #{task.name} timed out after #{timeout} seconds on #{target.safe_name}",
                     'bolt/choria-task-timeout')
      end
  end

  poll_result[:completed].merge(remaining_errors)
end

#poll_with_retries(targets, timeout, context) ⇒ Hash

Shared polling loop for bolt_tasks and shell polling. Handles sleep timing, round counting, RPC failure retry, and deadline enforcement.

The block receives the remaining targets each round and returns:

{ done: {target => output_hash}, rpc_failed: bool }

Parameters:

  • targets (Array, Hash)

    Initial targets to poll (duped internally)

  • timeout (Numeric)

    Maximum seconds before exiting

  • context (String)

    Label for log messages

Returns:

  • (Hash)

    with keys:

    • :completed [Hash=> Hash] All finished target outputs

    • :remaining [Array, Hash] Targets still pending when the loop exited

    • :rpc_persistent_failure [Boolean] True if loop exited due to persistent RPC failures



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/bolt/transport/choria/helpers.rb', line 34

def poll_with_retries(targets, timeout, context)
  deadline = Process.clock_gettime(Process::CLOCK_MONOTONIC) + timeout
  remaining = targets.dup
  completed = {}
  poll_failures = 0
  poll_round = 0

  until remaining.empty?
    sleep(POLL_INTERVAL_SECONDS)
    poll_round += 1
    logger.debug { "Poll round #{poll_round}: #{target_count(remaining)} still pending" }

    round = yield(remaining)

    if round[:rpc_failed]
      poll_failures += 1
      logger.warn { "#{context} poll failed (attempt #{poll_failures}/#{RPC_FAILURE_RETRIES})" }
      break if poll_failures >= RPC_FAILURE_RETRIES

      next
    end
    poll_failures = 0

    round[:done].each do |target, output|
      completed[target] = output
      remaining.delete(target)
    end

    break if Process.clock_gettime(Process::CLOCK_MONOTONIC) >= deadline
  end

  { completed: completed, remaining: remaining,
    rpc_persistent_failure: poll_failures >= RPC_FAILURE_RETRIES }
end

#powershell_cmd(script) ⇒ String

Wrap a PowerShell script for execution via shell agent. Uses -EncodedCommand with Base64-encoded UTF-16LE (the encoding Microsoft requires for -EncodedCommand) to avoid all quoting issues with cmd.exe and PowerShell metacharacters.

Parameters:

  • script (String)

    PowerShell script to encode and wrap

Returns:

  • (String)

    powershell.exe command with -EncodedCommand



128
129
130
# File 'lib/bolt/transport/choria/command_builders.rb', line 128

def powershell_cmd(script)
  "powershell.exe -NoProfile -NonInteractive -EncodedCommand #{Base64.strict_encode64(script.encode('UTF-16LE'))}"
end

#prepare_targets(targets, agent_name, result_opts, &callback) ⇒ Array

Configure the client, discover agents, partition targets by agent availability, and emit errors for incapable targets.

Parameters:

  • targets (Array<Bolt::Target>)

    Targets to prepare

  • agent_name (String)

    Required agent name (e.g. ‘shell’, ‘bolt_tasks’)

  • result_opts (Hash)

    Options passed through to emit_results (:action, :name, :position)

  • callback (Proc)

    Called with :node_start and :node_result events

Returns:

  • (Array)

    Two-element array:

    • Array<Bolt::Target>

      Targets that have the required agent

    • Array<Bolt::Result>

      Error results for targets that lack the agent



227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
# File 'lib/bolt/transport/choria/client.rb', line 227

def prepare_targets(targets, agent_name, result_opts, &callback)
  configure_client(targets.first)
  discover_agents(targets)

  capable, incapable = targets.partition { |target| has_agent?(target, agent_name) }

  agent_errors = incapable.each_with_object({}) do |target, errors|
    msg = if @agent_cache[choria_identity(target)].nil?
            "No agent information available for #{target.safe_name} (node did not respond to discovery)"
          else
            "The '#{agent_name}' agent is not available on #{target.safe_name}."
          end
    errors[target] = error_output(msg, 'bolt/choria-agent-not-available')
  end
  incapable_results = emit_results(agent_errors, fire_node_start: true, **result_opts, &callback)

  [capable, incapable_results]
end

#prepend_env_vars(target, command, env_vars, context) ⇒ String

Prepend environment variables to a command string. Returns the command unchanged if env_vars is nil or empty.

Parameters:

  • target (Bolt::Target)

    Used for platform detection

  • command (String)

    The command to prepend env vars to

  • env_vars (Hash{String => String}, nil)

    Variable names to values

  • context (String)

    Description for error messages (e.g., ‘task argument’)

Returns:

  • (String)

    Command with env vars prepended



70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/bolt/transport/choria/command_builders.rb', line 70

def prepend_env_vars(target, command, env_vars, context)
  return command unless env_vars&.any?

  env_vars.each_key { |key| validate_env_key!(key, context) }

  if windows_target?(target)
    set_stmts = env_vars.map { |key, val| "$env:#{key} = '#{ps_escape(val)}'" }
    "#{set_stmts.join('; ')}; & #{command}"
  else
    env_str = env_vars.map { |key, val| "#{key}=#{Shellwords.shellescape(val)}" }.join(' ')
    "/usr/bin/env #{env_str} #{command}"
  end
end

#provided_featuresObject

Advertise both shell and powershell so tasks with either requirement can be selected. The per-target selection happens in select_implementation below, which picks the right feature set based on the target’s detected OS.



52
53
54
# File 'lib/bolt/transport/choria.rb', line 52

def provided_features
  %w[shell powershell]
end

#ps_escape(str) ⇒ String

Escape single quotes for use inside PowerShell single-quoted strings.

Parameters:

  • str (String)

    String to escape

Returns:

  • (String)

    String with single quotes doubled



136
137
138
# File 'lib/bolt/transport/choria/command_builders.rb', line 136

def ps_escape(str)
  str.gsub("'", "''")
end

#rpc_request(agent, targets, context) {|MCollective::RPC::Client| ... } ⇒ Hash

Make a batched RPC call and split results into responded and errors. Yields the RPC client so the caller specifies which action to invoke.

Results are split based on MCollective RPC statuscodes:

  • statuscode 0: action completed successfully (:responded)

  • statuscode 1 (RPCAborted): action completed but reported a problem (:responded). The data is preserved rather than discarded because some agents (notably bolt_tasks) use statuscode 1 for application-level failures where the response data is still valid and meaningful (e.g., a task that ran but exited non-zero). Callers must handle this case and not assume :responded means success.

  • statuscode 2-5: RPC infrastructure error (:errors)

  • no response: target didn’t reply (:errors)

  • exception: total RPC failure (rpc_failed: true)

Serialized by @rpc_mutex because MCollective’s NATS connector is a singleton with a shared receive queue. Concurrent RPC calls cause reply channel collisions, cross-thread message confusion, and subscription conflicts. See choria-transport-dev.md for the full explanation.

Parameters:

  • agent (String)

    MCollective agent name (e.g. ‘shell’, ‘bolt_tasks’, ‘rpcutil’)

  • targets (Bolt::Target, Array<Bolt::Target>)

    One or more targets to address

  • context (String)

    Human-readable label for logging (e.g. ‘shell.start’)

Yields:

  • (MCollective::RPC::Client)

    The configured RPC client to invoke an action on

Returns:

  • (Hash)

    with keys:

    • :responded [Hash] Targets where the action completed (statuscode 0-1), mapped to their response data

    • :errors [Hash] Targets with RPC errors or no response, mapped to error output hashes

    • :rpc_failed [Boolean] True when the entire RPC call failed

    • :rpc_statuscodes [Hash] Per-target MCollective RPC statuscodes. Includes all targets that responded (both :responded and :errors). Not populated when rpc_failed is true (no individual responses).



171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
# File 'lib/bolt/transport/choria/client.rb', line 171

def rpc_request(agent, targets, context)
  targets = Array(targets)
  rpc_results = @rpc_mutex.synchronize do
    rpc_timeout = targets.first.options['rpc-timeout']
    client = create_rpc_client(agent, targets, rpc_timeout)
    yield(client)
  end
  by_sender = index_results_by_sender(rpc_results, targets, context)

  responded = {}
  errors = {}
  rpc_statuscodes = {}
  targets.each do |target|
    rpc_result = by_sender[choria_identity(target)]
    if rpc_result.nil?
      errors[target] = error_output(
        "No response from #{target.safe_name} for #{context}",
        'bolt/choria-no-response'
      )
    elsif rpc_result[:statuscode] > 1
      rpc_statuscodes[target] = rpc_result[:statuscode]
      errors[target] = error_output(
        "#{context} on #{target.safe_name} returned RPC error: " \
        "#{rpc_result[:statusmsg]} (code #{rpc_result[:statuscode]})",
        'bolt/choria-rpc-error'
      )
    else
      rpc_statuscodes[target] = rpc_result[:statuscode]
      if rpc_result[:statuscode] == 1
        logger.warn { "#{context} on #{target.safe_name} had RPC status code #{rpc_result[:statuscode]}: #{rpc_result[:statusmsg]}" }
      end
      responded[target] = rpc_result[:data]
    end
  end
  { responded: responded, errors: errors, rpc_failed: false, rpc_statuscodes: rpc_statuscodes }
rescue StandardError => e
  raise if e.is_a?(Bolt::Error)

  logger.warn { "#{context} RPC call failed: #{e.class}: #{e.message}" }
  errors = targets.each_with_object({}) do |target, errs|
    errs[target] = error_output("#{context} failed on #{target.safe_name}: #{e.message}",
                                'bolt/choria-rpc-failed')
  end
  { responded: {}, errors: errors, rpc_failed: true, rpc_statuscodes: {} }
end

#run_task_via_bolt_tasks(targets, task, arguments, result_opts, &callback) ⇒ Array<Bolt::Result>

Run a task via the bolt_tasks agent. Groups targets by implementation to support mixed-platform batches. Starts all groups before polling any of them so tasks execute concurrently on nodes across implementations.

Parameters:

  • targets (Array<Bolt::Target>)

    Targets that have the bolt_tasks agent

  • task (Bolt::Task)

    Task to execute

  • arguments (Hash)

    Task parameter names to values

  • result_opts (Hash)

    Options passed through to emit_results (:action, :name, :position)

  • callback (Proc)

    Called with :node_start and :node_result events

Returns:



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/bolt/transport/choria/bolt_tasks.rb', line 16

def run_task_via_bolt_tasks(targets, task, arguments, result_opts, &callback)
  logger.debug { "Running task #{task.name} via bolt_tasks agent on #{target_count(targets)}" }
  results = []

  # Start all implementation groups. Each gets its own download +
  # run_no_wait sequence. Tasks begin executing on nodes as soon as
  # run_no_wait returns.
  started_groups = []
  targets.group_by { |target| select_implementation(target, task) }.each do |implementation, impl_targets|
    start_result = download_and_start_task(impl_targets, task, implementation,
                                           arguments, result_opts, &callback)
    results += start_result[:failed_results]
    started_groups << start_result if start_result[:task_id]
  end

  # Poll each group. Tasks are already running concurrently on nodes,
  # so wall time is dominated by the longest task, not the sum.
  # Each group has a different task_id, so they must be polled separately.
  started_groups.each do |group|
    output_by_target = poll_task_status(group[:targets], group[:task_id], task)
    results += emit_results(output_by_target, **result_opts, &callback)
  end

  results
end

#run_task_via_shell(targets, task, arguments, result_opts, &callback) ⇒ Array<Bolt::Result>

Run a task via the shell agent. Groups targets by implementation to support mixed-platform batches. Starts all groups before polling so tasks execute concurrently on nodes across implementations.

Parameters:

  • targets (Array<Bolt::Target>)

    Targets that have the shell agent

  • task (Bolt::Task)

    Task to execute

  • arguments (Hash)

    Task parameter names to values

  • result_opts (Hash)

    Options passed through to emit_results (:action, :name, :position)

  • callback (Proc)

    Called with :node_start and :node_result events

Returns:



156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
# File 'lib/bolt/transport/choria/shell.rb', line 156

def run_task_via_shell(targets, task, arguments, result_opts, &callback)
  logger.debug { "Running task #{task.name} via shell agent on #{target_count(targets)}" }
  results = []
  all_pending = {}
  cleanup_entries = []

  # Each implementation group gets its own tmpdir because different
  # platforms need different base paths (e.g., /tmp vs C:\Windows\Temp).
  begin
    targets.group_by { |target| select_implementation(target, task) }.each do |implementation, impl_targets|
      start_result = upload_and_start_task(impl_targets, task, implementation,
                                           arguments, result_opts, &callback)
      results += start_result[:failed_results]
      all_pending.merge!(start_result[:pending])
      cleanup_entries << { targets: impl_targets, tmpdir: start_result[:tmpdir] }
    end

    # Poll all handles in one loop. Unlike bolt_tasks (which needs
    # separate polls per task_id), shell handles are interchangeable.
    unless all_pending.empty?
      timeout = targets.first.options['task-timeout']
      results += emit_results(wait_for_shell_results(all_pending, timeout), **result_opts, &callback)
    end
  ensure
    cleanup_entries.each { |entry| cleanup_tmpdir(entry[:targets], entry[:tmpdir]) }
  end

  results
end

#select_implementation(target, task) ⇒ Hash

Override to select task implementation based on the target’s OS. Other transports rely on inventory features to pick the right implementation, but Choria discovers the OS at runtime via the os.family fact. We pass only the detected platform’s feature so task.select_implementation picks the correct .ps1 or .sh file.

Parameters:

  • target (Bolt::Target)

    Target whose OS determines the implementation

  • task (Bolt::Task)

    Task with platform-specific implementations

Returns:

  • (Hash)

    Selected implementation hash with ‘path’, ‘name’, ‘input_method’, ‘files’ keys



65
66
67
68
69
70
# File 'lib/bolt/transport/choria.rb', line 65

def select_implementation(target, task)
  features = windows_target?(target) ? ['powershell'] : ['shell']
  impl = task.select_implementation(target, features)
  impl['input_method'] ||= default_input_method(impl['path'])
  impl
end

#shell_list(remaining) ⇒ Array

One round of the shell.list RPC action to check which handles have completed. Targets not yet done are omitted from the return value.

Parameters:

  • remaining (Hash{Bolt::Target => Hash})

    Targets still pending, each mapped to { handle: uuid_string }

Returns:

  • (Array)

    Two-element array:

    • done [Hash=> Hash] Completed targets mapped to handle state or error hash

    • rpc_failed [Boolean] True when the entire RPC call failed



427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
# File 'lib/bolt/transport/choria/shell.rb', line 427

def shell_list(remaining)
  response = rpc_request('shell', remaining.keys, 'shell.list') do |client|
    client.list
  end
  return [{}, true] if response[:rpc_failed]

  done = response[:errors]
  logger.debug { "shell.list: #{target_count(response[:responded])} responded, #{target_count(done)} failed" } unless done.empty?

  response[:responded].each do |target, data|
    if data.nil?
      done[target] = error_output("shell.list on #{target.safe_name} returned success but no data",
                                  'bolt/choria-missing-data')
      next
    end

    handle = remaining[target][:handle]
    job = data.dig(:jobs, handle)

    unless job
      logger.debug {
        job_handles = data[:jobs]&.keys || []
        "shell.list on #{target.safe_name}: handle #{handle} not found, " \
        "available handles: #{job_handles.inspect}"
      }
      done[target] = error_output(
        "Handle #{handle} not found in shell.list on #{target.safe_name}. " \
        "The process may have been cleaned up or the agent may have restarted.",
        'bolt/choria-handle-not-found'
      )
      next
    end

    status = job['status']&.to_s
    logger.debug { "shell.list on #{target.safe_name}: handle #{handle} status: #{status}" }
    done[target] = remaining[target] if SHELL_DONE_STATUSES.include?(status)
  end

  [done, false]
end

#shell_run(targets, command, description: nil) ⇒ Hash{Bolt::Target => Hash}

Execute a synchronous command on targets via the shell.run RPC action. Used for internal prep/cleanup (mkdir, chmod, etc.) that completes quickly. Returns only failures since successes don’t need to be reported.

Parameters:

  • targets (Array<Bolt::Target>)

    Targets to run the command on

  • command (String)

    Shell command to execute

  • description (String, nil) (defaults to: nil)

    Human-readable label for logging (defaults to command)

Returns:

  • (Hash{Bolt::Target => Hash})

    Failures only; empty hash means all succeeded



287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
# File 'lib/bolt/transport/choria/shell.rb', line 287

def shell_run(targets, command, description: nil)
  label = description || command
  command = powershell_cmd(command) if windows_target?(targets.first)
  response = rpc_request('shell', targets, label) do |client|
    client.run(command: command)
  end

  # Check that the exit code is 0 for each successful RPC response,
  # treating nonzero exit codes as failures.
  failures = response[:errors]
  response[:responded].each do |target, data|
    data ||= {}
    exitcode = exitcode_from(data, target, label)
    next if exitcode.zero?

    failures[target] = error_output(
      "#{label} failed on #{target.safe_name} (exit code #{exitcode}): #{data[:stderr]}",
      'bolt/choria-operation-failed',
      stdout: data[:stdout], stderr: data[:stderr], exitcode: exitcode
    )
  end

  failures
end

#shell_start(targets, command) ⇒ Array

Start an async command on targets via the shell.start RPC action. Returns handles for polling with wait_for_shell_results.

Parameters:

  • targets (Array<Bolt::Target>)

    Targets to start the command on

  • command (String)

    Shell command to execute

Returns:

  • (Array)

    Two-element array:

    • pending [Hash] Targets mapped to { handle: uuid_string }

    • failures [Hash] Targets mapped to error output hashes



341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
# File 'lib/bolt/transport/choria/shell.rb', line 341

def shell_start(targets, command)
  command = powershell_cmd(command) if windows_target?(targets.first)
  response = rpc_request('shell', targets, 'shell.start') do |client|
    client.start(command: command)
  end
  failures = response[:errors]

  pending, no_handle = response[:responded].partition { |_target, data| data&.dig(:handle) }.map(&:to_h)
  pending.each { |target, data| logger.debug { "Started command on #{target.safe_name}, handle: #{data[:handle]}" } }

  no_handle.each_key do |target|
    failures[target] = error_output("shell.start on #{target.safe_name} returned success but no handle",
                                    'bolt/choria-missing-handle')
  end

  [pending, failures]
end

#shell_statuses(targets) ⇒ Hash{Bolt::Target => Hash}

Fetch stdout/stderr/exitcode from completed targets via the shell.statuses RPC action. Requires shell agent >= 1.2.1.

Parameters:

  • targets (Hash{Bolt::Target => Hash})

    Completed targets mapped to { handle: uuid_string }

Returns:

  • (Hash{Bolt::Target => Hash})

    Output hash for each target



473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
# File 'lib/bolt/transport/choria/shell.rb', line 473

def shell_statuses(targets)
  handles = targets.transform_values { |data| data[:handle] }
  logger.debug { "Fetching shell.statuses for #{target_count(targets.keys)}" }

  results = {}
  response = rpc_request('shell', targets.keys, 'shell.statuses') do |client|
    client.statuses(handles: handles.values)
  end

  response[:errors].each do |target, fail_output|
    results[target] = fail_output
  end

  response[:responded].each do |target, data|
    statuses = data&.dig(:statuses)
    handle = handles[target]

    unless statuses
      results[target] = error_output(
        "shell.statuses on #{target.safe_name} returned no data",
        'bolt/choria-missing-data'
      )
      next
    end

    status_data = statuses[handle]
    unless status_data
      results[target] = error_output(
        "shell.statuses on #{target.safe_name} did not include handle #{handle}",
        'bolt/choria-missing-data'
      )
      next
    end

    status = status_data['status']&.to_s
    stdout = status_data['stdout']
    stderr = status_data['stderr']
    error_msg = status_data['error']

    if status == 'error'
      results[target] = error_output(
        "Handle #{handle} not found on #{target.safe_name}: #{error_msg}",
        'bolt/choria-handle-not-found'
      )
    elsif status == 'failed'
      results[target] = error_output(
        "Process failed on #{target.safe_name}: #{stderr}",
        'bolt/choria-process-failed',
        stdout: stdout, stderr: stderr, exitcode: 1
      )
    else
      exitcode = exitcode_from(status_data, target, 'shell.statuses')
      results[target] = output(stdout: stdout, stderr: stderr, exitcode: exitcode)
    end
  end

  results
rescue StandardError => e
  raise if e.is_a?(Bolt::Error)

  logger.warn { "shell.statuses RPC call failed: #{e.class}: #{e.message}" }
  targets.each_key do |target|
    results[target] ||= error_output(
      "Fetching output from #{target.safe_name} failed: #{e.class}: #{e.message}",
      'bolt/choria-result-processing-error'
    )
  end
  results
end

#stdin_pipe_command(target, data, command) ⇒ String

Build a command that pipes data to another command via stdin.

Parameters:

  • target (Bolt::Target)

    Used for platform detection

  • data (String)

    Data to pipe (typically JSON task arguments)

  • command (String)

    The command to receive stdin

Returns:

  • (String)

    Shell command with stdin piping



90
91
92
93
94
95
96
97
98
# File 'lib/bolt/transport/choria/command_builders.rb', line 90

def stdin_pipe_command(target, data, command)
  if windows_target?(target)
    # Use a here-string (@'...'@) to avoid escaping issues with
    # large JSON payloads. Content between @' and '@ is literal.
    "@'\n#{data}\n'@ | & #{command}"
  else
    "printf '%s' #{Shellwords.shellescape(data)} | #{command}"
  end
end

#target_count(targets) ⇒ Object



16
17
18
19
# File 'lib/bolt/transport/choria/helpers.rb', line 16

def target_count(targets)
  count = targets.is_a?(Hash) ? targets.size : targets.length
  "#{count} #{count == 1 ? 'target' : 'targets'}"
end

#task_file_spec(file, module_name, environment) ⇒ Hash

Build a file spec hash for the bolt_tasks download action. Computes the Puppet Server file_content URI based on the file’s module-relative path.

Parameters:

  • file (Hash)

    With ‘name’ (module-relative path) and ‘path’ (local absolute path)

  • module_name (String)

    Task’s module name (used for simple task files)

  • environment (String)

    Puppet environment name for the URI params

Returns:

  • (Hash)

    File spec with ‘filename’, ‘sha256’, ‘size_bytes’, and ‘uri’ keys



183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
# File 'lib/bolt/transport/choria/bolt_tasks.rb', line 183

def task_file_spec(file, module_name, environment)
  file_name = file['name']
  validate_file_name!(file_name)
  file_path = file['path']

  parts = file_name.split('/', 3)
  path = if parts.length == 3
           mod, subdir, rest = parts
           case subdir
           when 'files'
             "/puppet/v3/file_content/modules/#{mod}/#{rest}"
           when 'lib'
             "/puppet/v3/file_content/plugins/#{mod}/#{rest}"
           else
             "/puppet/v3/file_content/tasks/#{mod}/#{rest}"
           end
         else
           "/puppet/v3/file_content/tasks/#{module_name}/#{file_name}"
         end

  {
    'filename' => file_name,
    'sha256' => Digest::SHA256.file(file_path).hexdigest,
    'size_bytes' => File.size(file_path),
    'uri' => {
      'path' => path,
      'params' => { 'environment' => environment }
    }
  }
end

#unwrap_bolt_tasks_stdout(agent_stdout) ⇒ String?

Fix double-encoding in the bolt_tasks agent’s wrapper error path.

Normally, create_task_stdout returns a Hash and reply_task_status calls .to_json on it, producing a single JSON string like:

'{"_output":"hello world"}'

But for wrapper errors, create_task_stdout returns an already JSON-encoded String. reply_task_status still calls .to_json on it, encoding it a second time. The result is a JSON string whose value is itself a JSON string:

'"{\\"_error\\":{\\"kind\\":\\"choria.tasks/wrapper-error\\",...}}"'

We parse one layer of JSON. In the normal case, that produces a Hash and we return the original string. In the double-encoded case, it produces a String (the inner JSON), which we return so Result.for_task can parse it.

Parameters:

  • agent_stdout (String, nil)

    JSON-encoded stdout from the bolt_tasks agent

Returns:

  • (String, nil)

    JSON string suitable for Result.for_task



233
234
235
236
237
238
239
240
241
242
243
244
245
# File 'lib/bolt/transport/choria/bolt_tasks.rb', line 233

def unwrap_bolt_tasks_stdout(agent_stdout)
  return agent_stdout unless agent_stdout.is_a?(String)

  parsed = begin
    JSON.parse(agent_stdout)
  rescue JSON::ParserError
    return agent_stdout
  end

  # Normal case: parsed is a Hash, return the original JSON string.
  # Double-encoded case: parsed is a String (the inner JSON), return it.
  parsed.is_a?(String) ? parsed : agent_stdout
end

#upload(_target, _source, _destination, _options = {}, _position = []) ⇒ Object

Raises:



183
184
185
186
187
188
# File 'lib/bolt/transport/choria.rb', line 183

def upload(_target, _source, _destination, _options = {}, _position = [])
  raise Bolt::Error.new(
    'The Choria transport does not yet support upload.',
    'bolt/choria-unsupported-operation'
  )
end

#upload_and_start_task(targets, task, implementation, arguments, result_opts, &callback) ⇒ Hash

Upload task files and start execution for one implementation group.

Parameters:

  • targets (Array<Bolt::Target>)

    Targets sharing the same implementation

  • task (Bolt::Task)

    Task being executed

  • implementation (Hash)

    Task implementation with ‘path’, ‘name’, ‘input_method’, ‘files’ keys

  • arguments (Hash)

    Task parameter names to values

  • result_opts (Hash)

    Options passed through to emit_results (:action, :name, :position)

  • callback (Proc)

    Called with :node_start and :node_result events

Returns:

  • (Hash)

    with keys:

    • :failed_results [Array<Bolt::Result>] Error results from setup phase

    • :pending [Hash] Targets mapped to { handle: uuid } for polling

    • :tmpdir [String] Remote tmpdir path for cleanup



198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
# File 'lib/bolt/transport/choria/shell.rb', line 198

def upload_and_start_task(targets, task, implementation, arguments, result_opts, &callback)
  arguments = arguments.dup
  executable = implementation['path']
  input_method = implementation['input_method']
  extra_files = implementation['files']
  first_target = targets.first
  tmpdir = generate_tmpdir_path(first_target)

  executable_content = File.binread(executable)
  extra_file_contents = {}
  extra_files.each do |file|
    validate_file_name!(file['name'])
    extra_file_contents[file['name']] = File.binread(file['path'])
  end

  failed_results = []
  active_targets = targets.dup
  task_dir = tmpdir

  # Create the tmpdir
  failures = shell_run(active_targets,
                       make_dir_command(first_target, tmpdir),
                       description: 'mkdir tmpdir')
  failed_results += emit_results(failures, **result_opts, &callback)
  active_targets -= failures.keys

  # Tasks with extra files get a module-layout directory tree in
  # tmpdir, and _installdir is set so the task can find them.
  # Simple tasks go directly in tmpdir with no _installdir.
  if active_targets.any? && extra_files.any?
    arguments['_installdir'] = tmpdir
    task_dir = join_path(first_target, tmpdir, task.tasks_dir)

    # Create subdirectories for the task and its dependencies
    extra_dirs = extra_files.map { |file| join_path(first_target, tmpdir, File.dirname(file['name'])) }.uniq
    all_dirs = [task_dir] + extra_dirs
    failures = shell_run(active_targets,
                         make_dir_command(first_target, *all_dirs),
                         description: 'mkdir task dirs')
    failed_results += emit_results(failures, **result_opts, &callback)
    active_targets -= failures.keys

    # Upload each dependency file to its module-relative path
    extra_files.each do |file|
      break if active_targets.empty?

      failures = upload_file_content(active_targets, extra_file_contents[file['name']],
                                     join_path(first_target, tmpdir, file['name']))
      failed_results += emit_results(failures, **result_opts, &callback)
      active_targets -= failures.keys
    end
  end

  # Upload the main task executable
  remote_task_path = join_path(first_target, task_dir, File.basename(executable)) if active_targets.any?
  if remote_task_path
    failures = upload_file_content(active_targets, executable_content, remote_task_path)
    failed_results += emit_results(failures, **result_opts, &callback)
    active_targets -= failures.keys
  end

  # Make the task executable (no-op on Windows)
  chmod_cmd = make_executable_command(first_target, remote_task_path) if remote_task_path
  if active_targets.any? && chmod_cmd
    failures = shell_run(active_targets, chmod_cmd, description: 'chmod task')
    failed_results += emit_results(failures, **result_opts, &callback)
    active_targets -= failures.keys
  end

  # Start the task asynchronously
  pending = {}
  if active_targets.any? && remote_task_path
    full_cmd = build_task_command(first_target, remote_task_path, arguments, input_method,
                                  first_target.options['interpreters'])
    pending, start_failures = shell_start(active_targets, full_cmd)
    failed_results += emit_results(start_failures, **result_opts, &callback)
  end

  { failed_results: failed_results, pending: pending, tmpdir: tmpdir }
end

#upload_file_command(target, content_b64, dest) ⇒ String

Build a command that writes base64-encoded content to a file after decoding the content. Requires base64 CLI on POSIX targets.

Parameters:

  • target (Bolt::Target)

    Used for platform detection

  • content_b64 (String)

    Base64-encoded file content

  • dest (String)

    Absolute destination path on the remote node

Returns:

  • (String)

    Shell command



53
54
55
56
57
58
59
60
# File 'lib/bolt/transport/choria/command_builders.rb', line 53

def upload_file_command(target, content_b64, dest)
  if windows_target?(target)
    "[IO.File]::WriteAllBytes('#{ps_escape(dest)}', " \
      "[Convert]::FromBase64String('#{content_b64}'))"
  else
    "printf '%s' #{Shellwords.shellescape(content_b64)} | base64 -d > #{Shellwords.shellescape(dest)}"
  end
end

#upload_file_content(targets, content, destination) ⇒ Hash{Bolt::Target => Hash}

Upload file content to the same path on multiple targets via base64. The entire file is base64-encoded and sent as a single RPC message, so file size is limited by the NATS max message size (default 1MB, configurable via plugin.choria.network.client_max_payload in the Choria broker config). Base64 adds ~33% overhead, so the effective file size limit is roughly 750KB with default settings. Once the file-transfer agent is implemented, we’ll use chunked transfers via that agent instead when it’s available, removing this size limitation.

Parameters:

  • targets (Array<Bolt::Target>)

    Targets to upload to

  • content (String)

    Raw file content (binary-safe)

  • destination (String)

    Absolute path on the remote node

Returns:

  • (Hash{Bolt::Target => Hash})

    Failures only; empty hash means all succeeded



326
327
328
329
330
331
# File 'lib/bolt/transport/choria/shell.rb', line 326

def upload_file_content(targets, content, destination)
  logger.debug { "Uploading #{content.bytesize} bytes to #{destination} on #{target_count(targets)}" }
  encoded = Base64.strict_encode64(content)
  command = upload_file_command(targets.first, encoded, destination)
  shell_run(targets, command, description: "upload #{destination}")
end

#validate_env_key!(key, context) ⇒ Object

Validate an environment variable key is safe for shell interpolation.

Parameters:

  • key (String)

    Environment variable name to validate

  • context (String)

    Description for error messages

Raises:



185
186
187
188
189
190
191
192
193
194
# File 'lib/bolt/transport/choria/helpers.rb', line 185

def validate_env_key!(key, context)
  safe_pattern = /\A[A-Za-z_][A-Za-z0-9_]*\z/
  return if safe_pattern.match?(key)

  raise Bolt::Error.new(
    "Unsafe environment variable name '#{key}' in #{context}. " \
    "Names must match #{safe_pattern.source}",
    'bolt/invalid-env-var-name'
  )
end

#validate_file_name!(name) ⇒ Object

Validate that a file name does not contain path traversal sequences or absolute paths. Checks both POSIX and Windows conventions. Raises Bolt::Error on violations.

Parameters:

  • name (String)

    Task file name to validate



158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
# File 'lib/bolt/transport/choria/helpers.rb', line 158

def validate_file_name!(name)
  if name.include?("\0")
    raise Bolt::Error.new(
      "Invalid null byte in task file name: #{name.inspect}",
      'bolt/invalid-task-filename'
    )
  end

  if name.start_with?('/') || name.match?(WINDOWS_PATH_REGEX)
    raise Bolt::Error.new(
      "Absolute path not allowed in task file name: '#{name}'",
      'bolt/invalid-task-filename'
    )
  end

  if name.split(%r{[/\\]}).include?('..')
    raise Bolt::Error.new(
      "Path traversal detected in task file name: '#{name}'",
      'bolt/path-traversal'
    )
  end
end

#wait_for_shell_results(pending, timeout) ⇒ Hash{Bolt::Target => Hash}

Wait for async shell handles to complete, fetch their output via shell_statuses, and kill timed-out processes.

Parameters:

  • pending (Hash{Bolt::Target => Hash})

    Targets to poll, each mapped to { handle: uuid_string }

  • timeout (Numeric)

    Maximum seconds to wait before killing remaining processes

Returns:

  • (Hash{Bolt::Target => Hash})

    Output hash for every target (success and error)



365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
# File 'lib/bolt/transport/choria/shell.rb', line 365

def wait_for_shell_results(pending, timeout)
  return {} if pending.empty?

  poll_result = poll_with_retries(pending, timeout, 'shell.list') do |remaining|
    completed, rpc_failed = shell_list(remaining)
    next { rpc_failed: true, done: {} } if rpc_failed

    done = {}
    fetch_targets = {}
    completed.each do |target, value|
      if value[:error]
        done[target] = value
      else
        fetch_targets[target] = value
      end
    end

    unless fetch_targets.empty?
      logger.debug { "Fetching output from #{target_count(fetch_targets)}" }
      fetched = shell_statuses(fetch_targets)
      fetch_targets.each_key do |target|
        done[target] = fetched[target] || error_output(
          "Command completed on #{target.safe_name} but output could not be fetched",
          'bolt/choria-result-processing-error'
        )
      end
    end

    { rpc_failed: false, done: done }
  end

  remaining_errors = {}
  unless poll_result[:remaining].empty?
    if poll_result[:rpc_persistent_failure]
      poll_result[:remaining].each_key do |target|
        remaining_errors[target] = error_output(
          "RPC requests to poll shell status on #{target.safe_name} failed persistently",
          'bolt/choria-poll-failed'
        )
      end
    else
      kill_timed_out_processes(poll_result[:remaining])
      poll_result[:remaining].each_key do |target|
        remaining_errors[target] = error_output(
          "Command timed out after #{timeout} seconds on #{target.safe_name}",
          'bolt/choria-command-timeout'
        )
      end
    end
  end

  poll_result[:completed].merge(remaining_errors)
end

#windows_target?(target) ⇒ Boolean

Returns:

  • (Boolean)


35
36
37
# File 'lib/bolt/transport/choria/agent_discovery.rb', line 35

def windows_target?(target)
  @agent_cache[choria_identity(target)]&.dig(:os) == 'windows'
end