Class: Pgbus::Web::Streamer::FalconConnection

Inherits:
Object
  • Object
show all
Defined in:
lib/pgbus/web/streamer/falcon_connection.rb

Overview

Connection adapter for Falcon’s native streaming body path. Wraps a Protocol::HTTP::Body::Writable instead of a raw IO socket.

Satisfies the same duck-type interface as Connection so the Dispatcher, Heartbeat, and Instance can use either type interchangeably. Key difference: no IoWriter — writes go directly to body.write which is backed by Thread::Queue and is fiber-safe under Falcon’s scheduler.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(id:, stream_name:, body:, since_id:, write_deadline_ms:, context: nil) ⇒ FalconConnection

Returns a new instance of FalconConnection.



17
18
19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/pgbus/web/streamer/falcon_connection.rb', line 17

def initialize(id:, stream_name:, body:, since_id:, write_deadline_ms:, context: nil)
  @id = id
  @stream_name = stream_name
  @body = body
  @io = body
  @last_msg_id_sent = since_id.to_i
  @write_deadline_ms = write_deadline_ms
  @mutex = Mutex.new
  @dead = false
  @closed = false
  @created_at = monotonic
  @last_write_at = @created_at
  @context = context
end

Instance Attribute Details

#contextObject (readonly)

Returns the value of attribute context.



15
16
17
# File 'lib/pgbus/web/streamer/falcon_connection.rb', line 15

def context
  @context
end

#idObject (readonly)

Returns the value of attribute id.



15
16
17
# File 'lib/pgbus/web/streamer/falcon_connection.rb', line 15

def id
  @id
end

#ioObject (readonly)

Returns the value of attribute io.



15
16
17
# File 'lib/pgbus/web/streamer/falcon_connection.rb', line 15

def io
  @io
end

#last_msg_id_sentObject (readonly)

Returns the value of attribute last_msg_id_sent.



15
16
17
# File 'lib/pgbus/web/streamer/falcon_connection.rb', line 15

def last_msg_id_sent
  @last_msg_id_sent
end

#mutexObject (readonly)

Returns the value of attribute mutex.



15
16
17
# File 'lib/pgbus/web/streamer/falcon_connection.rb', line 15

def mutex
  @mutex
end

#stream_nameObject (readonly)

Returns the value of attribute stream_name.



15
16
17
# File 'lib/pgbus/web/streamer/falcon_connection.rb', line 15

def stream_name
  @stream_name
end

Instance Method Details

#closeObject



71
72
73
74
75
76
77
78
79
80
81
# File 'lib/pgbus/web/streamer/falcon_connection.rb', line 71

def close
  @mutex.synchronize do
    return if @closed

    @closed = true
    mark_dead!
    @body.close_write
  end
rescue StandardError => e
  Pgbus.logger&.debug { "[Pgbus::Streamer::FalconConnection] close failed: #{e.class}: #{e.message}" }
end

#dead?Boolean

Returns:

  • (Boolean)


87
88
89
# File 'lib/pgbus/web/streamer/falcon_connection.rb', line 87

def dead?
  @dead || @body.closed?
end

#enqueue(envelopes) ⇒ Object



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/pgbus/web/streamer/falcon_connection.rb', line 32

def enqueue(envelopes)
  written = []
  envelopes.each do |envelope|
    next if envelope.msg_id <= @last_msg_id_sent

    bytes = Pgbus::Streams::Envelope.message(
      id: envelope.msg_id,
      event: "turbo-stream",
      data: envelope.payload
    )

    result = write_to_body(bytes)
    if result == :ok
      @last_msg_id_sent = envelope.msg_id
      @last_write_at = monotonic
      written << envelope
    else
      mark_dead!
      break
    end
  end
  written
end

#idle_forObject



83
84
85
# File 'lib/pgbus/web/streamer/falcon_connection.rb', line 83

def idle_for
  monotonic - @last_write_at
end

#mark_dead!Object



91
92
93
# File 'lib/pgbus/web/streamer/falcon_connection.rb', line 91

def mark_dead!
  @dead = true
end

#write_comment(text) ⇒ Object



56
57
58
59
60
61
62
63
64
65
# File 'lib/pgbus/web/streamer/falcon_connection.rb', line 56

def write_comment(text)
  bytes = Pgbus::Streams::Envelope.comment(text)
  result = write_to_body(bytes)
  if result == :ok
    @last_write_at = monotonic
  else
    mark_dead!
  end
  result
end

#write_sentinel(bytes) ⇒ Object



67
68
69
# File 'lib/pgbus/web/streamer/falcon_connection.rb', line 67

def write_sentinel(bytes)
  write_to_body(bytes)
end