Module: Rubyists::Leopard::MetricsServer

Included in:
NatsApiServer::ClassMethods
Defined in:
lib/leopard/metrics_server.rb

Overview

Adds a minimal Prometheus HTTP endpoint for Leopard worker metrics.

Instance Method Summary collapse

Instance Method Details

#accumulate_worker_metrics(worker, busy, pending) ⇒ void (private)

This method returns an undefined value.

Adds one worker’s endpoint saturation metrics to the aggregate hashes.

Parameters:

  • worker (Object)

    A Leopard worker instance.

  • busy (Hash{String => Integer})

    Subject-to-busy-worker counts.

  • pending (Hash{String => Integer})

    Subject-to-pending-message counts.



107
108
109
110
111
112
113
114
115
116
117
118
119
120
# File 'lib/leopard/metrics_server.rb', line 107

def accumulate_worker_metrics(worker, busy, pending)
  service = worker.instance_variable_get(:@service)
  return unless service

  service.endpoints.each do |ep|
    # TODO: use ep.handler once nats-pure.rb adds attr_reader :handler to NATS::Service::Endpoint
    sub = ep.instance_variable_get(:@handler)
    next unless sub

    subj = ep.subject.to_s
    busy[subj]    += sub.concurrency_semaphore.available_permits.zero? ? 1 : 0
    pending[subj] += sub.pending_queue&.size.to_i
  end
end

#close_client(client) ⇒ void (private)

This method returns an undefined value.

Closes a metrics client socket, ignoring cleanup failures.

Parameters:

  • client (TCPSocket)

    The connected HTTP client.



49
50
51
52
53
# File 'lib/leopard/metrics_server.rb', line 49

def close_client(client)
  client.close
rescue StandardError
  nil
end

#collect_prometheus_metrics(workers) ⇒ Hash{Symbol => Object} (private)

Aggregates per-subject worker utilization metrics.

Parameters:

  • workers (Array<Object>)

    Active Leopard worker instances to observe.

Returns:

  • (Hash{Symbol => Object})

    Metric hashes for the Prometheus template.



88
89
90
91
92
93
94
95
96
97
98
# File 'lib/leopard/metrics_server.rb', line 88

def collect_prometheus_metrics(workers)
  busy    = Hash.new(0)
  pending = Hash.new(0)
  workers.each { |w| accumulate_worker_metrics(w, busy, pending) }
  {
    busy:,
    pending:,
    subjects: (busy.keys | pending.keys).sort,
    total: workers.size,
  }
end

#handle_metrics_client(client, workers) ⇒ void (private)

This method returns an undefined value.

Handles an individual metrics HTTP client connection.

Parameters:

  • client (TCPSocket)

    The connected HTTP client.

  • workers (Array<Object>)

    Active Leopard worker instances to observe.



34
35
36
37
38
39
40
41
42
# File 'lib/leopard/metrics_server.rb', line 34

def handle_metrics_client(client, workers)
  request_line = client.gets
  loop { break if (client.gets || '').chomp.empty? }
  write_metrics_response(client, request_line, workers)
rescue StandardError => e
  logger.warn "Metrics request error: #{e.message}"
ensure
  close_client(client)
end

#metrics_template_pathString (private)

Returns the absolute path to the Prometheus metrics template.

Returns:

  • (String)

    The metrics template path.



134
135
136
# File 'lib/leopard/metrics_server.rb', line 134

def metrics_template_path
  File.expand_path('templates/prometheus_metrics.erb', __dir__)
end

#prometheus_metrics(workers) ⇒ String (private)

Builds the Prometheus metrics payload for the current worker state.

Parameters:

  • workers (Array<Object>)

    Active Leopard worker instances to observe.

Returns:

  • (String)

    Rendered Prometheus text exposition output.



78
79
80
81
# File 'lib/leopard/metrics_server.rb', line 78

def prometheus_metrics(workers)
  metrics = collect_prometheus_metrics(workers)
  render_metrics_template(metrics)
end

#render_metrics_template(metrics) ⇒ String (private)

Renders the metrics ERB template with aggregated metric data.

Parameters:

  • metrics (Hash{Symbol => Object})

    Aggregated metric data for template rendering.

Returns:

  • (String)

    The rendered Prometheus payload.



127
128
129
# File 'lib/leopard/metrics_server.rb', line 127

def render_metrics_template(metrics)
  ERB.new(File.read(metrics_template_path), trim_mode: '-').result_with_hash(metrics)
end

#start_metrics_server(workers) ⇒ Thread (private)

Starts a lightweight HTTP server that exposes Leopard Prometheus metrics.

Parameters:

  • workers (Array<Object>)

    Active Leopard worker instances to observe.

Returns:

  • (Thread)

    The server thread.



17
18
19
20
21
22
23
24
25
26
# File 'lib/leopard/metrics_server.rb', line 17

def start_metrics_server(workers)
  port = ENV.fetch('LEOPARD_METRICS_PORT', '9394').to_i
  Thread.new do
    server = TCPServer.new(port)
    logger.info "Metrics server listening on :#{port}"
    loop { Thread.new(server.accept) { |client| handle_metrics_client(client, workers) } }
  rescue StandardError => e
    logger.error "Metrics server error: #{e.message}"
  end
end

#write_metrics_response(client, request_line, workers) ⇒ void (private)

This method returns an undefined value.

Writes the HTTP response for a metrics request.

Parameters:

  • client (TCPSocket)

    The connected HTTP client.

  • request_line (String, nil)

    The first line of the HTTP request.

  • workers (Array<Object>)

    Active Leopard worker instances to observe.



62
63
64
65
66
67
68
69
70
71
# File 'lib/leopard/metrics_server.rb', line 62

def write_metrics_response(client, request_line, workers)
  if request_line&.start_with?('GET /metrics')
    body = prometheus_metrics(workers)
    client.write "HTTP/1.1 200 OK\r\n" \
                 "Content-Type: text/plain; version=0.0.4\r\n" \
                 "Content-Length: #{body.bytesize}\r\n\r\n#{body}"
  else
    client.write "HTTP/1.1 404 Not Found\r\nContent-Length: 0\r\n\r\n"
  end
end