Class: Parse::LiveQuery::Client
- Inherits:
-
Object
- Object
- Parse::LiveQuery::Client
- Defined in:
- lib/parse/live_query/client.rb
Overview
WebSocket client for Parse LiveQuery server. Manages WebSocket connection, authentication, and subscription lifecycle.
Features:
-
Automatic ping/pong keep-alive with stale connection detection
-
Circuit breaker for intelligent failure handling
-
Event queue with backpressure protection
-
Automatic reconnection with exponential backoff and jitter
Constant Summary collapse
- OPCODE_CONTINUATION =
WebSocket operation codes
0x0- OPCODE_TEXT =
0x1- OPCODE_BINARY =
0x2- OPCODE_CLOSE =
0x8- OPCODE_PING =
0x9- OPCODE_PONG =
0xA- DEFAULT_MAX_MESSAGE_SIZE =
Default maximum message size (1MB) - prevents memory exhaustion attacks
1_048_576- DEFAULT_FRAME_READ_TIMEOUT =
Default frame read timeout in seconds - prevents indefinite blocking
30
Instance Attribute Summary collapse
-
#application_id ⇒ String
readonly
Parse application ID.
-
#circuit_breaker ⇒ CircuitBreaker
readonly
Connection circuit breaker.
-
#client_key ⇒ String?
readonly
Parse client key (REST API key).
-
#event_queue ⇒ EventQueue
readonly
Event processing queue.
-
#frame_read_timeout ⇒ Integer
readonly
Frame read timeout in seconds.
-
#health_monitor ⇒ HealthMonitor
readonly
Connection health monitor.
-
#master_key ⇒ String?
readonly
Parse master key.
-
#max_message_size ⇒ Integer
readonly
Maximum allowed message size in bytes.
-
#state ⇒ Symbol
readonly
Connection state (:disconnected, :connecting, :connected, :closed).
-
#subscriptions ⇒ Hash<Integer, Subscription>
readonly
Active subscriptions by request ID.
-
#url ⇒ String
readonly
WebSocket URL.
Instance Method Summary collapse
-
#close(code: 1000, reason: "Client closing") ⇒ Object
Disconnect from the LiveQuery server.
-
#closed? ⇒ Boolean
True if closed.
-
#connect ⇒ Boolean
Connect to the LiveQuery server.
-
#connected? ⇒ Boolean
True if connected.
-
#connecting? ⇒ Boolean
True if connecting.
-
#health_info ⇒ Hash
Get comprehensive health information.
-
#healthy? ⇒ Boolean
Check if connection is healthy.
-
#initialize(url: nil, application_id: nil, client_key: nil, master_key: nil, auto_connect: nil, auto_reconnect: nil) ⇒ Client
constructor
Create a new LiveQuery client.
-
#on(event) { ... } ⇒ Object
Register callback for connection events.
-
#on_close(&block) ⇒ Object
Callback for connection closed.
-
#on_error(&block) ⇒ Object
Callback for errors.
-
#on_open(&block) ⇒ Object
Callback for connection opened.
-
#shutdown(timeout: 5.0) ⇒ void
Graceful shutdown with timeout.
-
#subscribe(class_name, where: {}, fields: nil, session_token: nil) ⇒ Subscription
Subscribe to a Parse class with optional query constraints.
-
#unsubscribe(subscription) ⇒ Object
Unsubscribe from a subscription.
Constructor Details
#initialize(url: nil, application_id: nil, client_key: nil, master_key: nil, auto_connect: nil, auto_reconnect: nil) ⇒ Client
Create a new LiveQuery client
97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 |
# File 'lib/parse/live_query/client.rb', line 97 def initialize(url: nil, application_id: nil, client_key: nil, master_key: nil, auto_connect: nil, auto_reconnect: nil) cfg = config # Use provided values or fall back to configuration/environment @url = url || cfg.url || derive_websocket_url @application_id = application_id || cfg.application_id || parse_client_value(:application_id) @client_key = client_key || cfg.client_key || parse_client_value(:api_key) @master_key = master_key || cfg.master_key || parse_client_value(:master_key) @auto_connect = auto_connect.nil? ? cfg.auto_connect : auto_connect @auto_reconnect = auto_reconnect.nil? ? cfg.auto_reconnect : auto_reconnect @max_message_size = cfg. || DEFAULT_MAX_MESSAGE_SIZE @frame_read_timeout = cfg.frame_read_timeout || DEFAULT_FRAME_READ_TIMEOUT @state = :disconnected @subscriptions = {} @monitor = Monitor.new @socket = nil @reader_thread = nil @reconnect_thread = nil @reconnect_interval = cfg.initial_reconnect_interval @callbacks = Hash.new { |h, k| h[k] = [] } @client_id = nil # Initialize production components @health_monitor = HealthMonitor.new( client: self, ping_interval: cfg.ping_interval, pong_timeout: cfg.pong_timeout, ) @circuit_breaker = CircuitBreaker.new( failure_threshold: cfg.circuit_failure_threshold, reset_timeout: cfg.circuit_reset_timeout, on_state_change: method(:on_circuit_state_change), ) @event_queue = EventQueue.new( max_size: cfg.event_queue_size, strategy: cfg.backpressure_strategy, on_drop: method(:on_event_dropped), ) Logging.info("LiveQuery client initialized", url: @url, application_id: @application_id) connect if @auto_connect && @url end |
Instance Attribute Details
#application_id ⇒ String (readonly)
Returns Parse application ID.
61 62 63 |
# File 'lib/parse/live_query/client.rb', line 61 def application_id @application_id end |
#circuit_breaker ⇒ CircuitBreaker (readonly)
Returns connection circuit breaker.
79 80 81 |
# File 'lib/parse/live_query/client.rb', line 79 def circuit_breaker @circuit_breaker end |
#client_key ⇒ String? (readonly)
Returns Parse client key (REST API key).
64 65 66 |
# File 'lib/parse/live_query/client.rb', line 64 def client_key @client_key end |
#event_queue ⇒ EventQueue (readonly)
Returns event processing queue.
82 83 84 |
# File 'lib/parse/live_query/client.rb', line 82 def event_queue @event_queue end |
#frame_read_timeout ⇒ Integer (readonly)
Returns frame read timeout in seconds.
88 89 90 |
# File 'lib/parse/live_query/client.rb', line 88 def frame_read_timeout @frame_read_timeout end |
#health_monitor ⇒ HealthMonitor (readonly)
Returns connection health monitor.
76 77 78 |
# File 'lib/parse/live_query/client.rb', line 76 def health_monitor @health_monitor end |
#master_key ⇒ String? (readonly)
Returns Parse master key.
67 68 69 |
# File 'lib/parse/live_query/client.rb', line 67 def master_key @master_key end |
#max_message_size ⇒ Integer (readonly)
Returns maximum allowed message size in bytes.
85 86 87 |
# File 'lib/parse/live_query/client.rb', line 85 def @max_message_size end |
#state ⇒ Symbol (readonly)
Returns connection state (:disconnected, :connecting, :connected, :closed).
70 71 72 |
# File 'lib/parse/live_query/client.rb', line 70 def state @state end |
#subscriptions ⇒ Hash<Integer, Subscription> (readonly)
Returns active subscriptions by request ID.
73 74 75 |
# File 'lib/parse/live_query/client.rb', line 73 def subscriptions @subscriptions end |
#url ⇒ String (readonly)
Returns WebSocket URL.
58 59 60 |
# File 'lib/parse/live_query/client.rb', line 58 def url @url end |
Instance Method Details
#close(code: 1000, reason: "Client closing") ⇒ Object
Disconnect from the LiveQuery server
188 189 190 191 192 193 194 195 196 197 198 199 |
# File 'lib/parse/live_query/client.rb', line 188 def close(code: 1000, reason: "Client closing") @auto_reconnect = false @monitor.synchronize do return if @state == :closed Logging.info("Closing connection", code: code, reason: reason) send_close_frame(code, reason) if @socket cleanup_connection @state = :closed end emit(:close) end |
#closed? ⇒ Boolean
Returns true if closed.
244 245 246 |
# File 'lib/parse/live_query/client.rb', line 244 def closed? @state == :closed end |
#connect ⇒ Boolean
Connect to the LiveQuery server
151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 |
# File 'lib/parse/live_query/client.rb', line 151 def connect return true if connected? || connecting? # Check circuit breaker before attempting connection unless @circuit_breaker.allow_request? time_remaining = @circuit_breaker.time_until_half_open Logging.warn("Connection blocked by circuit breaker", state: @circuit_breaker.state, time_until_retry: time_remaining) emit(:circuit_open, time_remaining) schedule_reconnect if @auto_reconnect return false end @monitor.synchronize do @state = :connecting end begin Logging.info("Connecting to LiveQuery server", url: @url) establish_connection start_reader_thread true rescue => e @circuit_breaker.record_failure @state = :disconnected Logging.error("Failed to connect", error: e) emit(:error, ConnectionError.new("Failed to connect: #{e.}")) schedule_reconnect if @auto_reconnect false end end |
#connected? ⇒ Boolean
Returns true if connected.
234 235 236 |
# File 'lib/parse/live_query/client.rb', line 234 def connected? @state == :connected end |
#connecting? ⇒ Boolean
Returns true if connecting.
239 240 241 |
# File 'lib/parse/live_query/client.rb', line 239 def connecting? @state == :connecting end |
#health_info ⇒ Hash
Get comprehensive health information
256 257 258 259 260 261 262 263 264 265 266 267 268 |
# File 'lib/parse/live_query/client.rb', line 256 def health_info { state: @state, connected: connected?, healthy: healthy?, client_id: @client_id, subscription_count: @subscriptions.size, max_message_size: @max_message_size, health_monitor: @health_monitor.health_info, circuit_breaker: @circuit_breaker.info, event_queue: @event_queue.stats, } end |
#healthy? ⇒ Boolean
Check if connection is healthy
250 251 252 |
# File 'lib/parse/live_query/client.rb', line 250 def healthy? connected? && @health_monitor.healthy? end |
#on(event) { ... } ⇒ Object
Register callback for connection events
342 343 344 345 346 347 |
# File 'lib/parse/live_query/client.rb', line 342 def on(event, &block) @monitor.synchronize do @callbacks[event] << block if block_given? end self end |
#on_close(&block) ⇒ Object
Callback for connection closed
355 356 357 |
# File 'lib/parse/live_query/client.rb', line 355 def on_close(&block) on(:close, &block) end |
#on_error(&block) ⇒ Object
Callback for errors
360 361 362 |
# File 'lib/parse/live_query/client.rb', line 360 def on_error(&block) on(:error, &block) end |
#on_open(&block) ⇒ Object
Callback for connection opened
350 351 352 |
# File 'lib/parse/live_query/client.rb', line 350 def on_open(&block) on(:open, &block) end |
#shutdown(timeout: 5.0) ⇒ void
This method returns an undefined value.
Graceful shutdown with timeout
204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 |
# File 'lib/parse/live_query/client.rb', line 204 def shutdown(timeout: 5.0) Logging.info("Shutting down LiveQuery client", timeout: timeout) @auto_reconnect = false # Cancel any pending reconnect thread cancel_reconnect_thread # Stop health monitor @health_monitor.stop # Stop event queue and drain remaining events @event_queue.stop(drain: true, timeout: timeout / 2) # Close connection close(code: 1000, reason: "Shutdown") # Wait for reader thread to finish @reader_thread&.join(timeout / 2) # Force kill if still running @reader_thread&.kill @reader_thread = nil Logging.info("Shutdown complete", events_processed: @event_queue.processed_count, events_dropped: @event_queue.dropped_count) end |
#subscribe(class_name, where: {}, fields: nil, session_token: nil) ⇒ Subscription
Subscribe to a Parse class with optional query constraints
276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 |
# File 'lib/parse/live_query/client.rb', line 276 def subscribe(class_name, where: {}, fields: nil, session_token: nil) # Handle Parse::Object subclass if class_name.is_a?(Class) && class_name < Parse::Object class_name = class_name.parse_class end # Handle Parse::Query object if class_name.is_a?(Parse::Query) query = class_name class_name = query.table where = query.compile_where end # Refuse server-side-JS / data-mutating operators in the `where` # filter at any nesting depth. LiveQuery subscriptions are a # persistent server-evaluated channel; without this gate, a # caller could plant `$where`/`$function`/`$accumulator` (or # data-mutating stages nested inside) and have them re-evaluated # on every matching event for the lifetime of the subscription. # Permissive mode (recursive denylist only) mirrors the # `Parse::MongoDB`/`Parse::AtlasSearch` filter posture so the # SDK enforces one consistent set of refusals on every # user-influenced filter path. Parse::PipelineSecurity.validate_filter!(where) if where.is_a?(Hash) && !where.empty? subscription = Subscription.new( client: self, class_name: class_name, query: where, fields: fields, session_token: session_token, ) @monitor.synchronize do @subscriptions[subscription.request_id] = subscription end Logging.debug("Subscription created", request_id: subscription.request_id, class_name: class_name) # Send subscribe message if connected if connected? (subscription.) else # Queue subscription for when connection is established connect unless connecting? end subscription end |
#unsubscribe(subscription) ⇒ Object
Unsubscribe from a subscription
330 331 332 333 334 335 336 337 |
# File 'lib/parse/live_query/client.rb', line 330 def unsubscribe(subscription) Logging.debug("Unsubscribing", request_id: subscription.request_id) (subscription.) if connected? @monitor.synchronize do @subscriptions.delete(subscription.request_id) end end |