Class: ActiveJob::Temporal::MetricsServer

Inherits:
Object
  • Object
show all
Includes:
HttpLineReader
Defined in:
lib/activejob/temporal/metrics_server.rb

Constant Summary collapse

DEFAULT_BIND_ADDRESS =
"127.0.0.1"
READ_TIMEOUT_SECONDS =
1
CONTENT_TYPE =
"text/plain; version=0.0.4"
CONNECTION_WORKERS =
4
CONNECTION_QUEUE_SIZE =
16

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(port:, provider:, bind_address: DEFAULT_BIND_ADDRESS, allow_public_bind: false) ⇒ MetricsServer

Returns a new instance of MetricsServer.



23
24
25
26
27
28
29
30
# File 'lib/activejob/temporal/metrics_server.rb', line 23

def initialize(port:, provider:, bind_address: DEFAULT_BIND_ADDRESS, allow_public_bind: false)
  @requested_port = Integer(port)
  @bind_address = bind_address
  @allow_public_bind = allow_public_bind
  @provider = provider
  @running = false
  @mutex = Mutex.new
end

Instance Attribute Details

#bind_addressObject (readonly)

Returns the value of attribute bind_address.



21
22
23
# File 'lib/activejob/temporal/metrics_server.rb', line 21

def bind_address
  @bind_address
end

#portObject (readonly)

Returns the value of attribute port.



21
22
23
# File 'lib/activejob/temporal/metrics_server.rb', line 21

def port
  @port
end

Instance Method Details

#running?Boolean

Returns:

  • (Boolean)


75
76
77
# File 'lib/activejob/temporal/metrics_server.rb', line 75

def running?
  @mutex.synchronize { @running }
end

#startObject



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/activejob/temporal/metrics_server.rb', line 32

def start
  @mutex.synchronize do
    return self if @running

    BindPolicy.validate!(
      endpoint: "metrics",
      bind_address: bind_address,
      allow_public_bind: @allow_public_bind
    )
    @server = TCPServer.new(bind_address, @requested_port)
    @port = @server.addr[1]
    @connection_pool = ConnectionWorkerPool.new(
      size: CONNECTION_WORKERS,
      queue_size: CONNECTION_QUEUE_SIZE,
      name: "activejob-temporal-metrics"
    ) { |client| serve_client(client) }.start
    @running = true
    @thread = Thread.new { run }
  end

  self
end

#stopObject



55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/activejob/temporal/metrics_server.rb', line 55

def stop
  server = nil
  thread = nil
  connection_pool = nil

  @mutex.synchronize do
    server = @server
    thread = @thread
    connection_pool = @connection_pool
    @server = nil
    @thread = nil
    @connection_pool = nil
    @running = false
  end

  server&.close
  connection_pool&.stop(timeout: 2)
  thread&.join(2)
end