Class: NatsAsync::JetStream::PullSubscription

Inherits:
Object
  • Object
show all
Defined in:
lib/nats_async/jetstream.rb

Instance Method Summary collapse

Constructor Details

#initialize(client:, stream:, consumer:) ⇒ PullSubscription

Returns a new instance of PullSubscription.



26
27
28
29
30
31
32
# File 'lib/nats_async/jetstream.rb', line 26

def initialize(client:, stream:, consumer:)
  @client = client
  @stream = stream
  @consumer = consumer
  @inbox = "_INBOX.#{rand(1 << 30)}.#{object_id.abs}"
  @sid = @client.subscribe(@inbox) { |message| receive(message) }
end

Instance Method Details

#fetch(batch: 1, timeout: 1) ⇒ Object



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/nats_async/jetstream.rb', line 34

def fetch(batch: 1, timeout: 1)
  @messages = []
  @done = false
  @batch = batch
  @condition = Async::Condition.new

  @client.publish(next_subject, JSON.generate({batch: batch, expires: seconds_to_nanoseconds(timeout)}), reply: @inbox)
  wait_for_messages(timeout)
  @messages
ensure
  @messages = nil
  @condition = nil
  @batch = nil
  @done = false
end

#unsubscribeObject



50
51
52
53
54
55
56
# File 'lib/nats_async/jetstream.rb', line 50

def unsubscribe
  return true unless @sid

  @client.unsubscribe(@sid)
  @sid = nil
  true
end