Class: Upkeep::Subscriptions::ActiveRecordStore

Inherits:
Object
  • Object
show all
Defined in:
lib/upkeep/subscriptions/active_record_store.rb

Defined Under Namespace

Classes: DeferredIndexWrite, IndexEntryRecord, SubscriptionRecord

Constant Summary collapse

LOOKUP_NOTIFICATION =
LayeredReverseIndex::LOOKUP_NOTIFICATION
REGISTER_NOTIFICATION =
"register_subscription_store.upkeep"
ACTIVATE_NOTIFICATION =
"activate_subscription_store.upkeep"
PERSIST_NOTIFICATION =
ActiveRecordSubscriptionPersistence::PERSIST_NOTIFICATION
DURABILITY_MODE =
"async_subscription_row_index_on_subscribe"
INDEX_DURABILITY =
"on_subscribe"
REQUIRED_SCHEMA =
{
  "upkeep_subscriptions" => {
    "id" => :string,
    "subscriber_id" => :string,
    "recorder_snapshot" => :json,
    "metadata" => :json,
    "created_at" => :datetime,
    "updated_at" => :datetime
  },
  "upkeep_subscription_index_entries" => {
    "subscription_id" => :string,
    "lookup_key_digest" => :string,
    "dependency_source" => :string,
    "lookup_table" => :string,
    "lookup_record_id_snapshot" => :json,
    "lookup_attribute" => :string,
    "dependency_table" => :string,
    "dependency_predicate_digest" => :string,
    "dependency_metadata_snapshot" => :json,
    "owner_ids_snapshot" => :json,
    "created_at" => :datetime,
    "updated_at" => :datetime
  }
}.freeze

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(subscription_record: SubscriptionRecord, index_record: IndexEntryRecord) ⇒ ActiveRecordStore

Returns a new instance of ActiveRecordStore.



71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/upkeep/subscriptions/active_record_store.rb', line 71

def initialize(subscription_record: SubscriptionRecord, index_record: IndexEntryRecord)
  @subscription_record = subscription_record
  @index_record = index_record
  @index_builder = ReverseIndex.new
  @pending_registry = ActiveRegistry.new
  @active_registry = ActiveRegistry.new
  @deferred_index_writes = {}
  @deferred_index_mutex = Mutex.new
  @persistence = ActiveRecordSubscriptionPersistence.new(
    subscription_record: subscription_record,
    index_record: index_record,
    index_builder: index_builder
  )
  persistent_index = PersistentReverseIndex.new(
    reverse_index: index_builder,
    index_record: index_record
  )
  @reverse_index = LayeredReverseIndex.new(
    active_index: active_registry,
    persistent_index: persistent_index,
    persistent_count: -> { persistence.count },
    store: "active_record",
    pending_index: pending_registry
  )
  @durable_writer = AsyncDurableWriter.new { |jobs| persistence.persist_jobs(jobs) }
end

Instance Attribute Details

#reverse_indexObject (readonly)

Returns the value of attribute reverse_index.



67
68
69
# File 'lib/upkeep/subscriptions/active_record_store.rb', line 67

def reverse_index
  @reverse_index
end

Class Method Details

.available?(connect: false) ⇒ Boolean

Returns:

  • (Boolean)


98
99
100
# File 'lib/upkeep/subscriptions/active_record_store.rb', line 98

def self.available?(connect: false)
  schema_errors(connect: connect).empty?
end

.schema_errors(connect: false) ⇒ Object



102
103
104
105
106
107
108
109
110
111
# File 'lib/upkeep/subscriptions/active_record_store.rb', line 102

def self.schema_errors(connect: false)
  return ["Active Record is not connected"] unless ActiveRecord::Base.connected? || connect

  connection = ActiveRecord::Base.connection
  REQUIRED_SCHEMA.flat_map { |table, columns| schema_errors_for_table(connection, table, columns) }
rescue ActiveRecord::ConnectionNotEstablished, ActiveRecord::NoDatabaseError => error
  [error.message]
rescue ActiveRecord::StatementInvalid => error
  ["database schema could not be inspected: #{error.message}"]
end

Instance Method Details

#activate(id) ⇒ Object



167
168
169
170
171
172
173
174
175
176
# File 'lib/upkeep/subscriptions/active_record_store.rb', line 167

def activate(id)
  if ActiveSupport::Notifications.notifier.listening?(ACTIVATE_NOTIFICATION)
    payload = { store: "active_record", subscription_id: id }
    ActiveSupport::Notifications.instrument(ACTIVATE_NOTIFICATION, payload) do
      activate_subscription(id, payload: payload)
    end
  else
    activate_subscription(id)
  end
end

#drainObject



160
# File 'lib/upkeep/subscriptions/active_record_store.rb', line 160

def drain = durable_writer.drain

#fetch(id) ⇒ Object



204
205
206
207
208
# File 'lib/upkeep/subscriptions/active_record_store.rb', line 204

def fetch(id)
  active_registry.fetch(id) || pending_registry.fetch(id) || persistence.fetch(id)
rescue ActiveRecord::RecordNotFound
  raise NotFound, id
end

#prune_stale!(older_than:) ⇒ Object



197
198
199
200
201
202
# File 'lib/upkeep/subscriptions/active_record_store.rb', line 197

def prune_stale!(older_than:)
  durable_writer.drain
  stale_ids = persistence.prune_stale!(older_than: older_than)
  active_registry.unregister(stale_ids)
  stale_ids.size
end

#register(subscriber_id:, recorder:, metadata: {}, entries: nil) ⇒ Object



149
150
151
152
153
154
155
156
157
158
# File 'lib/upkeep/subscriptions/active_record_store.rb', line 149

def register(subscriber_id:, recorder:, metadata: {}, entries: nil)
  if ActiveSupport::Notifications.notifier.listening?(REGISTER_NOTIFICATION)
    payload = { store: "active_record" }
    ActiveSupport::Notifications.instrument(REGISTER_NOTIFICATION, payload) do
      register_subscription(subscriber_id, recorder, , entries: entries, payload: payload)
    end
  else
    register_subscription(subscriber_id, recorder, , entries: entries)
  end
end

#resetObject



225
226
227
228
229
230
231
# File 'lib/upkeep/subscriptions/active_record_store.rb', line 225

def reset
  clear_deferred_index_writes
  durable_writer.drain
  pending_registry.reset
  active_registry.reset
  persistence.reset
end

#shutdownObject



162
163
164
165
# File 'lib/upkeep/subscriptions/active_record_store.rb', line 162

def shutdown
  clear_deferred_index_writes
  durable_writer.shutdown
end

#subscriptionsObject



210
211
212
213
214
215
216
217
218
219
220
221
222
223
# File 'lib/upkeep/subscriptions/active_record_store.rb', line 210

def subscriptions
  persistent_count = persistence.count
  in_memory_subscriptions = (active_registry.subscriptions + pending_registry.subscriptions).to_h do |subscription|
    [subscription.id, subscription]
  end
  return in_memory_subscriptions.values if in_memory_subscriptions.size >= persistent_count

  seen_ids = {}
  persisted = persistence.subscriptions.map do |subscription|
    seen_ids[subscription.id] = true
    in_memory_subscriptions.fetch(subscription.id, subscription)
  end
  persisted + in_memory_subscriptions.values.reject { |subscription| seen_ids[subscription.id] }
end

#summaryObject



233
234
235
236
237
238
239
240
241
242
243
244
245
# File 'lib/upkeep/subscriptions/active_record_store.rb', line 233

def summary
  persistent_count = persistence.count
  pending_count = pending_registry.count
  active_count = active_registry.count
  {
    subscriptions: [persistent_count, active_count + pending_count].max,
    persistent_subscriptions: persistent_count,
    pending_subscriptions: pending_count,
    active_subscriptions: active_count,
    deferred_index_subscriptions: deferred_index_count,
    reverse_index: reverse_index.summary
  }
end

#touch(id, now: Time.now) ⇒ Object



178
179
180
181
182
183
184
185
# File 'lib/upkeep/subscriptions/active_record_store.rb', line 178

def touch(id, now: Time.now)
   = { "last_seen_at" => now.utc.iso8601 }
  pending_registry.touch(id, metadata: )
  active_registry.touch(id, metadata: )
  activate(id)
  durable_writer.drain
  persistence.touch(id, metadata: , now: now)
end

#unregister(ids) ⇒ Object



187
188
189
190
191
192
193
194
195
# File 'lib/upkeep/subscriptions/active_record_store.rb', line 187

def unregister(ids)
  ids = Array(ids)
  pending_registry.unregister(ids)
  active_registry.unregister(ids)
  delete_deferred_index_writes(ids)
  persisted_ids = durable_writer.cancel(ids)
  persistence.delete(persisted_ids)
  ids.size
end