Module: Pgbus::CLI

Defined in:
lib/pgbus/cli.rb

Class Method Summary collapse

Class Method Details

.apply_capsule_filter(name) ⇒ Object



107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/pgbus/cli.rb', line 107

def apply_capsule_filter(name)
  capsule = Pgbus.configuration.capsule_named(name)
  unless capsule
    available = (Pgbus.configuration.workers || []).filter_map { |c| c[:name] || c["name"] }
    raise ArgumentError,
          "no capsule named #{name.inspect} (available: #{available.join(", ")})"
  end

  # Go through the public setter so any future normalization/validation
  # in workers= is applied consistently to the CLI override path too.
  Pgbus.configuration.workers = [capsule]
end

.apply_role_filter(options) ⇒ Object

Translates –workers-only / –scheduler-only / –dispatcher-only into the corresponding Pgbus.configuration.roles array. Mutually exclusive —passing more than one of the three flags raises ArgumentError.



94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/pgbus/cli.rb', line 94

def apply_role_filter(options)
  role_flags = options.slice(*ROLE_FLAG_TO_ROLE.keys).compact
  return if role_flags.empty?

  if role_flags.size > 1
    raise ArgumentError,
          "--workers-only, --scheduler-only, and --dispatcher-only are mutually exclusive " \
          "(got: #{role_flags.keys.map { |k| "--#{k.to_s.tr("_", "-")}" }.join(", ")})"
  end

  Pgbus.configuration.roles = [ROLE_FLAG_TO_ROLE.fetch(role_flags.keys.first)]
end

.apply_start_options(args) ⇒ Object

Parses CLI flags for ‘pgbus start` and applies them to the global configuration before the supervisor boots. Designed to override the initializer config without requiring a redeploy.



43
44
45
46
47
48
49
50
# File 'lib/pgbus/cli.rb', line 43

def apply_start_options(args)
  options = parse_start_options(args)

  Pgbus.configuration.workers = options[:queues] if options[:queues]
  Pgbus.configuration.execution_mode = options[:execution_mode].to_sym if options[:execution_mode]
  apply_capsule_filter(options[:capsule]) if options[:capsule]
  apply_role_filter(options)
end

.list_queuesObject



144
145
146
147
148
149
150
151
152
153
154
155
156
# File 'lib/pgbus/cli.rb', line 144

def list_queues
  Pgbus.client.list_queues
  metrics = Pgbus.client.metrics

  puts "QUEUE                                    DEPTH      VISIBLE    OLDEST (s)      TOTAL          "
  puts "-" * 95

  Array(metrics).each do |m|
    puts format("%-40s %-10s %-10s %-15s %-15s",
                m.queue_name, m.queue_length, m.queue_visible_length,
                m.oldest_msg_age_sec || "-", m.total_messages)
  end
end

.parse_start_options(args) ⇒ Object



52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/pgbus/cli.rb', line 52

def parse_start_options(args)
  options = {}
  OptionParser.new do |opts|
    opts.banner = "Usage: pgbus start [options]"

    opts.on("--queues STRING", "Override worker capsules (e.g. \"critical: 5; default: 10\")") do |v|
      options[:queues] = v
    end

    opts.on("--capsule NAME", "Run only the named capsule from the configured workers") do |v|
      options[:capsule] = v
    end

    opts.on("--workers-only", "Run only the worker processes (no scheduler/dispatcher/consumers/outbox)") do
      options[:workers_only] = true
    end

    opts.on("--scheduler-only", "Run only the recurring scheduler (the cron pod pattern)") do
      options[:scheduler_only] = true
    end

    opts.on("--dispatcher-only", "Run only the dispatcher (the maintenance pod pattern)") do
      options[:dispatcher_only] = true
    end

    opts.on("--execution-mode MODE", "Execution mode: threads (default) or async") do |v|
      options[:execution_mode] = v
    end
  end.parse!(args.dup)
  options
end


158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
# File 'lib/pgbus/cli.rb', line 158

def print_help
  puts <<~HELP
    Usage: pgbus <command> [options]

    Commands:
      start    Start the Pgbus supervisor (workers + dispatcher)
      status   Show running Pgbus processes
      queues   List queues with metrics
      mcp      Start the read-only MCP diagnostic server over stdio
      version  Show version
      help     Show this help

    Options for `start`:
      --queues STRING    Override worker capsules from the CLI
                         (e.g. "critical: 5; default: 10")
      --capsule NAME     Run only the named capsule from the configured
                         workers (useful for one-capsule-per-process
                         deployments)
      --workers-only     Run only worker processes (no scheduler/
                         dispatcher/consumers/outbox — for worker-only
                         containers)
      --scheduler-only   Run only the recurring scheduler (the cron pod
                         pattern — exactly one of these per deployment)
      --dispatcher-only  Run only the dispatcher (the maintenance pod
                         pattern)
      --execution-mode   Execution mode: threads (default) or async
                         (fiber-based, lower connection usage)

    Environment for `mcp`:
      PGBUS_MCP_TOKEN          If set, PGBUS_MCP_AUTH_TOKEN must match for
                               the server to start (optional token gate).
      PGBUS_MCP_ALLOW_PAYLOADS Truthy to let tools return raw message
                               bodies/headers (off by default; redacted).
  HELP
end

.show_statusObject



120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# File 'lib/pgbus/cli.rb', line 120

def show_status
  processes = ProcessEntry.order(:kind, :created_at)
                          .select(:kind, :hostname, :pid, :metadata, :last_heartbeat_at)

  if processes.none?
    puts "No Pgbus processes running."
    return
  end

  puts "KIND         HOST                 PID      HEARTBEAT                      METADATA"
  puts "-" * 100
  processes.each do |p|
    puts format("%-12s %-20s %-8s %-30s %s",
                p.kind, p.hostname, p.pid, p.last_heartbeat_at, p.)
  end
end

.start(args) ⇒ Object



9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/pgbus/cli.rb', line 9

def start(args)
  command = args.first || "help"

  case command
  when "start"
    start_supervisor(args[1..] || [])
  when "status"
    show_status
  when "queues"
    list_queues
  when "mcp"
    start_mcp_server
  when "version"
    puts "pgbus #{Pgbus::VERSION}"
  when "help", "--help", "-h"
    print_help
  else
    puts "Unknown command: #{command}"
    print_help
    exit 1
  end
end

.start_mcp_serverObject

Boots the read-only MCP diagnostic server over stdio. Loaded lazily so the optional ‘mcp` gem is only required when this command is invoked.



139
140
141
142
# File 'lib/pgbus/cli.rb', line 139

def start_mcp_server
  Pgbus::MCP.load!
  Pgbus::MCP::Runner.run
end

.start_supervisor(args = []) ⇒ Object



32
33
34
35
36
37
38
# File 'lib/pgbus/cli.rb', line 32

def start_supervisor(args = [])
  apply_start_options(args)
  Pgbus.logger.info { "[Pgbus] Starting Pgbus #{Pgbus::VERSION}..." }

  supervisor = Process::Supervisor.new
  supervisor.run
end