Class: ZZQ::Subscription

Inherits:
Object
  • Object
show all
Defined in:
lib/zzq/subscription.rb

Overview

Handle returned by Client#subscribe and Broker#subscribe. A thin wrapper around an Async::LimitedQueue that also owns the UNSUBSCRIBE lifecycle.

Callers consume with #pop or #each; #close sends UNSUBSCRIBE and wakes any blocked consumer with nil.

Created internally — users should never call .new directly.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(filters:, hwm:, subscription_id: nil, on_close: nil) ⇒ Subscription

Returns a new instance of Subscription.



23
24
25
26
27
28
29
# File 'lib/zzq/subscription.rb', line 23

def initialize(filters:, hwm:, subscription_id: nil, on_close: nil)
  @filters         = Array(filters).freeze
  @subscription_id = subscription_id
  @queue           = Async::LimitedQueue.new(hwm)
  @on_close        = on_close
  @closed          = false
end

Instance Attribute Details

#filtersArray<String> (readonly)

Returns topic filters this subscription matches.

Returns:

  • (Array<String>)

    topic filters this subscription matches.



17
18
19
# File 'lib/zzq/subscription.rb', line 17

def filters
  @filters
end

#subscription_idInteger (readonly)

Returns subscription identifier (v5) or nil (v3).

Returns:

  • (Integer)

    subscription identifier (v5) or nil (v3).



20
21
22
# File 'lib/zzq/subscription.rb', line 20

def subscription_id
  @subscription_id
end

Instance Method Details

#closeObject

Sends UNSUBSCRIBE and wakes the consumer. Idempotent.



50
51
52
53
54
55
# File 'lib/zzq/subscription.rb', line 50

def close
  return if @closed
  @closed = true
  @on_close&.call(self)
  @queue.enqueue(nil)
end

#closed?Boolean

Returns:

  • (Boolean)


58
59
60
# File 'lib/zzq/subscription.rb', line 58

def closed?
  @closed
end

#deliver(message) ⇒ Object

Internal: deliver a message to the subscriber’s queue. Blocks the caller if the queue is at high-water-mark.



65
66
67
68
# File 'lib/zzq/subscription.rb', line 65

def deliver(message)
  return if @closed
  @queue.enqueue(message)
end

#eachObject

Yields each message until the subscription is closed.



41
42
43
44
45
46
# File 'lib/zzq/subscription.rb', line 41

def each
  return enum_for(:each) unless block_given?
  while (msg = pop)
    yield msg
  end
end

#popObject

Blocks the fiber until a message arrives. Returns nil when the subscription is closed.



34
35
36
37
# File 'lib/zzq/subscription.rb', line 34

def pop
  return nil if @closed && @queue.empty?
  @queue.dequeue
end