Class: Pgbus::Web::Streamer::StreamCounter

Inherits:
Object
  • Object
show all
Defined in:
lib/pgbus/web/streamer/stream_counter.rb

Instance Method Summary collapse

Constructor Details

#initializeStreamCounter

Returns a new instance of StreamCounter.



9
10
11
# File 'lib/pgbus/web/streamer/stream_counter.rb', line 9

def initialize
  @streams = Concurrent::Map.new
end

Instance Method Details

#active_connections(stream_name) ⇒ Object



34
35
36
37
38
39
# File 'lib/pgbus/web/streamer/stream_counter.rb', line 34

def active_connections(stream_name)
  entry = @streams[stream_name]
  return 0 unless entry

  [entry[:active_connections].value, 0].max
end

#broadcasts(stream_name) ⇒ Object



29
30
31
32
# File 'lib/pgbus/web/streamer/stream_counter.rb', line 29

def broadcasts(stream_name)
  entry = @streams[stream_name]
  entry ? entry[:broadcasts].value : 0
end

#decrement_connections(stream_name) ⇒ Object



21
22
23
# File 'lib/pgbus/web/streamer/stream_counter.rb', line 21

def decrement_connections(stream_name)
  counters_for(stream_name)[:active_connections].decrement
end

#increment_broadcasts(stream_name) ⇒ Object



13
14
15
# File 'lib/pgbus/web/streamer/stream_counter.rb', line 13

def increment_broadcasts(stream_name)
  counters_for(stream_name)[:broadcasts].increment
end

#increment_connections(stream_name) ⇒ Object



17
18
19
# File 'lib/pgbus/web/streamer/stream_counter.rb', line 17

def increment_connections(stream_name)
  counters_for(stream_name)[:active_connections].increment
end

#increment_total_connections(stream_name) ⇒ Object



25
26
27
# File 'lib/pgbus/web/streamer/stream_counter.rb', line 25

def increment_total_connections(stream_name)
  counters_for(stream_name)[:total_connections].increment
end

#snapshotObject



46
47
48
49
50
51
52
53
54
55
56
# File 'lib/pgbus/web/streamer/stream_counter.rb', line 46

def snapshot
  result = {}
  @streams.each_pair do |name, counters|
    result[name] = {
      broadcasts: counters[:broadcasts].value,
      active_connections: [counters[:active_connections].value, 0].max,
      total_connections: counters[:total_connections].value
    }
  end
  result
end

#total_connections(stream_name) ⇒ Object



41
42
43
44
# File 'lib/pgbus/web/streamer/stream_counter.rb', line 41

def total_connections(stream_name)
  entry = @streams[stream_name]
  entry ? entry[:total_connections].value : 0
end

#totalsObject



58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/pgbus/web/streamer/stream_counter.rb', line 58

def totals
  total_broadcasts = 0
  total_active = 0
  total_conns = 0
  stream_count = 0

  @streams.each_pair do |_name, counters|
    total_broadcasts += counters[:broadcasts].value
    total_active += [counters[:active_connections].value, 0].max
    total_conns += counters[:total_connections].value
    stream_count += 1
  end

  {
    broadcasts: total_broadcasts,
    active_connections: total_active,
    total_connections: total_conns,
    streams: stream_count
  }
end