Class: Parse::LiveQuery::EventQueue
- Inherits:
-
Object
- Object
- Parse::LiveQuery::EventQueue
- Defined in:
- lib/parse/live_query/event_queue.rb
Overview
Bounded event queue with configurable backpressure strategies.
Provides a buffer between the WebSocket reader thread and callback execution, preventing high-frequency events from overwhelming the system.
Backpressure Strategies:
-
:block - Block enqueue until space available (can cause reader thread to block)
-
:drop_oldest - Drop oldest events when full (default)
-
:drop_newest - Drop incoming events when full
Constant Summary collapse
- STRATEGIES =
Valid backpressure strategies
[:block, :drop_oldest, :drop_newest].freeze
- DEFAULT_MAX_SIZE =
Default maximum queue size
1000- DEFAULT_STRATEGY =
Default backpressure strategy
:drop_oldest
Instance Attribute Summary collapse
-
#dropped_count ⇒ Integer
readonly
Number of dropped events.
-
#enqueued_count ⇒ Integer
readonly
Total events enqueued.
-
#max_size ⇒ Integer
readonly
Maximum queue size.
-
#processed_count ⇒ Integer
readonly
Total events processed.
-
#strategy ⇒ Symbol
readonly
Backpressure strategy.
Instance Method Summary collapse
-
#clear ⇒ Integer
Clear the queue.
-
#empty? ⇒ Boolean
Check if queue is empty.
-
#enqueue(event) ⇒ Boolean
Add an event to the queue.
-
#full? ⇒ Boolean
Check if queue is full.
-
#initialize(max_size: DEFAULT_MAX_SIZE, strategy: DEFAULT_STRATEGY, on_drop: nil) ⇒ EventQueue
constructor
Create a new event queue.
-
#running? ⇒ Boolean
Check if queue is running.
-
#size ⇒ Integer
Current queue size.
-
#start {|event| ... } ⇒ void
Start the event processor thread.
-
#stats ⇒ Hash
Get queue statistics.
-
#stop(drain: true, timeout: 5.0) ⇒ void
Stop the event processor.
Constructor Details
#initialize(max_size: DEFAULT_MAX_SIZE, strategy: DEFAULT_STRATEGY, on_drop: nil) ⇒ EventQueue
Create a new event queue
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/parse/live_query/event_queue.rb', line 60 def initialize(max_size: DEFAULT_MAX_SIZE, strategy: DEFAULT_STRATEGY, on_drop: nil) unless STRATEGIES.include?(strategy) raise ArgumentError, "Invalid strategy: #{strategy}. Must be one of #{STRATEGIES.inspect}" end @max_size = max_size @strategy = strategy @on_drop = on_drop @queue = [] @monitor = Monitor.new @condition = @monitor.new_cond @running = false @processor_thread = nil @dropped_count = 0 @enqueued_count = 0 @processed_count = 0 end |
Instance Attribute Details
#dropped_count ⇒ Integer (readonly)
Returns number of dropped events.
48 49 50 |
# File 'lib/parse/live_query/event_queue.rb', line 48 def dropped_count @dropped_count end |
#enqueued_count ⇒ Integer (readonly)
Returns total events enqueued.
51 52 53 |
# File 'lib/parse/live_query/event_queue.rb', line 51 def enqueued_count @enqueued_count end |
#max_size ⇒ Integer (readonly)
Returns maximum queue size.
42 43 44 |
# File 'lib/parse/live_query/event_queue.rb', line 42 def max_size @max_size end |
#processed_count ⇒ Integer (readonly)
Returns total events processed.
54 55 56 |
# File 'lib/parse/live_query/event_queue.rb', line 54 def processed_count @processed_count end |
#strategy ⇒ Symbol (readonly)
Returns backpressure strategy.
45 46 47 |
# File 'lib/parse/live_query/event_queue.rb', line 45 def strategy @strategy end |
Instance Method Details
#clear ⇒ Integer
Clear the queue
181 182 183 184 185 186 187 |
# File 'lib/parse/live_query/event_queue.rb', line 181 def clear @monitor.synchronize do count = @queue.size @queue.clear count end end |
#empty? ⇒ Boolean
Check if queue is empty
152 153 154 |
# File 'lib/parse/live_query/event_queue.rb', line 152 def empty? @monitor.synchronize { @queue.empty? } end |
#enqueue(event) ⇒ Boolean
Add an event to the queue
123 124 125 126 127 128 129 130 131 132 133 134 135 136 |
# File 'lib/parse/live_query/event_queue.rb', line 123 def enqueue(event) @monitor.synchronize do return false unless @running if @queue.size >= @max_size handle_backpressure(event) else @queue << event @enqueued_count += 1 @condition.signal true end end end |
#full? ⇒ Boolean
Check if queue is full
146 147 148 |
# File 'lib/parse/live_query/event_queue.rb', line 146 def full? @monitor.synchronize { @queue.size >= @max_size } end |
#running? ⇒ Boolean
Check if queue is running
158 159 160 |
# File 'lib/parse/live_query/event_queue.rb', line 158 def running? @monitor.synchronize { @running } end |
#size ⇒ Integer
Current queue size
140 141 142 |
# File 'lib/parse/live_query/event_queue.rb', line 140 def size @monitor.synchronize { @queue.size } end |
#start {|event| ... } ⇒ void
This method returns an undefined value.
Start the event processor thread
83 84 85 86 87 88 89 90 91 92 93 94 95 |
# File 'lib/parse/live_query/event_queue.rb', line 83 def start(&processor) raise ArgumentError, "Processor block required" unless block_given? @monitor.synchronize do return if @running @running = true @processor_thread = Thread.new { process_loop(&processor) } @processor_thread.abort_on_exception = false Logging.debug("Event queue started", max_size: @max_size, strategy: @strategy) end end |
#stats ⇒ Hash
Get queue statistics
164 165 166 167 168 169 170 171 172 173 174 175 176 177 |
# File 'lib/parse/live_query/event_queue.rb', line 164 def stats @monitor.synchronize do { size: @queue.size, max_size: @max_size, strategy: @strategy, running: @running, enqueued_count: @enqueued_count, processed_count: @processed_count, dropped_count: @dropped_count, utilization: @max_size > 0 ? (@queue.size.to_f / @max_size * 100).round(1) : 0, } end end |
#stop(drain: true, timeout: 5.0) ⇒ void
This method returns an undefined value.
Stop the event processor
101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 |
# File 'lib/parse/live_query/event_queue.rb', line 101 def stop(drain: true, timeout: 5.0) @monitor.synchronize do return unless @running @running = false @condition.broadcast end if drain && @processor_thread @processor_thread.join(timeout) end @processor_thread&.kill @processor_thread = nil remaining = @monitor.synchronize { @queue.size } Logging.debug("Event queue stopped", remaining: remaining, dropped: @dropped_count) end |