Class: ZZQ::Subscription
- Inherits:
-
Object
- Object
- ZZQ::Subscription
- Defined in:
- lib/zzq/subscription.rb
Overview
Handle returned by Client#subscribe and Broker#subscribe. A thin wrapper around an Async::LimitedQueue that also owns the UNSUBSCRIBE lifecycle.
Callers consume with #pop or #each; #close sends UNSUBSCRIBE and wakes any blocked consumer with nil.
Created internally — users should never call .new directly.
Instance Attribute Summary collapse
-
#filters ⇒ Array<String>
readonly
Topic filters this subscription matches.
-
#subscription_id ⇒ Integer
readonly
Subscription identifier (v5) or nil (v3).
Instance Method Summary collapse
-
#close ⇒ Object
Sends UNSUBSCRIBE and wakes the consumer.
- #closed? ⇒ Boolean
-
#deliver(message) ⇒ Object
Internal: deliver a message to the subscriber’s queue.
-
#each ⇒ Object
Yields each message until the subscription is closed.
-
#initialize(filters:, hwm:, subscription_id: nil, on_close: nil) ⇒ Subscription
constructor
A new instance of Subscription.
-
#pop ⇒ Object
Blocks the fiber until a message arrives.
Constructor Details
#initialize(filters:, hwm:, subscription_id: nil, on_close: nil) ⇒ Subscription
Returns a new instance of Subscription.
23 24 25 26 27 28 29 |
# File 'lib/zzq/subscription.rb', line 23 def initialize(filters:, hwm:, subscription_id: nil, on_close: nil) @filters = Array(filters).freeze @subscription_id = subscription_id @queue = Async::LimitedQueue.new(hwm) @on_close = on_close @closed = false end |
Instance Attribute Details
#filters ⇒ Array<String> (readonly)
Returns topic filters this subscription matches.
17 18 19 |
# File 'lib/zzq/subscription.rb', line 17 def filters @filters end |
#subscription_id ⇒ Integer (readonly)
Returns subscription identifier (v5) or nil (v3).
20 21 22 |
# File 'lib/zzq/subscription.rb', line 20 def subscription_id @subscription_id end |
Instance Method Details
#close ⇒ Object
Sends UNSUBSCRIBE and wakes the consumer. Idempotent.
50 51 52 53 54 55 |
# File 'lib/zzq/subscription.rb', line 50 def close return if @closed @closed = true @on_close&.call(self) @queue.enqueue(nil) end |
#closed? ⇒ Boolean
58 59 60 |
# File 'lib/zzq/subscription.rb', line 58 def closed? @closed end |
#deliver(message) ⇒ Object
Internal: deliver a message to the subscriber’s queue. Blocks the caller if the queue is at high-water-mark.
65 66 67 68 |
# File 'lib/zzq/subscription.rb', line 65 def deliver() return if @closed @queue.enqueue() end |
#each ⇒ Object
Yields each message until the subscription is closed.
41 42 43 44 45 46 |
# File 'lib/zzq/subscription.rb', line 41 def each return enum_for(:each) unless block_given? while (msg = pop) yield msg end end |
#pop ⇒ Object
Blocks the fiber until a message arrives. Returns nil when the subscription is closed.
34 35 36 37 |
# File 'lib/zzq/subscription.rb', line 34 def pop return nil if @closed && @queue.empty? @queue.dequeue end |