Class: Parse::LiveQuery::Client
- Inherits:
-
Object
- Object
- Parse::LiveQuery::Client
- Defined in:
- lib/parse/live_query/client.rb
Overview
WebSocket client for Parse LiveQuery server. Manages WebSocket connection, authentication, and subscription lifecycle.
Features:
- Automatic ping/pong keep-alive with stale connection detection
- Circuit breaker for intelligent failure handling
- Event queue with backpressure protection
- Automatic reconnection with exponential backoff and jitter
Constant Summary collapse
- OPCODE_CONTINUATION =
WebSocket operation codes
0x0- OPCODE_TEXT =
0x1- OPCODE_BINARY =
0x2- OPCODE_CLOSE =
0x8- OPCODE_PING =
0x9- OPCODE_PONG =
0xA- DEFAULT_MAX_MESSAGE_SIZE =
Default maximum message size (1MB) - prevents memory exhaustion attacks
1_048_576- DEFAULT_FRAME_READ_TIMEOUT =
Default frame read timeout in seconds - prevents indefinite blocking
30
Instance Attribute Summary collapse
-
#application_id ⇒ String
readonly
Parse application ID.
-
#circuit_breaker ⇒ CircuitBreaker
readonly
Connection circuit breaker.
-
#client_key ⇒ String?
readonly
Parse client key (REST API key).
-
#event_queue ⇒ EventQueue
readonly
Event processing queue.
-
#frame_read_timeout ⇒ Integer
readonly
Frame read timeout in seconds.
-
#health_monitor ⇒ HealthMonitor
readonly
Connection health monitor.
-
#master_key ⇒ String?
readonly
Parse master key.
-
#max_message_size ⇒ Integer
readonly
Maximum allowed message size in bytes.
-
#state ⇒ Symbol
readonly
Connection state (:disconnected, :connecting, :connected, :closed).
-
#subscriptions ⇒ Hash<Integer, Subscription>
readonly
Active subscriptions by request ID.
-
#url ⇒ String
readonly
WebSocket URL.
-
#use_master_key ⇒ Boolean
readonly
Whether this is an admin connection that sends the master key on the connect frame.
Instance Method Summary collapse
-
#admin_connection? ⇒ Boolean
True when this connection will actually send a master key on the connect frame: the admin opt-in (
use_master_key: true) is set AND a usable (non-empty String) master key is present. -
#close(code: 1000, reason: "Client closing") ⇒ Object
Disconnect from the LiveQuery server.
-
#closed? ⇒ Boolean
True if closed.
-
#connect ⇒ Boolean
Connect to the LiveQuery server.
-
#connected? ⇒ Boolean
True if connected.
-
#connecting? ⇒ Boolean
True if connecting.
-
#health_info ⇒ Hash
Get comprehensive health information.
-
#healthy? ⇒ Boolean
Check if connection is healthy.
-
#initialize(url: nil, application_id: nil, client_key: nil, master_key: NOT_PROVIDED, use_master_key: NOT_PROVIDED, auto_connect: nil, auto_reconnect: nil) ⇒ Client
constructor
Create a new LiveQuery client.
-
#inspect ⇒ String
Redacting inspect — the default
inspectdumps every instance variable, which would expose@master_key,@client_key, and the per-subscription session tokens in any log line, backtrace, Rails error page, or APM/error-reporter (Sentry / Honeybadger / Rollbar / Bugsnag) that renders the object. -
#on(event) { ... } ⇒ Object
Register callback for connection events.
-
#on_close(&block) ⇒ Object
Callback for connection closed.
-
#on_error(&block) ⇒ Object
Callback for errors.
-
#on_open(&block) ⇒ Object
Callback for connection opened.
-
#shutdown(timeout: 5.0)
Graceful shutdown with timeout.
-
#subscribe(class_name, where: {}, fields: nil, session_token: nil, use_master_key: false) {|subscription| ... } ⇒ Subscription
Subscribe to a Parse class with optional query constraints.
-
#unsubscribe(subscription) ⇒ Object
Unsubscribe from a subscription.
Constructor Details
#initialize(url: nil, application_id: nil, client_key: nil, master_key: NOT_PROVIDED, use_master_key: NOT_PROVIDED, auto_connect: nil, auto_reconnect: nil) ⇒ Client
Create a new LiveQuery client
157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 |
# File 'lib/parse/live_query/client.rb', line 157 def initialize(url: nil, application_id: nil, client_key: nil, master_key: NOT_PROVIDED, use_master_key: NOT_PROVIDED, auto_connect: nil, auto_reconnect: nil) cfg = config # Use provided values or fall back to configuration/environment @url = url || cfg.url || derive_websocket_url @application_id = application_id || cfg.application_id || parse_client_value(:application_id) @client_key = client_key || cfg.client_key || parse_client_value(:api_key) @master_key = if master_key.equal?(NOT_PROVIDED) cfg.master_key || parse_client_value(:master_key) else master_key end # Admin-connection opt-in. Defaults to false (ACL-scoped). Only # an explicit `use_master_key: true` here — or `config.use_master_key # = true` — sends the master key on the connect frame. This is the # v5.1.0 security fix: prior versions sent the master key on the # connect frame whenever one was merely present, silently # elevating every subscription on the socket past ACL/CLP. @use_master_key = if use_master_key.equal?(NOT_PROVIDED) cfg.respond_to?(:use_master_key) ? !!cfg.use_master_key : false else use_master_key == true end @auto_connect = auto_connect.nil? ? cfg.auto_connect : auto_connect @auto_reconnect = auto_reconnect.nil? ? cfg.auto_reconnect : auto_reconnect @max_message_size = cfg. || DEFAULT_MAX_MESSAGE_SIZE @frame_read_timeout = cfg.frame_read_timeout || DEFAULT_FRAME_READ_TIMEOUT @state = :disconnected @subscriptions = {} @monitor = Monitor.new @socket = nil @reader_thread = nil @reconnect_thread = nil @reconnect_interval = cfg.initial_reconnect_interval @callbacks = Hash.new { |h, k| h[k] = [] } @client_id = nil # Initialize production components @health_monitor = HealthMonitor.new( client: self, ping_interval: cfg.ping_interval, pong_timeout: cfg.pong_timeout, ) @circuit_breaker = CircuitBreaker.new( failure_threshold: cfg.circuit_failure_threshold, reset_timeout: cfg.circuit_reset_timeout, on_state_change: method(:on_circuit_state_change), ) @event_queue = EventQueue.new( max_size: cfg.event_queue_size, strategy: cfg.backpressure_strategy, on_drop: method(:on_event_dropped), ) Logging.info("LiveQuery client initialized", url: @url, application_id: @application_id) connect if @auto_connect && @url end |
Instance Attribute Details
#application_id ⇒ String (readonly)
Returns Parse application ID.
61 62 63 |
# File 'lib/parse/live_query/client.rb', line 61 def application_id @application_id end |
#circuit_breaker ⇒ CircuitBreaker (readonly)
Returns connection circuit breaker.
112 113 114 |
# File 'lib/parse/live_query/client.rb', line 112 def circuit_breaker @circuit_breaker end |
#client_key ⇒ String? (readonly)
Returns Parse client key (REST API key).
64 65 66 |
# File 'lib/parse/live_query/client.rb', line 64 def client_key @client_key end |
#event_queue ⇒ EventQueue (readonly)
Returns event processing queue.
115 116 117 |
# File 'lib/parse/live_query/client.rb', line 115 def event_queue @event_queue end |
#frame_read_timeout ⇒ Integer (readonly)
Returns frame read timeout in seconds.
121 122 123 |
# File 'lib/parse/live_query/client.rb', line 121 def frame_read_timeout @frame_read_timeout end |
#health_monitor ⇒ HealthMonitor (readonly)
Returns connection health monitor.
109 110 111 |
# File 'lib/parse/live_query/client.rb', line 109 def health_monitor @health_monitor end |
#master_key ⇒ String? (readonly)
Returns Parse master key.
67 68 69 |
# File 'lib/parse/live_query/client.rb', line 67 def master_key @master_key end |
#max_message_size ⇒ Integer (readonly)
Returns maximum allowed message size in bytes.
118 119 120 |
# File 'lib/parse/live_query/client.rb', line 118 def @max_message_size end |
#state ⇒ Symbol (readonly)
Returns connection state (:disconnected, :connecting, :connected, :closed).
103 104 105 |
# File 'lib/parse/live_query/client.rb', line 103 def state @state end |
#subscriptions ⇒ Hash<Integer, Subscription> (readonly)
Returns active subscriptions by request ID.
106 107 108 |
# File 'lib/parse/live_query/client.rb', line 106 def subscriptions @subscriptions end |
#url ⇒ String (readonly)
Returns WebSocket URL.
58 59 60 |
# File 'lib/parse/live_query/client.rb', line 58 def url @url end |
#use_master_key ⇒ Boolean (readonly)
Returns whether this is an admin connection that sends the master key on the connect frame. When true, EVERY subscription on this connection bypasses ACL/CLP enforcement (Parse Server resolves master-key authorization per-connection at connect time, never per-subscription). Defaults to false.
74 75 76 |
# File 'lib/parse/live_query/client.rb', line 74 def use_master_key @use_master_key end |
Instance Method Details
#admin_connection? ⇒ Boolean
Returns true when this connection will actually send a
master key on the connect frame: the admin opt-in
(use_master_key: true) is set AND a usable (non-empty String)
master key is present. The single source of truth for "will
this socket bypass ACL/CLP for every subscription." A bare
use_master_key: true with no key is NOT an admin connection.
82 83 84 |
# File 'lib/parse/live_query/client.rb', line 82 def admin_connection? @use_master_key && @master_key.is_a?(String) && !@master_key.empty? end |
#close(code: 1000, reason: "Client closing") ⇒ Object
Disconnect from the LiveQuery server
262 263 264 265 266 267 268 269 270 271 272 273 |
# File 'lib/parse/live_query/client.rb', line 262 def close(code: 1000, reason: "Client closing") @auto_reconnect = false @monitor.synchronize do return if @state == :closed Logging.info("Closing connection", code: code, reason: reason) send_close_frame(code, reason) if @socket cleanup_connection @state = :closed end emit(:close) end |
#closed? ⇒ Boolean
Returns true if closed.
318 319 320 |
# File 'lib/parse/live_query/client.rb', line 318 def closed? @state == :closed end |
#connect ⇒ Boolean
Connect to the LiveQuery server
225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 |
# File 'lib/parse/live_query/client.rb', line 225 def connect return true if connected? || connecting? # Check circuit breaker before attempting connection unless @circuit_breaker.allow_request? time_remaining = @circuit_breaker.time_until_half_open Logging.warn("Connection blocked by circuit breaker", state: @circuit_breaker.state, time_until_retry: time_remaining) emit(:circuit_open, time_remaining) schedule_reconnect if @auto_reconnect return false end @monitor.synchronize do @state = :connecting end begin Logging.info("Connecting to LiveQuery server", url: @url) establish_connection start_reader_thread true rescue => e @circuit_breaker.record_failure @state = :disconnected Logging.error("Failed to connect", error: e) emit(:error, ConnectionError.new("Failed to connect: #{e.}")) schedule_reconnect if @auto_reconnect false end end |
#connected? ⇒ Boolean
Returns true if connected.
308 309 310 |
# File 'lib/parse/live_query/client.rb', line 308 def connected? @state == :connected end |
#connecting? ⇒ Boolean
Returns true if connecting.
313 314 315 |
# File 'lib/parse/live_query/client.rb', line 313 def connecting? @state == :connecting end |
#health_info ⇒ Hash
Get comprehensive health information
330 331 332 333 334 335 336 337 338 339 340 341 342 |
# File 'lib/parse/live_query/client.rb', line 330 def health_info { state: @state, connected: connected?, healthy: healthy?, client_id: @client_id, subscription_count: @subscriptions.size, max_message_size: @max_message_size, health_monitor: @health_monitor.health_info, circuit_breaker: @circuit_breaker.info, event_queue: @event_queue.stats, } end |
#healthy? ⇒ Boolean
Check if connection is healthy
324 325 326 |
# File 'lib/parse/live_query/client.rb', line 324 def healthy? connected? && @health_monitor.healthy? end |
#inspect ⇒ String
Redacting inspect — the default inspect dumps every instance
variable, which would expose @master_key, @client_key, and
the per-subscription session tokens in any log line, backtrace,
Rails error page, or APM/error-reporter (Sentry / Honeybadger /
Rollbar / Bugsnag) that renders the object. Configuration#to_h
already redacts these; this keeps the live Client object
consistent with that contract.
94 95 96 97 98 99 100 |
# File 'lib/parse/live_query/client.rb', line 94 def inspect "#<#{self.class.name} url=#{@url.inspect} " \ "application_id=#{@application_id.inspect} state=#{@state.inspect} " \ "admin_connection=#{admin_connection?} subscriptions=#{@subscriptions.size} " \ "client_key=#{redacted_secret(@client_key)} " \ "master_key=#{redacted_secret(@master_key)}>" end |
#on(event) { ... } ⇒ Object
Register callback for connection events
459 460 461 462 463 464 |
# File 'lib/parse/live_query/client.rb', line 459 def on(event, &block) @monitor.synchronize do @callbacks[event] << block if block_given? end self end |
#on_close(&block) ⇒ Object
Callback for connection closed
472 473 474 |
# File 'lib/parse/live_query/client.rb', line 472 def on_close(&block) on(:close, &block) end |
#on_error(&block) ⇒ Object
Callback for errors
477 478 479 |
# File 'lib/parse/live_query/client.rb', line 477 def on_error(&block) on(:error, &block) end |
#on_open(&block) ⇒ Object
Callback for connection opened
467 468 469 |
# File 'lib/parse/live_query/client.rb', line 467 def on_open(&block) on(:open, &block) end |
#shutdown(timeout: 5.0)
This method returns an undefined value.
Graceful shutdown with timeout
278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 |
# File 'lib/parse/live_query/client.rb', line 278 def shutdown(timeout: 5.0) Logging.info("Shutting down LiveQuery client", timeout: timeout) @auto_reconnect = false # Cancel any pending reconnect thread cancel_reconnect_thread # Stop health monitor @health_monitor.stop # Stop event queue and drain remaining events @event_queue.stop(drain: true, timeout: timeout / 2) # Close connection close(code: 1000, reason: "Shutdown") # Wait for reader thread to finish @reader_thread&.join(timeout / 2) # Force kill if still running @reader_thread&.kill @reader_thread = nil Logging.info("Shutdown complete", events_processed: @event_queue.processed_count, events_dropped: @event_queue.dropped_count) end |
#subscribe(class_name, where: {}, fields: nil, session_token: nil, use_master_key: false) {|subscription| ... } ⇒ Subscription
Subscribe to a Parse class with optional query constraints
366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 |
# File 'lib/parse/live_query/client.rb', line 366 def subscribe(class_name, where: {}, fields: nil, session_token: nil, use_master_key: false, &block) # Handle Parse::Object subclass if class_name.is_a?(Class) && class_name < Parse::Object class_name = class_name.parse_class end # Handle Parse::Query object if class_name.is_a?(Parse::Query) query = class_name class_name = query.table where = query.compile_where end # Refuse server-side-JS / data-mutating operators in the `where` # filter at any nesting depth. LiveQuery subscriptions are a # persistent server-evaluated channel; without this gate, a # caller could plant `$where`/`$function`/`$accumulator` (or # data-mutating stages nested inside) and have them re-evaluated # on every matching event for the lifetime of the subscription. # Permissive mode (recursive denylist only) mirrors the # `Parse::MongoDB`/`Parse::AtlasSearch` filter posture so the # SDK enforces one consistent set of refusals on every # user-influenced filter path. Parse::PipelineSecurity.validate_filter!(where) if where.is_a?(Hash) && !where.empty? warn_subscription_scope_mismatch(use_master_key, session_token) subscription = Subscription.new( client: self, class_name: class_name, query: where, fields: fields, session_token: session_token, use_master_key: use_master_key, ) @monitor.synchronize do @subscriptions[subscription.request_id] = subscription end Logging.debug("Subscription created", request_id: subscription.request_id, class_name: class_name) # Yield the subscription BEFORE the subscribe frame goes out so # caller-registered callbacks are wired before any server event # can arrive on this request_id. Order matters: subscribe-frame- # then-yield would race a fast server response against the # callback registration on a hot socket. # # If the caller's block raises, ROLL BACK the registry insert # before re-raising. Without this, the failed-block # subscription stays in `@subscriptions` and the next # `resubscribe_all` (triggered by a reconnect) wire-sends it # to the server — a ghost subscription the caller thought # they had aborted. if block_given? begin yield(subscription) rescue @monitor.synchronize do @subscriptions.delete(subscription.request_id) end raise end end # Send subscribe message if connected if connected? (subscription.) else # Queue subscription for when connection is established connect unless connecting? end subscription end |
#unsubscribe(subscription) ⇒ Object
Unsubscribe from a subscription
447 448 449 450 451 452 453 454 |
# File 'lib/parse/live_query/client.rb', line 447 def unsubscribe(subscription) Logging.debug("Unsubscribing", request_id: subscription.request_id) (subscription.) if connected? @monitor.synchronize do @subscriptions.delete(subscription.request_id) end end |