Class: Upkeep::Subscriptions::Store

Inherits:
Object
  • Object
show all
Defined in:
lib/upkeep/subscriptions/store.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(reverse_index: ReverseIndex.new) ⇒ Store

Returns a new instance of Store.



54
55
56
57
58
# File 'lib/upkeep/subscriptions/store.rb', line 54

def initialize(reverse_index: ReverseIndex.new)
  @reverse_index = reverse_index
  @subscriptions = {}
  @next_id = 0
end

Instance Attribute Details

#reverse_indexObject (readonly)

Returns the value of attribute reverse_index.



52
53
54
# File 'lib/upkeep/subscriptions/store.rb', line 52

def reverse_index
  @reverse_index
end

Instance Method Details

#activate(id) ⇒ Object



110
111
112
113
# File 'lib/upkeep/subscriptions/store.rb', line 110

def activate(id)
  @subscriptions.fetch(id)
  true
end

#drainObject



115
116
117
# File 'lib/upkeep/subscriptions/store.rb', line 115

def drain
  true
end

#fetch(id) ⇒ Object



123
124
125
126
127
# File 'lib/upkeep/subscriptions/store.rb', line 123

def fetch(id)
  @subscriptions.fetch(id)
rescue KeyError
  raise NotFound, id
end

#prune_stale!(older_than:) ⇒ Object



91
92
93
94
95
96
97
98
# File 'lib/upkeep/subscriptions/store.rb', line 91

def prune_stale!(older_than:)
  stale_ids = @subscriptions.filter_map do |id, subscription|
    id if last_seen_at(subscription) && last_seen_at(subscription) < older_than
  end

  unregister(stale_ids)
  stale_ids.size
end

#register(subscriber_id:, recorder:, metadata: {}, entries: nil) ⇒ Object



60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/upkeep/subscriptions/store.rb', line 60

def register(subscriber_id:, recorder:, metadata: {}, entries: nil)
  recorder.flush_pending_dependencies if recorder.respond_to?(:flush_pending_dependencies)
  subscription = Subscription.new(
    next_subscription_id,
    subscriber_id,
    recorder,
    recorder.graph,
    
  )

  @subscriptions[subscription.id] = subscription
  touch(subscription.id)
  if entries
    reverse_index.index_entries(entries, subscription: subscription)
  else
    reverse_index.index(subscription)
  end
  subscription
end

#resetObject



133
134
135
136
137
# File 'lib/upkeep/subscriptions/store.rb', line 133

def reset
  @subscriptions = {}
  @reverse_index = ReverseIndex.new
  @next_id = 0
end

#shutdownObject



119
120
121
# File 'lib/upkeep/subscriptions/store.rb', line 119

def shutdown
  true
end

#subscriptionsObject



129
130
131
# File 'lib/upkeep/subscriptions/store.rb', line 129

def subscriptions
  @subscriptions.values
end

#summaryObject



139
140
141
142
143
144
# File 'lib/upkeep/subscriptions/store.rb', line 139

def summary
  {
    subscriptions: subscriptions.size,
    reverse_index: reverse_index.summary
  }
end

#touch(id, now: Time.now) ⇒ Object



80
81
82
83
84
85
86
87
88
89
# File 'lib/upkeep/subscriptions/store.rb', line 80

def touch(id, now: Time.now)
  subscription = @subscriptions.fetch(id)
  @subscriptions[id] = Subscription.new(
    subscription.id,
    subscription.subscriber_id,
    subscription.recorder,
    subscription.graph,
    subscription..merge("last_seen_at" => now.utc.iso8601)
  )
end

#unregister(ids) ⇒ Object



100
101
102
103
104
105
106
107
108
# File 'lib/upkeep/subscriptions/store.rb', line 100

def unregister(ids)
  ids = Array(ids)
  ids.each do |id|
    next unless @subscriptions.delete(id)

    reverse_index.delete_subscription(id)
  end
  ids.size
end