Class: Igniter::Store::HTTPAdapter::SseBody

Inherits:
Object
  • Object
show all
Defined in:
lib/igniter/store/http_adapter.rb

Overview

Streaming body for SSE responses.

Emits retained catch-up events from replay_events, then blocks on a live subscription to the ChangefeedBuffer until #close is called.

SSE frame format per event:

id: <sequence>
event: fact_committed
data: <ChangeEvent#to_h JSON>
(blank line)

#close is safe to call from any thread — it pushes a sentinel to unblock the live delivery loop so the subscription handle is released cleanly.

Constant Summary collapse

SSE_SENTINEL =
:__sse_close

Instance Method Summary collapse

Constructor Details

#initialize(buf, replay_events, sub_stores) ⇒ SseBody

Returns a new instance of SseBody.



156
157
158
159
160
161
# File 'lib/igniter/store/http_adapter.rb', line 156

def initialize(buf, replay_events, sub_stores)
  @buf           = buf
  @replay_events = replay_events
  @sub_stores    = sub_stores
  @queue         = nil
end

Instance Method Details

#closeObject



182
183
184
# File 'lib/igniter/store/http_adapter.rb', line 182

def close
  @queue&.push(SSE_SENTINEL)
end

#eachObject



163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
# File 'lib/igniter/store/http_adapter.rb', line 163

def each
  @replay_events.each { |e| yield sse_frame(e) }

  @queue  = Queue.new
  handle  = @buf.subscribe(stores: @sub_stores) { |e| @queue << e }

  begin
    loop do
      event = @queue.pop
      break if event.equal?(SSE_SENTINEL)
      yield sse_frame(event)
    end
  rescue IOError, Errno::EPIPE
    nil
  ensure
    handle&.close
  end
end