Class: Pgbus::Web::StreamApp

Inherits:
Object
  • Object
show all
Defined in:
lib/pgbus/web/stream_app.rb

Overview

Rack app mounted at /pgbus/streams. Not a Rails controller — this bypasses the entire Rails middleware stack for the streaming path, which avoids several classes of bug (ActionDispatch::Cookies leaking into long-lived requests, ActiveRecord::QueryCache holding the AR connection open, flash modifications after hijack, etc.). It also removes the temptation to use ActionController::Live, which has a well-documented tendency to tie up Puma threads (see puma#1009, puma#938, puma#569).

Call flow (happy path, Puma 6.1+ hijack):

1. Parse the signed name from PATH_INFO
2. Verify it via Pgbus::Streams::SignedName (raises → 404)
3. Run the authorize hook (raises → 403)
4. Parse the cursor from ?since= or Last-Event-ID
5. Check streams_max_connections per worker (over → 503)
6. Check rack.hijack? (missing → 501)
7. Hijack, write the HTTP response line + SSE headers + opening
   comment directly to the socket
8. Build a Connection, hand to Streamer.current.register(...)
9. Return [-1, {}, []] (Puma's async protocol)

On errors, returns a normal [status, headers, body] response so Rack / the reverse proxy can log and retry. The whole thing is designed so a reviewer can read call/2 top-to-bottom and see the full request lifecycle.

Instance Method Summary collapse

Constructor Details

#initialize(streamer: nil, config: nil, logger: nil, authorize: nil) ⇒ StreamApp

Returns a new instance of StreamApp.



36
37
38
39
40
41
# File 'lib/pgbus/web/stream_app.rb', line 36

def initialize(streamer: nil, config: nil, logger: nil, authorize: nil)
  @streamer_override = streamer
  @config_override   = config
  @logger_override   = logger
  @authorize         = authorize || ->(_env, _stream_name) { true }
end

Instance Method Details

#call(env) ⇒ Object



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/pgbus/web/stream_app.rb', line 43

def call(env)
  request = Rack::Request.new(env)
  return not_found("only GET is supported") unless request.get?

  signed_name = extract_signed_name(env)
  return not_found("missing signed stream name") if signed_name.nil?

  stream_name = verify!(signed_name)
  return not_found("invalid signed stream name") if stream_name.nil?

  authorize_result = @authorize.call(env, stream_name)
  return forbidden unless authorize_result

  # If authorize returned a non-boolean value (e.g. a User model),
  # treat it as the connection context for audience filtering.
  # A bare `true` means "authorized but no context" — filters that
  # depend on a context will fail-closed on these connections.
  context = authorize_result unless authorize_result == true

  cursor = parse_cursor(env, request)
  return bad_request("invalid cursor: #{cursor}") if cursor.is_a?(String)

  return test_mode_stub if config.streams_test_mode

  return over_capacity if streamer.registry.size >= config.streams_max_connections

  if config.streams_falcon_streaming_body
    streaming_body_response(stream_name: stream_name, since_id: cursor, context: context)
  elsif env["rack.hijack?"]
    hijack_and_register(env, stream_name: stream_name, since_id: cursor, context: context)
  else
    unsupported_server
  end
rescue StandardError => e
  logger.error { "[Pgbus::StreamApp] #{e.class}: #{e.message}" }
  server_error
end