Class: NatsAsync::JetStream::PullSubscription
- Inherits:
-
Object
- Object
- NatsAsync::JetStream::PullSubscription
- Defined in:
- lib/nats_async/jetstream.rb
Instance Method Summary collapse
- #fetch(batch: 1, timeout: 1) ⇒ Object
-
#initialize(client:, stream:, consumer:) ⇒ PullSubscription
constructor
A new instance of PullSubscription.
- #unsubscribe ⇒ Object
Constructor Details
#initialize(client:, stream:, consumer:) ⇒ PullSubscription
Returns a new instance of PullSubscription.
26 27 28 29 30 31 32 |
# File 'lib/nats_async/jetstream.rb', line 26 def initialize(client:, stream:, consumer:) @client = client @stream = stream @consumer = consumer @inbox = "_INBOX.#{rand(1 << 30)}.#{object_id.abs}" @sid = @client.subscribe(@inbox) { || receive() } end |
Instance Method Details
#fetch(batch: 1, timeout: 1) ⇒ Object
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/nats_async/jetstream.rb', line 34 def fetch(batch: 1, timeout: 1) @messages = [] @done = false @error = nil @batch = batch @condition = Async::Condition.new @client.publish(next_subject, JSON.generate({batch: batch, expires: seconds_to_nanoseconds(timeout)}), reply: @inbox) (timeout) raise @error if @error @messages ensure @messages = nil @error = nil @condition = nil @batch = nil @done = false end |
#unsubscribe ⇒ Object
54 55 56 57 58 59 60 |
# File 'lib/nats_async/jetstream.rb', line 54 def unsubscribe return true unless @sid @client.unsubscribe(@sid) @sid = nil true end |