Class: Upkeep::Subscriptions::Store
- Inherits:
-
Object
- Object
- Upkeep::Subscriptions::Store
- Defined in:
- lib/upkeep/subscriptions/store.rb
Instance Attribute Summary collapse
-
#reverse_index ⇒ Object
readonly
Returns the value of attribute reverse_index.
Instance Method Summary collapse
- #activate(id) ⇒ Object
- #drain ⇒ Object
- #fetch(id) ⇒ Object
-
#initialize(reverse_index: ReverseIndex.new) ⇒ Store
constructor
A new instance of Store.
- #prune_stale!(older_than:) ⇒ Object
- #register(subscriber_id:, recorder:, metadata: {}, entries: nil) ⇒ Object
- #reset ⇒ Object
- #shutdown ⇒ Object
- #subscriptions ⇒ Object
- #summary ⇒ Object
- #touch(id, now: Time.now) ⇒ Object
- #unregister(ids) ⇒ Object
Constructor Details
#initialize(reverse_index: ReverseIndex.new) ⇒ Store
Returns a new instance of Store.
54 55 56 57 58 |
# File 'lib/upkeep/subscriptions/store.rb', line 54 def initialize(reverse_index: ReverseIndex.new) @reverse_index = reverse_index @subscriptions = {} @next_id = 0 end |
Instance Attribute Details
#reverse_index ⇒ Object (readonly)
Returns the value of attribute reverse_index.
52 53 54 |
# File 'lib/upkeep/subscriptions/store.rb', line 52 def reverse_index @reverse_index end |
Instance Method Details
#activate(id) ⇒ Object
110 111 112 113 |
# File 'lib/upkeep/subscriptions/store.rb', line 110 def activate(id) @subscriptions.fetch(id) true end |
#drain ⇒ Object
115 116 117 |
# File 'lib/upkeep/subscriptions/store.rb', line 115 def drain true end |
#fetch(id) ⇒ Object
123 124 125 126 127 |
# File 'lib/upkeep/subscriptions/store.rb', line 123 def fetch(id) @subscriptions.fetch(id) rescue KeyError raise NotFound, id end |
#prune_stale!(older_than:) ⇒ Object
91 92 93 94 95 96 97 98 |
# File 'lib/upkeep/subscriptions/store.rb', line 91 def prune_stale!(older_than:) stale_ids = @subscriptions.filter_map do |id, subscription| id if last_seen_at(subscription) && last_seen_at(subscription) < older_than end unregister(stale_ids) stale_ids.size end |
#register(subscriber_id:, recorder:, metadata: {}, entries: nil) ⇒ Object
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/upkeep/subscriptions/store.rb', line 60 def register(subscriber_id:, recorder:, metadata: {}, entries: nil) recorder.flush_pending_dependencies if recorder.respond_to?(:flush_pending_dependencies) subscription = Subscription.new( next_subscription_id, subscriber_id, recorder, recorder.graph, ) @subscriptions[subscription.id] = subscription touch(subscription.id) if entries reverse_index.index_entries(entries, subscription: subscription) else reverse_index.index(subscription) end subscription end |
#reset ⇒ Object
133 134 135 136 137 |
# File 'lib/upkeep/subscriptions/store.rb', line 133 def reset @subscriptions = {} @reverse_index = ReverseIndex.new @next_id = 0 end |
#shutdown ⇒ Object
119 120 121 |
# File 'lib/upkeep/subscriptions/store.rb', line 119 def shutdown true end |
#subscriptions ⇒ Object
129 130 131 |
# File 'lib/upkeep/subscriptions/store.rb', line 129 def subscriptions @subscriptions.values end |
#summary ⇒ Object
139 140 141 142 143 144 |
# File 'lib/upkeep/subscriptions/store.rb', line 139 def summary { subscriptions: subscriptions.size, reverse_index: reverse_index.summary } end |
#touch(id, now: Time.now) ⇒ Object
80 81 82 83 84 85 86 87 88 89 |
# File 'lib/upkeep/subscriptions/store.rb', line 80 def touch(id, now: Time.now) subscription = @subscriptions.fetch(id) @subscriptions[id] = Subscription.new( subscription.id, subscription.subscriber_id, subscription.recorder, subscription.graph, subscription..merge("last_seen_at" => now.utc.iso8601) ) end |
#unregister(ids) ⇒ Object
100 101 102 103 104 105 106 107 108 |
# File 'lib/upkeep/subscriptions/store.rb', line 100 def unregister(ids) ids = Array(ids) ids.each do |id| next unless @subscriptions.delete(id) reverse_index.delete_subscription(id) end ids.size end |