Class: Upkeep::Subscriptions::ActiveRecordStore

Inherits:
Object
  • Object
show all
Defined in:
lib/upkeep/subscriptions/active_record_store.rb

Defined Under Namespace

Classes: DeferredIndexWrite, IndexEntryRecord, SubscriptionRecord

Constant Summary collapse

LOOKUP_NOTIFICATION =
LayeredReverseIndex::LOOKUP_NOTIFICATION
REGISTER_NOTIFICATION =
"register_subscription_store.upkeep"
ACTIVATE_NOTIFICATION =
"activate_subscription_store.upkeep"
PERSIST_NOTIFICATION =
ActiveRecordSubscriptionPersistence::PERSIST_NOTIFICATION
DURABILITY_MODE =
"async_subscription_row_index_on_subscribe"
INDEX_DURABILITY =
"on_subscribe"

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(subscription_record: SubscriptionRecord, index_record: IndexEntryRecord) ⇒ ActiveRecordStore

Returns a new instance of ActiveRecordStore.



47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/upkeep/subscriptions/active_record_store.rb', line 47

def initialize(subscription_record: SubscriptionRecord, index_record: IndexEntryRecord)
  @subscription_record = subscription_record
  @index_record = index_record
  @index_builder = ReverseIndex.new
  @pending_registry = ActiveRegistry.new
  @active_registry = ActiveRegistry.new
  @deferred_index_writes = {}
  @deferred_index_mutex = Mutex.new
  @persistence = ActiveRecordSubscriptionPersistence.new(
    subscription_record: subscription_record,
    index_record: index_record,
    index_builder: index_builder
  )
  persistent_index = PersistentReverseIndex.new(
    reverse_index: index_builder,
    index_record: index_record
  )
  @reverse_index = LayeredReverseIndex.new(
    active_index: active_registry,
    persistent_index: persistent_index,
    persistent_count: -> { persistence.count },
    store: "active_record",
    pending_index: pending_registry
  )
  @durable_writer = AsyncDurableWriter.new { |jobs| persistence.persist_jobs(jobs) }
end

Instance Attribute Details

#reverse_indexObject (readonly)

Returns the value of attribute reverse_index.



43
44
45
# File 'lib/upkeep/subscriptions/active_record_store.rb', line 43

def reverse_index
  @reverse_index
end

Class Method Details

.available?(connect: false) ⇒ Boolean

Returns:

  • (Boolean)


74
75
76
77
78
79
80
81
82
# File 'lib/upkeep/subscriptions/active_record_store.rb', line 74

def self.available?(connect: false)
  return false unless ActiveRecord::Base.connected? || connect

  connection = ActiveRecord::Base.connection
  connection.data_source_exists?("upkeep_subscriptions") &&
    connection.data_source_exists?("upkeep_subscription_index_entries")
rescue ActiveRecord::ConnectionNotEstablished, ActiveRecord::NoDatabaseError, ActiveRecord::StatementInvalid
  false
end

Instance Method Details

#activate(id) ⇒ Object



102
103
104
105
106
107
108
109
110
111
# File 'lib/upkeep/subscriptions/active_record_store.rb', line 102

def activate(id)
  if ActiveSupport::Notifications.notifier.listening?(ACTIVATE_NOTIFICATION)
    payload = { store: "active_record", subscription_id: id }
    ActiveSupport::Notifications.instrument(ACTIVATE_NOTIFICATION, payload) do
      activate_subscription(id, payload: payload)
    end
  else
    activate_subscription(id)
  end
end

#drainObject



95
# File 'lib/upkeep/subscriptions/active_record_store.rb', line 95

def drain = durable_writer.drain

#fetch(id) ⇒ Object



139
140
141
# File 'lib/upkeep/subscriptions/active_record_store.rb', line 139

def fetch(id)
  active_registry.fetch(id) || pending_registry.fetch(id) || persistence.fetch(id)
end

#prune_stale!(older_than:) ⇒ Object



132
133
134
135
136
137
# File 'lib/upkeep/subscriptions/active_record_store.rb', line 132

def prune_stale!(older_than:)
  durable_writer.drain
  stale_ids = persistence.prune_stale!(older_than: older_than)
  active_registry.unregister(stale_ids)
  stale_ids.size
end

#register(subscriber_id:, recorder:, metadata: {}, entries: nil) ⇒ Object



84
85
86
87
88
89
90
91
92
93
# File 'lib/upkeep/subscriptions/active_record_store.rb', line 84

def register(subscriber_id:, recorder:, metadata: {}, entries: nil)
  if ActiveSupport::Notifications.notifier.listening?(REGISTER_NOTIFICATION)
    payload = { store: "active_record" }
    ActiveSupport::Notifications.instrument(REGISTER_NOTIFICATION, payload) do
      register_subscription(subscriber_id, recorder, , entries: entries, payload: payload)
    end
  else
    register_subscription(subscriber_id, recorder, , entries: entries)
  end
end

#resetObject



158
159
160
161
162
163
164
# File 'lib/upkeep/subscriptions/active_record_store.rb', line 158

def reset
  clear_deferred_index_writes
  durable_writer.drain
  pending_registry.reset
  active_registry.reset
  persistence.reset
end

#shutdownObject



97
98
99
100
# File 'lib/upkeep/subscriptions/active_record_store.rb', line 97

def shutdown
  clear_deferred_index_writes
  durable_writer.shutdown
end

#subscriptionsObject



143
144
145
146
147
148
149
150
151
152
153
154
155
156
# File 'lib/upkeep/subscriptions/active_record_store.rb', line 143

def subscriptions
  persistent_count = persistence.count
  in_memory_subscriptions = (active_registry.subscriptions + pending_registry.subscriptions).to_h do |subscription|
    [subscription.id, subscription]
  end
  return in_memory_subscriptions.values if in_memory_subscriptions.size >= persistent_count

  seen_ids = {}
  persisted = persistence.subscriptions.map do |subscription|
    seen_ids[subscription.id] = true
    in_memory_subscriptions.fetch(subscription.id, subscription)
  end
  persisted + in_memory_subscriptions.values.reject { |subscription| seen_ids[subscription.id] }
end

#summaryObject



166
167
168
169
170
171
172
173
174
175
176
177
178
# File 'lib/upkeep/subscriptions/active_record_store.rb', line 166

def summary
  persistent_count = persistence.count
  pending_count = pending_registry.count
  active_count = active_registry.count
  {
    subscriptions: [persistent_count, active_count + pending_count].max,
    persistent_subscriptions: persistent_count,
    pending_subscriptions: pending_count,
    active_subscriptions: active_count,
    deferred_index_subscriptions: deferred_index_count,
    reverse_index: reverse_index.summary
  }
end

#touch(id, now: Time.now) ⇒ Object



113
114
115
116
117
118
119
120
# File 'lib/upkeep/subscriptions/active_record_store.rb', line 113

def touch(id, now: Time.now)
   = { "last_seen_at" => now.utc.iso8601 }
  pending_registry.touch(id, metadata: )
  active_registry.touch(id, metadata: )
  activate(id)
  durable_writer.drain
  persistence.touch(id, metadata: , now: now)
end

#unregister(ids) ⇒ Object



122
123
124
125
126
127
128
129
130
# File 'lib/upkeep/subscriptions/active_record_store.rb', line 122

def unregister(ids)
  ids = Array(ids)
  pending_registry.unregister(ids)
  active_registry.unregister(ids)
  delete_deferred_index_writes(ids)
  persisted_ids = durable_writer.cancel(ids)
  persistence.delete(persisted_ids)
  ids.size
end