Class: BPS::Publisher::NATS
- Inherits:
-
Abstract
- Object
- Abstract
- BPS::Publisher::NATS
- Defined in:
- lib/bps/publisher/nats.rb
Defined Under Namespace
Classes: Topic
Constant Summary collapse
- FLUSH_TIMEOUT =
5
- CLIENT_OPTS =
{ servers: [:string], dont_randomize_servers: :bool, reconnect_time_wait: :float, max_reconnect_attempts: :int, connect_timeout: :float, tls_ca_file: :string, # TODO: review, list all of them: https://github.com/nats-io/nats-pure.rb }.freeze
Class Method Summary collapse
-
.coercer ⇒ BPS::Coercer
The options coercer.
- .parse_url(url) ⇒ Object
Instance Method Summary collapse
- #close ⇒ Object
-
#initialize(**opts) ⇒ NATS
constructor
A new instance of NATS.
- #topic(name) ⇒ Object
Constructor Details
#initialize(**opts) ⇒ NATS
Returns a new instance of NATS.
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/bps/publisher/nats.rb', line 53 def initialize(**opts) super() # handle TLS if CA file is provided: if !opts[:tls] && opts[:tls_ca_file] ctx = OpenSSL::SSL::SSLContext.new ctx.set_params ctx.ca_file = opts.delete(:tls_ca_file) opts[:tls] = ctx end @topics = {} @client = ::NATS::IO::Client.new @client.connect(**opts.slice(*CLIENT_OPTS.keys)) end |
Class Method Details
.coercer ⇒ BPS::Coercer
Returns the options coercer.
48 49 50 |
# File 'lib/bps/publisher/nats.rb', line 48 def self.coercer @coercer ||= BPS::Coercer.new(CLIENT_OPTS).freeze end |
.parse_url(url) ⇒ Object
36 37 38 39 40 41 42 43 44 45 |
# File 'lib/bps/publisher/nats.rb', line 36 def self.parse_url(url) port = url.port&.to_s || '4222' servers = CGI.unescape(url.host).split(',').map do |host| addr = "nats://#{host}" addr << ':' << port unless /:\d+$/.match?(addr) addr end opts = CGI.parse(url.query || '').transform_values {|v| v.size == 1 ? v[0] : v } opts.merge(servers: servers) end |
Instance Method Details
#close ⇒ Object
73 74 75 76 77 |
# File 'lib/bps/publisher/nats.rb', line 73 def close super @client.close end |
#topic(name) ⇒ Object
69 70 71 |
# File 'lib/bps/publisher/nats.rb', line 69 def topic(name) @topics[name] ||= self.class::Topic.new(@client, name) end |