Class: ProcessBot::ControlSocket

Inherits:
Object
  • Object
show all
Defined in:
lib/process_bot/control_socket.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options:, process:) ⇒ ControlSocket

Returns a new instance of ControlSocket.



8
9
10
11
12
13
14
# File 'lib/process_bot/control_socket.rb', line 8

def initialize(options:, process:)
  @options = options
  @process = process
  @port = options.fetch(:port).to_i
  @clients = []
  @clients_mutex = Mutex.new
end

Instance Attribute Details

#clientsObject (readonly)

Returns the value of attribute clients.



6
7
8
# File 'lib/process_bot/control_socket.rb', line 6

def clients
  @clients
end

#clients_mutexObject (readonly)

Returns the value of attribute clients_mutex.



6
7
8
# File 'lib/process_bot/control_socket.rb', line 6

def clients_mutex
  @clients_mutex
end

#optionsObject (readonly)

Returns the value of attribute options.



6
7
8
# File 'lib/process_bot/control_socket.rb', line 6

def options
  @options
end

#portObject (readonly)

Returns the value of attribute port.



6
7
8
# File 'lib/process_bot/control_socket.rb', line 6

def port
  @port
end

#processObject (readonly)

Returns the value of attribute process.



6
7
8
# File 'lib/process_bot/control_socket.rb', line 6

def process
  @process
end

#serverObject (readonly)

Returns the value of attribute server.



6
7
8
# File 'lib/process_bot/control_socket.rb', line 6

def server
  @server
end

Instance Method Details

#actually_start_tcp_server(host, port) ⇒ Object



99
100
101
# File 'lib/process_bot/control_socket.rb', line 99

def actually_start_tcp_server(host, port)
  TCPServer.new(host, port)
end

#add_client(client) ⇒ Object



177
178
179
180
181
# File 'lib/process_bot/control_socket.rb', line 177

def add_client(client)
  clients_mutex.synchronize do
    clients << client
  end
end

#broadcast_log(_event_name, output:, type:) ⇒ Object



166
167
168
169
170
171
172
173
174
175
# File 'lib/process_bot/control_socket.rb', line 166

def broadcast_log(_event_name, output:, type:)
  safe_output = normalize_output(output)
  payload = JSON.generate(type: "log", stream: type.to_s, output: safe_output)

  clients_snapshot.each do |client|
    client.puts(payload)
  rescue IOError, Errno::EPIPE, Errno::ECONNRESET
    remove_client(client)
  end
end

#clients_snapshotObject



189
190
191
192
193
# File 'lib/process_bot/control_socket.rb', line 189

def clients_snapshot
  clients_mutex.synchronize do
    clients.dup
  end
end

#duplicate_id_error_message(id, duplicates) ⇒ Object



81
82
83
84
85
86
87
88
89
90
91
# File 'lib/process_bot/control_socket.rb', line 81

def duplicate_id_error_message(id, duplicates)
  details = duplicates.map { |entry| "PID #{entry[:pid]} on port #{entry[:port]}" }.join(", ")
  example_port = duplicates.first[:port]
  handler = options.fetch(:handler, "custom")
  release_path = options.fetch(:release_path, "/")

  "Another process_bot with id=#{id.inspect} is already running for this application (#{details}). " \
    "Stop it (e.g. `process_bot --command stop --port #{example_port} --id #{id} " \
    "--handler #{handler} --release-path #{release_path}`) " \
    "or kill that PID before starting a new instance."
end

#ensure_no_duplicate_id!Object

Prevent a second process_bot with the same ‘–id` under the same application from starting while the first is still alive. The `start_tcp_server` loop silently drifts to a free port when the requested one is in use; drift is intentional when unrelated process_bots share a host, but it’s a bug when a Capistrano deploy’s stop failed to clean up the previous release’s process_bot and the new release’s start drifts around the zombie. Scope the match by ‘application_basename` (derived from `release_path`) so that two unrelated apps on the same host can reuse a generic id like `sidekiq-main` without falsely blocking each other.



65
66
67
68
69
70
71
72
73
# File 'lib/process_bot/control_socket.rb', line 65

def ensure_no_duplicate_id!
  id = options[:id]
  return if id.nil? || id.to_s.strip.empty?

  duplicates = find_duplicate_id_entries(id.to_s, safe_application_basename)
  return if duplicates.empty?

  raise duplicate_id_error_message(id, duplicates)
end

#find_duplicate_id_entries(id, basename) ⇒ Object



75
76
77
78
79
# File 'lib/process_bot/control_socket.rb', line 75

def find_duplicate_id_entries(id, basename)
  running_process_bot_entries.select do |entry|
    entry[:id] == id && entry[:application_basename] == basename
  end
end

#handle_client(client) ⇒ Object

rubocop:disable Metrics/AbcSize



123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
# File 'lib/process_bot/control_socket.rb', line 123

def handle_client(client) # rubocop:disable Metrics/AbcSize
  add_client(client)

  loop do
    data = client.gets
    break if data.nil? # Client disconnected

    command = JSON.parse(data)
    command_type = command.fetch("command")

    if command_type == "graceful" || command_type == "graceful_no_wait" || command_type == "restart" || command_type == "stop"
      begin
        unless process.accept_control_commands?
          client.puts(JSON.generate(type: "error", message: "ProcessBot is shutting down", backtrace: Thread.current.backtrace))
          break
        end

        command_options = if command["options"]
          symbolize_keys(command.fetch("options"))
        else
          {}
        end

        process.with_control_command do
          run_command(command_type, command_options, client)
        end
      rescue => e # rubocop:disable Style/RescueStandardError
        logger.error e.message
        logger.error e.backtrace

        client.puts(JSON.generate(type: "error", message: e.message, backtrace: e.backtrace))

        raise e
      end
    else
      client.puts(JSON.generate(type: "error", message: "Unknown command: #{command_type}", backtrace: Thread.current.backtrace))
    end
  end
ensure
  remove_client(client)
  client.close unless client.closed?
end

#loggerObject



16
17
18
# File 'lib/process_bot/control_socket.rb', line 16

def logger
  @logger ||= ProcessBot::Logger.new(options: options)
end

#normalize_output(output) ⇒ Object



195
196
197
# File 'lib/process_bot/control_socket.rb', line 195

def normalize_output(output)
  output.to_s.encode("UTF-8", invalid: :replace, undef: :replace, replace: "?")
end

#remove_client(client) ⇒ Object



183
184
185
186
187
# File 'lib/process_bot/control_socket.rb', line 183

def remove_client(client)
  clients_mutex.synchronize do
    clients.delete(client)
  end
end

#run_client_loopObject



107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# File 'lib/process_bot/control_socket.rb', line 107

def run_client_loop
  Thread.new do
    loop do
      begin
        client = server.accept
      rescue IOError, Errno::EBADF
        break
      end

      Thread.new do
        handle_client(client)
      end
    end
  end
end

#run_command(command_type, command_options, client) ⇒ Object



210
211
212
213
214
215
# File 'lib/process_bot/control_socket.rb', line 210

def run_command(command_type, command_options, client)
  logger.logs "Command #{command_type} with options #{command_options}"

  process.__send__(command_type, **command_options)
  client.puts(JSON.generate(type: "success"))
end

#running_process_bot_entriesObject

Parsed ‘id:, pid:, port:` entries for every running process_bot visible to `ps`, extracted from each instance’s JSON process title.



224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
# File 'lib/process_bot/control_socket.rb', line 224

def running_process_bot_entries
  entries = []

  Knj::Unix_proc.list("grep" => "ProcessBot") do |process|
    process_command = process.data.fetch("cmd")
    match = process_command.match(/ProcessBot (\{.+\})/)
    next unless match

    begin
      process_data = JSON.parse(match[1])
    rescue JSON::ParserError
      next
    end

    pid = process.data["pid"] || process.pid
    entries << {
      application_basename: process_data["application_basename"],
      id: process_data["id"]&.to_s,
      pid: pid,
      port: process_data["port"]&.to_i
    }
  end

  entries
end

#safe_application_basenameObject



93
94
95
96
97
# File 'lib/process_bot/control_socket.rb', line 93

def safe_application_basename
  options.application_basename
rescue KeyError
  nil
end

#startObject



20
21
22
23
24
25
26
27
28
# File 'lib/process_bot/control_socket.rb', line 20

def start
  start_tcp_server
  run_client_loop
  logger.logs "TCPServer started"
  options.events.call(:on_socket_opened, port: @port)
  options.events.connect(:on_log) do |event_name, output:, type:|
    broadcast_log(event_name, output: output, type: type)
  end
end

#start_tcp_serverObject



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/process_bot/control_socket.rb', line 30

def start_tcp_server
  ensure_no_duplicate_id!

  used_ports = used_process_bot_ports
  attempts = 0

  loop do
    if used_ports.include?(@port)
      @port += 1
      next
    end

    attempts += 1
    @server = actually_start_tcp_server("localhost", @port)
    break
  rescue Errno::EADDRINUSE, Errno::EADDRNOTAVAIL => e
    if attempts <= 100
      @port += 1
      next
    else
      raise e
    end
  end
end

#stopObject



103
104
105
# File 'lib/process_bot/control_socket.rb', line 103

def stop
  server&.close
end

#symbolize_keys(hash) ⇒ Object



199
200
201
202
203
204
205
206
207
208
# File 'lib/process_bot/control_socket.rb', line 199

def symbolize_keys(hash)
  new_hash = {}
  hash.each do |key, value|
    next if key == "port"

    new_hash[key.to_sym] = value
  end

  new_hash
end

#used_process_bot_portsObject



217
218
219
# File 'lib/process_bot/control_socket.rb', line 217

def used_process_bot_ports
  running_process_bot_entries.filter_map { |entry| entry[:port] }.uniq
end