Class: Upkeep::Subscriptions::Store

Inherits:
Object
  • Object
show all
Defined in:
lib/upkeep/subscriptions/store.rb

Constant Summary collapse

PERSIST_NOTIFICATION =
"persist_subscription_store.upkeep"

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(reverse_index: ReverseIndex.new) ⇒ Store

Returns a new instance of Store.



187
188
189
190
191
192
193
# File 'lib/upkeep/subscriptions/store.rb', line 187

def initialize(reverse_index: ReverseIndex.new)
  @active_registry = ActiveRegistry.new(reverse_index: reverse_index)
  @pending_registry = ActiveRegistry.new
  @pending_index_entries = {}
  @reverse_index = MemoryReverseIndex.new(active_registry: active_registry, pending_registry: pending_registry)
  @next_id = 0
end

Instance Attribute Details

#reverse_indexObject (readonly)

Returns the value of attribute reverse_index.



185
186
187
# File 'lib/upkeep/subscriptions/store.rb', line 185

def reverse_index
  @reverse_index
end

Instance Method Details

#activate(id) ⇒ Object



232
233
234
235
236
237
238
239
240
241
# File 'lib/upkeep/subscriptions/store.rb', line 232

def activate(id)
  if ActiveSupport::Notifications.notifier.listening?(PERSIST_NOTIFICATION)
    payload = memory_persist_payload(operation: :persist_index)
    ActiveSupport::Notifications.instrument(PERSIST_NOTIFICATION, payload) do
      activate_subscription(id, payload: payload)
    end
  else
    activate_subscription(id)
  end
end

#drainObject



243
244
245
# File 'lib/upkeep/subscriptions/store.rb', line 243

def drain
  true
end

#explain(id) ⇒ Object



255
256
257
# File 'lib/upkeep/subscriptions/store.rb', line 255

def explain(id)
  fetch(id).explain
end

#fetch(id) ⇒ Object



251
252
253
# File 'lib/upkeep/subscriptions/store.rb', line 251

def fetch(id)
  active_registry.fetch(id) || pending_registry.fetch(id) || raise(NotFound, id)
end

#prune_stale!(older_than:) ⇒ Object



214
215
216
217
218
219
220
221
222
# File 'lib/upkeep/subscriptions/store.rb', line 214

def prune_stale!(older_than:)
  stale_ids = subscriptions.filter_map do |subscription|
    id = subscription.id
    id if last_seen_at(subscription) && last_seen_at(subscription) < older_than
  end

  unregister(stale_ids)
  stale_ids.size
end

#register(subscriber_id:, recorder:, metadata: {}, entries: nil) ⇒ Object



195
196
197
198
199
200
201
202
203
204
# File 'lib/upkeep/subscriptions/store.rb', line 195

def register(subscriber_id:, recorder:, metadata: {}, entries: nil)
  if ActiveSupport::Notifications.notifier.listening?(PERSIST_NOTIFICATION)
    payload = memory_persist_payload(operation: :persist_subscription)
    ActiveSupport::Notifications.instrument(PERSIST_NOTIFICATION, payload) do
      register_subscription(subscriber_id: subscriber_id, recorder: recorder, metadata: , entries: entries, payload: payload)
    end
  else
    register_subscription(subscriber_id: subscriber_id, recorder: recorder, metadata: , entries: entries)
  end
end

#resetObject



263
264
265
266
267
268
269
# File 'lib/upkeep/subscriptions/store.rb', line 263

def reset
  @active_registry = ActiveRegistry.new
  @pending_registry = ActiveRegistry.new
  @pending_index_entries = {}
  @reverse_index = MemoryReverseIndex.new(active_registry: active_registry, pending_registry: pending_registry)
  @next_id = 0
end

#shutdownObject



247
248
249
# File 'lib/upkeep/subscriptions/store.rb', line 247

def shutdown
  true
end

#subscriptionsObject



259
260
261
# File 'lib/upkeep/subscriptions/store.rb', line 259

def subscriptions
  active_registry.subscriptions + pending_registry.subscriptions
end

#summaryObject



271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
# File 'lib/upkeep/subscriptions/store.rb', line 271

def summary
  active = active_registry.summary
  pending = pending_registry.summary
  {
    subscriptions: subscriptions.size,
    pending_subscriptions: pending_registry.count,
    active_subscriptions: active_registry.count,
    deferred_index_subscriptions: 0,
    reverse_index: active.merge(
      mode: :active,
      active: active,
      pending: pending,
      persistent: { lookup_keys: 0, entries: 0 }
    )
  }
end

#touch(id, now: Time.now) ⇒ Object



206
207
208
209
210
211
212
# File 'lib/upkeep/subscriptions/store.rb', line 206

def touch(id, now: Time.now)
  fetch(id)
   = { "last_seen_at" => now.utc.iso8601 }
  pending_registry.touch(id, metadata: )
  active_registry.touch(id, metadata: )
  true
end

#unregister(ids) ⇒ Object



224
225
226
227
228
229
230
# File 'lib/upkeep/subscriptions/store.rb', line 224

def unregister(ids)
  ids = Array(ids)
  ids.each { |id| @pending_index_entries.delete(id) }
  pending_registry.unregister(ids)
  active_registry.unregister(ids)
  ids.size
end