Class: Cloudflare::SSEOut
- Inherits:
-
Object
- Object
- Cloudflare::SSEOut
- Defined in:
- lib/cloudflare_workers/stream.rb
Overview
Writer side of the SSE pipe. Passed to the user block as ‘out`. Every write goes through a `TextEncoder` so the bytes hit the client as valid UTF-8 regardless of the Opal String’s internal representation.
Writes are fire-and-forget: the WritableStream internally queues them in order, so a sequence of ‘out << a; out << b` always lands on the wire as “ab” without the caller having to manually `.__await__` each write. `close` waits on the tail promise in the chain so it doesn’t close the writer mid-queue (which would truncate bytes the client had not yet drained).
Memory is O(1) — we chain promises instead of accumulating them in an ever-growing array. A long-lived / high-frequency SSE endpoint therefore doesn’t leak once-flushed write-ack promises.
Instance Method Summary collapse
-
#close ⇒ Object
Close the writable side.
- #closed? ⇒ Boolean
-
#comment(text) ⇒ Object
‘: keep-alive` comments are the SSE-standard keep-alive mechanism.
-
#each ⇒ Object
Rack-body compatibility so the SSEOut itself is also iterable by ‘build_js_response` when used as a body (unusual but supported).
-
#event(data, event: nil, id: nil, retry_ms: nil) ⇒ Object
Helper: emit a well-formed SSE event.
-
#initialize(writer) ⇒ SSEOut
constructor
A new instance of SSEOut.
-
#sleep(seconds) ⇒ Object
Suspend the task for ‘seconds` seconds (Float allowed).
-
#write(data) ⇒ Object
(also: #<<)
Write a raw string chunk to the stream.
Constructor Details
#initialize(writer) ⇒ SSEOut
Returns a new instance of SSEOut.
149 150 151 152 153 154 155 156 157 158 |
# File 'lib/cloudflare_workers/stream.rb', line 149 def initialize(writer) @writer = writer @encoder = `new TextEncoder()` # `@tail` tracks only the *latest* pending writer.write() # promise. Each write() chains onto it via .then(), so the # chain length stays 1 regardless of how many writes have # already flushed. @tail = `Promise.resolve()` @closed = false end |
Instance Method Details
#close ⇒ Object
Close the writable side. Waits for the tail promise in the write chain to drain so the client receives the final bytes before ‘done: true`. After this, subsequent writes become no-ops so a racing producer doesn’t crash on a closed writer.
207 208 209 210 211 212 213 214 215 216 217 218 219 |
# File 'lib/cloudflare_workers/stream.rb', line 207 def close return self if @closed @closed = true w = @writer tail = @tail # writer.close() itself can reject if the consumer bailed out. # Swallow — the Workers runtime already surfaces the underlying # error to the client via the HTTP layer. Single-line x-string # so Opal emits it as an expression (see Multipart#to_uint8_array # for the same gotcha). `(async function(t, wr){ try { await t; } catch(e) {} try { await wr.close(); } catch(e) {} })(#{tail}, #{w})`.__await__ self end |
#closed? ⇒ Boolean
221 222 223 |
# File 'lib/cloudflare_workers/stream.rb', line 221 def closed? @closed end |
#comment(text) ⇒ Object
‘: keep-alive` comments are the SSE-standard keep-alive mechanism. Browsers / proxies won’t drop the connection while they see one.
190 191 192 |
# File 'lib/cloudflare_workers/stream.rb', line 190 def comment(text) write(": #{text}\n\n") end |
#each ⇒ Object
Rack-body compatibility so the SSEOut itself is also iterable by ‘build_js_response` when used as a body (unusual but supported).
227 |
# File 'lib/cloudflare_workers/stream.rb', line 227 def each; end |
#event(data, event: nil, id: nil, retry_ms: nil) ⇒ Object
Helper: emit a well-formed SSE event. ‘data` is split on LF and each line prefixed with `data:` per the SSE spec.
178 179 180 181 182 183 184 185 186 |
# File 'lib/cloudflare_workers/stream.rb', line 178 def event(data, event: nil, id: nil, retry_ms: nil) buf = '' buf += "event: #{event}\n" if event buf += "id: #{id}\n" if id buf += "retry: #{retry_ms.to_i}\n" if retry_ms data.to_s.split("\n", -1).each { |line| buf += "data: #{line}\n" } buf += "\n" write(buf) end |
#sleep(seconds) ⇒ Object
Suspend the task for ‘seconds` seconds (Float allowed). Uses setTimeout under the hood so the Workers CPU budget is not charged for wall-clock waiting.
197 198 199 200 201 |
# File 'lib/cloudflare_workers/stream.rb', line 197 def sleep(seconds) ms = (seconds.to_f * 1000).to_i `(new Promise(function(r){ setTimeout(r, #{ms}); }))`.__await__ self end |
#write(data) ⇒ Object Also known as: <<
Write a raw string chunk to the stream. The caller is responsible for SSE framing (e.g. ‘“data: foonn”`). Returns self.
162 163 164 165 166 167 168 169 170 171 172 173 |
# File 'lib/cloudflare_workers/stream.rb', line 162 def write(data) return self if @closed s = data.to_s w = @writer enc = @encoder # Chain the write onto the current tail so ordering is still # enforced but the promise graph stays flat (Copilot review #3 — # prior implementation pushed every write into an unbounded # array which would leak memory on long-running streams). @tail = `#{@tail}.then(function() { return #{w}.write(#{enc}.encode(#{s})); })` self end |