Class: Upkeep::Subscriptions::Store
- Inherits:
-
Object
- Object
- Upkeep::Subscriptions::Store
- Defined in:
- lib/upkeep/subscriptions/store.rb
Instance Attribute Summary collapse
-
#reverse_index ⇒ Object
readonly
Returns the value of attribute reverse_index.
Instance Method Summary collapse
- #activate(id) ⇒ Object
- #drain ⇒ Object
- #fetch(id) ⇒ Object
-
#initialize(reverse_index: ReverseIndex.new) ⇒ Store
constructor
A new instance of Store.
- #prune_stale!(older_than:) ⇒ Object
- #register(subscriber_id:, recorder:, metadata: {}, entries: nil) ⇒ Object
- #reset ⇒ Object
- #shutdown ⇒ Object
- #subscriptions ⇒ Object
- #summary ⇒ Object
- #touch(id, now: Time.now) ⇒ Object
- #unregister(ids) ⇒ Object
Constructor Details
#initialize(reverse_index: ReverseIndex.new) ⇒ Store
Returns a new instance of Store.
55 56 57 58 59 60 61 |
# File 'lib/upkeep/subscriptions/store.rb', line 55 def initialize(reverse_index: ReverseIndex.new) @active_registry = ActiveRegistry.new(reverse_index: reverse_index) @pending_registry = ActiveRegistry.new @pending_index_entries = {} @reverse_index = active_registry @next_id = 0 end |
Instance Attribute Details
#reverse_index ⇒ Object (readonly)
Returns the value of attribute reverse_index.
53 54 55 |
# File 'lib/upkeep/subscriptions/store.rb', line 53 def reverse_index @reverse_index end |
Instance Method Details
#activate(id) ⇒ Object
104 105 106 107 108 109 110 111 112 113 114 |
# File 'lib/upkeep/subscriptions/store.rb', line 104 def activate(id) return true if active_registry.fetch(id) subscription = pending_registry.fetch(id) return false unless subscription entries = @pending_index_entries.delete(id) active_registry.register(subscription, entries: entries) pending_registry.unregister(id) true end |
#drain ⇒ Object
116 117 118 |
# File 'lib/upkeep/subscriptions/store.rb', line 116 def drain true end |
#fetch(id) ⇒ Object
124 125 126 |
# File 'lib/upkeep/subscriptions/store.rb', line 124 def fetch(id) active_registry.fetch(id) || pending_registry.fetch(id) || raise(NotFound, id) end |
#prune_stale!(older_than:) ⇒ Object
86 87 88 89 90 91 92 93 94 |
# File 'lib/upkeep/subscriptions/store.rb', line 86 def prune_stale!(older_than:) stale_ids = subscriptions.filter_map do |subscription| id = subscription.id id if last_seen_at(subscription) && last_seen_at(subscription) < older_than end unregister(stale_ids) stale_ids.size end |
#register(subscriber_id:, recorder:, metadata: {}, entries: nil) ⇒ Object
63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/upkeep/subscriptions/store.rb', line 63 def register(subscriber_id:, recorder:, metadata: {}, entries: nil) recorder.flush_pending_dependencies if recorder.respond_to?(:flush_pending_dependencies) subscription = Subscription.new( next_subscription_id, subscriber_id, recorder, recorder.graph, ) pending_registry.register(subscription, entries: entries) @pending_index_entries[subscription.id] = entries if entries subscription end |
#reset ⇒ Object
132 133 134 135 136 137 138 |
# File 'lib/upkeep/subscriptions/store.rb', line 132 def reset @active_registry = ActiveRegistry.new @pending_registry = ActiveRegistry.new @pending_index_entries = {} @reverse_index = active_registry @next_id = 0 end |
#shutdown ⇒ Object
120 121 122 |
# File 'lib/upkeep/subscriptions/store.rb', line 120 def shutdown true end |
#subscriptions ⇒ Object
128 129 130 |
# File 'lib/upkeep/subscriptions/store.rb', line 128 def subscriptions active_registry.subscriptions + pending_registry.subscriptions end |
#summary ⇒ Object
140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 |
# File 'lib/upkeep/subscriptions/store.rb', line 140 def summary active = active_registry.summary pending = pending_registry.summary { subscriptions: subscriptions.size, pending_subscriptions: pending_registry.count, active_subscriptions: active_registry.count, deferred_index_subscriptions: 0, reverse_index: active.merge( mode: :active, active: active, pending: pending, persistent: { lookup_keys: 0, entries: 0 } ) } end |
#touch(id, now: Time.now) ⇒ Object
78 79 80 81 82 83 84 |
# File 'lib/upkeep/subscriptions/store.rb', line 78 def touch(id, now: Time.now) fetch(id) = { "last_seen_at" => now.utc.iso8601 } pending_registry.touch(id, metadata: ) active_registry.touch(id, metadata: ) true end |
#unregister(ids) ⇒ Object
96 97 98 99 100 101 102 |
# File 'lib/upkeep/subscriptions/store.rb', line 96 def unregister(ids) ids = Array(ids) ids.each { |id| @pending_index_entries.delete(id) } pending_registry.unregister(ids) active_registry.unregister(ids) ids.size end |