Class: Pgbus::Web::Streamer::FalconConnection

Inherits:
Object
  • Object
show all
Defined in:
lib/pgbus/web/streamer/falcon_connection.rb

Overview

Connection adapter for Falcon’s native streaming body path. Wraps a Protocol::HTTP::Body::Writable instead of a raw IO socket.

Satisfies the same duck-type interface as Connection so the Dispatcher, Heartbeat, and Instance can use either type interchangeably. Key difference: no IoWriter — writes go directly to body.write which is backed by Thread::Queue and is fiber-safe under Falcon’s scheduler.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(id:, stream_name:, body:, since_id:, write_deadline_ms:, context: nil) ⇒ FalconConnection

Returns a new instance of FalconConnection.



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# File 'lib/pgbus/web/streamer/falcon_connection.rb', line 18

def initialize(id:, stream_name:, body:, since_id:, write_deadline_ms:, context: nil)
  @id = id
  @stream_name = stream_name
  @body = body
  @io = body
  @last_msg_id_sent = since_id.to_i
  @write_deadline_ms = write_deadline_ms
  @mutex = Mutex.new
  @dead = false
  @closed = false
  @presence_member = nil
  @created_at = monotonic
  @last_write_at = @created_at
  @context = context
end

Instance Attribute Details

#contextObject (readonly)

Returns the value of attribute context.



15
16
17
# File 'lib/pgbus/web/streamer/falcon_connection.rb', line 15

def context
  @context
end

#idObject (readonly)

Returns the value of attribute id.



15
16
17
# File 'lib/pgbus/web/streamer/falcon_connection.rb', line 15

def id
  @id
end

#ioObject (readonly)

Returns the value of attribute io.



15
16
17
# File 'lib/pgbus/web/streamer/falcon_connection.rb', line 15

def io
  @io
end

#last_msg_id_sentObject (readonly)

Returns the value of attribute last_msg_id_sent.



15
16
17
# File 'lib/pgbus/web/streamer/falcon_connection.rb', line 15

def last_msg_id_sent
  @last_msg_id_sent
end

#mutexObject (readonly)

Returns the value of attribute mutex.



15
16
17
# File 'lib/pgbus/web/streamer/falcon_connection.rb', line 15

def mutex
  @mutex
end

#presence_memberObject

Returns the value of attribute presence_member.



16
17
18
# File 'lib/pgbus/web/streamer/falcon_connection.rb', line 16

def presence_member
  @presence_member
end

#stream_nameObject (readonly)

Returns the value of attribute stream_name.



15
16
17
# File 'lib/pgbus/web/streamer/falcon_connection.rb', line 15

def stream_name
  @stream_name
end

Instance Method Details

#closeObject



73
74
75
76
77
78
79
80
81
82
83
# File 'lib/pgbus/web/streamer/falcon_connection.rb', line 73

def close
  @mutex.synchronize do
    return if @closed

    @closed = true
    mark_dead!
    @body.close_write
  end
rescue StandardError => e
  Pgbus.logger&.debug { "[Pgbus::Streamer::FalconConnection] close failed: #{e.class}: #{e.message}" }
end

#dead?Boolean

Returns:

  • (Boolean)


89
90
91
# File 'lib/pgbus/web/streamer/falcon_connection.rb', line 89

def dead?
  @dead || @body.closed?
end

#enqueue(envelopes) ⇒ Object



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/pgbus/web/streamer/falcon_connection.rb', line 34

def enqueue(envelopes)
  written = []
  envelopes.each do |envelope|
    next if envelope.msg_id <= @last_msg_id_sent

    bytes = Pgbus::Streams::Envelope.message(
      id: envelope.msg_id,
      event: sse_event_for(envelope),
      data: envelope.payload
    )

    result = write_to_body(bytes)
    if result == :ok
      @last_msg_id_sent = envelope.msg_id
      @last_write_at = monotonic
      written << envelope
    else
      mark_dead!
      break
    end
  end
  written
end

#idle_forObject



85
86
87
# File 'lib/pgbus/web/streamer/falcon_connection.rb', line 85

def idle_for
  monotonic - @last_write_at
end

#mark_dead!Object



93
94
95
# File 'lib/pgbus/web/streamer/falcon_connection.rb', line 93

def mark_dead!
  @dead = true
end

#write_comment(text) ⇒ Object



58
59
60
61
62
63
64
65
66
67
# File 'lib/pgbus/web/streamer/falcon_connection.rb', line 58

def write_comment(text)
  bytes = Pgbus::Streams::Envelope.comment(text)
  result = write_to_body(bytes)
  if result == :ok
    @last_write_at = monotonic
  else
    mark_dead!
  end
  result
end

#write_sentinel(bytes) ⇒ Object



69
70
71
# File 'lib/pgbus/web/streamer/falcon_connection.rb', line 69

def write_sentinel(bytes)
  write_to_body(bytes)
end