Class: BPS::Subscriber::STAN

Inherits:
Abstract
  • Object
show all
Defined in:
lib/bps/subscriber/stan.rb

Instance Method Summary collapse

Constructor Details

#initialize(cluster_id, client_id, **opts) ⇒ STAN

Returns a new instance of STAN.

Parameters:

  • cluster (String)

    ID.

  • client (String)

    ID.

  • options. (Hash)


10
11
12
13
14
# File 'lib/bps/subscriber/stan.rb', line 10

def initialize(cluster_id, client_id, **opts)
  super()

  @client = ::BPS::STAN.connect(cluster_id, client_id, **opts)
end

Instance Method Details

#closeObject

Close the subscriber.



28
29
30
31
32
33
34
# File 'lib/bps/subscriber/stan.rb', line 28

def close
  super

  # NATS/STAN does not survive multi-closes, so close only once:
  @client&.close
  @client = nil
end

#subscribe(topic, **opts) ⇒ Object

Subscribe to a topic

Parameters:

  • topic (String)

    topic the topic name.



18
19
20
21
22
23
24
25
# File 'lib/bps/subscriber/stan.rb', line 18

def subscribe(topic, **opts)
  # important opts:
  # - queue: 'queue-name'          # https://docs.nats.io/developing-with-nats-streaming/queues
  # - durable_name: 'durable-name' # https://docs.nats.io/developing-with-nats-streaming/durables
  @client.subscribe(topic, **opts) do |msg|
    yield msg.data # TODO: maybe yielding just bytes is not too flexible? But IMO can wait till (much) later
  end
end