Class: Upkeep::Subscriptions::ActiveRecordSubscriptionPersistence
- Inherits:
-
Object
- Object
- Upkeep::Subscriptions::ActiveRecordSubscriptionPersistence
- Defined in:
- lib/upkeep/subscriptions/active_record_subscription_persistence.rb
Constant Summary collapse
- PERSIST_NOTIFICATION =
"persist_subscription_store.upkeep"- INDEX_ENTRIES_SNAPSHOT_KEY =
"__upkeep_index_entries"
Instance Method Summary collapse
- #count ⇒ Object
- #delete(ids) ⇒ Object
- #fetch(id) ⇒ Object
- #fetch_with_index_entries(id) ⇒ Object
-
#initialize(subscription_record:, index_record:, index_builder:) ⇒ ActiveRecordSubscriptionPersistence
constructor
A new instance of ActiveRecordSubscriptionPersistence.
- #persist_jobs(jobs) ⇒ Object
- #prune_stale!(older_than:) ⇒ Object
- #reset ⇒ Object
- #subscriptions ⇒ Object
- #touch(id, metadata:, now:) ⇒ Object
Constructor Details
#initialize(subscription_record:, index_record:, index_builder:) ⇒ ActiveRecordSubscriptionPersistence
Returns a new instance of ActiveRecordSubscriptionPersistence.
15 16 17 18 19 20 21 |
# File 'lib/upkeep/subscriptions/active_record_subscription_persistence.rb', line 15 def initialize(subscription_record:, index_record:, index_builder:) @subscription_record = subscription_record @index_record = index_record @index_builder = index_builder @count_mutex = Mutex.new @count_cache = nil end |
Instance Method Details
#count ⇒ Object
94 95 96 97 98 |
# File 'lib/upkeep/subscriptions/active_record_subscription_persistence.rb', line 94 def count @count_mutex.synchronize do @count_cache ||= subscription_record.count end end |
#delete(ids) ⇒ Object
61 62 63 64 65 66 67 68 69 70 71 72 |
# File 'lib/upkeep/subscriptions/active_record_subscription_persistence.rb', line 61 def delete(ids) ids = Array(ids) return if ids.empty? ActiveRecord::Base.connection_pool.with_connection do ActiveRecord::Base.transaction do index_record.where(subscription_id: ids).delete_all deleted = subscription_record.where(id: ids).delete_all decrement_count_cache(deleted) end end end |
#fetch(id) ⇒ Object
74 75 76 77 |
# File 'lib/upkeep/subscriptions/active_record_subscription_persistence.rb', line 74 def fetch(id) record = subscription_record.find(id) (record) end |
#fetch_with_index_entries(id) ⇒ Object
79 80 81 82 |
# File 'lib/upkeep/subscriptions/active_record_subscription_persistence.rb', line 79 def fetch_with_index_entries(id) record = subscription_record.find(id) [(record), index_entries_from_snapshot(record.recorder_snapshot)] end |
#persist_jobs(jobs) ⇒ Object
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/upkeep/subscriptions/active_record_subscription_persistence.rb', line 23 def persist_jobs(jobs) if ActiveSupport::Notifications.notifier.listening?(PERSIST_NOTIFICATION) payload = { store: "active_record", jobs: jobs.size, subscriptions: jobs.count { |job| persist_subscription?(job) }, index_jobs: jobs.count { |job| persist_index?(job) }, dependency_entries: jobs.sum { |job| persist_index?(job) ? job.entries.size : 0 }, pending_index_entries: jobs.sum { |job| persist_subscription?(job) ? job.entries.size : 0 }, operations: operation_counts(jobs) } ActiveSupport::Notifications.instrument(PERSIST_NOTIFICATION, payload) do result = persist_jobs_without_instrumentation(jobs) payload[:subscription_rows] = result.fetch(:subscription_rows) payload[:index_rows] = result.fetch(:index_rows) end else persist_jobs_without_instrumentation(jobs) end end |
#prune_stale!(older_than:) ⇒ Object
53 54 55 56 57 58 59 |
# File 'lib/upkeep/subscriptions/active_record_subscription_persistence.rb', line 53 def prune_stale!(older_than:) stale_ids = subscription_record.where(subscription_record.arel_table[:updated_at].lt(older_than)).pluck(:id) return [] if stale_ids.empty? delete(stale_ids) stale_ids end |
#reset ⇒ Object
88 89 90 91 92 |
# File 'lib/upkeep/subscriptions/active_record_subscription_persistence.rb', line 88 def reset index_record.delete_all subscription_record.delete_all write_count_cache(0) end |
#subscriptions ⇒ Object
84 85 86 |
# File 'lib/upkeep/subscriptions/active_record_subscription_persistence.rb', line 84 def subscriptions subscription_record.order(:created_at, :id).map { |record| (record) } end |
#touch(id, metadata:, now:) ⇒ Object
44 45 46 47 48 49 50 51 |
# File 'lib/upkeep/subscriptions/active_record_subscription_persistence.rb', line 44 def touch(id, metadata:, now:) subscription_record.where(id: id).find_each do |record| record.update_columns( metadata: record..to_h.merge(), updated_at: now ) end end |