Module: Rubyists::Leopard::MetricsServer

Included in:
NatsApiServer::ClassMethods
Defined in:
lib/leopard/metrics_server.rb

Overview

Adds a minimal Prometheus HTTP endpoint for Leopard worker metrics.

Instance Method Summary collapse

Instance Method Details

#accumulate_worker_metrics(worker, worker_index, subject_metrics) ⇒ void (private)

This method returns an undefined value.

Appends one worker’s per-subject slot metrics to the subject_metrics array.

Parameters:

  • worker (Object)

    A Leopard worker instance.

  • worker_index (Integer)

    Position of this worker in the workers array.

  • subject_metrics (Array<Hash>)

    Accumulator for per-worker-per-subject metric rows.



108
109
110
111
112
113
114
115
116
# File 'lib/leopard/metrics_server.rb', line 108

def accumulate_worker_metrics(worker, worker_index, subject_metrics)
  service = worker.instance_variable_get(:@service)
  return unless service

  service.endpoints.each do |endpoint|
    row = endpoint_subject_metrics(endpoint, worker_index)
    subject_metrics << row if row
  end
end

#close_client(client) ⇒ void (private)

This method returns an undefined value.

Closes a metrics client socket, ignoring cleanup failures.

Parameters:

  • client (TCPSocket)

    The connected HTTP client.



49
50
51
52
53
# File 'lib/leopard/metrics_server.rb', line 49

def close_client(client)
  client.close
rescue StandardError
  nil
end

#collect_prometheus_metrics(workers) ⇒ Hash{Symbol => Object} (private)

Aggregates per-worker, per-subject saturation metrics and per-worker executor metrics.

Parameters:

  • workers (Array<Object>)

    Active Leopard worker instances to observe.

Returns:

  • (Hash{Symbol => Object})

    Metric hashes for the Prometheus template.



88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/leopard/metrics_server.rb', line 88

def collect_prometheus_metrics(workers)
  subject_metrics = []
  executors       = []

  workers.each_with_index do |w, i|
    accumulate_worker_metrics(w, i, subject_metrics)
    ex = w.instance_variable_get(:@client)&.subscription_executor
    executors << { worker: i, executor: ex } if ex
  end

  { subject_metrics:, executors: }
end

#endpoint_subject_metrics(endpoint, worker_index) ⇒ Hash? (private)

Builds a per-worker-per-subject metric row from a single endpoint, or nil if not yet active.

Parameters:

  • endpoint (Object)

    A NATS service endpoint.

  • worker_index (Integer)

    Position of the owning worker in the workers array.

Returns:

  • (Hash, nil)


124
125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/leopard/metrics_server.rb', line 124

def endpoint_subject_metrics(endpoint, worker_index)
  # TODO: use endpoint.handler once nats-pure.rb adds attr_reader :handler to NATS::Service::Endpoint
  sub = endpoint.instance_variable_get(:@handler)
  return unless sub

  concurrency = sub.instance_variable_get(:@processing_concurrency).to_i
  {
    worker: worker_index,
    subject: endpoint.subject.to_s,
    busy_slots: concurrency - sub.concurrency_semaphore.available_permits,
    capacity_slots: concurrency,
    pending: sub.pending_queue&.size.to_i,
  }
end

#handle_metrics_client(client, workers) ⇒ void (private)

This method returns an undefined value.

Handles an individual metrics HTTP client connection.

Parameters:

  • client (TCPSocket)

    The connected HTTP client.

  • workers (Array<Object>)

    Active Leopard worker instances to observe.



34
35
36
37
38
39
40
41
42
# File 'lib/leopard/metrics_server.rb', line 34

def handle_metrics_client(client, workers)
  request_line = client.gets
  loop { break if (client.gets || '').chomp.empty? }
  write_metrics_response(client, request_line, workers)
rescue StandardError => e
  logger.warn "Metrics request error: #{e.message}"
ensure
  close_client(client)
end

#metrics_template_pathString (private)

Returns the absolute path to the Prometheus metrics template.

Returns:

  • (String)

    The metrics template path.



151
152
153
# File 'lib/leopard/metrics_server.rb', line 151

def metrics_template_path
  File.expand_path('templates/prometheus_metrics.erb', __dir__)
end

#prometheus_metrics(workers) ⇒ String (private)

Builds the Prometheus metrics payload for the current worker state.

Parameters:

  • workers (Array<Object>)

    Active Leopard worker instances to observe.

Returns:

  • (String)

    Rendered Prometheus text exposition output.



78
79
80
81
# File 'lib/leopard/metrics_server.rb', line 78

def prometheus_metrics(workers)
  metrics = collect_prometheus_metrics(workers)
  render_metrics_template(metrics)
end

#render_metrics_template(metrics) ⇒ String (private)

Renders the metrics ERB template with aggregated metric data.

Parameters:

  • metrics (Hash{Symbol => Object})

    Aggregated metric data for template rendering.

Returns:

  • (String)

    The rendered Prometheus payload.



144
145
146
# File 'lib/leopard/metrics_server.rb', line 144

def render_metrics_template(metrics)
  ERB.new(File.read(metrics_template_path), trim_mode: '-').result_with_hash(metrics)
end

#start_metrics_server(workers) ⇒ Thread (private)

Starts a lightweight HTTP server that exposes Leopard Prometheus metrics.

Parameters:

  • workers (Array<Object>)

    Active Leopard worker instances to observe.

Returns:

  • (Thread)

    The server thread.



17
18
19
20
21
22
23
24
25
26
# File 'lib/leopard/metrics_server.rb', line 17

def start_metrics_server(workers)
  port = ENV.fetch('LEOPARD_METRICS_PORT', '9394').to_i
  Thread.new do
    server = TCPServer.new(port)
    logger.info "Metrics server listening on :#{port}"
    loop { Thread.new(server.accept) { |client| handle_metrics_client(client, workers) } }
  rescue StandardError => e
    logger.error "Metrics server error: #{e.message}"
  end
end

#write_metrics_response(client, request_line, workers) ⇒ void (private)

This method returns an undefined value.

Writes the HTTP response for a metrics request.

Parameters:

  • client (TCPSocket)

    The connected HTTP client.

  • request_line (String, nil)

    The first line of the HTTP request.

  • workers (Array<Object>)

    Active Leopard worker instances to observe.



62
63
64
65
66
67
68
69
70
71
# File 'lib/leopard/metrics_server.rb', line 62

def write_metrics_response(client, request_line, workers)
  if request_line&.start_with?('GET /metrics')
    body = prometheus_metrics(workers)
    client.write "HTTP/1.1 200 OK\r\n" \
                 "Content-Type: text/plain; version=0.0.4\r\n" \
                 "Content-Length: #{body.bytesize}\r\n\r\n#{body}"
  else
    client.write "HTTP/1.1 404 Not Found\r\nContent-Length: 0\r\n\r\n"
  end
end