Class: Arcp::Runtime::EventLog
- Inherits:
-
Object
- Object
- Arcp::Runtime::EventLog
- Defined in:
- lib/arcp/runtime/event_log.rb
Overview
In-memory ring of buffered events keyed by session_id. The runtime uses this for the resume window and ‘session.ack`-driven early eviction. A SQLite-backed variant (same API) is suitable for multi-process runtimes; for v1 we ship the in-memory implementation used by tests and the Falcon-hosted single-process runtime.
Instance Method Summary collapse
- #append(session_id, envelope) ⇒ Object
- #buffer_size(session_id) ⇒ Object private
- #evict_up_to(session_id, seq) ⇒ Object
-
#expire! ⇒ Object
Evict events past the resume window (advisory; consumer drives via timer).
- #floor(session_id) ⇒ Object
-
#initialize(window_sec: 300, clock: Arcp::SystemClock.new) ⇒ EventLog
constructor
A new instance of EventLog.
- #replay(session_id, from_event_seq: nil) ⇒ Object
Constructor Details
#initialize(window_sec: 300, clock: Arcp::SystemClock.new) ⇒ EventLog
Returns a new instance of EventLog.
11 12 13 14 15 16 17 |
# File 'lib/arcp/runtime/event_log.rb', line 11 def initialize(window_sec: 300, clock: Arcp::SystemClock.new) @window_sec = window_sec @clock = clock @sessions = Hash.new { |h, k| h[k] = [] } @floor = Hash.new(0) @mutex = Mutex.new end |
Instance Method Details
#append(session_id, envelope) ⇒ Object
19 20 21 22 23 24 |
# File 'lib/arcp/runtime/event_log.rb', line 19 def append(session_id, envelope) @mutex.synchronize do @sessions[session_id] << [envelope, @clock.monotonic] end envelope end |
#buffer_size(session_id) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
59 |
# File 'lib/arcp/runtime/event_log.rb', line 59 def buffer_size(session_id) = @sessions[session_id].size |
#evict_up_to(session_id, seq) ⇒ Object
28 29 30 31 32 33 34 35 |
# File 'lib/arcp/runtime/event_log.rb', line 28 def evict_up_to(session_id, seq) @mutex.synchronize do @floor[session_id] = [@floor[session_id], seq].max @sessions[session_id].reject! do |env, _t| env.event_seq && env.event_seq <= seq end end end |
#expire! ⇒ Object
Evict events past the resume window (advisory; consumer drives via timer).
49 50 51 52 53 54 55 56 |
# File 'lib/arcp/runtime/event_log.rb', line 49 def expire! now = @clock.monotonic @mutex.synchronize do @sessions.each_value do |buf| buf.reject! { |(_e, t)| (now - t) > @window_sec } end end end |
#floor(session_id) ⇒ Object
26 |
# File 'lib/arcp/runtime/event_log.rb', line 26 def floor(session_id) = @floor[session_id] |
#replay(session_id, from_event_seq: nil) ⇒ Object
37 38 39 40 41 42 43 44 45 46 |
# File 'lib/arcp/runtime/event_log.rb', line 37 def replay(session_id, from_event_seq: nil) @mutex.synchronize do @sessions[session_id].each_with_object([]) do |(env, _t), out| next if env.event_seq.nil? next if from_event_seq && env.event_seq < from_event_seq out << env end end end |