Module: Pgbus::Web::Streamer::IoWriter

Defined in:
lib/pgbus/web/streamer/io_writer.rb

Overview

Non-blocking IO writer with a per-call deadline, serialised through the connection’s own mutex. This is the bug-fix for puma/puma#576: a naive ‘io.write(bytes)` on a dead or slow SSE client deadlocks the dispatcher thread until the OS closes the socket (which can take minutes under a TCP keepalive). The message_bus gem hit this in production; we copy the pattern.

The write loop uses write_nonblock + IO.select so a slow client at most stalls *its own* mutex-protected write for ‘deadline_ms`, never the dispatcher or heartbeat thread. When the deadline expires with bytes still pending, we return :blocked; the caller (Connection#enqueue or Connection#write_comment) translates that into mark_dead!, and the heartbeat sweep eventually unregisters the connection.

Returns:

:ok       — all bytes written
:closed   — peer gone (EPIPE / ECONNRESET / IOError on closed IO)
:blocked  — deadline hit before all bytes could be written

Class Method Summary collapse

Class Method Details

.write(connection, bytes, deadline_ms:) ⇒ Object



25
26
27
28
29
# File 'lib/pgbus/web/streamer/io_writer.rb', line 25

def self.write(connection, bytes, deadline_ms:)
  connection.mutex.synchronize do
    write_all(connection.io, bytes, deadline_ms)
  end
end