Class: Pgbus::Web::Streamer::FalconConnection
- Inherits:
-
Object
- Object
- Pgbus::Web::Streamer::FalconConnection
- Defined in:
- lib/pgbus/web/streamer/falcon_connection.rb
Overview
Connection adapter for Falcon’s native streaming body path. Wraps a Protocol::HTTP::Body::Writable instead of a raw IO socket.
Satisfies the same duck-type interface as Connection so the Dispatcher, Heartbeat, and Instance can use either type interchangeably. Key difference: no IoWriter — writes go directly to body.write which is backed by Thread::Queue and is fiber-safe under Falcon’s scheduler.
Instance Attribute Summary collapse
-
#context ⇒ Object
readonly
Returns the value of attribute context.
-
#id ⇒ Object
readonly
Returns the value of attribute id.
-
#io ⇒ Object
readonly
Returns the value of attribute io.
-
#last_msg_id_sent ⇒ Object
readonly
Returns the value of attribute last_msg_id_sent.
-
#mutex ⇒ Object
readonly
Returns the value of attribute mutex.
-
#stream_name ⇒ Object
readonly
Returns the value of attribute stream_name.
Instance Method Summary collapse
- #close ⇒ Object
- #dead? ⇒ Boolean
- #enqueue(envelopes) ⇒ Object
- #idle_for ⇒ Object
-
#initialize(id:, stream_name:, body:, since_id:, write_deadline_ms:, context: nil) ⇒ FalconConnection
constructor
A new instance of FalconConnection.
- #mark_dead! ⇒ Object
- #write_comment(text) ⇒ Object
- #write_sentinel(bytes) ⇒ Object
Constructor Details
#initialize(id:, stream_name:, body:, since_id:, write_deadline_ms:, context: nil) ⇒ FalconConnection
Returns a new instance of FalconConnection.
17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
# File 'lib/pgbus/web/streamer/falcon_connection.rb', line 17 def initialize(id:, stream_name:, body:, since_id:, write_deadline_ms:, context: nil) @id = id @stream_name = stream_name @body = body @io = body @last_msg_id_sent = since_id.to_i @write_deadline_ms = write_deadline_ms @mutex = Mutex.new @dead = false @closed = false @created_at = monotonic @last_write_at = @created_at @context = context end |
Instance Attribute Details
#context ⇒ Object (readonly)
Returns the value of attribute context.
15 16 17 |
# File 'lib/pgbus/web/streamer/falcon_connection.rb', line 15 def context @context end |
#id ⇒ Object (readonly)
Returns the value of attribute id.
15 16 17 |
# File 'lib/pgbus/web/streamer/falcon_connection.rb', line 15 def id @id end |
#io ⇒ Object (readonly)
Returns the value of attribute io.
15 16 17 |
# File 'lib/pgbus/web/streamer/falcon_connection.rb', line 15 def io @io end |
#last_msg_id_sent ⇒ Object (readonly)
Returns the value of attribute last_msg_id_sent.
15 16 17 |
# File 'lib/pgbus/web/streamer/falcon_connection.rb', line 15 def last_msg_id_sent @last_msg_id_sent end |
#mutex ⇒ Object (readonly)
Returns the value of attribute mutex.
15 16 17 |
# File 'lib/pgbus/web/streamer/falcon_connection.rb', line 15 def mutex @mutex end |
#stream_name ⇒ Object (readonly)
Returns the value of attribute stream_name.
15 16 17 |
# File 'lib/pgbus/web/streamer/falcon_connection.rb', line 15 def stream_name @stream_name end |
Instance Method Details
#close ⇒ Object
71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/pgbus/web/streamer/falcon_connection.rb', line 71 def close @mutex.synchronize do return if @closed @closed = true mark_dead! @body.close_write end rescue StandardError => e Pgbus.logger&.debug { "[Pgbus::Streamer::FalconConnection] close failed: #{e.class}: #{e.}" } end |
#dead? ⇒ Boolean
87 88 89 |
# File 'lib/pgbus/web/streamer/falcon_connection.rb', line 87 def dead? @dead || @body.closed? end |
#enqueue(envelopes) ⇒ Object
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/pgbus/web/streamer/falcon_connection.rb', line 32 def enqueue(envelopes) written = [] envelopes.each do |envelope| next if envelope.msg_id <= @last_msg_id_sent bytes = Pgbus::Streams::Envelope.( id: envelope.msg_id, event: "turbo-stream", data: envelope.payload ) result = write_to_body(bytes) if result == :ok @last_msg_id_sent = envelope.msg_id @last_write_at = monotonic written << envelope else mark_dead! break end end written end |
#idle_for ⇒ Object
83 84 85 |
# File 'lib/pgbus/web/streamer/falcon_connection.rb', line 83 def idle_for monotonic - @last_write_at end |
#mark_dead! ⇒ Object
91 92 93 |
# File 'lib/pgbus/web/streamer/falcon_connection.rb', line 91 def mark_dead! @dead = true end |
#write_comment(text) ⇒ Object
56 57 58 59 60 61 62 63 64 65 |
# File 'lib/pgbus/web/streamer/falcon_connection.rb', line 56 def write_comment(text) bytes = Pgbus::Streams::Envelope.comment(text) result = write_to_body(bytes) if result == :ok @last_write_at = monotonic else mark_dead! end result end |
#write_sentinel(bytes) ⇒ Object
67 68 69 |
# File 'lib/pgbus/web/streamer/falcon_connection.rb', line 67 def write_sentinel(bytes) write_to_body(bytes) end |