Class: Pgbus::Web::Streamer::Registry

Inherits:
Object
  • Object
show all
Defined in:
lib/pgbus/web/streamer/registry.rb

Overview

Worker-local in-memory registry of SSE connections indexed by stream name and connection id. Thread-safe via a single mutex. Reads return snapshots so iterators never hold the lock.

Registry operations are O(1) under contention (mutex-protected hash lookups), and iteration is O(n) over a snapshot. The data structure is deliberately boring — the interesting parts of the streamer (LISTEN multiplexing, write scheduling, replay race handling) live elsewhere.

Instance Method Summary collapse

Constructor Details

#initializeRegistry

Returns a new instance of Registry.



15
16
17
18
19
# File 'lib/pgbus/web/streamer/registry.rb', line 15

def initialize
  @mutex = Mutex.new
  @by_stream = {}
  @by_id = {}
end

Instance Method Details

#connections_for(stream_name) ⇒ Object

Returns a snapshot Array of connections on the given stream. Mutating the result has no effect on the registry.



59
60
61
62
63
64
# File 'lib/pgbus/web/streamer/registry.rb', line 59

def connections_for(stream_name)
  @mutex.synchronize do
    set = @by_stream[stream_name]
    set ? set.to_a : []
  end
end

#each_connectionObject

Yields every registered connection across all streams. The iteration walks a snapshot so the block may safely call back into register/ unregister without risk of deadlock or skipped items.



85
86
87
88
# File 'lib/pgbus/web/streamer/registry.rb', line 85

def each_connection(&)
  snapshot = @mutex.synchronize { @by_id.values.dup }
  snapshot.each(&)
end

#empty?(stream_name) ⇒ Boolean

Returns:

  • (Boolean)


71
72
73
74
75
76
# File 'lib/pgbus/web/streamer/registry.rb', line 71

def empty?(stream_name)
  @mutex.synchronize do
    set = @by_stream[stream_name]
    set.nil? || set.empty?
  end
end

#lookup(id) ⇒ Object



53
54
55
# File 'lib/pgbus/web/streamer/registry.rb', line 53

def lookup(id)
  @mutex.synchronize { @by_id[id] }
end

#register(connection) ⇒ Object



21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/pgbus/web/streamer/registry.rb', line 21

def register(connection)
  @mutex.synchronize do
    existing = @by_id[connection.id]
    return if existing.equal?(connection)

    # Registration of a new object under an existing id is a
    # defensive path — SecureRandom.hex(8) collisions are
    # astronomical, but the Registry's invariant is "@by_stream
    # only contains objects that are also in @by_id", so we
    # must scrub the old entry from its stream index before
    # overwriting. Otherwise connections_for(stream) would
    # return orphaned objects and writes would go nowhere.
    evict_from_stream(existing) if existing

    @by_id[connection.id] = connection
    (@by_stream[connection.stream_name] ||= Set.new).add(connection)
  end
end

#sizeObject



78
79
80
# File 'lib/pgbus/web/streamer/registry.rb', line 78

def size
  @mutex.synchronize { @by_id.size }
end

#streamsObject

Snapshot list of stream names with at least one registered connection.



67
68
69
# File 'lib/pgbus/web/streamer/registry.rb', line 67

def streams
  @mutex.synchronize { @by_stream.keys.dup }
end

#unregister(connection) ⇒ Object



40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/pgbus/web/streamer/registry.rb', line 40

def unregister(connection)
  @mutex.synchronize do
    existing = @by_id.delete(connection.id)
    return unless existing

    set = @by_stream[existing.stream_name]
    next unless set

    set.delete(existing)
    @by_stream.delete(existing.stream_name) if set.empty?
  end
end