Class: Turbocable::Client
- Inherits:
-
Object
- Object
- Turbocable::Client
- Defined in:
- lib/turbocable/client.rb
Overview
Entry point for publishing messages to the TurboCable fan-out pipeline.
Prefer the top-level convenience method:
Turbocable.broadcast("chat_room_42", text: "hello")
Use the client directly when you need per-call codec overrides or access to the returned JetStream ack:
client = Turbocable::Client.new(Turbocable.config)
ack = client.broadcast("chat_room_42", {text: "hi"}, codec: :json)
puts ack.stream # => "TURBOCABLE"
puts ack.seq # => 1
Data flow
-
Validate the stream name against the server’s subject-token charset.
-
Look up the codec (default from
config.default_codec, or per-call). -
Serialize the payload to bytes.
-
Enforce
config.max_payload_bytes— fail fast before touching NATS. -
Publish to NATS JetStream via NatsConnection#publish.
-
On transient failures (
Timeout,JetStream::Error), retry with exponential backoff up toconfig.max_retriestimes. -
Return the JetStream ack on success or raise
PublishErroron final failure.
On delivery guarantees
A successful ack means NATS JetStream has persisted the message. If the server operator has set TURBOCABLE_STREAM_RATE_LIMIT_RPS, messages that exceed the stream’s rate limit may be dropped by turbocable-server after a successful NATS ack. A green broadcast is therefore not an end-to-end delivery guarantee — it is a persistence guarantee.
Constant Summary collapse
- STREAM_NAME_PATTERN =
Conservative charset matching what the server’s NATS glob authorizer accepts as a subject token. Characters that break NATS subject parsing (
., *, >, whitespace) are excluded. /\A[A-Za-z0-9_:-]+\z/- BASE_DELAY =
Exponential backoff parameters.
0.05- BACKOFF_FACTOR =
50 ms
2- JITTER_FACTOR =
±20%
0.20
Instance Method Summary collapse
-
#broadcast(stream_name, payload, codec: nil) ⇒ NATS::JetStream::PubAck
Publishes
payloadto thestream_namesubject. -
#healthy? ⇒ Boolean
Returns
trueif the underlying adapter is reachable,falseotherwise. -
#initialize(config, connection: nil, clock: nil) ⇒ Client
constructor
A new instance of Client.
Constructor Details
#initialize(config, connection: nil, clock: nil) ⇒ Client
Returns a new instance of Client.
52 53 54 55 56 |
# File 'lib/turbocable/client.rb', line 52 def initialize(config, connection: nil, clock: nil) @config = config @connection = connection @clock = clock end |
Instance Method Details
#broadcast(stream_name, payload, codec: nil) ⇒ NATS::JetStream::PubAck
Publishes payload to the stream_name subject.
93 94 95 96 97 98 99 100 101 102 103 |
# File 'lib/turbocable/client.rb', line 93 def broadcast(stream_name, payload, codec: nil) validate_stream_name!(stream_name) codec_module = Codecs.fetch(codec || @config.default_codec) bytes = codec_module.encode(payload) enforce_payload_size!(bytes) subject = "#{@config.subject_prefix}.#{stream_name}" publish_with_retries(subject, bytes) end |
#healthy? ⇒ Boolean
Returns true if the underlying adapter is reachable, false otherwise.
For the :nats adapter this issues a NATS flush (PING/PONG round-trip) and, if the stream name is known, fetches JetStream stream info to confirm JetStream connectivity. For the :null adapter this always returns true.
The method *never raises* on network errors — callers that need strict semantics should use Turbocable.healthcheck! instead.
69 70 71 72 73 74 75 76 77 78 |
# File 'lib/turbocable/client.rb', line 69 def healthy? connection.ping(timeout: @config.publish_timeout) rescue ConfigurationError raise rescue => e @config.logger.warn do "[Turbocable] Health check failed: #{e.class} — #{e.}" end false end |