Class: Igniter::Store::HTTPAdapter::SseBody
- Inherits:
-
Object
- Object
- Igniter::Store::HTTPAdapter::SseBody
- Defined in:
- lib/igniter/store/http_adapter.rb
Overview
Streaming body for SSE responses.
Emits retained catch-up events from replay_events, then blocks on a live subscription to the ChangefeedBuffer until #close is called.
SSE frame format per event:
id: <sequence>
event: fact_committed
data: <ChangeEvent#to_h JSON>
(blank line)
#close is safe to call from any thread — it pushes a sentinel to unblock the live delivery loop so the subscription handle is released cleanly.
Constant Summary collapse
- SSE_SENTINEL =
:__sse_close
Instance Method Summary collapse
- #close ⇒ Object
- #each ⇒ Object
-
#initialize(buf, replay_events, sub_stores) ⇒ SseBody
constructor
A new instance of SseBody.
Constructor Details
#initialize(buf, replay_events, sub_stores) ⇒ SseBody
Returns a new instance of SseBody.
156 157 158 159 160 161 |
# File 'lib/igniter/store/http_adapter.rb', line 156 def initialize(buf, replay_events, sub_stores) @buf = buf @replay_events = replay_events @sub_stores = sub_stores @queue = nil end |
Instance Method Details
#close ⇒ Object
182 183 184 |
# File 'lib/igniter/store/http_adapter.rb', line 182 def close @queue&.push(SSE_SENTINEL) end |
#each ⇒ Object
163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 |
# File 'lib/igniter/store/http_adapter.rb', line 163 def each @replay_events.each { |e| yield sse_frame(e) } @queue = Queue.new handle = @buf.subscribe(stores: @sub_stores) { |e| @queue << e } begin loop do event = @queue.pop break if event.equal?(SSE_SENTINEL) yield sse_frame(event) end rescue IOError, Errno::EPIPE nil ensure handle&.close end end |