Module: Pgbus::CLI

Defined in:
lib/pgbus/cli.rb

Class Method Summary collapse

Class Method Details

.apply_capsule_filter(name) ⇒ Object



105
106
107
108
109
110
111
112
113
114
115
116
# File 'lib/pgbus/cli.rb', line 105

def apply_capsule_filter(name)
  capsule = Pgbus.configuration.capsule_named(name)
  unless capsule
    available = (Pgbus.configuration.workers || []).filter_map { |c| c[:name] || c["name"] }
    raise ArgumentError,
          "no capsule named #{name.inspect} (available: #{available.join(", ")})"
  end

  # Go through the public setter so any future normalization/validation
  # in workers= is applied consistently to the CLI override path too.
  Pgbus.configuration.workers = [capsule]
end

.apply_role_filter(options) ⇒ Object

Translates –workers-only / –scheduler-only / –dispatcher-only into the corresponding Pgbus.configuration.roles array. Mutually exclusive —passing more than one of the three flags raises ArgumentError.



92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/pgbus/cli.rb', line 92

def apply_role_filter(options)
  role_flags = options.slice(*ROLE_FLAG_TO_ROLE.keys).compact
  return if role_flags.empty?

  if role_flags.size > 1
    raise ArgumentError,
          "--workers-only, --scheduler-only, and --dispatcher-only are mutually exclusive " \
          "(got: #{role_flags.keys.map { |k| "--#{k.to_s.tr("_", "-")}" }.join(", ")})"
  end

  Pgbus.configuration.roles = [ROLE_FLAG_TO_ROLE.fetch(role_flags.keys.first)]
end

.apply_start_options(args) ⇒ Object

Parses CLI flags for ‘pgbus start` and applies them to the global configuration before the supervisor boots. Designed to override the initializer config without requiring a redeploy.



41
42
43
44
45
46
47
48
# File 'lib/pgbus/cli.rb', line 41

def apply_start_options(args)
  options = parse_start_options(args)

  Pgbus.configuration.workers = options[:queues] if options[:queues]
  Pgbus.configuration.execution_mode = options[:execution_mode].to_sym if options[:execution_mode]
  apply_capsule_filter(options[:capsule]) if options[:capsule]
  apply_role_filter(options)
end

.list_queuesObject



135
136
137
138
139
140
141
142
143
144
145
146
147
# File 'lib/pgbus/cli.rb', line 135

def list_queues
  Pgbus.client.list_queues
  metrics = Pgbus.client.metrics

  puts "QUEUE                                    DEPTH      VISIBLE    OLDEST (s)      TOTAL          "
  puts "-" * 95

  Array(metrics).each do |m|
    puts format("%-40s %-10s %-10s %-15s %-15s",
                m.queue_name, m.queue_length, m.queue_visible_length,
                m.oldest_msg_age_sec || "-", m.total_messages)
  end
end

.parse_start_options(args) ⇒ Object



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/pgbus/cli.rb', line 50

def parse_start_options(args)
  options = {}
  OptionParser.new do |opts|
    opts.banner = "Usage: pgbus start [options]"

    opts.on("--queues STRING", "Override worker capsules (e.g. \"critical: 5; default: 10\")") do |v|
      options[:queues] = v
    end

    opts.on("--capsule NAME", "Run only the named capsule from the configured workers") do |v|
      options[:capsule] = v
    end

    opts.on("--workers-only", "Run only the worker processes (no scheduler/dispatcher/consumers/outbox)") do
      options[:workers_only] = true
    end

    opts.on("--scheduler-only", "Run only the recurring scheduler (the cron pod pattern)") do
      options[:scheduler_only] = true
    end

    opts.on("--dispatcher-only", "Run only the dispatcher (the maintenance pod pattern)") do
      options[:dispatcher_only] = true
    end

    opts.on("--execution-mode MODE", "Execution mode: threads (default) or async") do |v|
      options[:execution_mode] = v
    end
  end.parse!(args.dup)
  options
end


149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# File 'lib/pgbus/cli.rb', line 149

def print_help
  puts <<~HELP
    Usage: pgbus <command> [options]

    Commands:
      start    Start the Pgbus supervisor (workers + dispatcher)
      status   Show running Pgbus processes
      queues   List queues with metrics
      version  Show version
      help     Show this help

    Options for `start`:
      --queues STRING    Override worker capsules from the CLI
                         (e.g. "critical: 5; default: 10")
      --capsule NAME     Run only the named capsule from the configured
                         workers (useful for one-capsule-per-process
                         deployments)
      --workers-only     Run only worker processes (no scheduler/
                         dispatcher/consumers/outbox — for worker-only
                         containers)
      --scheduler-only   Run only the recurring scheduler (the cron pod
                         pattern — exactly one of these per deployment)
      --dispatcher-only  Run only the dispatcher (the maintenance pod
                         pattern)
      --execution-mode   Execution mode: threads (default) or async
                         (fiber-based, lower connection usage)
  HELP
end

.show_statusObject



118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/pgbus/cli.rb', line 118

def show_status
  processes = ProcessEntry.order(:kind, :created_at)
                          .select(:kind, :hostname, :pid, :metadata, :last_heartbeat_at)

  if processes.none?
    puts "No Pgbus processes running."
    return
  end

  puts "KIND         HOST                 PID      HEARTBEAT                      METADATA"
  puts "-" * 100
  processes.each do |p|
    puts format("%-12s %-20s %-8s %-30s %s",
                p.kind, p.hostname, p.pid, p.last_heartbeat_at, p.)
  end
end

.start(args) ⇒ Object



9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# File 'lib/pgbus/cli.rb', line 9

def start(args)
  command = args.first || "help"

  case command
  when "start"
    start_supervisor(args[1..] || [])
  when "status"
    show_status
  when "queues"
    list_queues
  when "version"
    puts "pgbus #{Pgbus::VERSION}"
  when "help", "--help", "-h"
    print_help
  else
    puts "Unknown command: #{command}"
    print_help
    exit 1
  end
end

.start_supervisor(args = []) ⇒ Object



30
31
32
33
34
35
36
# File 'lib/pgbus/cli.rb', line 30

def start_supervisor(args = [])
  apply_start_options(args)
  Pgbus.logger.info { "[Pgbus] Starting Pgbus #{Pgbus::VERSION}..." }

  supervisor = Process::Supervisor.new
  supervisor.run
end