Class: Parse::LiveQuery::Client
- Inherits:
-
Object
- Object
- Parse::LiveQuery::Client
- Defined in:
- lib/parse/live_query/client.rb
Overview
WebSocket client for Parse LiveQuery server. Manages WebSocket connection, authentication, and subscription lifecycle.
Features:
-
Automatic ping/pong keep-alive with stale connection detection
-
Circuit breaker for intelligent failure handling
-
Event queue with backpressure protection
-
Automatic reconnection with exponential backoff and jitter
Constant Summary collapse
- OPCODE_CONTINUATION =
WebSocket operation codes
0x0- OPCODE_TEXT =
0x1- OPCODE_BINARY =
0x2- OPCODE_CLOSE =
0x8- OPCODE_PING =
0x9- OPCODE_PONG =
0xA- DEFAULT_MAX_MESSAGE_SIZE =
Default maximum message size (1MB) - prevents memory exhaustion attacks
1_048_576- DEFAULT_FRAME_READ_TIMEOUT =
Default frame read timeout in seconds - prevents indefinite blocking
30
Instance Attribute Summary collapse
-
#application_id ⇒ String
readonly
Parse application ID.
-
#circuit_breaker ⇒ CircuitBreaker
readonly
Connection circuit breaker.
-
#client_key ⇒ String?
readonly
Parse client key (REST API key).
-
#event_queue ⇒ EventQueue
readonly
Event processing queue.
-
#frame_read_timeout ⇒ Integer
readonly
Frame read timeout in seconds.
-
#health_monitor ⇒ HealthMonitor
readonly
Connection health monitor.
-
#master_key ⇒ String?
readonly
Parse master key.
-
#max_message_size ⇒ Integer
readonly
Maximum allowed message size in bytes.
-
#state ⇒ Symbol
readonly
Connection state (:disconnected, :connecting, :connected, :closed).
-
#subscriptions ⇒ Hash<Integer, Subscription>
readonly
Active subscriptions by request ID.
-
#url ⇒ String
readonly
WebSocket URL.
Instance Method Summary collapse
-
#close(code: 1000, reason: "Client closing") ⇒ Object
Disconnect from the LiveQuery server.
-
#closed? ⇒ Boolean
True if closed.
-
#connect ⇒ Boolean
Connect to the LiveQuery server.
-
#connected? ⇒ Boolean
True if connected.
-
#connecting? ⇒ Boolean
True if connecting.
-
#health_info ⇒ Hash
Get comprehensive health information.
-
#healthy? ⇒ Boolean
Check if connection is healthy.
-
#initialize(url: nil, application_id: nil, client_key: nil, master_key: NOT_PROVIDED, auto_connect: nil, auto_reconnect: nil) ⇒ Client
constructor
Create a new LiveQuery client.
-
#on(event) { ... } ⇒ Object
Register callback for connection events.
-
#on_close(&block) ⇒ Object
Callback for connection closed.
-
#on_error(&block) ⇒ Object
Callback for errors.
-
#on_open(&block) ⇒ Object
Callback for connection opened.
-
#shutdown(timeout: 5.0) ⇒ void
Graceful shutdown with timeout.
-
#subscribe(class_name, where: {}, fields: nil, session_token: nil) ⇒ Subscription
Subscribe to a Parse class with optional query constraints.
-
#unsubscribe(subscription) ⇒ Object
Unsubscribe from a subscription.
Constructor Details
#initialize(url: nil, application_id: nil, client_key: nil, master_key: NOT_PROVIDED, auto_connect: nil, auto_reconnect: nil) ⇒ Client
Create a new LiveQuery client
112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 |
# File 'lib/parse/live_query/client.rb', line 112 def initialize(url: nil, application_id: nil, client_key: nil, master_key: NOT_PROVIDED, auto_connect: nil, auto_reconnect: nil) cfg = config # Use provided values or fall back to configuration/environment @url = url || cfg.url || derive_websocket_url @application_id = application_id || cfg.application_id || parse_client_value(:application_id) @client_key = client_key || cfg.client_key || parse_client_value(:api_key) @master_key = if master_key.equal?(NOT_PROVIDED) cfg.master_key || parse_client_value(:master_key) else master_key end @auto_connect = auto_connect.nil? ? cfg.auto_connect : auto_connect @auto_reconnect = auto_reconnect.nil? ? cfg.auto_reconnect : auto_reconnect @max_message_size = cfg. || DEFAULT_MAX_MESSAGE_SIZE @frame_read_timeout = cfg.frame_read_timeout || DEFAULT_FRAME_READ_TIMEOUT @state = :disconnected @subscriptions = {} @monitor = Monitor.new @socket = nil @reader_thread = nil @reconnect_thread = nil @reconnect_interval = cfg.initial_reconnect_interval @callbacks = Hash.new { |h, k| h[k] = [] } @client_id = nil # Initialize production components @health_monitor = HealthMonitor.new( client: self, ping_interval: cfg.ping_interval, pong_timeout: cfg.pong_timeout, ) @circuit_breaker = CircuitBreaker.new( failure_threshold: cfg.circuit_failure_threshold, reset_timeout: cfg.circuit_reset_timeout, on_state_change: method(:on_circuit_state_change), ) @event_queue = EventQueue.new( max_size: cfg.event_queue_size, strategy: cfg.backpressure_strategy, on_drop: method(:on_event_dropped), ) Logging.info("LiveQuery client initialized", url: @url, application_id: @application_id) connect if @auto_connect && @url end |
Instance Attribute Details
#application_id ⇒ String (readonly)
Returns Parse application ID.
61 62 63 |
# File 'lib/parse/live_query/client.rb', line 61 def application_id @application_id end |
#circuit_breaker ⇒ CircuitBreaker (readonly)
Returns connection circuit breaker.
79 80 81 |
# File 'lib/parse/live_query/client.rb', line 79 def circuit_breaker @circuit_breaker end |
#client_key ⇒ String? (readonly)
Returns Parse client key (REST API key).
64 65 66 |
# File 'lib/parse/live_query/client.rb', line 64 def client_key @client_key end |
#event_queue ⇒ EventQueue (readonly)
Returns event processing queue.
82 83 84 |
# File 'lib/parse/live_query/client.rb', line 82 def event_queue @event_queue end |
#frame_read_timeout ⇒ Integer (readonly)
Returns frame read timeout in seconds.
88 89 90 |
# File 'lib/parse/live_query/client.rb', line 88 def frame_read_timeout @frame_read_timeout end |
#health_monitor ⇒ HealthMonitor (readonly)
Returns connection health monitor.
76 77 78 |
# File 'lib/parse/live_query/client.rb', line 76 def health_monitor @health_monitor end |
#master_key ⇒ String? (readonly)
Returns Parse master key.
67 68 69 |
# File 'lib/parse/live_query/client.rb', line 67 def master_key @master_key end |
#max_message_size ⇒ Integer (readonly)
Returns maximum allowed message size in bytes.
85 86 87 |
# File 'lib/parse/live_query/client.rb', line 85 def @max_message_size end |
#state ⇒ Symbol (readonly)
Returns connection state (:disconnected, :connecting, :connected, :closed).
70 71 72 |
# File 'lib/parse/live_query/client.rb', line 70 def state @state end |
#subscriptions ⇒ Hash<Integer, Subscription> (readonly)
Returns active subscriptions by request ID.
73 74 75 |
# File 'lib/parse/live_query/client.rb', line 73 def subscriptions @subscriptions end |
#url ⇒ String (readonly)
Returns WebSocket URL.
58 59 60 |
# File 'lib/parse/live_query/client.rb', line 58 def url @url end |
Instance Method Details
#close(code: 1000, reason: "Client closing") ⇒ Object
Disconnect from the LiveQuery server
206 207 208 209 210 211 212 213 214 215 216 217 |
# File 'lib/parse/live_query/client.rb', line 206 def close(code: 1000, reason: "Client closing") @auto_reconnect = false @monitor.synchronize do return if @state == :closed Logging.info("Closing connection", code: code, reason: reason) send_close_frame(code, reason) if @socket cleanup_connection @state = :closed end emit(:close) end |
#closed? ⇒ Boolean
Returns true if closed.
262 263 264 |
# File 'lib/parse/live_query/client.rb', line 262 def closed? @state == :closed end |
#connect ⇒ Boolean
Connect to the LiveQuery server
169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 |
# File 'lib/parse/live_query/client.rb', line 169 def connect return true if connected? || connecting? # Check circuit breaker before attempting connection unless @circuit_breaker.allow_request? time_remaining = @circuit_breaker.time_until_half_open Logging.warn("Connection blocked by circuit breaker", state: @circuit_breaker.state, time_until_retry: time_remaining) emit(:circuit_open, time_remaining) schedule_reconnect if @auto_reconnect return false end @monitor.synchronize do @state = :connecting end begin Logging.info("Connecting to LiveQuery server", url: @url) establish_connection start_reader_thread true rescue => e @circuit_breaker.record_failure @state = :disconnected Logging.error("Failed to connect", error: e) emit(:error, ConnectionError.new("Failed to connect: #{e.}")) schedule_reconnect if @auto_reconnect false end end |
#connected? ⇒ Boolean
Returns true if connected.
252 253 254 |
# File 'lib/parse/live_query/client.rb', line 252 def connected? @state == :connected end |
#connecting? ⇒ Boolean
Returns true if connecting.
257 258 259 |
# File 'lib/parse/live_query/client.rb', line 257 def connecting? @state == :connecting end |
#health_info ⇒ Hash
Get comprehensive health information
274 275 276 277 278 279 280 281 282 283 284 285 286 |
# File 'lib/parse/live_query/client.rb', line 274 def health_info { state: @state, connected: connected?, healthy: healthy?, client_id: @client_id, subscription_count: @subscriptions.size, max_message_size: @max_message_size, health_monitor: @health_monitor.health_info, circuit_breaker: @circuit_breaker.info, event_queue: @event_queue.stats, } end |
#healthy? ⇒ Boolean
Check if connection is healthy
268 269 270 |
# File 'lib/parse/live_query/client.rb', line 268 def healthy? connected? && @health_monitor.healthy? end |
#on(event) { ... } ⇒ Object
Register callback for connection events
360 361 362 363 364 365 |
# File 'lib/parse/live_query/client.rb', line 360 def on(event, &block) @monitor.synchronize do @callbacks[event] << block if block_given? end self end |
#on_close(&block) ⇒ Object
Callback for connection closed
373 374 375 |
# File 'lib/parse/live_query/client.rb', line 373 def on_close(&block) on(:close, &block) end |
#on_error(&block) ⇒ Object
Callback for errors
378 379 380 |
# File 'lib/parse/live_query/client.rb', line 378 def on_error(&block) on(:error, &block) end |
#on_open(&block) ⇒ Object
Callback for connection opened
368 369 370 |
# File 'lib/parse/live_query/client.rb', line 368 def on_open(&block) on(:open, &block) end |
#shutdown(timeout: 5.0) ⇒ void
This method returns an undefined value.
Graceful shutdown with timeout
222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 |
# File 'lib/parse/live_query/client.rb', line 222 def shutdown(timeout: 5.0) Logging.info("Shutting down LiveQuery client", timeout: timeout) @auto_reconnect = false # Cancel any pending reconnect thread cancel_reconnect_thread # Stop health monitor @health_monitor.stop # Stop event queue and drain remaining events @event_queue.stop(drain: true, timeout: timeout / 2) # Close connection close(code: 1000, reason: "Shutdown") # Wait for reader thread to finish @reader_thread&.join(timeout / 2) # Force kill if still running @reader_thread&.kill @reader_thread = nil Logging.info("Shutdown complete", events_processed: @event_queue.processed_count, events_dropped: @event_queue.dropped_count) end |
#subscribe(class_name, where: {}, fields: nil, session_token: nil) ⇒ Subscription
Subscribe to a Parse class with optional query constraints
294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 |
# File 'lib/parse/live_query/client.rb', line 294 def subscribe(class_name, where: {}, fields: nil, session_token: nil) # Handle Parse::Object subclass if class_name.is_a?(Class) && class_name < Parse::Object class_name = class_name.parse_class end # Handle Parse::Query object if class_name.is_a?(Parse::Query) query = class_name class_name = query.table where = query.compile_where end # Refuse server-side-JS / data-mutating operators in the `where` # filter at any nesting depth. LiveQuery subscriptions are a # persistent server-evaluated channel; without this gate, a # caller could plant `$where`/`$function`/`$accumulator` (or # data-mutating stages nested inside) and have them re-evaluated # on every matching event for the lifetime of the subscription. # Permissive mode (recursive denylist only) mirrors the # `Parse::MongoDB`/`Parse::AtlasSearch` filter posture so the # SDK enforces one consistent set of refusals on every # user-influenced filter path. Parse::PipelineSecurity.validate_filter!(where) if where.is_a?(Hash) && !where.empty? subscription = Subscription.new( client: self, class_name: class_name, query: where, fields: fields, session_token: session_token, ) @monitor.synchronize do @subscriptions[subscription.request_id] = subscription end Logging.debug("Subscription created", request_id: subscription.request_id, class_name: class_name) # Send subscribe message if connected if connected? (subscription.) else # Queue subscription for when connection is established connect unless connecting? end subscription end |
#unsubscribe(subscription) ⇒ Object
Unsubscribe from a subscription
348 349 350 351 352 353 354 355 |
# File 'lib/parse/live_query/client.rb', line 348 def unsubscribe(subscription) Logging.debug("Unsubscribing", request_id: subscription.request_id) (subscription.) if connected? @monitor.synchronize do @subscriptions.delete(subscription.request_id) end end |