Module: BPS::STAN

Defined in:
lib/bps/stan.rb

Constant Summary collapse

CLIENT_OPTS =
{
  nats: {
    servers: [:string],
    dont_randomize_servers: :bool,
    reconnect_time_wait: :float,
    max_reconnect_attempts: :int,
    connect_timeout: :float,
    tls_ca_file: :string,
    # TODO: review, list all of them: https://github.com/nats-io/nats.rb
  },
}.freeze

Class Method Summary collapse

Class Method Details

.coercerBPS::Coercer

Returns the options coercer.

Returns:

  • (BPS::Coercer)

    the options coercer



54
55
56
# File 'lib/bps/stan.rb', line 54

def self.coercer
  @coercer ||= BPS::Coercer.new(CLIENT_OPTS).freeze
end

.connect(cluster_id, client_id, nats: {}, **opts) ⇒ STAN::Client

Returns connected STAN client.

Parameters:

  • cluster (String)

    ID

  • client (String)

    ID

  • options (Hash)

Returns:

  • (STAN::Client)

    connected STAN client



39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/bps/stan.rb', line 39

def self.connect(cluster_id, client_id, nats: {}, **opts)
  # handle TLS if CA file is provided:
  if !nats[:tls] && nats[:tls_ca_file]
    ctx = OpenSSL::SSL::SSLContext.new
    ctx.set_params
    ctx.ca_file = nats.delete(:tls_ca_file)
    nats[:tls] = ctx
  end

  client = ::STAN::Client.new
  client.connect(cluster_id, client_id, nats: nats, **opts.slice(*CLIENT_OPTS.keys))
  client
end

.parse_url(url) ⇒ Array

Returns arguments for connecting to STAN.

Returns:

  • (Array)

    arguments for connecting to STAN



59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/bps/stan.rb', line 59

def self.parse_url(url)
  port = url.port&.to_s || '4222'
  servers = CGI.unescape(url.host).split(',').map do |host|
    addr = "nats://#{host}"
    addr << ':' << port unless /:\d+$/.match?(addr)
    addr
  end
  opts = CGI.parse(url.query || '').transform_values {|v| v.size == 1 ? v[0] : v }
  cluster_id = opts.delete('cluster_id')
  client_id = opts.delete('client_id')
  [cluster_id, client_id, { nats: { servers: servers } }]
end