Class: Upkeep::Subscriptions::Store
- Inherits:
-
Object
- Object
- Upkeep::Subscriptions::Store
- Defined in:
- lib/upkeep/subscriptions/store.rb
Instance Attribute Summary collapse
-
#reverse_index ⇒ Object
readonly
Returns the value of attribute reverse_index.
Instance Method Summary collapse
- #activate(id) ⇒ Object
- #drain ⇒ Object
- #fetch(id) ⇒ Object
-
#initialize(reverse_index: ReverseIndex.new) ⇒ Store
constructor
A new instance of Store.
- #prune_stale!(older_than:) ⇒ Object
- #register(subscriber_id:, recorder:, metadata: {}, entries: nil) ⇒ Object
- #reset ⇒ Object
- #shutdown ⇒ Object
- #subscriptions ⇒ Object
- #summary ⇒ Object
- #touch(id, now: Time.now) ⇒ Object
- #unregister(ids) ⇒ Object
Constructor Details
#initialize(reverse_index: ReverseIndex.new) ⇒ Store
Returns a new instance of Store.
52 53 54 55 56 |
# File 'lib/upkeep/subscriptions/store.rb', line 52 def initialize(reverse_index: ReverseIndex.new) @reverse_index = reverse_index @subscriptions = {} @next_id = 0 end |
Instance Attribute Details
#reverse_index ⇒ Object (readonly)
Returns the value of attribute reverse_index.
50 51 52 |
# File 'lib/upkeep/subscriptions/store.rb', line 50 def reverse_index @reverse_index end |
Instance Method Details
#activate(id) ⇒ Object
108 109 110 111 |
# File 'lib/upkeep/subscriptions/store.rb', line 108 def activate(id) @subscriptions.fetch(id) true end |
#drain ⇒ Object
113 114 115 |
# File 'lib/upkeep/subscriptions/store.rb', line 113 def drain true end |
#fetch(id) ⇒ Object
121 122 123 |
# File 'lib/upkeep/subscriptions/store.rb', line 121 def fetch(id) @subscriptions.fetch(id) end |
#prune_stale!(older_than:) ⇒ Object
89 90 91 92 93 94 95 96 |
# File 'lib/upkeep/subscriptions/store.rb', line 89 def prune_stale!(older_than:) stale_ids = @subscriptions.filter_map do |id, subscription| id if last_seen_at(subscription) && last_seen_at(subscription) < older_than end unregister(stale_ids) stale_ids.size end |
#register(subscriber_id:, recorder:, metadata: {}, entries: nil) ⇒ Object
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/upkeep/subscriptions/store.rb', line 58 def register(subscriber_id:, recorder:, metadata: {}, entries: nil) recorder.flush_pending_dependencies if recorder.respond_to?(:flush_pending_dependencies) subscription = Subscription.new( next_subscription_id, subscriber_id, recorder, recorder.graph, ) @subscriptions[subscription.id] = subscription touch(subscription.id) if entries reverse_index.index_entries(entries, subscription: subscription) else reverse_index.index(subscription) end subscription end |
#reset ⇒ Object
129 130 131 132 133 |
# File 'lib/upkeep/subscriptions/store.rb', line 129 def reset @subscriptions = {} @reverse_index = ReverseIndex.new @next_id = 0 end |
#shutdown ⇒ Object
117 118 119 |
# File 'lib/upkeep/subscriptions/store.rb', line 117 def shutdown true end |
#subscriptions ⇒ Object
125 126 127 |
# File 'lib/upkeep/subscriptions/store.rb', line 125 def subscriptions @subscriptions.values end |
#summary ⇒ Object
135 136 137 138 139 140 |
# File 'lib/upkeep/subscriptions/store.rb', line 135 def summary { subscriptions: subscriptions.size, reverse_index: reverse_index.summary } end |
#touch(id, now: Time.now) ⇒ Object
78 79 80 81 82 83 84 85 86 87 |
# File 'lib/upkeep/subscriptions/store.rb', line 78 def touch(id, now: Time.now) subscription = @subscriptions.fetch(id) @subscriptions[id] = Subscription.new( subscription.id, subscription.subscriber_id, subscription.recorder, subscription.graph, subscription..merge("last_seen_at" => now.utc.iso8601) ) end |
#unregister(ids) ⇒ Object
98 99 100 101 102 103 104 105 106 |
# File 'lib/upkeep/subscriptions/store.rb', line 98 def unregister(ids) ids = Array(ids) ids.each do |id| next unless @subscriptions.delete(id) reverse_index.delete_subscription(id) end ids.size end |