Class: Turbocable::NatsConnection Private
- Inherits:
-
Object
- Object
- Turbocable::NatsConnection
- Defined in:
- lib/turbocable/nats_connection.rb
Overview
This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.
Manages the process-wide NATS connection and JetStream context.
Design invariants
-
*One connection per process.*
nats-pureis thread-safe; multiple Ruby threads can publish concurrently without additional serialization. -
*Lazy open.* The connection is not established until the first
#publishcall. This means apps that configure Turbocable but never broadcast (e.g. consumer-only processes) don’t open a NATS socket. -
Fork-safe. On
Process.fork(Puma / Unicorn worker boot), child processes detect the PID change and reopen their own connection instead of sharing the parent’s file descriptors. -
*Clean shutdown.* An
at_exithook flushes pending acks and closes the connection gracefully when the process exits. -
*No stream management.* The gem never creates or alters the
TURBOCABLEJetStream stream — that’sturbocable-server‘s responsibility. If the stream is absent, NATS returns a “no stream matches subject” error that is surfaced asPublishErrorwith an actionable message.
Constant Summary collapse
- KV_BUCKET_MISSING_ON_LOOKUP =
This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.
Exception types raised by nats-pure when a JetStream KV bucket does not exist yet (
#key_valuemust then callcreate_key_value). [ NATS::JetStream::Error::NotFound, NATS::JetStream::Error::StreamNotFound, *(defined?(NATS::KeyValue::BucketNotFoundError) ? [NATS::KeyValue::BucketNotFoundError] : []) ].freeze
Instance Method Summary collapse
-
#close ⇒ void
private
Closes the NATS connection and resets internal state.
-
#initialize(config) ⇒ NatsConnection
constructor
private
A new instance of NatsConnection.
-
#key_value(bucket, history: 1) ⇒ NATS::JetStream::KeyValue
private
Returns a NATS KV store handle for
bucket, creating the bucket with sensible defaults if it does not yet exist. -
#ping(timeout: 2.0) ⇒ Boolean
private
Returns
trueif NATS is reachable via a flush round-trip. -
#publish(subject, bytes, timeout:) ⇒ NATS::JetStream::PubAck
private
Publishes
bytestosubjectvia JetStream and returns the ack.
Constructor Details
#initialize(config) ⇒ NatsConnection
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns a new instance of NatsConnection.
36 37 38 39 40 41 42 43 44 |
# File 'lib/turbocable/nats_connection.rb', line 36 def initialize(config) @config = config @mutex = Mutex.new @nc = nil # NATS::IO::Client @js = nil # JetStream context @pid = nil # PID at connection open time at_exit { close_quietly } end |
Instance Method Details
#close ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
Closes the NATS connection and resets internal state. Safe to call multiple times — subsequent calls are no-ops.
99 100 101 102 103 104 105 106 |
# File 'lib/turbocable/nats_connection.rb', line 99 def close @mutex.synchronize do @nc&.close @nc = nil @js = nil @pid = nil end end |
#key_value(bucket, history: 1) ⇒ NATS::JetStream::KeyValue
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns a NATS KV store handle for bucket, creating the bucket with sensible defaults if it does not yet exist.
Used by Turbocable::Auth.publish_public_key!. The server watches the bucket but does not create it; the gem is the source of truth for the bucket’s lifecycle.
86 87 88 89 90 91 92 93 |
# File 'lib/turbocable/nats_connection.rb', line 86 def key_value(bucket, history: 1) ensure_connected! begin @js.key_value(bucket) rescue *KV_BUCKET_MISSING_ON_LOOKUP @js.create_key_value(bucket: bucket, history: history) end end |
#ping(timeout: 2.0) ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns true if NATS is reachable via a flush round-trip.
67 68 69 70 71 72 73 |
# File 'lib/turbocable/nats_connection.rb', line 67 def ping(timeout: 2.0) ensure_connected! @nc.flush(timeout) true rescue false end |
#publish(subject, bytes, timeout:) ⇒ NATS::JetStream::PubAck
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Publishes bytes to subject via JetStream and returns the ack.
Opens the connection on first call (lazy) and reopens it in a forked child process (fork-safe).
56 57 58 59 60 61 |
# File 'lib/turbocable/nats_connection.rb', line 56 def publish(subject, bytes, timeout:) ensure_connected! @js.publish(subject, bytes, timeout: timeout) rescue NATS::IO::NoRespondersError, NATS::JetStream::Error => e handle_nats_error(e, subject) end |