Class: Upkeep::Subscriptions::ActiveRecordStore

Inherits:
Object
  • Object
show all
Defined in:
lib/upkeep/subscriptions/active_record_store.rb

Defined Under Namespace

Classes: DeferredIndexWrite, IndexEntryRecord, ShapeIndexEntryRecord, SubscriptionRecord

Constant Summary collapse

LOOKUP_NOTIFICATION =
LayeredReverseIndex::LOOKUP_NOTIFICATION
REGISTER_NOTIFICATION =
"register_subscription_store.upkeep"
ACTIVATE_NOTIFICATION =
"activate_subscription_store.upkeep"
PERSIST_NOTIFICATION =
ActiveRecordSubscriptionPersistence::PERSIST_NOTIFICATION
DURABILITY_MODE =
"async_subscription_row_index_on_subscribe"
INDEX_DURABILITY =
"on_subscribe"
REQUIRED_SCHEMA =
{
  "upkeep_subscriptions" => {
    "id" => :string,
    "subscriber_id" => :string,
    "recorder_snapshot" => :json,
    "metadata" => :json,
    "subscription_shape_key" => :string,
    "created_at" => :datetime,
    "updated_at" => :datetime
  },
  "upkeep_subscription_index_entries" => {
    "subscription_id" => :string,
    "lookup_key_digest" => :string,
    "dependency_source" => :string,
    "lookup_table" => :string,
    "lookup_record_id_snapshot" => :json,
    "lookup_attribute" => :string,
    "dependency_table" => :string,
    "dependency_predicate_digest" => :string,
    "dependency_metadata_snapshot" => :json,
    "owner_ids_snapshot" => :json,
    "created_at" => :datetime,
    "updated_at" => :datetime
  },
  "upkeep_subscription_shape_index_entries" => {
    "subscription_shape_key" => :string,
    "lookup_key_digest" => :string,
    "dependency_source" => :string,
    "lookup_table" => :string,
    "lookup_record_id_snapshot" => :json,
    "lookup_attribute" => :string,
    "dependency_table" => :string,
    "dependency_predicate_digest" => :string,
    "dependency_metadata_snapshot" => :json,
    "owner_ids_snapshot" => :json,
    "created_at" => :datetime,
    "updated_at" => :datetime
  }
}.freeze

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(subscription_record: SubscriptionRecord, index_record: IndexEntryRecord, shape_index_record: ShapeIndexEntryRecord) ⇒ ActiveRecordStore

Returns a new instance of ActiveRecordStore.



90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/upkeep/subscriptions/active_record_store.rb', line 90

def initialize(subscription_record: SubscriptionRecord, index_record: IndexEntryRecord, shape_index_record: ShapeIndexEntryRecord)
  @subscription_record = subscription_record
  @index_record = index_record
  @shape_index_record = shape_index_record
  @index_builder = ReverseIndex.new
  @pending_registry = ActiveRegistry.new
  @active_registry = ActiveRegistry.new
  @deferred_index_writes = {}
  @deferred_index_mutex = Mutex.new
  @persistence = ActiveRecordSubscriptionPersistence.new(
    subscription_record: subscription_record,
    index_record: index_record,
    shape_index_record: shape_index_record,
    index_builder: index_builder
  )
  persistent_index = PersistentReverseIndex.new(
    reverse_index: index_builder,
    index_record: index_record,
    shape_index_record: shape_index_record,
    subscription_record: subscription_record
  )
  @reverse_index = LayeredReverseIndex.new(
    active_index: active_registry,
    persistent_index: persistent_index,
    persistent_count: -> { persistence.count },
    store: "active_record",
    pending_index: pending_registry
  )
  @durable_writer = AsyncDurableWriter.new { |jobs| persistence.persist_jobs(jobs) }
end

Instance Attribute Details

#reverse_indexObject (readonly)

Returns the value of attribute reverse_index.



86
87
88
# File 'lib/upkeep/subscriptions/active_record_store.rb', line 86

def reverse_index
  @reverse_index
end

Class Method Details

.available?(connect: false) ⇒ Boolean

Returns:

  • (Boolean)


121
122
123
# File 'lib/upkeep/subscriptions/active_record_store.rb', line 121

def self.available?(connect: false)
  schema_errors(connect: connect).empty?
end

.schema_errors(connect: false) ⇒ Object



125
126
127
128
129
130
131
132
133
134
# File 'lib/upkeep/subscriptions/active_record_store.rb', line 125

def self.schema_errors(connect: false)
  return ["Active Record is not connected"] unless ActiveRecord::Base.connected? || connect

  connection = ActiveRecord::Base.connection
  REQUIRED_SCHEMA.flat_map { |table, columns| schema_errors_for_table(connection, table, columns) }
rescue ActiveRecord::ConnectionNotEstablished, ActiveRecord::NoDatabaseError => error
  [error.message]
rescue ActiveRecord::StatementInvalid => error
  ["database schema could not be inspected: #{error.message}"]
end

Instance Method Details

#activate(id) ⇒ Object



190
191
192
193
194
195
196
197
198
199
# File 'lib/upkeep/subscriptions/active_record_store.rb', line 190

def activate(id)
  if ActiveSupport::Notifications.notifier.listening?(ACTIVATE_NOTIFICATION)
    payload = { store: "active_record", subscription_id: id }
    ActiveSupport::Notifications.instrument(ACTIVATE_NOTIFICATION, payload) do
      activate_subscription(id, payload: payload)
    end
  else
    activate_subscription(id)
  end
end

#drainObject



183
# File 'lib/upkeep/subscriptions/active_record_store.rb', line 183

def drain = durable_writer.drain

#fetch(id) ⇒ Object



228
229
230
231
232
# File 'lib/upkeep/subscriptions/active_record_store.rb', line 228

def fetch(id)
  active_registry.fetch(id) || pending_registry.fetch(id) || persistence.fetch(id)
rescue ActiveRecord::RecordNotFound
  raise NotFound, id
end

#prune_stale!(older_than:) ⇒ Object



221
222
223
224
225
226
# File 'lib/upkeep/subscriptions/active_record_store.rb', line 221

def prune_stale!(older_than:)
  durable_writer.drain
  stale_ids = persistence.prune_stale!(older_than: older_than)
  active_registry.unregister(stale_ids)
  stale_ids.size
end

#register(subscriber_id:, recorder:, metadata: {}, entries: nil) ⇒ Object



172
173
174
175
176
177
178
179
180
181
# File 'lib/upkeep/subscriptions/active_record_store.rb', line 172

def register(subscriber_id:, recorder:, metadata: {}, entries: nil)
  if ActiveSupport::Notifications.notifier.listening?(REGISTER_NOTIFICATION)
    payload = { store: "active_record" }
    ActiveSupport::Notifications.instrument(REGISTER_NOTIFICATION, payload) do
      register_subscription(subscriber_id, recorder, , entries: entries, payload: payload)
    end
  else
    register_subscription(subscriber_id, recorder, , entries: entries)
  end
end

#resetObject



249
250
251
252
253
254
255
# File 'lib/upkeep/subscriptions/active_record_store.rb', line 249

def reset
  clear_deferred_index_writes
  durable_writer.drain
  pending_registry.reset
  active_registry.reset
  persistence.reset
end

#shutdownObject



185
186
187
188
# File 'lib/upkeep/subscriptions/active_record_store.rb', line 185

def shutdown
  clear_deferred_index_writes
  durable_writer.shutdown
end

#subscriptionsObject



234
235
236
237
238
239
240
241
242
243
244
245
246
247
# File 'lib/upkeep/subscriptions/active_record_store.rb', line 234

def subscriptions
  persistent_count = persistence.count
  in_memory_subscriptions = (active_registry.subscriptions + pending_registry.subscriptions).to_h do |subscription|
    [subscription.id, subscription]
  end
  return in_memory_subscriptions.values if in_memory_subscriptions.size >= persistent_count

  seen_ids = {}
  persisted = persistence.subscriptions.map do |subscription|
    seen_ids[subscription.id] = true
    in_memory_subscriptions.fetch(subscription.id, subscription)
  end
  persisted + in_memory_subscriptions.values.reject { |subscription| seen_ids[subscription.id] }
end

#summaryObject



257
258
259
260
261
262
263
264
265
266
267
268
269
# File 'lib/upkeep/subscriptions/active_record_store.rb', line 257

def summary
  persistent_count = persistence.count
  pending_count = pending_registry.count
  active_count = active_registry.count
  {
    subscriptions: [persistent_count, active_count + pending_count].max,
    persistent_subscriptions: persistent_count,
    pending_subscriptions: pending_count,
    active_subscriptions: active_count,
    deferred_index_subscriptions: deferred_index_count,
    reverse_index: reverse_index.summary
  }
end

#touch(id, now: Time.now) ⇒ Object



201
202
203
204
205
206
207
208
209
# File 'lib/upkeep/subscriptions/active_record_store.rb', line 201

def touch(id, now: Time.now)
  fetch(id)
   = { "last_seen_at" => now.utc.iso8601 }
  pending_registry.touch(id, metadata: )
  active_registry.touch(id, metadata: )
  activate(id)
  durable_writer.drain
  persistence.touch(id, metadata: , now: now)
end

#unregister(ids) ⇒ Object



211
212
213
214
215
216
217
218
219
# File 'lib/upkeep/subscriptions/active_record_store.rb', line 211

def unregister(ids)
  ids = Array(ids)
  pending_registry.unregister(ids)
  active_registry.unregister(ids)
  delete_deferred_index_writes(ids)
  persisted_ids = durable_writer.cancel(ids)
  persistence.delete(persisted_ids)
  ids.size
end