Class: Pgbus::Web::Streamer::Connection

Inherits:
Object
  • Object
show all
Defined in:
lib/pgbus/web/streamer/connection.rb

Overview

Wraps a single hijacked SSE client socket with its own cursor state, per-io mutex, and liveness flag. Owns no threads — the Dispatcher and Heartbeat threads call #enqueue / #write_comment on Connection instances directly, and the per-io mutex in IoWriter serialises concurrent writes.

Cursor semantics: ‘last_msg_id_sent` is strictly monotonic. `enqueue` filters envelopes with `msg_id > last_msg_id_sent` and advances the cursor only for envelopes that actually wrote successfully. This is the client-side leg of the replay-race fix (§6.5 of the design doc).

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(id:, stream_name:, io:, since_id:, writer:, write_deadline_ms:, context: nil) ⇒ Connection

Returns a new instance of Connection.



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/pgbus/web/streamer/connection.rb', line 18

def initialize(id:, stream_name:, io:, since_id:, writer:, write_deadline_ms:, context: nil)
  @id = id
  @stream_name = stream_name
  @io = io
  @last_msg_id_sent = since_id.to_i
  @writer = writer
  @write_deadline_ms = write_deadline_ms
  @mutex = Mutex.new
  @dead = false
  @closed = false
  @created_at = monotonic
  @last_write_at = @created_at
  # Context is whatever the StreamApp's authorize hook returned
  # (a truthy non-boolean value). Typically a user model or a
  # session hash. The Dispatcher passes it to the Filters
  # registry when evaluating visible_to predicates. Defaults to
  # nil for tests that don't need audience filtering.
  @context = context
end

Instance Attribute Details

#contextObject (readonly)

Returns the value of attribute context.



16
17
18
# File 'lib/pgbus/web/streamer/connection.rb', line 16

def context
  @context
end

#idObject (readonly)

Returns the value of attribute id.



16
17
18
# File 'lib/pgbus/web/streamer/connection.rb', line 16

def id
  @id
end

#ioObject (readonly)

Returns the value of attribute io.



16
17
18
# File 'lib/pgbus/web/streamer/connection.rb', line 16

def io
  @io
end

#last_msg_id_sentObject (readonly)

Returns the value of attribute last_msg_id_sent.



16
17
18
# File 'lib/pgbus/web/streamer/connection.rb', line 16

def last_msg_id_sent
  @last_msg_id_sent
end

#mutexObject (readonly)

Returns the value of attribute mutex.



16
17
18
# File 'lib/pgbus/web/streamer/connection.rb', line 16

def mutex
  @mutex
end

#stream_nameObject (readonly)

Returns the value of attribute stream_name.



16
17
18
# File 'lib/pgbus/web/streamer/connection.rb', line 16

def stream_name
  @stream_name
end

Instance Method Details

#closeObject

Idempotent socket close for use by Instance#shutdown! and the heartbeat idle reaper. Wraps the respond_to? / closed? dance so callers don’t need to know about StringIO-in-tests vs real Socket-in-prod or about the mark_dead! ordering.

Takes the same mutex as IoWriter.write so it can’t fire mid-write — otherwise the write loop could hit a half-closed socket and corrupt the ‘last_msg_id_sent` cursor by marking the connection dead between successful writes. The rescue narrows to IO-related exceptions; unrelated errors (bugs in the fake IO used by tests, nil-dereferences, etc.) should still propagate so the test suite catches them.



101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/pgbus/web/streamer/connection.rb', line 101

def close
  @mutex.synchronize do
    return if @closed

    @closed = true
    mark_dead!
    return unless @io.respond_to?(:close)

    @io.close unless @io.respond_to?(:closed?) && @io.closed?
  end
rescue IOError, SystemCallError => e
  Pgbus.logger&.debug { "[Pgbus::Streamer::Connection] close failed: #{e.class}: #{e.message}" }
end

#dead?Boolean

Returns:

  • (Boolean)


81
82
83
# File 'lib/pgbus/web/streamer/connection.rb', line 81

def dead?
  @dead
end

#enqueue(envelopes) ⇒ Object



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/pgbus/web/streamer/connection.rb', line 38

def enqueue(envelopes)
  written = []
  envelopes.each do |envelope|
    next if envelope.msg_id <= @last_msg_id_sent

    bytes = Pgbus::Streams::Envelope.message(
      id: envelope.msg_id,
      event: "turbo-stream",
      data: envelope.payload
    )

    result = @writer.write(self, bytes, deadline_ms: @write_deadline_ms)
    if result == :ok
      @last_msg_id_sent = envelope.msg_id
      @last_write_at = monotonic
      written << envelope
    else
      mark_dead!
      break
    end
  end
  written
end

#idle_forObject



77
78
79
# File 'lib/pgbus/web/streamer/connection.rb', line 77

def idle_for
  monotonic - @last_write_at
end

#mark_dead!Object



85
86
87
# File 'lib/pgbus/web/streamer/connection.rb', line 85

def mark_dead!
  @dead = true
end

#write_comment(text) ⇒ Object



62
63
64
65
66
67
68
69
70
71
# File 'lib/pgbus/web/streamer/connection.rb', line 62

def write_comment(text)
  bytes = Pgbus::Streams::Envelope.comment(text)
  result = @writer.write(self, bytes, deadline_ms: @write_deadline_ms)
  if result == :ok
    @last_write_at = monotonic
  else
    mark_dead!
  end
  result
end

#write_sentinel(bytes) ⇒ Object



73
74
75
# File 'lib/pgbus/web/streamer/connection.rb', line 73

def write_sentinel(bytes)
  @writer.write(self, bytes, deadline_ms: @write_deadline_ms)
end