Class: Cloudflare::SSEOut

Inherits:
Object
  • Object
show all
Defined in:
lib/cloudflare_workers/stream.rb

Overview

Writer side of the SSE pipe. Passed to the user block as ‘out`. Every write goes through a `TextEncoder` so the bytes hit the client as valid UTF-8 regardless of the Opal String’s internal representation.

Writes are fire-and-forget: the WritableStream internally queues them in order, so a sequence of ‘out << a; out << b` always lands on the wire as “ab” without the caller having to manually `.__await__` each write. `close` waits on the tail promise in the chain so it doesn’t close the writer mid-queue (which would truncate bytes the client had not yet drained).

Memory is O(1) — we chain promises instead of accumulating them in an ever-growing array. A long-lived / high-frequency SSE endpoint therefore doesn’t leak once-flushed write-ack promises.

Instance Method Summary collapse

Constructor Details

#initialize(writer) ⇒ SSEOut

Returns a new instance of SSEOut.



149
150
151
152
153
154
155
156
157
158
# File 'lib/cloudflare_workers/stream.rb', line 149

def initialize(writer)
  @writer = writer
  @encoder = `new TextEncoder()`
  # `@tail` tracks only the *latest* pending writer.write()
  # promise. Each write() chains onto it via .then(), so the
  # chain length stays 1 regardless of how many writes have
  # already flushed.
  @tail = `Promise.resolve()`
  @closed = false
end

Instance Method Details

#closeObject

Close the writable side. Waits for the tail promise in the write chain to drain so the client receives the final bytes before ‘done: true`. After this, subsequent writes become no-ops so a racing producer doesn’t crash on a closed writer.



207
208
209
210
211
212
213
214
215
216
217
218
219
# File 'lib/cloudflare_workers/stream.rb', line 207

def close
  return self if @closed
  @closed = true
  w = @writer
  tail = @tail
  # writer.close() itself can reject if the consumer bailed out.
  # Swallow — the Workers runtime already surfaces the underlying
  # error to the client via the HTTP layer. Single-line x-string
  # so Opal emits it as an expression (see Multipart#to_uint8_array
  # for the same gotcha).
  `(async function(t, wr){ try { await t; } catch(e) {} try { await wr.close(); } catch(e) {} })(#{tail}, #{w})`.__await__
  self
end

#closed?Boolean

Returns:

  • (Boolean)


221
222
223
# File 'lib/cloudflare_workers/stream.rb', line 221

def closed?
  @closed
end

#comment(text) ⇒ Object

‘: keep-alive` comments are the SSE-standard keep-alive mechanism. Browsers / proxies won’t drop the connection while they see one.



190
191
192
# File 'lib/cloudflare_workers/stream.rb', line 190

def comment(text)
  write(": #{text}\n\n")
end

#eachObject

Rack-body compatibility so the SSEOut itself is also iterable by ‘build_js_response` when used as a body (unusual but supported).



227
# File 'lib/cloudflare_workers/stream.rb', line 227

def each; end

#event(data, event: nil, id: nil, retry_ms: nil) ⇒ Object

Helper: emit a well-formed SSE event. ‘data` is split on LF and each line prefixed with `data:` per the SSE spec.



178
179
180
181
182
183
184
185
186
# File 'lib/cloudflare_workers/stream.rb', line 178

def event(data, event: nil, id: nil, retry_ms: nil)
  buf = ''
  buf += "event: #{event}\n" if event
  buf += "id: #{id}\n" if id
  buf += "retry: #{retry_ms.to_i}\n" if retry_ms
  data.to_s.split("\n", -1).each { |line| buf += "data: #{line}\n" }
  buf += "\n"
  write(buf)
end

#sleep(seconds) ⇒ Object

Suspend the task for ‘seconds` seconds (Float allowed). Uses setTimeout under the hood so the Workers CPU budget is not charged for wall-clock waiting.



197
198
199
200
201
# File 'lib/cloudflare_workers/stream.rb', line 197

def sleep(seconds)
  ms = (seconds.to_f * 1000).to_i
  `(new Promise(function(r){ setTimeout(r, #{ms}); }))`.__await__
  self
end

#write(data) ⇒ Object Also known as: <<

Write a raw string chunk to the stream. The caller is responsible for SSE framing (e.g. ‘“data: foonn”`). Returns self.



162
163
164
165
166
167
168
169
170
171
172
173
# File 'lib/cloudflare_workers/stream.rb', line 162

def write(data)
  return self if @closed
  s = data.to_s
  w = @writer
  enc = @encoder
  # Chain the write onto the current tail so ordering is still
  # enforced but the promise graph stays flat (Copilot review #3 —
  # prior implementation pushed every write into an unbounded
  # array which would leak memory on long-running streams).
  @tail = `#{@tail}.then(function() { return #{w}.write(#{enc}.encode(#{s})); })`
  self
end