Class: NatsAsync::JetStream::PullSubscription
- Inherits:
-
Object
- Object
- NatsAsync::JetStream::PullSubscription
- Defined in:
- lib/nats_async/jetstream.rb
Instance Method Summary collapse
- #fetch(batch: 1, timeout: 1) ⇒ Object
-
#initialize(client:, stream:, consumer:) ⇒ PullSubscription
constructor
A new instance of PullSubscription.
- #unsubscribe ⇒ Object
Constructor Details
#initialize(client:, stream:, consumer:) ⇒ PullSubscription
Returns a new instance of PullSubscription.
26 27 28 29 30 31 32 |
# File 'lib/nats_async/jetstream.rb', line 26 def initialize(client:, stream:, consumer:) @client = client @stream = stream @consumer = consumer @inbox = "_INBOX.#{rand(1 << 30)}.#{object_id.abs}" @sid = @client.subscribe(@inbox) { || receive() } end |
Instance Method Details
#fetch(batch: 1, timeout: 1) ⇒ Object
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/nats_async/jetstream.rb', line 34 def fetch(batch: 1, timeout: 1) @messages = [] @done = false @batch = batch @condition = Async::Condition.new @client.publish(next_subject, JSON.generate({batch: batch, expires: seconds_to_nanoseconds(timeout)}), reply: @inbox) (timeout) @messages ensure @messages = nil @condition = nil @batch = nil @done = false end |
#unsubscribe ⇒ Object
50 51 52 53 54 55 56 |
# File 'lib/nats_async/jetstream.rb', line 50 def unsubscribe return true unless @sid @client.unsubscribe(@sid) @sid = nil true end |