Class: Legate::SessionService::ActiveRecord

Inherits:
Base
  • Object
show all
Includes:
EventBroadcast
Defined in:
lib/legate/session_service/active_record.rb

Overview

Durable session store backed by ActiveRecord (subsumes R2 persistence).

Semantics mirror InMemory — repeated get_session within a process returns the same Session object so a run’s mutations accumulate — but every mutation is written through to the database, so a restart or another process re-hydrates the committed history and state from rows.

The host application owns the AR connection (Rails does this for you; standalone users call ActiveRecord::Base.establish_connection). Tables are created by the generated migration, or ad hoc via ActiveRecord.create_tables!.

Defined Under Namespace

Classes: EventRecord, Record, ScopedStateRecord, SessionRecord

Constant Summary collapse

DEFAULT_MAX_CACHED_SESSIONS =

Cap on cached Session objects. The cache gives a run a stable Session identity and avoids re-hydrating on every get_session; because every mutation is written through, an evicted session re-hydrates correctly from the database. LRU eviction only ever reclaims idle sessions — an active run keeps touching (and so keeps) its own session.

1_000

Class Method Summary collapse

Instance Method Summary collapse

Methods included from EventBroadcast

#broadcast_event, #subscribe, #unsubscribe

Constructor Details

#initialize(max_cached_sessions: DEFAULT_MAX_CACHED_SESSIONS) ⇒ ActiveRecord

Returns a new instance of ActiveRecord.

Parameters:

  • max_cached_sessions (Integer) (defaults to: DEFAULT_MAX_CACHED_SESSIONS)

    LRU bound for the in-process cache



107
108
109
110
111
112
113
# File 'lib/legate/session_service/active_record.rb', line 107

def initialize(max_cached_sessions: DEFAULT_MAX_CACHED_SESSIONS)
  super()
  @cache = {} # insertion order = LRU order; guarded by @cache_mutex
  @cache_mutex = Mutex.new
  @max_cached_sessions = max_cached_sessions
  Legate.logger.info('ActiveRecord session service initialized.')
end

Class Method Details

.create_tables!(connection: Record.connection) ⇒ Object

Creates the three Legate tables if absent. Convenience for tests and standalone (non-migration) setups; Rails apps use the generated migration.

Parameters:

  • connection (ActiveRecord::ConnectionAdapters::AbstractAdapter) (defaults to: Record.connection)


63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/legate/session_service/active_record.rb', line 63

def self.create_tables!(connection: Record.connection)
  unless connection.table_exists?(:legate_sessions)
    connection.create_table :legate_sessions, id: :string do |t|
      t.string :app_name
      t.string :user_id
      t.text :state
      t.timestamps
    end
  end

  unless connection.table_exists?(:legate_events)
    connection.create_table :legate_events do |t|
      t.string :legate_session_id, null: false, index: true
      t.integer :position, null: false, default: 0
      t.string :role
      t.text :content
      t.string :tool_name
      t.text :state_delta
      t.string :event_timestamp
      t.string :event_id
      t.timestamps
    end
  end

  return if connection.table_exists?(:legate_scoped_states)

  connection.create_table :legate_scoped_states do |t|
    t.string :scope, null: false
    t.string :state_key, null: false
    t.text :value
    t.timestamps
  end
  connection.add_index :legate_scoped_states, %i[scope state_key], unique: true,
                                                                   name: 'index_legate_scoped_states_on_scope_and_key'
end

Instance Method Details

#append_event(session_id:, event:) ⇒ Object



147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
# File 'lib/legate/session_service/active_record.rb', line 147

def append_event(session_id:, event:)
  session = get_session(session_id: session_id)
  return false unless session

  session.add_event(event) # merges state_delta into the cached Session
  # The event row and the merged session state must land together, or
  # neither — otherwise a crash between them leaves history and state
  # inconsistent.
  SessionRecord.transaction do
    insert_event(session_id, event)
    write_session_state(session_id, session)
  end
  broadcast_event(session_id, event) # notify any streaming subscribers (R3)
  true
rescue ::ActiveRecord::ActiveRecordError => e
  Legate.logger.error("ActiveRecord session service: append_event failed for '#{session_id}': #{e.message}")
  # The write rolled back but the cached Session already holds the event;
  # drop it so the next get_session re-hydrates the committed truth.
  cache_evict(session_id)
  false
end

#clear_scoped_state(scope, key) ⇒ Object



199
200
201
202
203
# File 'lib/legate/session_service/active_record.rb', line 199

def clear_scoped_state(scope, key)
  relation = ScopedStateRecord.where(scope: scope.to_s)
  relation = relation.where(state_key: key.to_s) unless key == '*'
  relation.delete_all
end

#create_session(app_name:, user_id:, session_id: nil, initial_state: {}) ⇒ Object



119
120
121
122
123
124
125
126
127
128
129
130
# File 'lib/legate/session_service/active_record.rb', line 119

def create_session(app_name:, user_id:, session_id: nil, initial_state: {})
  session = Legate::Session.new(
    app_name: app_name, user_id: user_id, id: session_id,
    initial_state: symbolize(initial_state), session_service: self
  )
  SessionRecord.create!(
    id: session.id, app_name: app_name, user_id: user_id, state: session.state_to_h
  )
  cache_put(session.id, session)
  Legate.logger.info("Created persistent session: #{session.id} for app:#{app_name}, user:#{user_id}")
  session
end

#delete_session(session_id:) ⇒ Object



169
170
171
172
173
174
175
176
177
178
179
180
# File 'lib/legate/session_service/active_record.rb', line 169

def delete_session(session_id:)
  cache_evict(session_id)
  EventRecord.where(legate_session_id: session_id).delete_all
  deleted = SessionRecord.where(id: session_id).delete_all
  if deleted.positive?
    Legate.logger.info("Deleted persistent session: #{session_id}")
    true
  else
    Legate.logger.warn("Attempted to delete non-existent session: #{session_id}")
    false
  end
end

#get_session(session_id:) ⇒ Object



132
133
134
135
136
137
138
139
140
141
142
143
144
145
# File 'lib/legate/session_service/active_record.rb', line 132

def get_session(session_id:)
  cached = cache_get(session_id)
  return cached if cached

  record = SessionRecord.find_by(id: session_id)
  unless record
    Legate.logger.warn("Session not found: #{session_id}")
    return nil
  end

  session = hydrate(record)
  cache_put(session_id, session)
  session
end

#get_state(session_id:, key:) ⇒ Object



222
223
224
225
226
227
228
# File 'lib/legate/session_service/active_record.rb', line 222

def get_state(session_id:, key:)
  session = get_session(session_id: session_id)
  return session.get_state(key) if session

  Legate.logger.warn("ActiveRecord session service: Session not found '#{session_id}' when getting state for '#{key}'.")
  nil
end

#list_sessions(app_name: nil, user_id: nil) ⇒ Object



182
183
184
185
186
187
# File 'lib/legate/session_service/active_record.rb', line 182

def list_sessions(app_name: nil, user_id: nil)
  scope = SessionRecord.all
  scope = scope.where(app_name: app_name) if app_name
  scope = scope.where(user_id: user_id) if user_id
  scope.pluck(:id).filter_map { |id| get_session(session_id: id) }
end

#load_scoped_state(scope, key) ⇒ Object



195
196
197
# File 'lib/legate/session_service/active_record.rb', line 195

def load_scoped_state(scope, key)
  ScopedStateRecord.find_by(scope: scope.to_s, state_key: key.to_s)&.value
end

#persistent?Boolean

Returns:

  • (Boolean)


115
116
117
# File 'lib/legate/session_service/active_record.rb', line 115

def persistent?
  true
end

#save_scoped_state(scope, key, value) ⇒ Object



189
190
191
192
193
# File 'lib/legate/session_service/active_record.rb', line 189

def save_scoped_state(scope, key, value)
  record = ScopedStateRecord.find_or_initialize_by(scope: scope.to_s, state_key: key.to_s)
  record.value = value
  record.save!
end

#set_state(session_id:, key:, value:) ⇒ Object



205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
# File 'lib/legate/session_service/active_record.rb', line 205

def set_state(session_id:, key:, value:)
  session = get_session(session_id: session_id)
  unless session
    Legate.logger.warn("ActiveRecord session service: Session not found '#{session_id}' when setting state for '#{key}'.")
    return nil
  end

  session.set_state(key, value)
  # Scoped keys are already persisted via save_scoped_state; write through
  # the plain-key state so it survives a restart.
  write_session_state(session_id, session)
  nil
rescue Legate::SerializationError => e
  Legate.logger.error("ActiveRecord session service: Error setting state for '#{session_id}', key '#{key}': #{e.message}")
  nil
end