Class: Rubino::Run::EventStore

Inherits:
Object
  • Object
show all
Defined in:
lib/rubino/run/event_store.rb

Overview

Persists per-run events for SSE replay (Last-Event-ID) and audit.

seq is monotonic per session_id (computed under a transaction as max(seq) 1+) so a single Session can stream across multiple Runs without seq collisions; SSE handlers send seq as the event id and clients resume with after_seq.

Reads order primarily by seq; #for_run inherits that ordering. When two inserts land in the same wall-clock second, the (created_at, rowid) tuple is the implicit tiebreaker for any consumer scanning by timestamp (Repository#last_for_session uses the same trick).

Instance Method Summary collapse

Constructor Details

#initialize(db: nil) ⇒ EventStore

Returns a new instance of EventStore.



21
22
23
# File 'lib/rubino/run/event_store.rb', line 21

def initialize(db: nil)
  @db = db || Rubino.database.db
end

Instance Method Details

#append(session_id:, run_id:, type:, payload:) ⇒ Object



25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/rubino/run/event_store.rb', line 25

def append(session_id:, run_id:, type:, payload:)
  @db.transaction do
    next_seq = (@db[:events].where(session_id: session_id).max(:seq) || 0) + 1
    row = {
      id: SecureRandom.uuid,
      session_id: session_id,
      run_id: run_id,
      type: type.to_s,
      payload_json: JSON.generate(scrub_for_json(payload)),
      seq: next_seq,
      created_at: Time.now.utc.iso8601
    }
    @db[:events].insert(row)
    row
  end
end

#for_run(run_id, after_seq: nil) ⇒ Object

Parameters:

  • after_seq (Integer, nil) (defaults to: nil)

    when given, returns only events with seq > after_seq (used to honour SSE Last-Event-ID on reconnect).



63
64
65
66
67
# File 'lib/rubino/run/event_store.rb', line 63

def for_run(run_id, after_seq: nil)
  ds = @db[:events].where(run_id: run_id).order(:seq)
  ds = ds.where { seq > after_seq } if after_seq
  ds.all
end

#last_seq_for_session(session_id) ⇒ Object



69
70
71
# File 'lib/rubino/run/event_store.rb', line 69

def last_seq_for_session(session_id)
  @db[:events].where(session_id: session_id).max(:seq) || 0
end

#scrub_for_json(value) ⇒ Object

Recursively replaces invalid UTF-8 bytes so JSON.generate never raises JSON::GeneratorError on the event boundary. A tool that returns binary data (e.g. ReadTool on a misdetected PDF) would otherwise blow up here, propagate out of emit_finished, and kill the entire run — the model would never receive a tool error result and couldn’t recover.



47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/rubino/run/event_store.rb', line 47

def scrub_for_json(value)
  case value
  when String
    if value.encoding == Encoding::UTF_8
      value.valid_encoding? ? value : value.scrub("?")
    else
      value.dup.force_encoding(Encoding::UTF_8).scrub("?")
    end
  when Hash  then value.transform_values { |v| scrub_for_json(v) }
  when Array then value.map { |v| scrub_for_json(v) }
  else            value
  end
end