Class: Cloudflare::SSEOut

Inherits:
Object
  • Object
show all
Defined in:
lib/homura/runtime/stream.rb

Overview

Writer side of the SSE pipe. Passed to the user block as ‘out`. Every write goes through a `TextEncoder` so the bytes hit the client as valid UTF-8 regardless of the Opal String’s internal representation.

Writes are fire-and-forget: the WritableStream internally queues them in order, so a sequence of ‘out << a; out << b` always lands on the wire as “ab” without the caller having to manually `.__await__` each write. `close` waits on the tail promise in the chain so it doesn’t close the writer mid-queue (which would truncate bytes the client had not yet drained).

Memory is O(1) — we chain promises instead of accumulating them in an ever-growing array. A long-lived / high-frequency SSE endpoint therefore doesn’t leak once-flushed write-ack promises.

Instance Method Summary collapse

Constructor Details

#initialize(writer) ⇒ SSEOut

Returns a new instance of SSEOut.



151
152
153
154
155
156
157
158
159
160
# File 'lib/homura/runtime/stream.rb', line 151

def initialize(writer)
  @writer = writer
  @encoder = `new TextEncoder()`
  # `@tail` tracks only the *latest* pending writer.write()
  # promise. Each write() chains onto it via .then(), so the
  # chain length stays 1 regardless of how many writes have
  # already flushed.
  @tail = `Promise.resolve()`
  @closed = false
end

Instance Method Details

#closeObject

Close the writable side. Waits for the tail promise in the write chain to drain so the client receives the final bytes before ‘done: true`. After this, subsequent writes become no-ops so a racing producer doesn’t crash on a closed writer.



210
211
212
213
214
215
216
217
218
219
220
221
222
# File 'lib/homura/runtime/stream.rb', line 210

def close
  return self if @closed
  @closed = true
  w = @writer
  tail = @tail
  # writer.close() itself can reject if the consumer bailed out.
  # Swallow — the Workers runtime already surfaces the underlying
  # error to the client via the HTTP layer. Single-line x-string
  # so Opal emits it as an expression (see Multipart#to_uint8_array
  # for the same gotcha).
  `(async function(t, wr){ try { await t; } catch(e) {} try { await wr.close(); } catch(e) {} })(#{tail}, #{w})`.__await__
  self
end

#closed?Boolean

Returns:

  • (Boolean)


224
225
226
# File 'lib/homura/runtime/stream.rb', line 224

def closed?
  @closed
end

#comment(text) ⇒ Object

‘: keep-alive` comments are the SSE-standard keep-alive mechanism. Browsers / proxies won’t drop the connection while they see one.



193
194
195
# File 'lib/homura/runtime/stream.rb', line 193

def comment(text)
  write(": #{text}\n\n")
end

#eachObject

Rack-body compatibility so the SSEOut itself is also iterable by ‘build_js_response` when used as a body (unusual but supported).



230
231
# File 'lib/homura/runtime/stream.rb', line 230

def each
end

#event(data, event: nil, id: nil, retry_ms: nil) ⇒ Object

Helper: emit a well-formed SSE event. ‘data` is split on LF and each line prefixed with `data:` per the SSE spec.



181
182
183
184
185
186
187
188
189
# File 'lib/homura/runtime/stream.rb', line 181

def event(data, event: nil, id: nil, retry_ms: nil)
  buf = ""
  buf += "event: #{event}\n" if event
  buf += "id: #{id}\n" if id
  buf += "retry: #{retry_ms.to_i}\n" if retry_ms
  data.to_s.split("\n", -1).each { |line| buf += "data: #{line}\n" }
  buf += "\n"
  write(buf)
end

#sleep(seconds) ⇒ Object

Suspend the task for ‘seconds` seconds (Float allowed). Uses setTimeout under the hood so the Workers CPU budget is not charged for wall-clock waiting.



200
201
202
203
204
# File 'lib/homura/runtime/stream.rb', line 200

def sleep(seconds)
  ms = (seconds.to_f * 1000).to_i
  `(new Promise(function(r){ setTimeout(r, #{ms}); }))`.__await__
  self
end

#write(data) ⇒ Object Also known as: <<

Write a raw string chunk to the stream. The caller is responsible for SSE framing (e.g. ‘“data: foonn”`). Returns self.



164
165
166
167
168
169
170
171
172
173
174
175
176
# File 'lib/homura/runtime/stream.rb', line 164

def write(data)
  return self if @closed
  s = data.to_s
  w = @writer
  enc = @encoder
  # Chain the write onto the current tail so ordering is still
  # enforced but the promise graph stays flat (Copilot review #3 —
  # prior implementation pushed every write into an unbounded
  # array which would leak memory on long-running streams).
  @tail =
    `#{@tail}.then(function() { return #{w}.write(#{enc}.encode(#{s})); })`
  self
end