Class: Pgbus::Web::Streamer::Connection
- Inherits:
-
Object
- Object
- Pgbus::Web::Streamer::Connection
- Defined in:
- lib/pgbus/web/streamer/connection.rb
Overview
Wraps a single hijacked SSE client socket with its own cursor state, per-io mutex, and liveness flag. Owns no threads — the Dispatcher and Heartbeat threads call #enqueue / #write_comment on Connection instances directly, and the per-io mutex in IoWriter serialises concurrent writes.
Cursor semantics: ‘last_msg_id_sent` is strictly monotonic. `enqueue` filters envelopes with `msg_id > last_msg_id_sent` and advances the cursor only for envelopes that actually wrote successfully. This is the client-side leg of the replay-race fix (§6.5 of the design doc).
Instance Attribute Summary collapse
-
#context ⇒ Object
readonly
Returns the value of attribute context.
-
#id ⇒ Object
readonly
Returns the value of attribute id.
-
#io ⇒ Object
readonly
Returns the value of attribute io.
-
#last_msg_id_sent ⇒ Object
readonly
Returns the value of attribute last_msg_id_sent.
-
#mutex ⇒ Object
readonly
Returns the value of attribute mutex.
-
#stream_name ⇒ Object
readonly
Returns the value of attribute stream_name.
Instance Method Summary collapse
-
#close ⇒ Object
Idempotent socket close for use by Instance#shutdown! and the heartbeat idle reaper.
- #dead? ⇒ Boolean
- #enqueue(envelopes) ⇒ Object
- #idle_for ⇒ Object
-
#initialize(id:, stream_name:, io:, since_id:, writer:, write_deadline_ms:, context: nil) ⇒ Connection
constructor
A new instance of Connection.
- #mark_dead! ⇒ Object
- #write_comment(text) ⇒ Object
- #write_sentinel(bytes) ⇒ Object
Constructor Details
#initialize(id:, stream_name:, io:, since_id:, writer:, write_deadline_ms:, context: nil) ⇒ Connection
Returns a new instance of Connection.
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/pgbus/web/streamer/connection.rb', line 18 def initialize(id:, stream_name:, io:, since_id:, writer:, write_deadline_ms:, context: nil) @id = id @stream_name = stream_name @io = io @last_msg_id_sent = since_id.to_i @writer = writer @write_deadline_ms = write_deadline_ms @mutex = Mutex.new @dead = false @closed = false @created_at = monotonic @last_write_at = @created_at # Context is whatever the StreamApp's authorize hook returned # (a truthy non-boolean value). Typically a user model or a # session hash. The Dispatcher passes it to the Filters # registry when evaluating visible_to predicates. Defaults to # nil for tests that don't need audience filtering. @context = context end |
Instance Attribute Details
#context ⇒ Object (readonly)
Returns the value of attribute context.
16 17 18 |
# File 'lib/pgbus/web/streamer/connection.rb', line 16 def context @context end |
#id ⇒ Object (readonly)
Returns the value of attribute id.
16 17 18 |
# File 'lib/pgbus/web/streamer/connection.rb', line 16 def id @id end |
#io ⇒ Object (readonly)
Returns the value of attribute io.
16 17 18 |
# File 'lib/pgbus/web/streamer/connection.rb', line 16 def io @io end |
#last_msg_id_sent ⇒ Object (readonly)
Returns the value of attribute last_msg_id_sent.
16 17 18 |
# File 'lib/pgbus/web/streamer/connection.rb', line 16 def last_msg_id_sent @last_msg_id_sent end |
#mutex ⇒ Object (readonly)
Returns the value of attribute mutex.
16 17 18 |
# File 'lib/pgbus/web/streamer/connection.rb', line 16 def mutex @mutex end |
#stream_name ⇒ Object (readonly)
Returns the value of attribute stream_name.
16 17 18 |
# File 'lib/pgbus/web/streamer/connection.rb', line 16 def stream_name @stream_name end |
Instance Method Details
#close ⇒ Object
Idempotent socket close for use by Instance#shutdown! and the heartbeat idle reaper. Wraps the respond_to? / closed? dance so callers don’t need to know about StringIO-in-tests vs real Socket-in-prod or about the mark_dead! ordering.
Takes the same mutex as IoWriter.write so it can’t fire mid-write — otherwise the write loop could hit a half-closed socket and corrupt the ‘last_msg_id_sent` cursor by marking the connection dead between successful writes. The rescue narrows to IO-related exceptions; unrelated errors (bugs in the fake IO used by tests, nil-dereferences, etc.) should still propagate so the test suite catches them.
101 102 103 104 105 106 107 108 109 110 111 112 113 |
# File 'lib/pgbus/web/streamer/connection.rb', line 101 def close @mutex.synchronize do return if @closed @closed = true mark_dead! return unless @io.respond_to?(:close) @io.close unless @io.respond_to?(:closed?) && @io.closed? end rescue IOError, SystemCallError => e Pgbus.logger&.debug { "[Pgbus::Streamer::Connection] close failed: #{e.class}: #{e.}" } end |
#dead? ⇒ Boolean
81 82 83 |
# File 'lib/pgbus/web/streamer/connection.rb', line 81 def dead? @dead end |
#enqueue(envelopes) ⇒ Object
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/pgbus/web/streamer/connection.rb', line 38 def enqueue(envelopes) written = [] envelopes.each do |envelope| next if envelope.msg_id <= @last_msg_id_sent bytes = Pgbus::Streams::Envelope.( id: envelope.msg_id, event: "turbo-stream", data: envelope.payload ) result = @writer.write(self, bytes, deadline_ms: @write_deadline_ms) if result == :ok @last_msg_id_sent = envelope.msg_id @last_write_at = monotonic written << envelope else mark_dead! break end end written end |
#idle_for ⇒ Object
77 78 79 |
# File 'lib/pgbus/web/streamer/connection.rb', line 77 def idle_for monotonic - @last_write_at end |
#mark_dead! ⇒ Object
85 86 87 |
# File 'lib/pgbus/web/streamer/connection.rb', line 85 def mark_dead! @dead = true end |
#write_comment(text) ⇒ Object
62 63 64 65 66 67 68 69 70 71 |
# File 'lib/pgbus/web/streamer/connection.rb', line 62 def write_comment(text) bytes = Pgbus::Streams::Envelope.comment(text) result = @writer.write(self, bytes, deadline_ms: @write_deadline_ms) if result == :ok @last_write_at = monotonic else mark_dead! end result end |
#write_sentinel(bytes) ⇒ Object
73 74 75 |
# File 'lib/pgbus/web/streamer/connection.rb', line 73 def write_sentinel(bytes) @writer.write(self, bytes, deadline_ms: @write_deadline_ms) end |