Class: Arcp::Runtime::EventLog

Inherits:
Object
  • Object
show all
Defined in:
lib/arcp/runtime/event_log.rb

Overview

In-memory ring of buffered events keyed by session_id. The runtime uses this for the resume window and ‘session.ack`-driven early eviction. A SQLite-backed variant (same API) is suitable for multi-process runtimes; for v1 we ship the in-memory implementation used by tests and the Falcon-hosted single-process runtime.

Instance Method Summary collapse

Constructor Details

#initialize(window_sec: 300, clock: Arcp::SystemClock.new) ⇒ EventLog

Returns a new instance of EventLog.



11
12
13
14
15
16
17
# File 'lib/arcp/runtime/event_log.rb', line 11

def initialize(window_sec: 300, clock: Arcp::SystemClock.new)
  @window_sec = window_sec
  @clock = clock
  @sessions = Hash.new { |h, k| h[k] = [] }
  @floor = Hash.new(0)
  @mutex = Mutex.new
end

Instance Method Details

#append(session_id, envelope) ⇒ Object



19
20
21
22
23
24
# File 'lib/arcp/runtime/event_log.rb', line 19

def append(session_id, envelope)
  @mutex.synchronize do
    @sessions[session_id] << [envelope, @clock.monotonic]
  end
  envelope
end

#buffer_size(session_id) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



59
# File 'lib/arcp/runtime/event_log.rb', line 59

def buffer_size(session_id) = @sessions[session_id].size

#evict_up_to(session_id, seq) ⇒ Object



28
29
30
31
32
33
34
35
# File 'lib/arcp/runtime/event_log.rb', line 28

def evict_up_to(session_id, seq)
  @mutex.synchronize do
    @floor[session_id] = [@floor[session_id], seq].max
    @sessions[session_id].reject! do |env, _t|
      env.event_seq && env.event_seq <= seq
    end
  end
end

#expire!Object

Evict events past the resume window (advisory; consumer drives via timer).



49
50
51
52
53
54
55
56
# File 'lib/arcp/runtime/event_log.rb', line 49

def expire!
  now = @clock.monotonic
  @mutex.synchronize do
    @sessions.each_value do |buf|
      buf.reject! { |(_e, t)| (now - t) > @window_sec }
    end
  end
end

#floor(session_id) ⇒ Object



26
# File 'lib/arcp/runtime/event_log.rb', line 26

def floor(session_id) = @floor[session_id]

#replay(session_id, from_event_seq: nil) ⇒ Object



37
38
39
40
41
42
43
44
45
46
# File 'lib/arcp/runtime/event_log.rb', line 37

def replay(session_id, from_event_seq: nil)
  @mutex.synchronize do
    @sessions[session_id].each_with_object([]) do |(env, _t), out|
      next if env.event_seq.nil?
      next if from_event_seq && env.event_seq < from_event_seq

      out << env
    end
  end
end