Class: Parse::LiveQuery::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/parse/live_query/client.rb

Overview

WebSocket client for Parse LiveQuery server. Manages WebSocket connection, authentication, and subscription lifecycle.

Features:

  • Automatic ping/pong keep-alive with stale connection detection

  • Circuit breaker for intelligent failure handling

  • Event queue with backpressure protection

  • Automatic reconnection with exponential backoff and jitter

Examples:

Basic usage

client = Parse::LiveQuery::Client.new(
  url: "wss://your-parse-server.com",
  application_id: "your_app_id",
  client_key: "your_client_key"
)

subscription = client.subscribe("Song", where: { artist: "Beatles" })
subscription.on(:create) { |song| puts "New song!" }

client.shutdown(timeout: 5)

Constant Summary collapse

OPCODE_CONTINUATION =

WebSocket operation codes

0x0
OPCODE_TEXT =
0x1
OPCODE_BINARY =
0x2
OPCODE_CLOSE =
0x8
OPCODE_PING =
0x9
OPCODE_PONG =
0xA
DEFAULT_MAX_MESSAGE_SIZE =

Default maximum message size (1MB) - prevents memory exhaustion attacks

1_048_576
DEFAULT_FRAME_READ_TIMEOUT =

Default frame read timeout in seconds - prevents indefinite blocking

30

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(url: nil, application_id: nil, client_key: nil, master_key: nil, auto_connect: nil, auto_reconnect: nil) ⇒ Client

Create a new LiveQuery client

Parameters:

  • url (String) (defaults to: nil)

    WebSocket URL (wss://…)

  • application_id (String) (defaults to: nil)

    Parse application ID

  • client_key (String) (defaults to: nil)

    Parse REST API key

  • master_key (String, nil) (defaults to: nil)

    Parse master key (optional)

  • auto_connect (Boolean) (defaults to: nil)

    connect immediately (default: true)

  • auto_reconnect (Boolean) (defaults to: nil)

    automatically reconnect on disconnect (default: true)



97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
# File 'lib/parse/live_query/client.rb', line 97

def initialize(url: nil, application_id: nil, client_key: nil, master_key: nil,
               auto_connect: nil, auto_reconnect: nil)
  cfg = config

  # Use provided values or fall back to configuration/environment
  @url = url || cfg.url || derive_websocket_url
  @application_id = application_id || cfg.application_id ||
                    parse_client_value(:application_id)
  @client_key = client_key || cfg.client_key ||
                parse_client_value(:api_key)
  @master_key = master_key || cfg.master_key ||
                parse_client_value(:master_key)

  @auto_connect = auto_connect.nil? ? cfg.auto_connect : auto_connect
  @auto_reconnect = auto_reconnect.nil? ? cfg.auto_reconnect : auto_reconnect
  @max_message_size = cfg.max_message_size || DEFAULT_MAX_MESSAGE_SIZE
  @frame_read_timeout = cfg.frame_read_timeout || DEFAULT_FRAME_READ_TIMEOUT

  @state = :disconnected
  @subscriptions = {}
  @monitor = Monitor.new
  @socket = nil
  @reader_thread = nil
  @reconnect_thread = nil
  @reconnect_interval = cfg.initial_reconnect_interval
  @callbacks = Hash.new { |h, k| h[k] = [] }
  @client_id = nil

  # Initialize production components
  @health_monitor = HealthMonitor.new(
    client: self,
    ping_interval: cfg.ping_interval,
    pong_timeout: cfg.pong_timeout,
  )

  @circuit_breaker = CircuitBreaker.new(
    failure_threshold: cfg.circuit_failure_threshold,
    reset_timeout: cfg.circuit_reset_timeout,
    on_state_change: method(:on_circuit_state_change),
  )

  @event_queue = EventQueue.new(
    max_size: cfg.event_queue_size,
    strategy: cfg.backpressure_strategy,
    on_drop: method(:on_event_dropped),
  )

  Logging.info("LiveQuery client initialized", url: @url, application_id: @application_id)

  connect if @auto_connect && @url
end

Instance Attribute Details

#application_idString (readonly)

Returns Parse application ID.

Returns:

  • (String)

    Parse application ID



61
62
63
# File 'lib/parse/live_query/client.rb', line 61

def application_id
  @application_id
end

#circuit_breakerCircuitBreaker (readonly)

Returns connection circuit breaker.

Returns:



79
80
81
# File 'lib/parse/live_query/client.rb', line 79

def circuit_breaker
  @circuit_breaker
end

#client_keyString? (readonly)

Returns Parse client key (REST API key).

Returns:

  • (String, nil)

    Parse client key (REST API key)



64
65
66
# File 'lib/parse/live_query/client.rb', line 64

def client_key
  @client_key
end

#event_queueEventQueue (readonly)

Returns event processing queue.

Returns:



82
83
84
# File 'lib/parse/live_query/client.rb', line 82

def event_queue
  @event_queue
end

#frame_read_timeoutInteger (readonly)

Returns frame read timeout in seconds.

Returns:

  • (Integer)

    frame read timeout in seconds



88
89
90
# File 'lib/parse/live_query/client.rb', line 88

def frame_read_timeout
  @frame_read_timeout
end

#health_monitorHealthMonitor (readonly)

Returns connection health monitor.

Returns:



76
77
78
# File 'lib/parse/live_query/client.rb', line 76

def health_monitor
  @health_monitor
end

#master_keyString? (readonly)

Returns Parse master key.

Returns:

  • (String, nil)

    Parse master key



67
68
69
# File 'lib/parse/live_query/client.rb', line 67

def master_key
  @master_key
end

#max_message_sizeInteger (readonly)

Returns maximum allowed message size in bytes.

Returns:

  • (Integer)

    maximum allowed message size in bytes



85
86
87
# File 'lib/parse/live_query/client.rb', line 85

def max_message_size
  @max_message_size
end

#stateSymbol (readonly)

Returns connection state (:disconnected, :connecting, :connected, :closed).

Returns:

  • (Symbol)

    connection state (:disconnected, :connecting, :connected, :closed)



70
71
72
# File 'lib/parse/live_query/client.rb', line 70

def state
  @state
end

#subscriptionsHash<Integer, Subscription> (readonly)

Returns active subscriptions by request ID.

Returns:



73
74
75
# File 'lib/parse/live_query/client.rb', line 73

def subscriptions
  @subscriptions
end

#urlString (readonly)

Returns WebSocket URL.

Returns:



58
59
60
# File 'lib/parse/live_query/client.rb', line 58

def url
  @url
end

Instance Method Details

#close(code: 1000, reason: "Client closing") ⇒ Object

Disconnect from the LiveQuery server

Parameters:

  • code (Integer) (defaults to: 1000)

    WebSocket close code

  • reason (String) (defaults to: "Client closing")

    close reason



188
189
190
191
192
193
194
195
196
197
198
199
# File 'lib/parse/live_query/client.rb', line 188

def close(code: 1000, reason: "Client closing")
  @auto_reconnect = false
  @monitor.synchronize do
    return if @state == :closed

    Logging.info("Closing connection", code: code, reason: reason)
    send_close_frame(code, reason) if @socket
    cleanup_connection
    @state = :closed
  end
  emit(:close)
end

#closed?Boolean

Returns true if closed.

Returns:

  • (Boolean)

    true if closed



244
245
246
# File 'lib/parse/live_query/client.rb', line 244

def closed?
  @state == :closed
end

#connectBoolean

Connect to the LiveQuery server

Returns:

  • (Boolean)

    true if connection initiated



151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
# File 'lib/parse/live_query/client.rb', line 151

def connect
  return true if connected? || connecting?

  # Check circuit breaker before attempting connection
  unless @circuit_breaker.allow_request?
    time_remaining = @circuit_breaker.time_until_half_open
    Logging.warn("Connection blocked by circuit breaker",
                 state: @circuit_breaker.state,
                 time_until_retry: time_remaining)
    emit(:circuit_open, time_remaining)
    schedule_reconnect if @auto_reconnect
    return false
  end

  @monitor.synchronize do
    @state = :connecting
  end

  begin
    Logging.info("Connecting to LiveQuery server", url: @url)
    establish_connection
    start_reader_thread
    send_connect_message
    true
  rescue => e
    @circuit_breaker.record_failure
    @state = :disconnected
    Logging.error("Failed to connect", error: e)
    emit(:error, ConnectionError.new("Failed to connect: #{e.message}"))
    schedule_reconnect if @auto_reconnect
    false
  end
end

#connected?Boolean

Returns true if connected.

Returns:

  • (Boolean)

    true if connected



234
235
236
# File 'lib/parse/live_query/client.rb', line 234

def connected?
  @state == :connected
end

#connecting?Boolean

Returns true if connecting.

Returns:

  • (Boolean)

    true if connecting



239
240
241
# File 'lib/parse/live_query/client.rb', line 239

def connecting?
  @state == :connecting
end

#health_infoHash

Get comprehensive health information

Returns:



256
257
258
259
260
261
262
263
264
265
266
267
268
# File 'lib/parse/live_query/client.rb', line 256

def health_info
  {
    state: @state,
    connected: connected?,
    healthy: healthy?,
    client_id: @client_id,
    subscription_count: @subscriptions.size,
    max_message_size: @max_message_size,
    health_monitor: @health_monitor.health_info,
    circuit_breaker: @circuit_breaker.info,
    event_queue: @event_queue.stats,
  }
end

#healthy?Boolean

Check if connection is healthy

Returns:

  • (Boolean)


250
251
252
# File 'lib/parse/live_query/client.rb', line 250

def healthy?
  connected? && @health_monitor.healthy?
end

#on(event) { ... } ⇒ Object

Register callback for connection events

Parameters:

  • event (Symbol)

    :open, :close, :error, :circuit_open, :circuit_closed

Yields:

  • callback block



342
343
344
345
346
347
# File 'lib/parse/live_query/client.rb', line 342

def on(event, &block)
  @monitor.synchronize do
    @callbacks[event] << block if block_given?
  end
  self
end

#on_close(&block) ⇒ Object

Callback for connection closed



355
356
357
# File 'lib/parse/live_query/client.rb', line 355

def on_close(&block)
  on(:close, &block)
end

#on_error(&block) ⇒ Object

Callback for errors



360
361
362
# File 'lib/parse/live_query/client.rb', line 360

def on_error(&block)
  on(:error, &block)
end

#on_open(&block) ⇒ Object

Callback for connection opened



350
351
352
# File 'lib/parse/live_query/client.rb', line 350

def on_open(&block)
  on(:open, &block)
end

#shutdown(timeout: 5.0) ⇒ void

This method returns an undefined value.

Graceful shutdown with timeout

Parameters:

  • timeout (Float) (defaults to: 5.0)

    seconds to wait for graceful shutdown



204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
# File 'lib/parse/live_query/client.rb', line 204

def shutdown(timeout: 5.0)
  Logging.info("Shutting down LiveQuery client", timeout: timeout)

  @auto_reconnect = false

  # Cancel any pending reconnect thread
  cancel_reconnect_thread

  # Stop health monitor
  @health_monitor.stop

  # Stop event queue and drain remaining events
  @event_queue.stop(drain: true, timeout: timeout / 2)

  # Close connection
  close(code: 1000, reason: "Shutdown")

  # Wait for reader thread to finish
  @reader_thread&.join(timeout / 2)

  # Force kill if still running
  @reader_thread&.kill
  @reader_thread = nil

  Logging.info("Shutdown complete",
               events_processed: @event_queue.processed_count,
               events_dropped: @event_queue.dropped_count)
end

#subscribe(class_name, where: {}, fields: nil, session_token: nil) ⇒ Subscription

Subscribe to a Parse class with optional query constraints

Parameters:

  • class_name (String, Class)

    Parse class name or model class

  • where (Hash) (defaults to: {})

    query constraints

  • fields (Array<String>) (defaults to: nil)

    specific fields to watch

  • session_token (String) (defaults to: nil)

    session token for ACL-aware subscriptions

Returns:



276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
# File 'lib/parse/live_query/client.rb', line 276

def subscribe(class_name, where: {}, fields: nil, session_token: nil)
  # Handle Parse::Object subclass
  if class_name.is_a?(Class) && class_name < Parse::Object
    class_name = class_name.parse_class
  end

  # Handle Parse::Query object
  if class_name.is_a?(Parse::Query)
    query = class_name
    class_name = query.table
    where = query.compile_where
  end

  # Refuse server-side-JS / data-mutating operators in the `where`
  # filter at any nesting depth. LiveQuery subscriptions are a
  # persistent server-evaluated channel; without this gate, a
  # caller could plant `$where`/`$function`/`$accumulator` (or
  # data-mutating stages nested inside) and have them re-evaluated
  # on every matching event for the lifetime of the subscription.
  # Permissive mode (recursive denylist only) mirrors the
  # `Parse::MongoDB`/`Parse::AtlasSearch` filter posture so the
  # SDK enforces one consistent set of refusals on every
  # user-influenced filter path.
  Parse::PipelineSecurity.validate_filter!(where) if where.is_a?(Hash) && !where.empty?

  subscription = Subscription.new(
    client: self,
    class_name: class_name,
    query: where,
    fields: fields,
    session_token: session_token,
  )

  @monitor.synchronize do
    @subscriptions[subscription.request_id] = subscription
  end

  Logging.debug("Subscription created",
                request_id: subscription.request_id,
                class_name: class_name)

  # Send subscribe message if connected
  if connected?
    send_message(subscription.to_subscribe_message)
  else
    # Queue subscription for when connection is established
    connect unless connecting?
  end

  subscription
end

#unsubscribe(subscription) ⇒ Object

Unsubscribe from a subscription

Parameters:



330
331
332
333
334
335
336
337
# File 'lib/parse/live_query/client.rb', line 330

def unsubscribe(subscription)
  Logging.debug("Unsubscribing", request_id: subscription.request_id)
  send_message(subscription.to_unsubscribe_message) if connected?

  @monitor.synchronize do
    @subscriptions.delete(subscription.request_id)
  end
end