Class: Pgbus::Web::StreamApp
- Inherits:
-
Object
- Object
- Pgbus::Web::StreamApp
- Defined in:
- lib/pgbus/web/stream_app.rb
Overview
Rack app mounted at /pgbus/streams. Not a Rails controller — this bypasses the entire Rails middleware stack for the streaming path, which avoids several classes of bug (ActionDispatch::Cookies leaking into long-lived requests, ActiveRecord::QueryCache holding the AR connection open, flash modifications after hijack, etc.). It also removes the temptation to use ActionController::Live, which has a well-documented tendency to tie up Puma threads (see puma#1009, puma#938, puma#569).
Call flow (happy path, Puma 6.1+ hijack):
1. Parse the signed name from PATH_INFO
2. Verify it via Pgbus::Streams::SignedName (raises → 404)
3. Run the authorize hook (raises → 403)
4. Parse the cursor from ?since= or Last-Event-ID
5. Check streams_max_connections per worker (over → 503)
6. Check rack.hijack? (missing → 501)
7. Hijack, write the HTTP response line + SSE headers + opening
comment directly to the socket
8. Build a Connection, hand to Streamer.current.register(...)
9. Return [-1, {}, []] (Puma's async protocol)
On errors, returns a normal [status, headers, body] response so Rack / the reverse proxy can log and retry. The whole thing is designed so a reviewer can read call/2 top-to-bottom and see the full request lifecycle.
Instance Method Summary collapse
- #call(env) ⇒ Object
-
#initialize(streamer: nil, config: nil, logger: nil, authorize: nil) ⇒ StreamApp
constructor
A new instance of StreamApp.
Constructor Details
#initialize(streamer: nil, config: nil, logger: nil, authorize: nil) ⇒ StreamApp
Returns a new instance of StreamApp.
36 37 38 39 40 41 |
# File 'lib/pgbus/web/stream_app.rb', line 36 def initialize(streamer: nil, config: nil, logger: nil, authorize: nil) @streamer_override = streamer @config_override = config @logger_override = logger @authorize = || ->(_env, _stream_name) { true } end |
Instance Method Details
#call(env) ⇒ Object
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/pgbus/web/stream_app.rb', line 43 def call(env) request = Rack::Request.new(env) return not_found("only GET is supported") unless request.get? signed_name = extract_signed_name(env) return not_found("missing signed stream name") if signed_name.nil? stream_name = verify!(signed_name) return not_found("invalid signed stream name") if stream_name.nil? = @authorize.call(env, stream_name) return forbidden unless # If authorize returned a non-boolean value (e.g. a User model), # treat it as the connection context for audience filtering. # A bare `true` means "authorized but no context" — filters that # depend on a context will fail-closed on these connections. context = unless == true cursor = parse_cursor(env, request) return bad_request("invalid cursor: #{cursor}") if cursor.is_a?(String) return test_mode_stub if config.streams_test_mode return over_capacity if streamer.registry.size >= config.streams_max_connections if config.streams_falcon_streaming_body streaming_body_response(stream_name: stream_name, since_id: cursor, context: context) elsif env["rack.hijack?"] hijack_and_register(env, stream_name: stream_name, since_id: cursor, context: context) else unsupported_server end rescue StandardError => e logger.error { "[Pgbus::StreamApp] #{e.class}: #{e.}" } server_error end |