Class: Ration::Hub

Inherits:
Object
  • Object
show all
Defined in:
lib/ration/hub.rb

Instance Method Summary collapse

Constructor Details

#initialize(backend:, logger:) ⇒ Hub

Returns a new instance of Hub.



5
6
7
8
9
10
11
# File 'lib/ration/hub.rb', line 5

def initialize(backend:, logger:)
  @backend       = backend
  @logger        = logger
  @subscriptions = Concurrent::Map.new
  @started       = false
  @start_mutex   = Mutex.new
end

Instance Method Details

#publish(event) ⇒ Object



13
14
15
16
# File 'lib/ration/hub.rb', line 13

def publish(event)
  ensure_started
  @backend.publish(event)
end

#stopObject



43
44
45
46
47
48
49
50
# File 'lib/ration/hub.rb', line 43

def stop
  @start_mutex.synchronize do
    @subscriptions.each_value(&:close)
    @subscriptions.clear
    @backend.stop if @started
    @started = false
  end
end

#subscribe(max:, filter: nil, on_overflow: :close) ⇒ Object



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/ration/hub.rb', line 18

def subscribe(max:, filter: nil, on_overflow: :close)
  ensure_started

  sub = Subscription.new(
    max:         max,
    filter:      filter,
    on_overflow: on_overflow,
    logger:      @logger
  )
  @subscriptions[sub.id] = sub

  return sub unless block_given?

  begin
    yield sub
  ensure
    unsubscribe(sub)
  end
end

#subscription_countObject



52
53
54
# File 'lib/ration/hub.rb', line 52

def subscription_count
  @subscriptions.size
end

#unsubscribe(sub) ⇒ Object



38
39
40
41
# File 'lib/ration/hub.rb', line 38

def unsubscribe(sub)
  @subscriptions.delete(sub.id)
  sub.close
end