Class: BPS::Publisher::NATS

Inherits:
Abstract
  • Object
show all
Defined in:
lib/bps/publisher/nats.rb

Defined Under Namespace

Classes: Topic

Constant Summary collapse

FLUSH_TIMEOUT =
5
CLIENT_OPTS =
{
  servers: [:string],
  dont_randomize_servers: :bool,
  reconnect_time_wait: :float,
  max_reconnect_attempts: :int,
  connect_timeout: :float,
  tls_ca_file: :string,
  # TODO: review, list all of them: https://github.com/nats-io/nats-pure.rb
}.freeze

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(**opts) ⇒ NATS

Returns a new instance of NATS.

Parameters:

  • options. (Hash)


53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/bps/publisher/nats.rb', line 53

def initialize(**opts)
  super()

  # handle TLS if CA file is provided:
  if !opts[:tls] && opts[:tls_ca_file]
    ctx = OpenSSL::SSL::SSLContext.new
    ctx.set_params
    ctx.ca_file = opts.delete(:tls_ca_file)
    opts[:tls] = ctx
  end

  @topics = {}
  @client = ::NATS::IO::Client.new
  @client.connect(**opts.slice(*CLIENT_OPTS.keys))
end

Class Method Details

.coercerBPS::Coercer

Returns the options coercer.

Returns:

  • (BPS::Coercer)

    the options coercer.



48
49
50
# File 'lib/bps/publisher/nats.rb', line 48

def self.coercer
  @coercer ||= BPS::Coercer.new(CLIENT_OPTS).freeze
end

.parse_url(url) ⇒ Object



36
37
38
39
40
41
42
43
44
45
# File 'lib/bps/publisher/nats.rb', line 36

def self.parse_url(url)
  port = url.port&.to_s || '4222'
  servers = CGI.unescape(url.host).split(',').map do |host|
    addr = "nats://#{host}"
    addr << ':' << port unless /:\d+$/.match?(addr)
    addr
  end
  opts = CGI.parse(url.query || '').transform_values {|v| v.size == 1 ? v[0] : v }
  opts.merge(servers: servers)
end

Instance Method Details

#closeObject



73
74
75
76
77
# File 'lib/bps/publisher/nats.rb', line 73

def close
  super

  @client.close
end

#topic(name) ⇒ Object



69
70
71
# File 'lib/bps/publisher/nats.rb', line 69

def topic(name)
  @topics[name] ||= self.class::Topic.new(@client, name)
end