Class: Parse::LiveQuery::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/parse/live_query/client.rb

Overview

WebSocket client for Parse LiveQuery server. Manages WebSocket connection, authentication, and subscription lifecycle.

Features:

  • Automatic ping/pong keep-alive with stale connection detection
  • Circuit breaker for intelligent failure handling
  • Event queue with backpressure protection
  • Automatic reconnection with exponential backoff and jitter

Examples:

Basic usage

client = Parse::LiveQuery::Client.new(
  url: "wss://your-parse-server.com",
  application_id: "your_app_id",
  client_key: "your_client_key"
)

subscription = client.subscribe("Song", where: { artist: "Beatles" })
subscription.on(:create) { |song| puts "New song!" }

client.shutdown(timeout: 5)

Constant Summary collapse

OPCODE_CONTINUATION =

WebSocket operation codes

0x0
OPCODE_TEXT =
0x1
OPCODE_BINARY =
0x2
OPCODE_CLOSE =
0x8
OPCODE_PING =
0x9
OPCODE_PONG =
0xA
DEFAULT_MAX_MESSAGE_SIZE =

Default maximum message size (1MB) - prevents memory exhaustion attacks

1_048_576
DEFAULT_FRAME_READ_TIMEOUT =

Default frame read timeout in seconds - prevents indefinite blocking

30

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(url: nil, application_id: nil, client_key: nil, master_key: NOT_PROVIDED, use_master_key: NOT_PROVIDED, auto_connect: nil, auto_reconnect: nil) ⇒ Client

Create a new LiveQuery client

Parameters:

  • url (String) (defaults to: nil)

    WebSocket URL (wss://...)

  • application_id (String) (defaults to: nil)

    Parse application ID

  • client_key (String) (defaults to: nil)

    Parse REST API key

  • master_key (String, nil) (defaults to: NOT_PROVIDED)

    Parse master key. Pass nil explicitly to force client-mode (no master key on the subscription handshake) even when the parent Parse client has one. Omit the argument to fall back to the LiveQuery config or the parent Parse client. NOTE: holding a master key does NOT by itself elevate the connection — see use_master_key:.

  • use_master_key (Boolean) (defaults to: NOT_PROVIDED)

    build an ADMIN connection: send the master key on the connect frame so the LiveQuery server skips ACL/CLP enforcement for ALL subscriptions on this socket. Defaults to false — connections are ACL-scoped by their per-subscription session tokens unless you explicitly opt in. Parse Server resolves master-key authorization per-CONNECTION at connect time (_handleConnectclient.hasMasterKey); there is no per-subscription master key. For a process that needs both scoped and admin streams, build two clients. Requires a master key to be present (otherwise the flag is a no-op and a warning is emitted).

  • auto_connect (Boolean) (defaults to: nil)

    connect immediately (default: true)

  • auto_reconnect (Boolean) (defaults to: nil)

    automatically reconnect on disconnect (default: true)



157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
# File 'lib/parse/live_query/client.rb', line 157

def initialize(url: nil, application_id: nil, client_key: nil, master_key: NOT_PROVIDED,
               use_master_key: NOT_PROVIDED, auto_connect: nil, auto_reconnect: nil)
  cfg = config

  # Use provided values or fall back to configuration/environment
  @url = url || cfg.url || derive_websocket_url
  @application_id = application_id || cfg.application_id ||
                    parse_client_value(:application_id)
  @client_key = client_key || cfg.client_key ||
                parse_client_value(:api_key)
  @master_key = if master_key.equal?(NOT_PROVIDED)
      cfg.master_key || parse_client_value(:master_key)
    else
      master_key
    end
  # Admin-connection opt-in. Defaults to false (ACL-scoped). Only
  # an explicit `use_master_key: true` here — or `config.use_master_key
  # = true` — sends the master key on the connect frame. This is the
  # v5.1.0 security fix: prior versions sent the master key on the
  # connect frame whenever one was merely present, silently
  # elevating every subscription on the socket past ACL/CLP.
  @use_master_key = if use_master_key.equal?(NOT_PROVIDED)
      cfg.respond_to?(:use_master_key) ? !!cfg.use_master_key : false
    else
      use_master_key == true
    end

  @auto_connect = auto_connect.nil? ? cfg.auto_connect : auto_connect
  @auto_reconnect = auto_reconnect.nil? ? cfg.auto_reconnect : auto_reconnect
  @max_message_size = cfg.max_message_size || DEFAULT_MAX_MESSAGE_SIZE
  @frame_read_timeout = cfg.frame_read_timeout || DEFAULT_FRAME_READ_TIMEOUT

  @state = :disconnected
  @subscriptions = {}
  @monitor = Monitor.new
  @socket = nil
  @reader_thread = nil
  @reconnect_thread = nil
  @reconnect_interval = cfg.initial_reconnect_interval
  @callbacks = Hash.new { |h, k| h[k] = [] }
  @client_id = nil

  # Initialize production components
  @health_monitor = HealthMonitor.new(
    client: self,
    ping_interval: cfg.ping_interval,
    pong_timeout: cfg.pong_timeout,
  )

  @circuit_breaker = CircuitBreaker.new(
    failure_threshold: cfg.circuit_failure_threshold,
    reset_timeout: cfg.circuit_reset_timeout,
    on_state_change: method(:on_circuit_state_change),
  )

  @event_queue = EventQueue.new(
    max_size: cfg.event_queue_size,
    strategy: cfg.backpressure_strategy,
    on_drop: method(:on_event_dropped),
  )

  Logging.info("LiveQuery client initialized", url: @url, application_id: @application_id)

  connect if @auto_connect && @url
end

Instance Attribute Details

#application_idString (readonly)

Returns Parse application ID.

Returns:

  • (String)

    Parse application ID



61
62
63
# File 'lib/parse/live_query/client.rb', line 61

def application_id
  @application_id
end

#circuit_breakerCircuitBreaker (readonly)

Returns connection circuit breaker.

Returns:



112
113
114
# File 'lib/parse/live_query/client.rb', line 112

def circuit_breaker
  @circuit_breaker
end

#client_keyString? (readonly)

Returns Parse client key (REST API key).

Returns:

  • (String, nil)

    Parse client key (REST API key)



64
65
66
# File 'lib/parse/live_query/client.rb', line 64

def client_key
  @client_key
end

#event_queueEventQueue (readonly)

Returns event processing queue.

Returns:



115
116
117
# File 'lib/parse/live_query/client.rb', line 115

def event_queue
  @event_queue
end

#frame_read_timeoutInteger (readonly)

Returns frame read timeout in seconds.

Returns:

  • (Integer)

    frame read timeout in seconds



121
122
123
# File 'lib/parse/live_query/client.rb', line 121

def frame_read_timeout
  @frame_read_timeout
end

#health_monitorHealthMonitor (readonly)

Returns connection health monitor.

Returns:



109
110
111
# File 'lib/parse/live_query/client.rb', line 109

def health_monitor
  @health_monitor
end

#master_keyString? (readonly)

Returns Parse master key.

Returns:

  • (String, nil)

    Parse master key



67
68
69
# File 'lib/parse/live_query/client.rb', line 67

def master_key
  @master_key
end

#max_message_sizeInteger (readonly)

Returns maximum allowed message size in bytes.

Returns:

  • (Integer)

    maximum allowed message size in bytes



118
119
120
# File 'lib/parse/live_query/client.rb', line 118

def max_message_size
  @max_message_size
end

#stateSymbol (readonly)

Returns connection state (:disconnected, :connecting, :connected, :closed).

Returns:

  • (Symbol)

    connection state (:disconnected, :connecting, :connected, :closed)



103
104
105
# File 'lib/parse/live_query/client.rb', line 103

def state
  @state
end

#subscriptionsHash<Integer, Subscription> (readonly)

Returns active subscriptions by request ID.

Returns:



106
107
108
# File 'lib/parse/live_query/client.rb', line 106

def subscriptions
  @subscriptions
end

#urlString (readonly)

Returns WebSocket URL.

Returns:



58
59
60
# File 'lib/parse/live_query/client.rb', line 58

def url
  @url
end

#use_master_keyBoolean (readonly)

Returns whether this is an admin connection that sends the master key on the connect frame. When true, EVERY subscription on this connection bypasses ACL/CLP enforcement (Parse Server resolves master-key authorization per-connection at connect time, never per-subscription). Defaults to false.

Returns:

  • (Boolean)

    whether this is an admin connection that sends the master key on the connect frame. When true, EVERY subscription on this connection bypasses ACL/CLP enforcement (Parse Server resolves master-key authorization per-connection at connect time, never per-subscription). Defaults to false.



74
75
76
# File 'lib/parse/live_query/client.rb', line 74

def use_master_key
  @use_master_key
end

Instance Method Details

#admin_connection?Boolean

Returns true when this connection will actually send a master key on the connect frame: the admin opt-in (use_master_key: true) is set AND a usable (non-empty String) master key is present. The single source of truth for "will this socket bypass ACL/CLP for every subscription." A bare use_master_key: true with no key is NOT an admin connection.

Returns:

  • (Boolean)

    true when this connection will actually send a master key on the connect frame: the admin opt-in (use_master_key: true) is set AND a usable (non-empty String) master key is present. The single source of truth for "will this socket bypass ACL/CLP for every subscription." A bare use_master_key: true with no key is NOT an admin connection.



82
83
84
# File 'lib/parse/live_query/client.rb', line 82

def admin_connection?
  @use_master_key && @master_key.is_a?(String) && !@master_key.empty?
end

#close(code: 1000, reason: "Client closing") ⇒ Object

Disconnect from the LiveQuery server

Parameters:

  • code (Integer) (defaults to: 1000)

    WebSocket close code

  • reason (String) (defaults to: "Client closing")

    close reason



262
263
264
265
266
267
268
269
270
271
272
273
# File 'lib/parse/live_query/client.rb', line 262

def close(code: 1000, reason: "Client closing")
  @auto_reconnect = false
  @monitor.synchronize do
    return if @state == :closed

    Logging.info("Closing connection", code: code, reason: reason)
    send_close_frame(code, reason) if @socket
    cleanup_connection
    @state = :closed
  end
  emit(:close)
end

#closed?Boolean

Returns true if closed.

Returns:

  • (Boolean)

    true if closed



318
319
320
# File 'lib/parse/live_query/client.rb', line 318

def closed?
  @state == :closed
end

#connectBoolean

Connect to the LiveQuery server

Returns:

  • (Boolean)

    true if connection initiated



225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
# File 'lib/parse/live_query/client.rb', line 225

def connect
  return true if connected? || connecting?

  # Check circuit breaker before attempting connection
  unless @circuit_breaker.allow_request?
    time_remaining = @circuit_breaker.time_until_half_open
    Logging.warn("Connection blocked by circuit breaker",
                 state: @circuit_breaker.state,
                 time_until_retry: time_remaining)
    emit(:circuit_open, time_remaining)
    schedule_reconnect if @auto_reconnect
    return false
  end

  @monitor.synchronize do
    @state = :connecting
  end

  begin
    Logging.info("Connecting to LiveQuery server", url: @url)
    establish_connection
    start_reader_thread
    send_connect_message
    true
  rescue => e
    @circuit_breaker.record_failure
    @state = :disconnected
    Logging.error("Failed to connect", error: e)
    emit(:error, ConnectionError.new("Failed to connect: #{e.message}"))
    schedule_reconnect if @auto_reconnect
    false
  end
end

#connected?Boolean

Returns true if connected.

Returns:

  • (Boolean)

    true if connected



308
309
310
# File 'lib/parse/live_query/client.rb', line 308

def connected?
  @state == :connected
end

#connecting?Boolean

Returns true if connecting.

Returns:

  • (Boolean)

    true if connecting



313
314
315
# File 'lib/parse/live_query/client.rb', line 313

def connecting?
  @state == :connecting
end

#health_infoHash

Get comprehensive health information

Returns:



330
331
332
333
334
335
336
337
338
339
340
341
342
# File 'lib/parse/live_query/client.rb', line 330

def health_info
  {
    state: @state,
    connected: connected?,
    healthy: healthy?,
    client_id: @client_id,
    subscription_count: @subscriptions.size,
    max_message_size: @max_message_size,
    health_monitor: @health_monitor.health_info,
    circuit_breaker: @circuit_breaker.info,
    event_queue: @event_queue.stats,
  }
end

#healthy?Boolean

Check if connection is healthy

Returns:

  • (Boolean)


324
325
326
# File 'lib/parse/live_query/client.rb', line 324

def healthy?
  connected? && @health_monitor.healthy?
end

#inspectString

Redacting inspect — the default inspect dumps every instance variable, which would expose @master_key, @client_key, and the per-subscription session tokens in any log line, backtrace, Rails error page, or APM/error-reporter (Sentry / Honeybadger / Rollbar / Bugsnag) that renders the object. Configuration#to_h already redacts these; this keeps the live Client object consistent with that contract.

Returns:



94
95
96
97
98
99
100
# File 'lib/parse/live_query/client.rb', line 94

def inspect
  "#<#{self.class.name} url=#{@url.inspect} " \
  "application_id=#{@application_id.inspect} state=#{@state.inspect} " \
  "admin_connection=#{admin_connection?} subscriptions=#{@subscriptions.size} " \
  "client_key=#{redacted_secret(@client_key)} " \
  "master_key=#{redacted_secret(@master_key)}>"
end

#on(event) { ... } ⇒ Object

Register callback for connection events

Parameters:

  • event (Symbol)

    :open, :close, :error, :circuit_open, :circuit_closed

Yields:

  • callback block



459
460
461
462
463
464
# File 'lib/parse/live_query/client.rb', line 459

def on(event, &block)
  @monitor.synchronize do
    @callbacks[event] << block if block_given?
  end
  self
end

#on_close(&block) ⇒ Object

Callback for connection closed



472
473
474
# File 'lib/parse/live_query/client.rb', line 472

def on_close(&block)
  on(:close, &block)
end

#on_error(&block) ⇒ Object

Callback for errors



477
478
479
# File 'lib/parse/live_query/client.rb', line 477

def on_error(&block)
  on(:error, &block)
end

#on_open(&block) ⇒ Object

Callback for connection opened



467
468
469
# File 'lib/parse/live_query/client.rb', line 467

def on_open(&block)
  on(:open, &block)
end

#shutdown(timeout: 5.0)

This method returns an undefined value.

Graceful shutdown with timeout

Parameters:

  • timeout (Float) (defaults to: 5.0)

    seconds to wait for graceful shutdown



278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
# File 'lib/parse/live_query/client.rb', line 278

def shutdown(timeout: 5.0)
  Logging.info("Shutting down LiveQuery client", timeout: timeout)

  @auto_reconnect = false

  # Cancel any pending reconnect thread
  cancel_reconnect_thread

  # Stop health monitor
  @health_monitor.stop

  # Stop event queue and drain remaining events
  @event_queue.stop(drain: true, timeout: timeout / 2)

  # Close connection
  close(code: 1000, reason: "Shutdown")

  # Wait for reader thread to finish
  @reader_thread&.join(timeout / 2)

  # Force kill if still running
  @reader_thread&.kill
  @reader_thread = nil

  Logging.info("Shutdown complete",
               events_processed: @event_queue.processed_count,
               events_dropped: @event_queue.dropped_count)
end

#subscribe(class_name, where: {}, fields: nil, session_token: nil, use_master_key: false) {|subscription| ... } ⇒ Subscription

Subscribe to a Parse class with optional query constraints

Parameters:

  • class_name (String, Class)

    Parse class name or model class

  • where (Hash) (defaults to: {})

    query constraints

  • fields (Array<String>) (defaults to: nil)

    specific fields to watch

  • session_token (String) (defaults to: nil)

    session token for ACL-aware subscriptions

  • use_master_key (Boolean) (defaults to: false)

    assert that this subscription needs master-key (ACL-bypassing) scope. Parse Server has NO per-subscription master key — authorization is fixed per connection at connect time — so this flag does NOT elevate a single subscription on a scoped socket. It is honored only when this client is an admin connection (built with use_master_key: true), in which case the whole connection is already elevated. On a non-admin connection, passing use_master_key: true emits a warning and the subscription stays ACL-scoped. See Subscription#initialize.

Yields:

  • (subscription)

    runs the block with the freshly- constructed Subscription BEFORE the subscribe frame is sent to the server, so callbacks registered inside the block (sub.on(:create) { … }, etc.) are wired before any server events can arrive. Optional — callers may still capture the returned subscription and register callbacks later.

Returns:



366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
# File 'lib/parse/live_query/client.rb', line 366

def subscribe(class_name, where: {}, fields: nil, session_token: nil,
              use_master_key: false, &block)
  # Handle Parse::Object subclass
  if class_name.is_a?(Class) && class_name < Parse::Object
    class_name = class_name.parse_class
  end

  # Handle Parse::Query object
  if class_name.is_a?(Parse::Query)
    query = class_name
    class_name = query.table
    where = query.compile_where
  end

  # Refuse server-side-JS / data-mutating operators in the `where`
  # filter at any nesting depth. LiveQuery subscriptions are a
  # persistent server-evaluated channel; without this gate, a
  # caller could plant `$where`/`$function`/`$accumulator` (or
  # data-mutating stages nested inside) and have them re-evaluated
  # on every matching event for the lifetime of the subscription.
  # Permissive mode (recursive denylist only) mirrors the
  # `Parse::MongoDB`/`Parse::AtlasSearch` filter posture so the
  # SDK enforces one consistent set of refusals on every
  # user-influenced filter path.
  Parse::PipelineSecurity.validate_filter!(where) if where.is_a?(Hash) && !where.empty?

  warn_subscription_scope_mismatch(use_master_key, session_token)

  subscription = Subscription.new(
    client: self,
    class_name: class_name,
    query: where,
    fields: fields,
    session_token: session_token,
    use_master_key: use_master_key,
  )

  @monitor.synchronize do
    @subscriptions[subscription.request_id] = subscription
  end

  Logging.debug("Subscription created",
                request_id: subscription.request_id,
                class_name: class_name)

  # Yield the subscription BEFORE the subscribe frame goes out so
  # caller-registered callbacks are wired before any server event
  # can arrive on this request_id. Order matters: subscribe-frame-
  # then-yield would race a fast server response against the
  # callback registration on a hot socket.
  #
  # If the caller's block raises, ROLL BACK the registry insert
  # before re-raising. Without this, the failed-block
  # subscription stays in `@subscriptions` and the next
  # `resubscribe_all` (triggered by a reconnect) wire-sends it
  # to the server — a ghost subscription the caller thought
  # they had aborted.
  if block_given?
    begin
      yield(subscription)
    rescue
      @monitor.synchronize do
        @subscriptions.delete(subscription.request_id)
      end
      raise
    end
  end

  # Send subscribe message if connected
  if connected?
    send_message(subscription.to_subscribe_message)
  else
    # Queue subscription for when connection is established
    connect unless connecting?
  end

  subscription
end

#unsubscribe(subscription) ⇒ Object

Unsubscribe from a subscription

Parameters:



447
448
449
450
451
452
453
454
# File 'lib/parse/live_query/client.rb', line 447

def unsubscribe(subscription)
  Logging.debug("Unsubscribing", request_id: subscription.request_id)
  send_message(subscription.to_unsubscribe_message) if connected?

  @monitor.synchronize do
    @subscriptions.delete(subscription.request_id)
  end
end