Class: Upkeep::Subscriptions::Store

Inherits:
Object
  • Object
show all
Defined in:
lib/upkeep/subscriptions/store.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(reverse_index: ReverseIndex.new) ⇒ Store

Returns a new instance of Store.



55
56
57
58
59
60
61
# File 'lib/upkeep/subscriptions/store.rb', line 55

def initialize(reverse_index: ReverseIndex.new)
  @active_registry = ActiveRegistry.new(reverse_index: reverse_index)
  @pending_registry = ActiveRegistry.new
  @pending_index_entries = {}
  @reverse_index = active_registry
  @next_id = 0
end

Instance Attribute Details

#reverse_indexObject (readonly)

Returns the value of attribute reverse_index.



53
54
55
# File 'lib/upkeep/subscriptions/store.rb', line 53

def reverse_index
  @reverse_index
end

Instance Method Details

#activate(id) ⇒ Object



104
105
106
107
108
109
110
111
112
113
114
# File 'lib/upkeep/subscriptions/store.rb', line 104

def activate(id)
  return true if active_registry.fetch(id)

  subscription = pending_registry.fetch(id)
  return false unless subscription

  entries = @pending_index_entries.delete(id)
  active_registry.register(subscription, entries: entries)
  pending_registry.unregister(id)
  true
end

#drainObject



116
117
118
# File 'lib/upkeep/subscriptions/store.rb', line 116

def drain
  true
end

#fetch(id) ⇒ Object



124
125
126
# File 'lib/upkeep/subscriptions/store.rb', line 124

def fetch(id)
  active_registry.fetch(id) || pending_registry.fetch(id) || raise(NotFound, id)
end

#prune_stale!(older_than:) ⇒ Object



86
87
88
89
90
91
92
93
94
# File 'lib/upkeep/subscriptions/store.rb', line 86

def prune_stale!(older_than:)
  stale_ids = subscriptions.filter_map do |subscription|
    id = subscription.id
    id if last_seen_at(subscription) && last_seen_at(subscription) < older_than
  end

  unregister(stale_ids)
  stale_ids.size
end

#register(subscriber_id:, recorder:, metadata: {}, entries: nil) ⇒ Object



63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/upkeep/subscriptions/store.rb', line 63

def register(subscriber_id:, recorder:, metadata: {}, entries: nil)
  recorder.flush_pending_dependencies if recorder.respond_to?(:flush_pending_dependencies)
  subscription = Subscription.new(
    next_subscription_id,
    subscriber_id,
    recorder,
    recorder.graph,
    
  )

  pending_registry.register(subscription, entries: entries)
  @pending_index_entries[subscription.id] = entries if entries
  subscription
end

#resetObject



132
133
134
135
136
137
138
# File 'lib/upkeep/subscriptions/store.rb', line 132

def reset
  @active_registry = ActiveRegistry.new
  @pending_registry = ActiveRegistry.new
  @pending_index_entries = {}
  @reverse_index = active_registry
  @next_id = 0
end

#shutdownObject



120
121
122
# File 'lib/upkeep/subscriptions/store.rb', line 120

def shutdown
  true
end

#subscriptionsObject



128
129
130
# File 'lib/upkeep/subscriptions/store.rb', line 128

def subscriptions
  active_registry.subscriptions + pending_registry.subscriptions
end

#summaryObject



140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
# File 'lib/upkeep/subscriptions/store.rb', line 140

def summary
  active = active_registry.summary
  pending = pending_registry.summary
  {
    subscriptions: subscriptions.size,
    pending_subscriptions: pending_registry.count,
    active_subscriptions: active_registry.count,
    deferred_index_subscriptions: 0,
    reverse_index: active.merge(
      mode: :active,
      active: active,
      pending: pending,
      persistent: { lookup_keys: 0, entries: 0 }
    )
  }
end

#touch(id, now: Time.now) ⇒ Object



78
79
80
81
82
83
84
# File 'lib/upkeep/subscriptions/store.rb', line 78

def touch(id, now: Time.now)
  fetch(id)
   = { "last_seen_at" => now.utc.iso8601 }
  pending_registry.touch(id, metadata: )
  active_registry.touch(id, metadata: )
  true
end

#unregister(ids) ⇒ Object



96
97
98
99
100
101
102
# File 'lib/upkeep/subscriptions/store.rb', line 96

def unregister(ids)
  ids = Array(ids)
  ids.each { |id| @pending_index_entries.delete(id) }
  pending_registry.unregister(ids)
  active_registry.unregister(ids)
  ids.size
end