Class: Bolt::Transport::Choria
- Defined in:
- lib/bolt/transport/choria.rb,
lib/bolt/transport/choria/shell.rb,
lib/bolt/transport/choria/client.rb,
lib/bolt/transport/choria/helpers.rb,
lib/bolt/transport/choria/bolt_tasks.rb,
lib/bolt/transport/choria/agent_discovery.rb,
lib/bolt/transport/choria/command_builders.rb
Overview
Choria transport for OpenBolt. Communicates with nodes via Choria’s NATS pub/sub messaging infrastructure using the choria-mcorpc-support gem as the client library. Extends Transport::Base directly (not Simple) because Choria’s pub/sub model doesn’t fit the persistent connection/shell abstraction that Simple assumes.
Available capabilities depend on which agents are installed on the target node:
bolt_tasks agent only: Only run_task works, via the bolt_tasks agent
which downloads task files from an OpenVox/Puppet Server and executes
them via task_wrapper. All other operations fail with an actionable
error directing the user to install the shell agent.
shell agent installed (>= 1.2.1): run_command, run_script, and
run_task work. run_task uses the bolt_tasks agent by default.
To run local tasks via the shell agent, set task-agent to 'shell'
in project config or specify --choria-task-agent shell.
Upload, download, and plans are not yet supported.
Constant Summary collapse
- SHELL_DONE_STATUSES =
Terminal shell job statuses that indicate the process has finished.
%w[stopped failed].freeze
- RPC_FAILURE_RETRIES =
Number of consecutive RPC poll failures before giving up and marking all remaining targets as failed. Used by both polling loops (poll_task_status and wait_for_shell_results).
3- POLL_INTERVAL_SECONDS =
Polling interval between rounds, used by poll_task_status and wait_for_shell_results. Each round makes one batched RPC call regardless of target count, so a 1-second interval balances responsiveness against broker load.
1- WINDOWS_PATH_REGEX =
Matches Windows absolute paths like C:temp or D:/foo. Used by validate_file_name! and Config::Transport::Choria#absolute_path?.
%r{\A[A-Za-z]:[\\/]}- SHELL_MIN_VERSION =
'1.2.1'- AGENT_MIN_VERSIONS =
{ 'shell' => SHELL_MIN_VERSION }.freeze
Instance Attribute Summary
Attributes inherited from Base
Instance Method Summary collapse
-
#batch_command(targets, command, options = {}, position = [], &callback) ⇒ Array<Bolt::Result>
Run a command on targets via the shell agent.
-
#batch_connected?(targets) ⇒ Boolean
Override batch_connected? to check all targets in one RPC call.
-
#batch_script(targets, script, arguments, options = {}, position = [], &callback) ⇒ Array<Bolt::Result>
Run a script on targets via the shell agent.
-
#batch_task(targets, task, arguments, _options = {}, position = [], &callback) ⇒ Array<Bolt::Result>
Override batch_task to handle multiple targets in one thread using the RPC.
-
#batch_task_with(targets, task, target_mapping, options = {}, position = [], &callback) ⇒ Array<Bolt::Result>
Override batch_task_with for per-target arguments.
-
#batches(targets) ⇒ Array<Array<Bolt::Target>>
Group targets by collective so each batch uses a single RPC client scope.
-
#build_result(target, data, action:, name:, position:) ⇒ Bolt::Result
Build a Bolt::Result from an output hash.
-
#build_task_command(target, remote_task_path, arguments, input_method, interpreter_options) ⇒ String
Build the full command string for task execution via the shell agent, handling interpreter selection, environment variable injection, and stdin piping.
-
#choria_identity(target) ⇒ Object
Returns the Choria node identity for a target.
-
#cleanup_dir_command(target, path) ⇒ String
Build a recursive directory removal command.
-
#cleanup_tmpdir(targets, tmpdir) ⇒ Object
Clean up a remote tmpdir on targets, logging per-target failures.
-
#collective_for(target) ⇒ Object
Returns the collective for a target, used by batches() to group targets.
-
#configure_client(target) ⇒ Object
One-time setup of the local MCollective client connection to the NATS broker.
-
#create_rpc_client(agent_name, targets, timeout) ⇒ MCollective::RPC::Client
Create an MCollective::RPC::Client for one or more targets.
-
#discover_agent_list(targets) ⇒ Object
Discover available agents on targets via rpcutil.agent_inventory and populate @agent_cache with agent lists.
-
#discover_agents(targets) ⇒ Object
Discover agents and detect OS on targets via two batched RPC calls (agent_inventory for agents+versions, get_fact for os.family).
-
#discover_os_family(targets) ⇒ Object
Detect the OS family on targets via rpcutil.get_fact and update.
- #download(_target, _source, _destination, _options = {}, _position = []) ⇒ Object
-
#download_and_start_task(targets, task, implementation, arguments, result_opts, &callback) ⇒ Hash
Download task files from the server and start execution for one implementation group via bolt_tasks.download and bolt_tasks.run_no_wait.
-
#emit_results(target_outputs, action:, name:, position:, fire_node_start: false, &callback) ⇒ Array<Bolt::Result>
Convert a hash of { target => output } into Results, fire callbacks, and return the Results array.
-
#envify_params(params) ⇒ Hash{String => String}
Convert task arguments to PT_-prefixed environment variable hash.
-
#error_output(message, kind, stdout: nil, stderr: nil, exitcode: 1) ⇒ Object
Build an error output hash.
-
#escape_arg(target, str) ⇒ String
Escape a string for use as a shell argument on the target platform.
-
#exitcode_from(data, target, context) ⇒ Integer
Extract exit code from RPC response data, defaulting to 1 with a warning if the agent returned nil.
-
#extract_task_output(data, target) ⇒ Hash
Extract stdout, stderr, and exitcode from a bolt_tasks task_status response.
-
#filter_agents(sender, agent_list) ⇒ Array<String>?
Filter out agents that don’t meet minimum version requirements.
-
#generate_tmpdir_path(target) ⇒ String
Generate a unique remote tmpdir path for batch operations.
- #has_agent?(target, agent_name) ⇒ Boolean
-
#index_results_by_sender(results, targets, context) ⇒ Hash{String => Hash}
Index RPC results by sender, keeping only the first response per sender and only from the set of expected identities.
-
#initialize ⇒ Choria
constructor
A new instance of Choria.
-
#join_path(target, *parts) ⇒ String
Join path segments using the target platform’s separator.
-
#kill_timed_out_processes(targets) ⇒ Object
Kill processes on timed-out targets.
-
#make_dir_command(target, *paths) ⇒ String
Build a mkdir command for one or more directories.
-
#make_executable_command(target, path) ⇒ String?
Build a chmod +x command.
- #meets_min_version?(version, min_version) ⇒ Boolean
-
#output(stdout: nil, stderr: nil, exitcode: nil) ⇒ Object
Build an output hash from command/task output.
-
#poll_task_status(targets, task_id, task) ⇒ Hash{Bolt::Target => Hash}
Poll bolt_tasks.task_status until all targets complete or timeout.
-
#poll_with_retries(targets, timeout, context) ⇒ Hash
Shared polling loop for bolt_tasks and shell polling.
-
#powershell_cmd(script) ⇒ String
Wrap a PowerShell script for execution via shell agent.
-
#prepare_targets(targets, agent_name, result_opts, &callback) ⇒ Array
Configure the client, discover agents, partition targets by agent availability, and emit errors for incapable targets.
-
#prepend_env_vars(target, command, env_vars, context) ⇒ String
Prepend environment variables to a command string.
-
#provided_features ⇒ Object
Advertise both shell and powershell so tasks with either requirement can be selected.
-
#ps_escape(str) ⇒ String
Escape single quotes for use inside PowerShell single-quoted strings.
-
#rpc_request(agent, targets, context) {|MCollective::RPC::Client| ... } ⇒ Hash
Make a batched RPC call and split results into responded and errors.
-
#run_task_via_bolt_tasks(targets, task, arguments, result_opts, &callback) ⇒ Array<Bolt::Result>
Run a task via the bolt_tasks agent.
-
#run_task_via_shell(targets, task, arguments, result_opts, &callback) ⇒ Array<Bolt::Result>
Run a task via the shell agent.
-
#select_implementation(target, task) ⇒ Hash
Override to select task implementation based on the target’s OS.
-
#shell_list(remaining) ⇒ Array
One round of the shell.list RPC action to check which handles have completed.
-
#shell_run(targets, command, description: nil) ⇒ Hash{Bolt::Target => Hash}
Execute a synchronous command on targets via the shell.run RPC action.
-
#shell_start(targets, command) ⇒ Array
Start an async command on targets via the shell.start RPC action.
-
#shell_statuses(targets) ⇒ Hash{Bolt::Target => Hash}
Fetch stdout/stderr/exitcode from completed targets via the shell.statuses RPC action.
-
#stdin_pipe_command(target, data, command) ⇒ String
Build a command that pipes data to another command via stdin.
- #target_count(targets) ⇒ Object
-
#task_file_spec(file, module_name, environment) ⇒ Hash
Build a file spec hash for the bolt_tasks download action.
-
#unwrap_bolt_tasks_stdout(agent_stdout) ⇒ String?
Fix double-encoding in the bolt_tasks agent’s wrapper error path.
- #upload(_target, _source, _destination, _options = {}, _position = []) ⇒ Object
-
#upload_and_start_task(targets, task, implementation, arguments, result_opts, &callback) ⇒ Hash
Upload task files and start execution for one implementation group.
-
#upload_file_command(target, content_b64, dest) ⇒ String
Build a command that writes base64-encoded content to a file after decoding the content.
-
#upload_file_content(targets, content, destination) ⇒ Hash{Bolt::Target => Hash}
Upload file content to the same path on multiple targets via base64.
-
#validate_env_key!(key, context) ⇒ Object
Validate an environment variable key is safe for shell interpolation.
-
#validate_file_name!(name) ⇒ Object
Validate that a file name does not contain path traversal sequences or absolute paths.
-
#wait_for_shell_results(pending, timeout) ⇒ Hash{Bolt::Target => Hash}
Wait for async shell handles to complete, fetch their output via shell_statuses, and kill timed-out processes.
- #windows_target?(target) ⇒ Boolean
Methods inherited from Base
#assert_batch_size_one, #batch_download, #batch_upload, #connected?, #default_input_method, #run_command, #run_script, #run_task, #select_interpreter, #unwrap_sensitive_args, #with_events
Constructor Details
#initialize ⇒ Choria
Returns a new instance of Choria.
34 35 36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/bolt/transport/choria.rb', line 34 def initialize super @config_mutex = Mutex.new @config_error = nil @client_configured = false # Serializes RPC calls across batch threads. See the comment on # rpc_request in helpers.rb for why this is necessary. @rpc_mutex = Mutex.new # Multiple batch threads write to this map concurrently when we # have more than one collective. @agent_cache = Concurrent::Map.new @default_collective = nil end |
Instance Method Details
#batch_command(targets, command, options = {}, position = [], &callback) ⇒ Array<Bolt::Result>
Run a command on targets via the shell agent. Assumes all targets in the batch are the same platform (POSIX or Windows). Mixed-platform batches use the first capable target’s platform for command syntax.
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
# File 'lib/bolt/transport/choria/shell.rb', line 19 def batch_command(targets, command, = {}, position = [], &callback) result_opts = { action: 'command', name: command, position: position } shell_targets, results = prepare_targets(targets, 'shell', result_opts, &callback) return results if shell_targets.empty? logger.debug { "Running command via shell agent on #{target_count(shell_targets)}" } first_target = shell_targets.first timeout = first_target.['command-timeout'] command = prepend_env_vars(first_target, command, [:env_vars], 'run_command env_vars') shell_targets.each { |target| callback&.call(type: :node_start, target: target) } pending, start_failures = shell_start(shell_targets, command) results += emit_results(start_failures, **result_opts, &callback) results += emit_results(wait_for_shell_results(pending, timeout), **result_opts, &callback) results end |
#batch_connected?(targets) ⇒ Boolean
Override batch_connected? to check all targets in one RPC call. Only used for wait_until_available in plans.
167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 |
# File 'lib/bolt/transport/choria.rb', line 167 def batch_connected?(targets) logger.debug { "Checking connectivity for #{target_count(targets)}" } first_target = targets.first configure_client(first_target) response = rpc_request('rpcutil', targets, 'rpcutil.ping') do |client| client.ping end response[:responded].length == targets.length rescue StandardError => e raise if e.is_a?(Bolt::Error) logger.warn { "Batch connectivity check failed: #{e.class}: #{e.}" } false end |
#batch_script(targets, script, arguments, options = {}, position = [], &callback) ⇒ Array<Bolt::Result>
Run a script on targets via the shell agent. Assumes all targets in the batch are the same platform (POSIX or Windows). Mixed-platform batches use the first capable target’s platform for infrastructure commands (mkdir, upload, chmod, cleanup).
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 |
# File 'lib/bolt/transport/choria/shell.rb', line 51 def batch_script(targets, script, arguments, = {}, position = [], &callback) result_opts = { action: 'script', name: script, position: position } shell_targets, results = prepare_targets(targets, 'shell', result_opts, &callback) return results if shell_targets.empty? logger.debug { "Running script via shell agent on #{target_count(shell_targets)}" } first_target = shell_targets.first arguments = unwrap_sensitive_args(arguments) timeout = first_target.['command-timeout'] tmpdir = generate_tmpdir_path(first_target) script_content = File.binread(script) shell_targets.each { |target| callback&.call(type: :node_start, target: target) } begin remote_path = join_path(first_target, tmpdir, File.basename(script)) active_targets = shell_targets.dup # Create a temp directory with restricted permissions failures = shell_run(active_targets, make_dir_command(first_target, tmpdir), description: 'mkdir tmpdir') results += emit_results(failures, **result_opts, &callback) active_targets -= failures.keys # Upload the script file if active_targets.any? failures = upload_file_content(active_targets, script_content, remote_path) results += emit_results(failures, **result_opts, &callback) active_targets -= failures.keys end # Make the script executable (no-op on Windows) chmod_cmd = make_executable_command(first_target, remote_path) if active_targets.any? && chmod_cmd failures = shell_run(active_targets, chmod_cmd, description: 'chmod script') results += emit_results(failures, **result_opts, &callback) active_targets -= failures.keys end # Execute the script asynchronously and poll for completion if active_targets.any? interpreter = select_interpreter(script, first_target.['interpreters']) cmd_parts = [] cmd_parts += Array(interpreter).map { |part| escape_arg(first_target, part) } if interpreter && [:script_interpreter] cmd_parts << escape_arg(first_target, remote_path) cmd_parts += arguments.map { |arg| escape_arg(first_target, arg) } pending, start_failures = shell_start(active_targets, cmd_parts.join(' ')) results += emit_results(start_failures, **result_opts, &callback) results += emit_results(wait_for_shell_results(pending, timeout), **result_opts, &callback) end ensure cleanup_tmpdir(shell_targets, tmpdir) end results end |
#batch_task(targets, task, arguments, _options = {}, position = [], &callback) ⇒ Array<Bolt::Result>
Override batch_task to handle multiple targets in one thread using the RPC. Implementation grouping (mixed-platform support) is handled internally by run_task_via_bolt_tasks and run_task_via_shell.
99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 |
# File 'lib/bolt/transport/choria.rb', line 99 def batch_task(targets, task, arguments, = {}, position = [], &callback) chosen_agent = targets.first.['task-agent'] || 'bolt_tasks' result_opts = { action: 'task', name: task.name, position: position } # The results var here is the error results for incapable targets, to which we'll add in # the successful results from the capable targets as we go. capable, results = prepare_targets(targets, chosen_agent, result_opts, &callback) logger.debug { "Task #{task.name} routing: agent: #{chosen_agent}, #{capable.size} capable / #{targets.size - capable.size} incapable" } unless capable.empty? capable.each { |target| callback&.call(type: :node_start, target: target) } arguments = unwrap_sensitive_args(arguments) results += case chosen_agent when 'bolt_tasks' run_task_via_bolt_tasks(capable, task, arguments, result_opts, &callback) when 'shell' run_task_via_shell(capable, task, arguments, result_opts, &callback) else raise Bolt::Error.new( "Unsupported task-agent '#{chosen_agent}'", 'bolt/choria-unsupported-agent' ) end end results end |
#batch_task_with(targets, task, target_mapping, options = {}, position = [], &callback) ⇒ Array<Bolt::Result>
Override batch_task_with for per-target arguments. Only called from the run_task_with Puppet plan function (no CLI or Ruby API path uses this). Discovery is batched upfront, but execution is sequential per-target because MCollective RPC calls send the same arguments to all targets. A future optimization could batch the download/infra-setup/polling steps while keeping only the start step per-target.
THIS IS NOT YET READY FOR PRODUCTION. The API is stable, but we don’t yet have full plan support and this runs the task sequentially across targets, which is very inefficient. It had to be implemented now, though, in order to prevent the assert_batch_size_one from the Base interface from blowing things up.
150 151 152 153 154 155 156 157 158 159 160 |
# File 'lib/bolt/transport/choria.rb', line 150 def batch_task_with(targets, task, target_mapping, = {}, position = [], &callback) # Pre-warm the agent cache so individual batch_task calls are cache hits configure_client(targets.first) discover_agents(targets) results = [] targets.each do |target| results += batch_task([target], task, target_mapping[target], , position, &callback) end results end |
#batches(targets) ⇒ Array<Array<Bolt::Target>>
Group targets by collective so each batch uses a single RPC client scope. MCollective RPC calls are published to a collective-specific NATS subject, so targets in different collectives must be in separate batches. Most deployments have one collective, yielding one batch. Bolt runs each batch in its own thread and @rpc_mutex serializes the RPC calls across threads to prevent response misrouting.
81 82 83 84 85 86 |
# File 'lib/bolt/transport/choria.rb', line 81 def batches(targets) # Populates @default_collective from the Choria config so targets # without an explicit collective are grouped correctly. configure_client(targets.first) targets.group_by { |target| collective_for(target) }.values end |
#build_result(target, data, action:, name:, position:) ⇒ Bolt::Result
Build a Bolt::Result from an output hash. Handles both success and error cases based on the presence of the :error key.
79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 |
# File 'lib/bolt/transport/choria/helpers.rb', line 79 def build_result(target, data, action:, name:, position:) if data[:error] Bolt::Result.from_exception( target, Bolt::Error.new(data[:error], data[:error_kind]), action: action, position: position ) elsif action == 'task' Bolt::Result.for_task(target, data[:stdout], data[:stderr], data[:exitcode], name, position) elsif %w[command script].include?(action) Bolt::Result.for_command( target, { 'stdout' => data[:stdout], 'stderr' => data[:stderr], 'exit_code' => data[:exitcode] }, action, name, position ) else raise Bolt::Error.new( "Unknown action '#{action}' in build_result", 'bolt/choria-unknown-action' ) end end |
#build_task_command(target, remote_task_path, arguments, input_method, interpreter_options) ⇒ String
Build the full command string for task execution via the shell agent, handling interpreter selection, environment variable injection, and stdin piping.
150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 |
# File 'lib/bolt/transport/choria/command_builders.rb', line 150 def build_task_command(target, remote_task_path, arguments, input_method, ) interpreter = select_interpreter(remote_task_path, ) cmd = interpreter ? "#{Array(interpreter).map { |part| escape_arg(target, part) }.join(' ')} #{escape_arg(target, remote_task_path)}" : escape_arg(target, remote_task_path) needs_env = Bolt::Task::ENVIRONMENT_METHODS.include?(input_method) needs_stdin = Bolt::Task::STDIN_METHODS.include?(input_method) if needs_env && needs_stdin && windows_target?(target) # On Windows, piping stdin into a multi-statement command # requires a script block. Pipeline data doesn't automatically # flow through a script block to inner commands, so we # explicitly forward $input via a pipe. env_params = envify_params(arguments) env_params.each_key { |key| validate_env_key!(key, 'task argument') } set_stmts = env_params.map { |key, val| "$env:#{key} = '#{ps_escape(val)}'" } cmd = stdin_pipe_command(target, arguments.to_json, "{ #{set_stmts.join('; ')}; $input | & #{cmd} }") else if needs_env cmd = prepend_env_vars(target, cmd, envify_params(arguments), 'task argument') end if needs_stdin cmd = stdin_pipe_command(target, arguments.to_json, cmd) end end cmd end |
#choria_identity(target) ⇒ Object
Returns the Choria node identity for a target. Uses the transport ‘host’ config if set, falling back to target.host (which Bolt derives from the URI or target name).
200 201 202 |
# File 'lib/bolt/transport/choria.rb', line 200 def choria_identity(target) target.['host'] || target.host end |
#cleanup_dir_command(target, path) ⇒ String
Build a recursive directory removal command.
40 41 42 43 44 |
# File 'lib/bolt/transport/choria/command_builders.rb', line 40 def cleanup_dir_command(target, path) windows_target?(target) ? "Remove-Item -Recurse -Force -Path '#{ps_escape(path)}'" : "rm -rf #{Shellwords.shellescape(path)}" end |
#cleanup_tmpdir(targets, tmpdir) ⇒ Object
Clean up a remote tmpdir on targets, logging per-target failures. Used in ensure blocks after batch_script and batch_task_shell.
127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 |
# File 'lib/bolt/transport/choria/shell.rb', line 127 def cleanup_tmpdir(targets, tmpdir) return unless targets.first..fetch('cleanup', true) unless File.basename(tmpdir).start_with?('bolt-choria-') logger.warn { "Refusing to delete unexpected tmpdir path: #{tmpdir}" } return end begin failures = shell_run(targets, cleanup_dir_command(targets.first, tmpdir), description: 'cleanup tmpdir') failures.each do |target, failure| logger.warn { "Cleanup failed on #{target.safe_name}. Task data may remain in #{tmpdir}. #{failure[:error]}" } end rescue StandardError => e logger.warn { "Cleanup of #{tmpdir} failed on all targets: #{e.}" } end end |
#collective_for(target) ⇒ Object
Returns the collective for a target, used by batches() to group targets. Falls back to the default collective from the loaded config.
206 207 208 |
# File 'lib/bolt/transport/choria.rb', line 206 def collective_for(target) target.['collective'] || @default_collective end |
#configure_client(target) ⇒ Object
One-time setup of the local MCollective client connection to the NATS broker. MCollective::Config.loadconfig must only be called once since it loads plugins via PluginManager.loadclass, and a second call raises “Plugin already loaded”.
The @client_configured flag is checked twice: once before taking the mutex (fast path to avoid lock overhead on every call after setup) and once inside (handles the race where two batch threads both see false simultaneously and try to configure concurrently).
This function is idempotent, so it should be called before any operation that needs the client connection to ensure it is configured correctly.
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 |
# File 'lib/bolt/transport/choria/client.rb', line 26 def configure_client(target) return if @client_configured @config_mutex.synchronize do return if @client_configured # If a previous attempt failed after partially initializing # MCollective (e.g., plugins loaded but NATS connector failed), # retrying loadconfig would hit "Plugin already loaded" errors. # Re-raise the original error so the caller gets a clear message. raise @config_error if @config_error # We do the require here because this is a pretty meaty library, and # no need to load it when OpenBolt starts up if the user isn't using # the Choria transport. require 'mcollective' opts = target. config = MCollective::Config.instance unless config.configured config_file = opts['config-file'] || MCollective::Util.config_file_for_user unless File.readable?(config_file) msg = if opts['config-file'] "Choria config file not found or not readable: #{config_file}" else "Could not find a readable Choria client config file. " \ "Searched: #{MCollective::Util.config_paths_for_user.join(', ')}. " \ "Set the 'config-file' option in the Choria transport configuration." end raise Bolt::Error.new(msg, 'bolt/choria-config-not-found') end begin config.loadconfig(config_file) rescue StandardError => e @config_error = Bolt::Error.new( "Choria client configuration failed: #{e.class}: #{e.}", 'bolt/choria-config-failed' ) raise @config_error end logger.debug { "Loaded Choria client config from #{config_file}" } end if opts['mcollective-certname'] ENV['MCOLLECTIVE_CERTNAME'] = opts['mcollective-certname'] logger.debug { "MCOLLECTIVE_CERTNAME set to #{opts['mcollective-certname']}" } end if opts['brokers'] brokers = Array(opts['brokers']).map { |broker| broker.include?(':') ? broker : "#{broker}:4222" } config.pluginconf['choria.middleware_hosts'] = brokers.join(',') logger.debug { "Choria brokers overridden: #{brokers.join(', ')}" } end if opts['ssl-ca'] && opts['ssl-cert'] && opts['ssl-key'] unreadable = %w[ssl-ca ssl-cert ssl-key].find { |key| !File.readable?(opts[key]) } if unreadable raise Bolt::Error.new( "File for #{unreadable} is not readable: #{opts[unreadable]}", 'bolt/choria-config-failed' ) end config.pluginconf['security.provider'] = 'file' config.pluginconf['security.file.ca'] = opts['ssl-ca'] config.pluginconf['security.file.certificate'] = opts['ssl-cert'] config.pluginconf['security.file.key'] = opts['ssl-key'] logger.debug { "Using file-based TLS security provider with given SSL override(s)" } end @default_collective = config.main_collective @client_configured = true end end |
#create_rpc_client(agent_name, targets, timeout) ⇒ MCollective::RPC::Client
Create an MCollective::RPC::Client for one or more targets. Accepts a single target or an array. Uses MCollective’s direct addressing mode (client.discover(nodes:)) to skip broadcast discovery and send requests directly to the specified nodes.
Note that when the client is created, if the shell agent isn’t already installed on the OpenBolt controller node, then the shell DDL that we bundle with OpenBolt at lib/mcollective/agent/shell.ddl automatically gets loaded since it’s on the $LOAD_PATH and in the right place for MCollective’s plugin loading. The bolt_tasks DDL is already included in the choria-mcorpc-support gem.
119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 |
# File 'lib/bolt/transport/choria/client.rb', line 119 def create_rpc_client(agent_name, targets, timeout) targets = Array(targets) = MCollective::Util. [:timeout] = timeout [:verbose] = false [:connection_timeout] = targets.first.['broker-timeout'] collective = collective_for(targets.first) [:collective] = collective if collective client = MCollective::RPC::Client.new(agent_name, options: ) client.progress = false identities = targets.map { |target| choria_identity(target) }.uniq client.discover(nodes: identities) client end |
#discover_agent_list(targets) ⇒ Object
Discover available agents on targets via rpcutil.agent_inventory and populate @agent_cache with agent lists.
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/bolt/transport/choria/agent_discovery.rb', line 43 def discover_agent_list(targets) response = rpc_request('rpcutil', targets, 'rpcutil.agent_inventory') do |client| client.agent_inventory end response[:errors].each { |target, output| logger.debug { "agent_inventory failed for #{target.safe_name}: #{output[:error]}" } } response[:responded].each do |target, data| sender = choria_identity(target) agents = filter_agents(sender, data[:agents]) unless agents logger.warn { "Unexpected agent_inventory response from #{sender}. This target will be treated as unreachable." } next end @agent_cache[sender] = { agents: agents } logger.debug { "Discovered agents on #{sender}: #{agents.join(', ')}" } end rescue StandardError => e raise if e.is_a?(Bolt::Error) logger.warn { "Agent discovery failed: #{e.class}: #{e.}" } end |
#discover_agents(targets) ⇒ Object
Discover agents and detect OS on targets via two batched RPC calls (agent_inventory for agents+versions, get_fact for os.family). Populates @agent_cache with { agents: […], os: ‘redhat’ | ‘windows’ | … }.
17 18 19 20 21 22 23 24 25 26 27 28 29 |
# File 'lib/bolt/transport/choria/agent_discovery.rb', line 17 def discover_agents(targets) uncached = targets.reject { |target| @agent_cache.key?(choria_identity(target)) } return if uncached.empty? logger.debug { "Discovering agents on #{target_count(uncached)}" } discover_agent_list(uncached) discover_os_family(uncached) uncached.each do |target| identity = choria_identity(target) logger.warn { "No response from #{identity} during agent discovery" } unless @agent_cache.key?(identity) end end |
#discover_os_family(targets) ⇒ Object
Detect the OS family on targets via rpcutil.get_fact and update
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/bolt/transport/choria/agent_discovery.rb', line 69 def discover_os_family(targets) # Only fetch OS for targets that responded to agent_inventory responded = targets.select { |target| @agent_cache.key?(choria_identity(target)) } return if responded.empty? response = rpc_request('rpcutil', responded, 'rpcutil.get_fact') do |client| client.get_fact(fact: 'os.family') end response[:errors].each { |target, output| logger.warn { "OS detection failed for #{target.safe_name}: #{output[:error]}. Defaulting to POSIX command syntax." } } response[:responded].each do |target, data| sender = choria_identity(target) os_family = data[:value].to_s.downcase if os_family.empty? logger.warn { "os.family fact is empty on #{sender}. Defaulting to POSIX command syntax." } next end @agent_cache[sender][:os] = os_family logger.debug { "Detected OS on #{sender}: #{os_family}" } end rescue StandardError => e raise if e.is_a?(Bolt::Error) logger.warn { "OS detection failed: #{e.class}: #{e.}. Defaulting to POSIX command syntax." } end |
#download(_target, _source, _destination, _options = {}, _position = []) ⇒ Object
190 191 192 193 194 195 |
# File 'lib/bolt/transport/choria.rb', line 190 def download(_target, _source, _destination, = {}, _position = []) raise Bolt::Error.new( 'The Choria transport does not yet support download.', 'bolt/choria-unsupported-operation' ) end |
#download_and_start_task(targets, task, implementation, arguments, result_opts, &callback) ⇒ Hash
Download task files from the server and start execution for one implementation group via bolt_tasks.download and bolt_tasks.run_no_wait.
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 |
# File 'lib/bolt/transport/choria/bolt_tasks.rb', line 55 def download_and_start_task(targets, task, implementation, arguments, result_opts, &callback) environment = targets.first.['puppet-environment'] input_method = implementation['input_method'] impl_files = [{ 'name' => File.basename(implementation['name']), 'path' => implementation['path'] }] + (implementation['files'] || []) file_specs_json = impl_files.map { |file| task_file_spec(file, task.module_name, environment) }.to_json # The failed_results reference will get updated and if we ever end up without # any targets left to act on, we can return it immediately. failed_results = [] none_started_result = { failed_results: failed_results, targets: [], task_id: nil } # Download task files logger.debug { "Downloading task #{task.name} files via bolt_tasks to #{target_count(targets)}" } response = rpc_request('bolt_tasks', targets, 'bolt_tasks.download') do |client| client.download(task: task.name, files: file_specs_json, environment: environment) end # The bolt_tasks agent uses reply.fail! with statuscode 1 for download # failures, which rpc_request routes to :responded since statuscode 0-1 # means the action completed. Check rpc_statuscodes to catch these and # report the download failure clearly instead of letting run_no_wait # fail with a confusing "task not available" error. dl_errors = response[:errors] response[:rpc_statuscodes].each do |target, code| next if code.zero? || dl_errors.key?(target) dl_errors[target] = error_output( "bolt_tasks.download on #{target.safe_name} failed to download task files", 'bolt/choria-download-failed' ) end # Must use concat rather than += to preserve reference to failed_results for early return failed_results.concat(emit_results(dl_errors, **result_opts, &callback)) remaining = response[:responded].keys - dl_errors.keys return none_started_result if remaining.empty? # Start task execution logger.debug { "Starting task #{task.name} on #{target_count(remaining)}" } response = rpc_request('bolt_tasks', remaining, 'bolt_tasks.run_no_wait') do |client| client.run_no_wait(task: task.name, input_method: input_method, files: file_specs_json, input: arguments.to_json) end failed_results.concat(emit_results(response[:errors], **result_opts, &callback)) return none_started_result if response[:responded].empty? # Extract the shared task_id (all targets get the same one from # the single run_no_wait call that fanned out to all of them) task_id = response[:responded].values.first&.dig(:task_id) unless task_id no_id_errors = response[:responded].each_with_object({}) do |(target, _), errors| errors[target] = error_output( "bolt_tasks.run_no_wait on #{target.safe_name} succeeded but returned no task_id", 'bolt/choria-missing-task-id' ) end failed_results.concat(emit_results(no_id_errors, **result_opts, &callback)) return none_started_result end logger.debug { "Started task #{task.name} on #{target_count(response[:responded])}, task_id: #{task_id}" } { failed_results: failed_results, targets: response[:responded].keys, task_id: task_id } end |
#emit_results(target_outputs, action:, name:, position:, fire_node_start: false, &callback) ⇒ Array<Bolt::Result>
Convert a hash of { target => output } into Results, fire callbacks, and return the Results array. When fire_node_start is true, fires a :node_start callback before each :node_result.
113 114 115 116 117 118 119 120 |
# File 'lib/bolt/transport/choria/helpers.rb', line 113 def emit_results(target_outputs, action:, name:, position:, fire_node_start: false, &callback) target_outputs.map do |target, data| callback&.call(type: :node_start, target: target) if fire_node_start result = build_result(target, data, action: action, name: name, position: position) callback&.call(type: :node_result, result: result) result end end |
#envify_params(params) ⇒ Hash{String => String}
Convert task arguments to PT_-prefixed environment variable hash. Duplicated from Bolt::Shell#envify_params. We don’t use Bolt::Shell classes because they interleave command building with connection-based execution (IO pipes, sudo prompts). With the Choria transport, we just need to build the command and send it via RPC so all the shell agents on the targets can execute it themselves.
191 192 193 194 195 196 |
# File 'lib/bolt/transport/choria/command_builders.rb', line 191 def envify_params(params) params.each_with_object({}) do |(key, val), env| val = val.to_json unless val.is_a?(String) env["PT_#{key}"] = val end end |
#error_output(message, kind, stdout: nil, stderr: nil, exitcode: 1) ⇒ Object
Build an error output hash. When actual output is available (e.g. a command ran but failed), pass it through so the user sees it.
129 130 131 132 |
# File 'lib/bolt/transport/choria/helpers.rb', line 129 def error_output(, kind, stdout: nil, stderr: nil, exitcode: 1) output(stdout: stdout, stderr: stderr, exitcode: exitcode) .merge(error: , error_kind: kind) end |
#escape_arg(target, str) ⇒ String
Escape a string for use as a shell argument on the target platform.
105 106 107 |
# File 'lib/bolt/transport/choria/command_builders.rb', line 105 def escape_arg(target, str) windows_target?(target) ? "'#{ps_escape(str)}'" : Shellwords.shellescape(str) end |
#exitcode_from(data, target, context) ⇒ Integer
Extract exit code from RPC response data, defaulting to 1 with a warning if the agent returned nil.
141 142 143 144 145 146 147 148 149 150 151 |
# File 'lib/bolt/transport/choria/helpers.rb', line 141 def exitcode_from(data, target, context) exitcode = data[:exitcode] || data['exitcode'] if exitcode.nil? logger.warn { "Agent on #{target.safe_name} returned no exit code for #{context}. " \ "Defaulting to exit code 1. This usually indicates an agent-level error." } exitcode = 1 end exitcode end |
#extract_task_output(data, target) ⇒ Hash
Extract stdout, stderr, and exitcode from a bolt_tasks task_status response.
170 171 172 173 174 |
# File 'lib/bolt/transport/choria/bolt_tasks.rb', line 170 def extract_task_output(data, target) exitcode = exitcode_from(data, target, 'task') output(stdout: unwrap_bolt_tasks_stdout(data[:stdout]), stderr: data[:stderr], exitcode: exitcode) end |
#filter_agents(sender, agent_list) ⇒ Array<String>?
Filter out agents that don’t meet minimum version requirements.
106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 |
# File 'lib/bolt/transport/choria/agent_discovery.rb', line 106 def filter_agents(sender, agent_list) return nil unless agent_list.is_a?(Array) agent_list.filter_map do |entry| name = entry['agent'] next unless name version = entry['version'] min_version = AGENT_MIN_VERSIONS[name] if min_version && !meets_min_version?(version, min_version) logger.warn { "The '#{name}' agent on #{sender} is version #{version || 'unknown'}, " \ "but #{min_version} or later is required. It will be treated as unavailable." } next end name end end |
#generate_tmpdir_path(target) ⇒ String
Generate a unique remote tmpdir path for batch operations.
116 117 118 119 120 |
# File 'lib/bolt/transport/choria/shell.rb', line 116 def generate_tmpdir_path(target) base = target.['tmpdir'] base = 'C:\Windows\Temp' if base == '/tmp' && windows_target?(target) join_path(target, base, "bolt-choria-#{SecureRandom.uuid}") end |
#has_agent?(target, agent_name) ⇒ Boolean
31 32 33 |
# File 'lib/bolt/transport/choria/agent_discovery.rb', line 31 def has_agent?(target, agent_name) @agent_cache[choria_identity(target)]&.dig(:agents)&.include?(agent_name) || false end |
#index_results_by_sender(results, targets, context) ⇒ Hash{String => Hash}
Index RPC results by sender, keeping only the first response per sender and only from the set of expected identities. Logs and discards responses from unexpected senders and duplicates.
254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 |
# File 'lib/bolt/transport/choria/client.rb', line 254 def index_results_by_sender(results, targets, context) expected = targets.to_set { |target| choria_identity(target) } by_sender = {} results.each do |result| sender = result[:sender] unless sender logger.warn { "Discarding #{context} response with nil sender" } next end unless expected.include?(sender) logger.warn { "Discarding #{context} response from unexpected sender '#{sender}'" } next end if by_sender.key?(sender) if result[:data] == by_sender[sender][:data] logger.debug { "Ignoring duplicate #{context} response from #{sender}" } else logger.warn { "Ignoring duplicate #{context} response from #{sender} with different data" } end next end by_sender[sender] = result end by_sender end |
#join_path(target, *parts) ⇒ String
Join path segments using the target platform’s separator. Normalizes embedded forward slashes to backslashes on Windows.
115 116 117 118 119 |
# File 'lib/bolt/transport/choria/command_builders.rb', line 115 def join_path(target, *parts) sep = windows_target?(target) ? '\\' : '/' parts = parts.map { |part| part.tr('/', sep) } if sep != '/' parts.join(sep) end |
#kill_timed_out_processes(targets) ⇒ Object
Kill processes on timed-out targets. Sequential because each target has a unique handle, requiring a separate shell.kill RPC call per target. A future batched kill action (like shell.statuses) would eliminate this.
548 549 550 551 552 553 554 555 556 557 |
# File 'lib/bolt/transport/choria/shell.rb', line 548 def kill_timed_out_processes(targets) logger.debug { "Killing timed-out processes on #{target_count(targets)}" } targets.each do |target, state| rpc_request('shell', target, 'shell.kill') do |client| client.kill(handle: state[:handle]) end rescue StandardError => e logger.warn { "Failed to kill process on #{target.safe_name}: #{e.}" } end end |
#make_dir_command(target, *paths) ⇒ String
Build a mkdir command for one or more directories.
16 17 18 19 20 21 22 23 24 |
# File 'lib/bolt/transport/choria/command_builders.rb', line 16 def make_dir_command(target, *paths) if windows_target?(target) escaped = paths.map { |path| "'#{ps_escape(path)}'" }.join(', ') "New-Item -ItemType Directory -Force -Path #{escaped}" else escaped = paths.map { |path| Shellwords.shellescape(path) }.join(' ') "mkdir -m 700 -p #{escaped}" end end |
#make_executable_command(target, path) ⇒ String?
Build a chmod +x command. Returns nil on Windows (not needed).
31 32 33 |
# File 'lib/bolt/transport/choria/command_builders.rb', line 31 def make_executable_command(target, path) windows_target?(target) ? nil : "chmod u+x #{Shellwords.shellescape(path)}" end |
#meets_min_version?(version, min_version) ⇒ Boolean
127 128 129 130 131 132 133 134 |
# File 'lib/bolt/transport/choria/agent_discovery.rb', line 127 def meets_min_version?(version, min_version) return false unless version Gem::Version.new(version) >= Gem::Version.new(min_version) rescue ArgumentError => e logger.warn { "Could not parse version '#{version}': #{e.}. Treating agent as unavailable." } false end |
#output(stdout: nil, stderr: nil, exitcode: nil) ⇒ Object
Build an output hash from command/task output.
123 124 125 |
# File 'lib/bolt/transport/choria/helpers.rb', line 123 def output(stdout: nil, stderr: nil, exitcode: nil) { stdout: stdout || '', stderr: stderr || '', exitcode: exitcode || 0 } end |
#poll_task_status(targets, task_id, task) ⇒ Hash{Bolt::Target => Hash}
Poll bolt_tasks.task_status until all targets complete or timeout.
124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 |
# File 'lib/bolt/transport/choria/bolt_tasks.rb', line 124 def poll_task_status(targets, task_id, task) timeout = targets.first.['task-timeout'] poll_result = poll_with_retries(targets, timeout, 'bolt_tasks.task_status') do |remaining| response = rpc_request('bolt_tasks', remaining, 'bolt_tasks.task_status') do |client| client.task_status(task_id: task_id) end next { rpc_failed: true, done: {} } if response[:rpc_failed] done = response[:errors].dup response[:responded].each do |target, data| if data.nil? done[target] = error_output( "bolt_tasks.task_status on #{target.safe_name} returned success but no data", 'bolt/choria-missing-data' ) next end next unless data[:completed] done[target] = extract_task_output(data, target) end { rpc_failed: false, done: done } end remaining_errors = poll_result[:remaining].each_with_object({}) do |target, errors| errors[target] = if poll_result[:rpc_persistent_failure] error_output("RPC requests to poll task status on #{target.safe_name} failed persistently", 'bolt/choria-poll-failed') else error_output("Task #{task.name} timed out after #{timeout} seconds on #{target.safe_name}", 'bolt/choria-task-timeout') end end poll_result[:completed].merge(remaining_errors) end |
#poll_with_retries(targets, timeout, context) ⇒ Hash
Shared polling loop for bolt_tasks and shell polling. Handles sleep timing, round counting, RPC failure retry, and deadline enforcement.
The block receives the remaining targets each round and returns:
{ done: {target => output_hash}, rpc_failed: bool }
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/bolt/transport/choria/helpers.rb', line 34 def poll_with_retries(targets, timeout, context) deadline = Process.clock_gettime(Process::CLOCK_MONOTONIC) + timeout remaining = targets.dup completed = {} poll_failures = 0 poll_round = 0 until remaining.empty? sleep(POLL_INTERVAL_SECONDS) poll_round += 1 logger.debug { "Poll round #{poll_round}: #{target_count(remaining)} still pending" } round = yield(remaining) if round[:rpc_failed] poll_failures += 1 logger.warn { "#{context} poll failed (attempt #{poll_failures}/#{RPC_FAILURE_RETRIES})" } break if poll_failures >= RPC_FAILURE_RETRIES next end poll_failures = 0 round[:done].each do |target, output| completed[target] = output remaining.delete(target) end break if Process.clock_gettime(Process::CLOCK_MONOTONIC) >= deadline end { completed: completed, remaining: remaining, rpc_persistent_failure: poll_failures >= RPC_FAILURE_RETRIES } end |
#powershell_cmd(script) ⇒ String
Wrap a PowerShell script for execution via shell agent. Uses -EncodedCommand with Base64-encoded UTF-16LE (the encoding Microsoft requires for -EncodedCommand) to avoid all quoting issues with cmd.exe and PowerShell metacharacters.
128 129 130 |
# File 'lib/bolt/transport/choria/command_builders.rb', line 128 def powershell_cmd(script) "powershell.exe -NoProfile -NonInteractive -EncodedCommand #{Base64.strict_encode64(script.encode('UTF-16LE'))}" end |
#prepare_targets(targets, agent_name, result_opts, &callback) ⇒ Array
Configure the client, discover agents, partition targets by agent availability, and emit errors for incapable targets.
227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 |
# File 'lib/bolt/transport/choria/client.rb', line 227 def prepare_targets(targets, agent_name, result_opts, &callback) configure_client(targets.first) discover_agents(targets) capable, incapable = targets.partition { |target| has_agent?(target, agent_name) } agent_errors = incapable.each_with_object({}) do |target, errors| msg = if @agent_cache[choria_identity(target)].nil? "No agent information available for #{target.safe_name} (node did not respond to discovery)" else "The '#{agent_name}' agent is not available on #{target.safe_name}." end errors[target] = error_output(msg, 'bolt/choria-agent-not-available') end incapable_results = emit_results(agent_errors, fire_node_start: true, **result_opts, &callback) [capable, incapable_results] end |
#prepend_env_vars(target, command, env_vars, context) ⇒ String
Prepend environment variables to a command string. Returns the command unchanged if env_vars is nil or empty.
70 71 72 73 74 75 76 77 78 79 80 81 82 |
# File 'lib/bolt/transport/choria/command_builders.rb', line 70 def prepend_env_vars(target, command, env_vars, context) return command unless env_vars&.any? env_vars.each_key { |key| validate_env_key!(key, context) } if windows_target?(target) set_stmts = env_vars.map { |key, val| "$env:#{key} = '#{ps_escape(val)}'" } "#{set_stmts.join('; ')}; & #{command}" else env_str = env_vars.map { |key, val| "#{key}=#{Shellwords.shellescape(val)}" }.join(' ') "/usr/bin/env #{env_str} #{command}" end end |
#provided_features ⇒ Object
Advertise both shell and powershell so tasks with either requirement can be selected. The per-target selection happens in select_implementation below, which picks the right feature set based on the target’s detected OS.
52 53 54 |
# File 'lib/bolt/transport/choria.rb', line 52 def provided_features %w[shell powershell] end |
#ps_escape(str) ⇒ String
Escape single quotes for use inside PowerShell single-quoted strings.
136 137 138 |
# File 'lib/bolt/transport/choria/command_builders.rb', line 136 def ps_escape(str) str.gsub("'", "''") end |
#rpc_request(agent, targets, context) {|MCollective::RPC::Client| ... } ⇒ Hash
Make a batched RPC call and split results into responded and errors. Yields the RPC client so the caller specifies which action to invoke.
Results are split based on MCollective RPC statuscodes:
-
statuscode 0: action completed successfully (:responded)
-
statuscode 1 (RPCAborted): action completed but reported a problem (:responded). The data is preserved rather than discarded because some agents (notably bolt_tasks) use statuscode 1 for application-level failures where the response data is still valid and meaningful (e.g., a task that ran but exited non-zero). Callers must handle this case and not assume :responded means success.
-
statuscode 2-5: RPC infrastructure error (:errors)
-
no response: target didn’t reply (:errors)
-
exception: total RPC failure (rpc_failed: true)
Serialized by @rpc_mutex because MCollective’s NATS connector is a singleton with a shared receive queue. Concurrent RPC calls cause reply channel collisions, cross-thread message confusion, and subscription conflicts. See choria-transport-dev.md for the full explanation.
171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 |
# File 'lib/bolt/transport/choria/client.rb', line 171 def rpc_request(agent, targets, context) targets = Array(targets) rpc_results = @rpc_mutex.synchronize do rpc_timeout = targets.first.['rpc-timeout'] client = create_rpc_client(agent, targets, rpc_timeout) yield(client) end by_sender = index_results_by_sender(rpc_results, targets, context) responded = {} errors = {} rpc_statuscodes = {} targets.each do |target| rpc_result = by_sender[choria_identity(target)] if rpc_result.nil? errors[target] = error_output( "No response from #{target.safe_name} for #{context}", 'bolt/choria-no-response' ) elsif rpc_result[:statuscode] > 1 rpc_statuscodes[target] = rpc_result[:statuscode] errors[target] = error_output( "#{context} on #{target.safe_name} returned RPC error: " \ "#{rpc_result[:statusmsg]} (code #{rpc_result[:statuscode]})", 'bolt/choria-rpc-error' ) else rpc_statuscodes[target] = rpc_result[:statuscode] if rpc_result[:statuscode] == 1 logger.warn { "#{context} on #{target.safe_name} had RPC status code #{rpc_result[:statuscode]}: #{rpc_result[:statusmsg]}" } end responded[target] = rpc_result[:data] end end { responded: responded, errors: errors, rpc_failed: false, rpc_statuscodes: rpc_statuscodes } rescue StandardError => e raise if e.is_a?(Bolt::Error) logger.warn { "#{context} RPC call failed: #{e.class}: #{e.}" } errors = targets.each_with_object({}) do |target, errs| errs[target] = error_output("#{context} failed on #{target.safe_name}: #{e.}", 'bolt/choria-rpc-failed') end { responded: {}, errors: errors, rpc_failed: true, rpc_statuscodes: {} } end |
#run_task_via_bolt_tasks(targets, task, arguments, result_opts, &callback) ⇒ Array<Bolt::Result>
Run a task via the bolt_tasks agent. Groups targets by implementation to support mixed-platform batches. Starts all groups before polling any of them so tasks execute concurrently on nodes across implementations.
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
# File 'lib/bolt/transport/choria/bolt_tasks.rb', line 16 def run_task_via_bolt_tasks(targets, task, arguments, result_opts, &callback) logger.debug { "Running task #{task.name} via bolt_tasks agent on #{target_count(targets)}" } results = [] # Start all implementation groups. Each gets its own download + # run_no_wait sequence. Tasks begin executing on nodes as soon as # run_no_wait returns. started_groups = [] targets.group_by { |target| select_implementation(target, task) }.each do |implementation, impl_targets| start_result = download_and_start_task(impl_targets, task, implementation, arguments, result_opts, &callback) results += start_result[:failed_results] started_groups << start_result if start_result[:task_id] end # Poll each group. Tasks are already running concurrently on nodes, # so wall time is dominated by the longest task, not the sum. # Each group has a different task_id, so they must be polled separately. started_groups.each do |group| output_by_target = poll_task_status(group[:targets], group[:task_id], task) results += emit_results(output_by_target, **result_opts, &callback) end results end |
#run_task_via_shell(targets, task, arguments, result_opts, &callback) ⇒ Array<Bolt::Result>
Run a task via the shell agent. Groups targets by implementation to support mixed-platform batches. Starts all groups before polling so tasks execute concurrently on nodes across implementations.
156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 |
# File 'lib/bolt/transport/choria/shell.rb', line 156 def run_task_via_shell(targets, task, arguments, result_opts, &callback) logger.debug { "Running task #{task.name} via shell agent on #{target_count(targets)}" } results = [] all_pending = {} cleanup_entries = [] # Each implementation group gets its own tmpdir because different # platforms need different base paths (e.g., /tmp vs C:\Windows\Temp). begin targets.group_by { |target| select_implementation(target, task) }.each do |implementation, impl_targets| start_result = upload_and_start_task(impl_targets, task, implementation, arguments, result_opts, &callback) results += start_result[:failed_results] all_pending.merge!(start_result[:pending]) cleanup_entries << { targets: impl_targets, tmpdir: start_result[:tmpdir] } end # Poll all handles in one loop. Unlike bolt_tasks (which needs # separate polls per task_id), shell handles are interchangeable. unless all_pending.empty? timeout = targets.first.['task-timeout'] results += emit_results(wait_for_shell_results(all_pending, timeout), **result_opts, &callback) end ensure cleanup_entries.each { |entry| cleanup_tmpdir(entry[:targets], entry[:tmpdir]) } end results end |
#select_implementation(target, task) ⇒ Hash
Override to select task implementation based on the target’s OS. Other transports rely on inventory features to pick the right implementation, but Choria discovers the OS at runtime via the os.family fact. We pass only the detected platform’s feature so task.select_implementation picks the correct .ps1 or .sh file.
65 66 67 68 69 70 |
# File 'lib/bolt/transport/choria.rb', line 65 def select_implementation(target, task) features = windows_target?(target) ? ['powershell'] : ['shell'] impl = task.select_implementation(target, features) impl['input_method'] ||= default_input_method(impl['path']) impl end |
#shell_list(remaining) ⇒ Array
One round of the shell.list RPC action to check which handles have completed. Targets not yet done are omitted from the return value.
427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 |
# File 'lib/bolt/transport/choria/shell.rb', line 427 def shell_list(remaining) response = rpc_request('shell', remaining.keys, 'shell.list') do |client| client.list end return [{}, true] if response[:rpc_failed] done = response[:errors] logger.debug { "shell.list: #{target_count(response[:responded])} responded, #{target_count(done)} failed" } unless done.empty? response[:responded].each do |target, data| if data.nil? done[target] = error_output("shell.list on #{target.safe_name} returned success but no data", 'bolt/choria-missing-data') next end handle = remaining[target][:handle] job = data.dig(:jobs, handle) unless job logger.debug { job_handles = data[:jobs]&.keys || [] "shell.list on #{target.safe_name}: handle #{handle} not found, " \ "available handles: #{job_handles.inspect}" } done[target] = error_output( "Handle #{handle} not found in shell.list on #{target.safe_name}. " \ "The process may have been cleaned up or the agent may have restarted.", 'bolt/choria-handle-not-found' ) next end status = job['status']&.to_s logger.debug { "shell.list on #{target.safe_name}: handle #{handle} status: #{status}" } done[target] = remaining[target] if SHELL_DONE_STATUSES.include?(status) end [done, false] end |
#shell_run(targets, command, description: nil) ⇒ Hash{Bolt::Target => Hash}
Execute a synchronous command on targets via the shell.run RPC action. Used for internal prep/cleanup (mkdir, chmod, etc.) that completes quickly. Returns only failures since successes don’t need to be reported.
287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 |
# File 'lib/bolt/transport/choria/shell.rb', line 287 def shell_run(targets, command, description: nil) label = description || command command = powershell_cmd(command) if windows_target?(targets.first) response = rpc_request('shell', targets, label) do |client| client.run(command: command) end # Check that the exit code is 0 for each successful RPC response, # treating nonzero exit codes as failures. failures = response[:errors] response[:responded].each do |target, data| data ||= {} exitcode = exitcode_from(data, target, label) next if exitcode.zero? failures[target] = error_output( "#{label} failed on #{target.safe_name} (exit code #{exitcode}): #{data[:stderr]}", 'bolt/choria-operation-failed', stdout: data[:stdout], stderr: data[:stderr], exitcode: exitcode ) end failures end |
#shell_start(targets, command) ⇒ Array
Start an async command on targets via the shell.start RPC action. Returns handles for polling with wait_for_shell_results.
341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 |
# File 'lib/bolt/transport/choria/shell.rb', line 341 def shell_start(targets, command) command = powershell_cmd(command) if windows_target?(targets.first) response = rpc_request('shell', targets, 'shell.start') do |client| client.start(command: command) end failures = response[:errors] pending, no_handle = response[:responded].partition { |_target, data| data&.dig(:handle) }.map(&:to_h) pending.each { |target, data| logger.debug { "Started command on #{target.safe_name}, handle: #{data[:handle]}" } } no_handle.each_key do |target| failures[target] = error_output("shell.start on #{target.safe_name} returned success but no handle", 'bolt/choria-missing-handle') end [pending, failures] end |
#shell_statuses(targets) ⇒ Hash{Bolt::Target => Hash}
Fetch stdout/stderr/exitcode from completed targets via the shell.statuses RPC action. Requires shell agent >= 1.2.1.
473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 |
# File 'lib/bolt/transport/choria/shell.rb', line 473 def shell_statuses(targets) handles = targets.transform_values { |data| data[:handle] } logger.debug { "Fetching shell.statuses for #{target_count(targets.keys)}" } results = {} response = rpc_request('shell', targets.keys, 'shell.statuses') do |client| client.statuses(handles: handles.values) end response[:errors].each do |target, fail_output| results[target] = fail_output end response[:responded].each do |target, data| statuses = data&.dig(:statuses) handle = handles[target] unless statuses results[target] = error_output( "shell.statuses on #{target.safe_name} returned no data", 'bolt/choria-missing-data' ) next end status_data = statuses[handle] unless status_data results[target] = error_output( "shell.statuses on #{target.safe_name} did not include handle #{handle}", 'bolt/choria-missing-data' ) next end status = status_data['status']&.to_s stdout = status_data['stdout'] stderr = status_data['stderr'] error_msg = status_data['error'] if status == 'error' results[target] = error_output( "Handle #{handle} not found on #{target.safe_name}: #{error_msg}", 'bolt/choria-handle-not-found' ) elsif status == 'failed' results[target] = error_output( "Process failed on #{target.safe_name}: #{stderr}", 'bolt/choria-process-failed', stdout: stdout, stderr: stderr, exitcode: 1 ) else exitcode = exitcode_from(status_data, target, 'shell.statuses') results[target] = output(stdout: stdout, stderr: stderr, exitcode: exitcode) end end results rescue StandardError => e raise if e.is_a?(Bolt::Error) logger.warn { "shell.statuses RPC call failed: #{e.class}: #{e.}" } targets.each_key do |target| results[target] ||= error_output( "Fetching output from #{target.safe_name} failed: #{e.class}: #{e.}", 'bolt/choria-result-processing-error' ) end results end |
#stdin_pipe_command(target, data, command) ⇒ String
Build a command that pipes data to another command via stdin.
90 91 92 93 94 95 96 97 98 |
# File 'lib/bolt/transport/choria/command_builders.rb', line 90 def stdin_pipe_command(target, data, command) if windows_target?(target) # Use a here-string (@'...'@) to avoid escaping issues with # large JSON payloads. Content between @' and '@ is literal. "@'\n#{data}\n'@ | & #{command}" else "printf '%s' #{Shellwords.shellescape(data)} | #{command}" end end |
#target_count(targets) ⇒ Object
16 17 18 19 |
# File 'lib/bolt/transport/choria/helpers.rb', line 16 def target_count(targets) count = targets.is_a?(Hash) ? targets.size : targets.length "#{count} #{count == 1 ? 'target' : 'targets'}" end |
#task_file_spec(file, module_name, environment) ⇒ Hash
Build a file spec hash for the bolt_tasks download action. Computes the Puppet Server file_content URI based on the file’s module-relative path.
183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 |
# File 'lib/bolt/transport/choria/bolt_tasks.rb', line 183 def task_file_spec(file, module_name, environment) file_name = file['name'] validate_file_name!(file_name) file_path = file['path'] parts = file_name.split('/', 3) path = if parts.length == 3 mod, subdir, rest = parts case subdir when 'files' "/puppet/v3/file_content/modules/#{mod}/#{rest}" when 'lib' "/puppet/v3/file_content/plugins/#{mod}/#{rest}" else "/puppet/v3/file_content/tasks/#{mod}/#{rest}" end else "/puppet/v3/file_content/tasks/#{module_name}/#{file_name}" end { 'filename' => file_name, 'sha256' => Digest::SHA256.file(file_path).hexdigest, 'size_bytes' => File.size(file_path), 'uri' => { 'path' => path, 'params' => { 'environment' => environment } } } end |
#unwrap_bolt_tasks_stdout(agent_stdout) ⇒ String?
Fix double-encoding in the bolt_tasks agent’s wrapper error path.
Normally, create_task_stdout returns a Hash and reply_task_status calls .to_json on it, producing a single JSON string like:
'{"_output":"hello world"}'
But for wrapper errors, create_task_stdout returns an already JSON-encoded String. reply_task_status still calls .to_json on it, encoding it a second time. The result is a JSON string whose value is itself a JSON string:
'"{\\"_error\\":{\\"kind\\":\\"choria.tasks/wrapper-error\\",...}}"'
We parse one layer of JSON. In the normal case, that produces a Hash and we return the original string. In the double-encoded case, it produces a String (the inner JSON), which we return so Result.for_task can parse it.
233 234 235 236 237 238 239 240 241 242 243 244 245 |
# File 'lib/bolt/transport/choria/bolt_tasks.rb', line 233 def unwrap_bolt_tasks_stdout(agent_stdout) return agent_stdout unless agent_stdout.is_a?(String) parsed = begin JSON.parse(agent_stdout) rescue JSON::ParserError return agent_stdout end # Normal case: parsed is a Hash, return the original JSON string. # Double-encoded case: parsed is a String (the inner JSON), return it. parsed.is_a?(String) ? parsed : agent_stdout end |
#upload(_target, _source, _destination, _options = {}, _position = []) ⇒ Object
183 184 185 186 187 188 |
# File 'lib/bolt/transport/choria.rb', line 183 def upload(_target, _source, _destination, = {}, _position = []) raise Bolt::Error.new( 'The Choria transport does not yet support upload.', 'bolt/choria-unsupported-operation' ) end |
#upload_and_start_task(targets, task, implementation, arguments, result_opts, &callback) ⇒ Hash
Upload task files and start execution for one implementation group.
198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 |
# File 'lib/bolt/transport/choria/shell.rb', line 198 def upload_and_start_task(targets, task, implementation, arguments, result_opts, &callback) arguments = arguments.dup executable = implementation['path'] input_method = implementation['input_method'] extra_files = implementation['files'] first_target = targets.first tmpdir = generate_tmpdir_path(first_target) executable_content = File.binread(executable) extra_file_contents = {} extra_files.each do |file| validate_file_name!(file['name']) extra_file_contents[file['name']] = File.binread(file['path']) end failed_results = [] active_targets = targets.dup task_dir = tmpdir # Create the tmpdir failures = shell_run(active_targets, make_dir_command(first_target, tmpdir), description: 'mkdir tmpdir') failed_results += emit_results(failures, **result_opts, &callback) active_targets -= failures.keys # Tasks with extra files get a module-layout directory tree in # tmpdir, and _installdir is set so the task can find them. # Simple tasks go directly in tmpdir with no _installdir. if active_targets.any? && extra_files.any? arguments['_installdir'] = tmpdir task_dir = join_path(first_target, tmpdir, task.tasks_dir) # Create subdirectories for the task and its dependencies extra_dirs = extra_files.map { |file| join_path(first_target, tmpdir, File.dirname(file['name'])) }.uniq all_dirs = [task_dir] + extra_dirs failures = shell_run(active_targets, make_dir_command(first_target, *all_dirs), description: 'mkdir task dirs') failed_results += emit_results(failures, **result_opts, &callback) active_targets -= failures.keys # Upload each dependency file to its module-relative path extra_files.each do |file| break if active_targets.empty? failures = upload_file_content(active_targets, extra_file_contents[file['name']], join_path(first_target, tmpdir, file['name'])) failed_results += emit_results(failures, **result_opts, &callback) active_targets -= failures.keys end end # Upload the main task executable remote_task_path = join_path(first_target, task_dir, File.basename(executable)) if active_targets.any? if remote_task_path failures = upload_file_content(active_targets, executable_content, remote_task_path) failed_results += emit_results(failures, **result_opts, &callback) active_targets -= failures.keys end # Make the task executable (no-op on Windows) chmod_cmd = make_executable_command(first_target, remote_task_path) if remote_task_path if active_targets.any? && chmod_cmd failures = shell_run(active_targets, chmod_cmd, description: 'chmod task') failed_results += emit_results(failures, **result_opts, &callback) active_targets -= failures.keys end # Start the task asynchronously pending = {} if active_targets.any? && remote_task_path full_cmd = build_task_command(first_target, remote_task_path, arguments, input_method, first_target.['interpreters']) pending, start_failures = shell_start(active_targets, full_cmd) failed_results += emit_results(start_failures, **result_opts, &callback) end { failed_results: failed_results, pending: pending, tmpdir: tmpdir } end |
#upload_file_command(target, content_b64, dest) ⇒ String
Build a command that writes base64-encoded content to a file after decoding the content. Requires base64 CLI on POSIX targets.
53 54 55 56 57 58 59 60 |
# File 'lib/bolt/transport/choria/command_builders.rb', line 53 def upload_file_command(target, content_b64, dest) if windows_target?(target) "[IO.File]::WriteAllBytes('#{ps_escape(dest)}', " \ "[Convert]::FromBase64String('#{content_b64}'))" else "printf '%s' #{Shellwords.shellescape(content_b64)} | base64 -d > #{Shellwords.shellescape(dest)}" end end |
#upload_file_content(targets, content, destination) ⇒ Hash{Bolt::Target => Hash}
Upload file content to the same path on multiple targets via base64. The entire file is base64-encoded and sent as a single RPC message, so file size is limited by the NATS max message size (default 1MB, configurable via plugin.choria.network.client_max_payload in the Choria broker config). Base64 adds ~33% overhead, so the effective file size limit is roughly 750KB with default settings. Once the file-transfer agent is implemented, we’ll use chunked transfers via that agent instead when it’s available, removing this size limitation.
326 327 328 329 330 331 |
# File 'lib/bolt/transport/choria/shell.rb', line 326 def upload_file_content(targets, content, destination) logger.debug { "Uploading #{content.bytesize} bytes to #{destination} on #{target_count(targets)}" } encoded = Base64.strict_encode64(content) command = upload_file_command(targets.first, encoded, destination) shell_run(targets, command, description: "upload #{destination}") end |
#validate_env_key!(key, context) ⇒ Object
Validate an environment variable key is safe for shell interpolation.
185 186 187 188 189 190 191 192 193 194 |
# File 'lib/bolt/transport/choria/helpers.rb', line 185 def validate_env_key!(key, context) safe_pattern = /\A[A-Za-z_][A-Za-z0-9_]*\z/ return if safe_pattern.match?(key) raise Bolt::Error.new( "Unsafe environment variable name '#{key}' in #{context}. " \ "Names must match #{safe_pattern.source}", 'bolt/invalid-env-var-name' ) end |
#validate_file_name!(name) ⇒ Object
Validate that a file name does not contain path traversal sequences or absolute paths. Checks both POSIX and Windows conventions. Raises Bolt::Error on violations.
158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 |
# File 'lib/bolt/transport/choria/helpers.rb', line 158 def validate_file_name!(name) if name.include?("\0") raise Bolt::Error.new( "Invalid null byte in task file name: #{name.inspect}", 'bolt/invalid-task-filename' ) end if name.start_with?('/') || name.match?(WINDOWS_PATH_REGEX) raise Bolt::Error.new( "Absolute path not allowed in task file name: '#{name}'", 'bolt/invalid-task-filename' ) end if name.split(%r{[/\\]}).include?('..') raise Bolt::Error.new( "Path traversal detected in task file name: '#{name}'", 'bolt/path-traversal' ) end end |
#wait_for_shell_results(pending, timeout) ⇒ Hash{Bolt::Target => Hash}
Wait for async shell handles to complete, fetch their output via shell_statuses, and kill timed-out processes.
365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 |
# File 'lib/bolt/transport/choria/shell.rb', line 365 def wait_for_shell_results(pending, timeout) return {} if pending.empty? poll_result = poll_with_retries(pending, timeout, 'shell.list') do |remaining| completed, rpc_failed = shell_list(remaining) next { rpc_failed: true, done: {} } if rpc_failed done = {} fetch_targets = {} completed.each do |target, value| if value[:error] done[target] = value else fetch_targets[target] = value end end unless fetch_targets.empty? logger.debug { "Fetching output from #{target_count(fetch_targets)}" } fetched = shell_statuses(fetch_targets) fetch_targets.each_key do |target| done[target] = fetched[target] || error_output( "Command completed on #{target.safe_name} but output could not be fetched", 'bolt/choria-result-processing-error' ) end end { rpc_failed: false, done: done } end remaining_errors = {} unless poll_result[:remaining].empty? if poll_result[:rpc_persistent_failure] poll_result[:remaining].each_key do |target| remaining_errors[target] = error_output( "RPC requests to poll shell status on #{target.safe_name} failed persistently", 'bolt/choria-poll-failed' ) end else kill_timed_out_processes(poll_result[:remaining]) poll_result[:remaining].each_key do |target| remaining_errors[target] = error_output( "Command timed out after #{timeout} seconds on #{target.safe_name}", 'bolt/choria-command-timeout' ) end end end poll_result[:completed].merge(remaining_errors) end |
#windows_target?(target) ⇒ Boolean
35 36 37 |
# File 'lib/bolt/transport/choria/agent_discovery.rb', line 35 def windows_target?(target) @agent_cache[choria_identity(target)]&.dig(:os) == 'windows' end |