Class: Upkeep::Subscriptions::Store
- Inherits:
-
Object
- Object
- Upkeep::Subscriptions::Store
- Defined in:
- lib/upkeep/subscriptions/store.rb
Constant Summary collapse
- PERSIST_NOTIFICATION =
"persist_subscription_store.upkeep"
Instance Attribute Summary collapse
-
#reverse_index ⇒ Object
readonly
Returns the value of attribute reverse_index.
Instance Method Summary collapse
- #activate(id) ⇒ Object
- #drain ⇒ Object
- #explain(id) ⇒ Object
- #fetch(id) ⇒ Object
-
#initialize(reverse_index: ReverseIndex.new) ⇒ Store
constructor
A new instance of Store.
- #prune_stale!(older_than:) ⇒ Object
- #register(subscriber_id:, recorder:, metadata: {}, entries: nil) ⇒ Object
- #reset ⇒ Object
- #shutdown ⇒ Object
- #subscriptions ⇒ Object
- #summary ⇒ Object
- #touch(id, now: Time.now) ⇒ Object
- #unregister(ids) ⇒ Object
Constructor Details
#initialize(reverse_index: ReverseIndex.new) ⇒ Store
Returns a new instance of Store.
187 188 189 190 191 192 193 |
# File 'lib/upkeep/subscriptions/store.rb', line 187 def initialize(reverse_index: ReverseIndex.new) @active_registry = ActiveRegistry.new(reverse_index: reverse_index) @pending_registry = ActiveRegistry.new @pending_index_entries = {} @reverse_index = MemoryReverseIndex.new(active_registry: active_registry, pending_registry: pending_registry) @next_id = 0 end |
Instance Attribute Details
#reverse_index ⇒ Object (readonly)
Returns the value of attribute reverse_index.
185 186 187 |
# File 'lib/upkeep/subscriptions/store.rb', line 185 def reverse_index @reverse_index end |
Instance Method Details
#activate(id) ⇒ Object
232 233 234 235 236 237 238 239 240 241 |
# File 'lib/upkeep/subscriptions/store.rb', line 232 def activate(id) if ActiveSupport::Notifications.notifier.listening?(PERSIST_NOTIFICATION) payload = memory_persist_payload(operation: :persist_index) ActiveSupport::Notifications.instrument(PERSIST_NOTIFICATION, payload) do activate_subscription(id, payload: payload) end else activate_subscription(id) end end |
#drain ⇒ Object
243 244 245 |
# File 'lib/upkeep/subscriptions/store.rb', line 243 def drain true end |
#explain(id) ⇒ Object
255 256 257 |
# File 'lib/upkeep/subscriptions/store.rb', line 255 def explain(id) fetch(id).explain end |
#fetch(id) ⇒ Object
251 252 253 |
# File 'lib/upkeep/subscriptions/store.rb', line 251 def fetch(id) active_registry.fetch(id) || pending_registry.fetch(id) || raise(NotFound, id) end |
#prune_stale!(older_than:) ⇒ Object
214 215 216 217 218 219 220 221 222 |
# File 'lib/upkeep/subscriptions/store.rb', line 214 def prune_stale!(older_than:) stale_ids = subscriptions.filter_map do |subscription| id = subscription.id id if last_seen_at(subscription) && last_seen_at(subscription) < older_than end unregister(stale_ids) stale_ids.size end |
#register(subscriber_id:, recorder:, metadata: {}, entries: nil) ⇒ Object
195 196 197 198 199 200 201 202 203 204 |
# File 'lib/upkeep/subscriptions/store.rb', line 195 def register(subscriber_id:, recorder:, metadata: {}, entries: nil) if ActiveSupport::Notifications.notifier.listening?(PERSIST_NOTIFICATION) payload = memory_persist_payload(operation: :persist_subscription) ActiveSupport::Notifications.instrument(PERSIST_NOTIFICATION, payload) do register_subscription(subscriber_id: subscriber_id, recorder: recorder, metadata: , entries: entries, payload: payload) end else register_subscription(subscriber_id: subscriber_id, recorder: recorder, metadata: , entries: entries) end end |
#reset ⇒ Object
263 264 265 266 267 268 269 |
# File 'lib/upkeep/subscriptions/store.rb', line 263 def reset @active_registry = ActiveRegistry.new @pending_registry = ActiveRegistry.new @pending_index_entries = {} @reverse_index = MemoryReverseIndex.new(active_registry: active_registry, pending_registry: pending_registry) @next_id = 0 end |
#shutdown ⇒ Object
247 248 249 |
# File 'lib/upkeep/subscriptions/store.rb', line 247 def shutdown true end |
#subscriptions ⇒ Object
259 260 261 |
# File 'lib/upkeep/subscriptions/store.rb', line 259 def subscriptions active_registry.subscriptions + pending_registry.subscriptions end |
#summary ⇒ Object
271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 |
# File 'lib/upkeep/subscriptions/store.rb', line 271 def summary active = active_registry.summary pending = pending_registry.summary { subscriptions: subscriptions.size, pending_subscriptions: pending_registry.count, active_subscriptions: active_registry.count, deferred_index_subscriptions: 0, reverse_index: active.merge( mode: :active, active: active, pending: pending, persistent: { lookup_keys: 0, entries: 0 } ) } end |
#touch(id, now: Time.now) ⇒ Object
206 207 208 209 210 211 212 |
# File 'lib/upkeep/subscriptions/store.rb', line 206 def touch(id, now: Time.now) fetch(id) = { "last_seen_at" => now.utc.iso8601 } pending_registry.touch(id, metadata: ) active_registry.touch(id, metadata: ) true end |
#unregister(ids) ⇒ Object
224 225 226 227 228 229 230 |
# File 'lib/upkeep/subscriptions/store.rb', line 224 def unregister(ids) ids = Array(ids) ids.each { |id| @pending_index_entries.delete(id) } pending_registry.unregister(ids) active_registry.unregister(ids) ids.size end |