Class: Pgbus::Web::Streamer::FalconConnection
- Inherits:
-
Object
- Object
- Pgbus::Web::Streamer::FalconConnection
- Defined in:
- lib/pgbus/web/streamer/falcon_connection.rb
Overview
Connection adapter for Falcon’s native streaming body path. Wraps a Protocol::HTTP::Body::Writable instead of a raw IO socket.
Satisfies the same duck-type interface as Connection so the Dispatcher, Heartbeat, and Instance can use either type interchangeably. Key difference: no IoWriter — writes go directly to body.write which is backed by Thread::Queue and is fiber-safe under Falcon’s scheduler.
Instance Attribute Summary collapse
-
#context ⇒ Object
readonly
Returns the value of attribute context.
-
#id ⇒ Object
readonly
Returns the value of attribute id.
-
#io ⇒ Object
readonly
Returns the value of attribute io.
-
#last_msg_id_sent ⇒ Object
readonly
Returns the value of attribute last_msg_id_sent.
-
#mutex ⇒ Object
readonly
Returns the value of attribute mutex.
-
#presence_member ⇒ Object
Returns the value of attribute presence_member.
-
#stream_name ⇒ Object
readonly
Returns the value of attribute stream_name.
Instance Method Summary collapse
- #close ⇒ Object
- #dead? ⇒ Boolean
- #enqueue(envelopes) ⇒ Object
- #idle_for ⇒ Object
-
#initialize(id:, stream_name:, body:, since_id:, write_deadline_ms:, context: nil) ⇒ FalconConnection
constructor
A new instance of FalconConnection.
- #mark_dead! ⇒ Object
- #write_comment(text) ⇒ Object
- #write_sentinel(bytes) ⇒ Object
Constructor Details
#initialize(id:, stream_name:, body:, since_id:, write_deadline_ms:, context: nil) ⇒ FalconConnection
Returns a new instance of FalconConnection.
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
# File 'lib/pgbus/web/streamer/falcon_connection.rb', line 18 def initialize(id:, stream_name:, body:, since_id:, write_deadline_ms:, context: nil) @id = id @stream_name = stream_name @body = body @io = body @last_msg_id_sent = since_id.to_i @write_deadline_ms = write_deadline_ms @mutex = Mutex.new @dead = false @closed = false @presence_member = nil @created_at = monotonic @last_write_at = @created_at @context = context end |
Instance Attribute Details
#context ⇒ Object (readonly)
Returns the value of attribute context.
15 16 17 |
# File 'lib/pgbus/web/streamer/falcon_connection.rb', line 15 def context @context end |
#id ⇒ Object (readonly)
Returns the value of attribute id.
15 16 17 |
# File 'lib/pgbus/web/streamer/falcon_connection.rb', line 15 def id @id end |
#io ⇒ Object (readonly)
Returns the value of attribute io.
15 16 17 |
# File 'lib/pgbus/web/streamer/falcon_connection.rb', line 15 def io @io end |
#last_msg_id_sent ⇒ Object (readonly)
Returns the value of attribute last_msg_id_sent.
15 16 17 |
# File 'lib/pgbus/web/streamer/falcon_connection.rb', line 15 def last_msg_id_sent @last_msg_id_sent end |
#mutex ⇒ Object (readonly)
Returns the value of attribute mutex.
15 16 17 |
# File 'lib/pgbus/web/streamer/falcon_connection.rb', line 15 def mutex @mutex end |
#presence_member ⇒ Object
Returns the value of attribute presence_member.
16 17 18 |
# File 'lib/pgbus/web/streamer/falcon_connection.rb', line 16 def presence_member @presence_member end |
#stream_name ⇒ Object (readonly)
Returns the value of attribute stream_name.
15 16 17 |
# File 'lib/pgbus/web/streamer/falcon_connection.rb', line 15 def stream_name @stream_name end |
Instance Method Details
#close ⇒ Object
73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/pgbus/web/streamer/falcon_connection.rb', line 73 def close @mutex.synchronize do return if @closed @closed = true mark_dead! @body.close_write end rescue StandardError => e Pgbus.logger&.debug { "[Pgbus::Streamer::FalconConnection] close failed: #{e.class}: #{e.}" } end |
#dead? ⇒ Boolean
89 90 91 |
# File 'lib/pgbus/web/streamer/falcon_connection.rb', line 89 def dead? @dead || @body.closed? end |
#enqueue(envelopes) ⇒ Object
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/pgbus/web/streamer/falcon_connection.rb', line 34 def enqueue(envelopes) written = [] envelopes.each do |envelope| next if envelope.msg_id <= @last_msg_id_sent bytes = Pgbus::Streams::Envelope.( id: envelope.msg_id, event: sse_event_for(envelope), data: envelope.payload ) result = write_to_body(bytes) if result == :ok @last_msg_id_sent = envelope.msg_id @last_write_at = monotonic written << envelope else mark_dead! break end end written end |
#idle_for ⇒ Object
85 86 87 |
# File 'lib/pgbus/web/streamer/falcon_connection.rb', line 85 def idle_for monotonic - @last_write_at end |
#mark_dead! ⇒ Object
93 94 95 |
# File 'lib/pgbus/web/streamer/falcon_connection.rb', line 93 def mark_dead! @dead = true end |
#write_comment(text) ⇒ Object
58 59 60 61 62 63 64 65 66 67 |
# File 'lib/pgbus/web/streamer/falcon_connection.rb', line 58 def write_comment(text) bytes = Pgbus::Streams::Envelope.comment(text) result = write_to_body(bytes) if result == :ok @last_write_at = monotonic else mark_dead! end result end |
#write_sentinel(bytes) ⇒ Object
69 70 71 |
# File 'lib/pgbus/web/streamer/falcon_connection.rb', line 69 def write_sentinel(bytes) write_to_body(bytes) end |