Class: Upkeep::Subscriptions::ActiveRecordStore
- Inherits:
-
Object
- Object
- Upkeep::Subscriptions::ActiveRecordStore
- Defined in:
- lib/upkeep/subscriptions/active_record_store.rb
Defined Under Namespace
Classes: DeferredIndexWrite, IndexEntryRecord, ShapeIndexEntryRecord, SubscriptionRecord
Constant Summary collapse
- LOOKUP_NOTIFICATION =
LayeredReverseIndex::LOOKUP_NOTIFICATION
- REGISTER_NOTIFICATION =
"register_subscription_store.upkeep"- ACTIVATE_NOTIFICATION =
"activate_subscription_store.upkeep"- PERSIST_NOTIFICATION =
ActiveRecordSubscriptionPersistence::PERSIST_NOTIFICATION
- DURABILITY_MODE =
"async_subscription_row_index_on_subscribe"- INDEX_DURABILITY =
"on_subscribe"- REQUIRED_SCHEMA =
{ "upkeep_subscriptions" => { "id" => :string, "subscriber_id" => :string, "recorder_snapshot" => :json, "metadata" => :json, "subscription_shape_key" => :string, "created_at" => :datetime, "updated_at" => :datetime }, "upkeep_subscription_index_entries" => { "subscription_id" => :string, "lookup_key_digest" => :string, "dependency_source" => :string, "lookup_table" => :string, "lookup_record_id_snapshot" => :json, "lookup_attribute" => :string, "dependency_table" => :string, "dependency_predicate_digest" => :string, "dependency_metadata_snapshot" => :json, "owner_ids_snapshot" => :json, "created_at" => :datetime, "updated_at" => :datetime }, "upkeep_subscription_shape_index_entries" => { "subscription_shape_key" => :string, "lookup_key_digest" => :string, "dependency_source" => :string, "lookup_table" => :string, "lookup_record_id_snapshot" => :json, "lookup_attribute" => :string, "dependency_table" => :string, "dependency_predicate_digest" => :string, "dependency_metadata_snapshot" => :json, "owner_ids_snapshot" => :json, "created_at" => :datetime, "updated_at" => :datetime } }.freeze
Instance Attribute Summary collapse
-
#reverse_index ⇒ Object
readonly
Returns the value of attribute reverse_index.
Class Method Summary collapse
Instance Method Summary collapse
- #activate(id) ⇒ Object
- #drain ⇒ Object
- #fetch(id) ⇒ Object
-
#initialize(subscription_record: SubscriptionRecord, index_record: IndexEntryRecord, shape_index_record: ShapeIndexEntryRecord) ⇒ ActiveRecordStore
constructor
A new instance of ActiveRecordStore.
- #prune_stale!(older_than:) ⇒ Object
- #register(subscriber_id:, recorder:, metadata: {}, entries: nil) ⇒ Object
- #reset ⇒ Object
- #shutdown ⇒ Object
- #subscriptions ⇒ Object
- #summary ⇒ Object
- #touch(id, now: Time.now) ⇒ Object
- #unregister(ids) ⇒ Object
Constructor Details
#initialize(subscription_record: SubscriptionRecord, index_record: IndexEntryRecord, shape_index_record: ShapeIndexEntryRecord) ⇒ ActiveRecordStore
Returns a new instance of ActiveRecordStore.
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 |
# File 'lib/upkeep/subscriptions/active_record_store.rb', line 90 def initialize(subscription_record: SubscriptionRecord, index_record: IndexEntryRecord, shape_index_record: ShapeIndexEntryRecord) @subscription_record = subscription_record @index_record = index_record @shape_index_record = shape_index_record @index_builder = ReverseIndex.new @pending_registry = ActiveRegistry.new @active_registry = ActiveRegistry.new @deferred_index_writes = {} @deferred_index_mutex = Mutex.new @persistence = ActiveRecordSubscriptionPersistence.new( subscription_record: subscription_record, index_record: index_record, shape_index_record: shape_index_record, index_builder: index_builder ) persistent_index = PersistentReverseIndex.new( reverse_index: index_builder, index_record: index_record, shape_index_record: shape_index_record, subscription_record: subscription_record ) @reverse_index = LayeredReverseIndex.new( active_index: active_registry, persistent_index: persistent_index, persistent_count: -> { persistence.count }, store: "active_record", pending_index: pending_registry ) @durable_writer = AsyncDurableWriter.new { |jobs| persistence.persist_jobs(jobs) } end |
Instance Attribute Details
#reverse_index ⇒ Object (readonly)
Returns the value of attribute reverse_index.
86 87 88 |
# File 'lib/upkeep/subscriptions/active_record_store.rb', line 86 def reverse_index @reverse_index end |
Class Method Details
.available?(connect: false) ⇒ Boolean
121 122 123 |
# File 'lib/upkeep/subscriptions/active_record_store.rb', line 121 def self.available?(connect: false) schema_errors(connect: connect).empty? end |
.schema_errors(connect: false) ⇒ Object
125 126 127 128 129 130 131 132 133 134 |
# File 'lib/upkeep/subscriptions/active_record_store.rb', line 125 def self.schema_errors(connect: false) return ["Active Record is not connected"] unless ActiveRecord::Base.connected? || connect connection = ActiveRecord::Base.connection REQUIRED_SCHEMA.flat_map { |table, columns| schema_errors_for_table(connection, table, columns) } rescue ActiveRecord::ConnectionNotEstablished, ActiveRecord::NoDatabaseError => error [error.] rescue ActiveRecord::StatementInvalid => error ["database schema could not be inspected: #{error.}"] end |
Instance Method Details
#activate(id) ⇒ Object
190 191 192 193 194 195 196 197 198 199 |
# File 'lib/upkeep/subscriptions/active_record_store.rb', line 190 def activate(id) if ActiveSupport::Notifications.notifier.listening?(ACTIVATE_NOTIFICATION) payload = { store: "active_record", subscription_id: id } ActiveSupport::Notifications.instrument(ACTIVATE_NOTIFICATION, payload) do activate_subscription(id, payload: payload) end else activate_subscription(id) end end |
#drain ⇒ Object
183 |
# File 'lib/upkeep/subscriptions/active_record_store.rb', line 183 def drain = durable_writer.drain |
#fetch(id) ⇒ Object
228 229 230 231 232 |
# File 'lib/upkeep/subscriptions/active_record_store.rb', line 228 def fetch(id) active_registry.fetch(id) || pending_registry.fetch(id) || persistence.fetch(id) rescue ActiveRecord::RecordNotFound raise NotFound, id end |
#prune_stale!(older_than:) ⇒ Object
221 222 223 224 225 226 |
# File 'lib/upkeep/subscriptions/active_record_store.rb', line 221 def prune_stale!(older_than:) durable_writer.drain stale_ids = persistence.prune_stale!(older_than: older_than) active_registry.unregister(stale_ids) stale_ids.size end |
#register(subscriber_id:, recorder:, metadata: {}, entries: nil) ⇒ Object
172 173 174 175 176 177 178 179 180 181 |
# File 'lib/upkeep/subscriptions/active_record_store.rb', line 172 def register(subscriber_id:, recorder:, metadata: {}, entries: nil) if ActiveSupport::Notifications.notifier.listening?(REGISTER_NOTIFICATION) payload = { store: "active_record" } ActiveSupport::Notifications.instrument(REGISTER_NOTIFICATION, payload) do register_subscription(subscriber_id, recorder, , entries: entries, payload: payload) end else register_subscription(subscriber_id, recorder, , entries: entries) end end |
#reset ⇒ Object
249 250 251 252 253 254 255 |
# File 'lib/upkeep/subscriptions/active_record_store.rb', line 249 def reset clear_deferred_index_writes durable_writer.drain pending_registry.reset active_registry.reset persistence.reset end |
#shutdown ⇒ Object
185 186 187 188 |
# File 'lib/upkeep/subscriptions/active_record_store.rb', line 185 def shutdown clear_deferred_index_writes durable_writer.shutdown end |
#subscriptions ⇒ Object
234 235 236 237 238 239 240 241 242 243 244 245 246 247 |
# File 'lib/upkeep/subscriptions/active_record_store.rb', line 234 def subscriptions persistent_count = persistence.count in_memory_subscriptions = (active_registry.subscriptions + pending_registry.subscriptions).to_h do |subscription| [subscription.id, subscription] end return in_memory_subscriptions.values if in_memory_subscriptions.size >= persistent_count seen_ids = {} persisted = persistence.subscriptions.map do |subscription| seen_ids[subscription.id] = true in_memory_subscriptions.fetch(subscription.id, subscription) end persisted + in_memory_subscriptions.values.reject { |subscription| seen_ids[subscription.id] } end |
#summary ⇒ Object
257 258 259 260 261 262 263 264 265 266 267 268 269 |
# File 'lib/upkeep/subscriptions/active_record_store.rb', line 257 def summary persistent_count = persistence.count pending_count = pending_registry.count active_count = active_registry.count { subscriptions: [persistent_count, active_count + pending_count].max, persistent_subscriptions: persistent_count, pending_subscriptions: pending_count, active_subscriptions: active_count, deferred_index_subscriptions: deferred_index_count, reverse_index: reverse_index.summary } end |
#touch(id, now: Time.now) ⇒ Object
201 202 203 204 205 206 207 208 209 |
# File 'lib/upkeep/subscriptions/active_record_store.rb', line 201 def touch(id, now: Time.now) fetch(id) = { "last_seen_at" => now.utc.iso8601 } pending_registry.touch(id, metadata: ) active_registry.touch(id, metadata: ) activate(id) durable_writer.drain persistence.touch(id, metadata: , now: now) end |
#unregister(ids) ⇒ Object
211 212 213 214 215 216 217 218 219 |
# File 'lib/upkeep/subscriptions/active_record_store.rb', line 211 def unregister(ids) ids = Array(ids) pending_registry.unregister(ids) active_registry.unregister(ids) delete_deferred_index_writes(ids) persisted_ids = durable_writer.cancel(ids) persistence.delete(persisted_ids) ids.size end |