Class: Upkeep::Subscriptions::Store

Inherits:
Object
  • Object
show all
Defined in:
lib/upkeep/subscriptions/store.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(reverse_index: ReverseIndex.new) ⇒ Store

Returns a new instance of Store.



52
53
54
55
56
# File 'lib/upkeep/subscriptions/store.rb', line 52

def initialize(reverse_index: ReverseIndex.new)
  @reverse_index = reverse_index
  @subscriptions = {}
  @next_id = 0
end

Instance Attribute Details

#reverse_indexObject (readonly)

Returns the value of attribute reverse_index.



50
51
52
# File 'lib/upkeep/subscriptions/store.rb', line 50

def reverse_index
  @reverse_index
end

Instance Method Details

#activate(id) ⇒ Object



108
109
110
111
# File 'lib/upkeep/subscriptions/store.rb', line 108

def activate(id)
  @subscriptions.fetch(id)
  true
end

#drainObject



113
114
115
# File 'lib/upkeep/subscriptions/store.rb', line 113

def drain
  true
end

#fetch(id) ⇒ Object



121
122
123
# File 'lib/upkeep/subscriptions/store.rb', line 121

def fetch(id)
  @subscriptions.fetch(id)
end

#prune_stale!(older_than:) ⇒ Object



89
90
91
92
93
94
95
96
# File 'lib/upkeep/subscriptions/store.rb', line 89

def prune_stale!(older_than:)
  stale_ids = @subscriptions.filter_map do |id, subscription|
    id if last_seen_at(subscription) && last_seen_at(subscription) < older_than
  end

  unregister(stale_ids)
  stale_ids.size
end

#register(subscriber_id:, recorder:, metadata: {}, entries: nil) ⇒ Object



58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/upkeep/subscriptions/store.rb', line 58

def register(subscriber_id:, recorder:, metadata: {}, entries: nil)
  recorder.flush_pending_dependencies if recorder.respond_to?(:flush_pending_dependencies)
  subscription = Subscription.new(
    next_subscription_id,
    subscriber_id,
    recorder,
    recorder.graph,
    
  )

  @subscriptions[subscription.id] = subscription
  touch(subscription.id)
  if entries
    reverse_index.index_entries(entries, subscription: subscription)
  else
    reverse_index.index(subscription)
  end
  subscription
end

#resetObject



129
130
131
132
133
# File 'lib/upkeep/subscriptions/store.rb', line 129

def reset
  @subscriptions = {}
  @reverse_index = ReverseIndex.new
  @next_id = 0
end

#shutdownObject



117
118
119
# File 'lib/upkeep/subscriptions/store.rb', line 117

def shutdown
  true
end

#subscriptionsObject



125
126
127
# File 'lib/upkeep/subscriptions/store.rb', line 125

def subscriptions
  @subscriptions.values
end

#summaryObject



135
136
137
138
139
140
# File 'lib/upkeep/subscriptions/store.rb', line 135

def summary
  {
    subscriptions: subscriptions.size,
    reverse_index: reverse_index.summary
  }
end

#touch(id, now: Time.now) ⇒ Object



78
79
80
81
82
83
84
85
86
87
# File 'lib/upkeep/subscriptions/store.rb', line 78

def touch(id, now: Time.now)
  subscription = @subscriptions.fetch(id)
  @subscriptions[id] = Subscription.new(
    subscription.id,
    subscription.subscriber_id,
    subscription.recorder,
    subscription.graph,
    subscription..merge("last_seen_at" => now.utc.iso8601)
  )
end

#unregister(ids) ⇒ Object



98
99
100
101
102
103
104
105
106
# File 'lib/upkeep/subscriptions/store.rb', line 98

def unregister(ids)
  ids = Array(ids)
  ids.each do |id|
    next unless @subscriptions.delete(id)

    reverse_index.delete_subscription(id)
  end
  ids.size
end