Class: Parse::LiveQuery::EventQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/parse/live_query/event_queue.rb

Overview

Bounded event queue with configurable backpressure strategies.

Provides a buffer between the WebSocket reader thread and callback execution, preventing high-frequency events from overwhelming the system.

Backpressure Strategies:

  • :block - Block enqueue until space available (can cause reader thread to block)

  • :drop_oldest - Drop oldest events when full (default)

  • :drop_newest - Drop incoming events when full

Examples:

queue = EventQueue.new(max_size: 1000, strategy: :drop_oldest)
queue.start { |event| process_event(event) }
queue.enqueue(event)
queue.stop(drain: true)

Constant Summary collapse

STRATEGIES =

Valid backpressure strategies

[:block, :drop_oldest, :drop_newest].freeze
DEFAULT_MAX_SIZE =

Default maximum queue size

1000
DEFAULT_STRATEGY =

Default backpressure strategy

:drop_oldest

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(max_size: DEFAULT_MAX_SIZE, strategy: DEFAULT_STRATEGY, on_drop: nil) ⇒ EventQueue

Create a new event queue

Parameters:

  • max_size (Integer) (defaults to: DEFAULT_MAX_SIZE)

    maximum queue size

  • strategy (Symbol) (defaults to: DEFAULT_STRATEGY)

    backpressure strategy (:block, :drop_oldest, :drop_newest)

  • on_drop (Proc, nil) (defaults to: nil)

    callback when events are dropped (receives event, reason)



60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/parse/live_query/event_queue.rb', line 60

def initialize(max_size: DEFAULT_MAX_SIZE, strategy: DEFAULT_STRATEGY, on_drop: nil)
  unless STRATEGIES.include?(strategy)
    raise ArgumentError, "Invalid strategy: #{strategy}. Must be one of #{STRATEGIES.inspect}"
  end

  @max_size = max_size
  @strategy = strategy
  @on_drop = on_drop

  @queue = []
  @monitor = Monitor.new
  @condition = @monitor.new_cond
  @running = false
  @processor_thread = nil

  @dropped_count = 0
  @enqueued_count = 0
  @processed_count = 0
end

Instance Attribute Details

#dropped_countInteger (readonly)

Returns number of dropped events.

Returns:

  • (Integer)

    number of dropped events



48
49
50
# File 'lib/parse/live_query/event_queue.rb', line 48

def dropped_count
  @dropped_count
end

#enqueued_countInteger (readonly)

Returns total events enqueued.

Returns:

  • (Integer)

    total events enqueued



51
52
53
# File 'lib/parse/live_query/event_queue.rb', line 51

def enqueued_count
  @enqueued_count
end

#max_sizeInteger (readonly)

Returns maximum queue size.

Returns:

  • (Integer)

    maximum queue size



42
43
44
# File 'lib/parse/live_query/event_queue.rb', line 42

def max_size
  @max_size
end

#processed_countInteger (readonly)

Returns total events processed.

Returns:

  • (Integer)

    total events processed



54
55
56
# File 'lib/parse/live_query/event_queue.rb', line 54

def processed_count
  @processed_count
end

#strategySymbol (readonly)

Returns backpressure strategy.

Returns:

  • (Symbol)

    backpressure strategy



45
46
47
# File 'lib/parse/live_query/event_queue.rb', line 45

def strategy
  @strategy
end

Instance Method Details

#clearInteger

Clear the queue

Returns:

  • (Integer)

    number of events cleared



181
182
183
184
185
186
187
# File 'lib/parse/live_query/event_queue.rb', line 181

def clear
  @monitor.synchronize do
    count = @queue.size
    @queue.clear
    count
  end
end

#empty?Boolean

Check if queue is empty

Returns:

  • (Boolean)


152
153
154
# File 'lib/parse/live_query/event_queue.rb', line 152

def empty?
  @monitor.synchronize { @queue.empty? }
end

#enqueue(event) ⇒ Boolean

Add an event to the queue

Parameters:

  • event (Object)

    the event to enqueue

Returns:

  • (Boolean)

    true if enqueued, false if dropped



123
124
125
126
127
128
129
130
131
132
133
134
135
136
# File 'lib/parse/live_query/event_queue.rb', line 123

def enqueue(event)
  @monitor.synchronize do
    return false unless @running

    if @queue.size >= @max_size
      handle_backpressure(event)
    else
      @queue << event
      @enqueued_count += 1
      @condition.signal
      true
    end
  end
end

#full?Boolean

Check if queue is full

Returns:

  • (Boolean)


146
147
148
# File 'lib/parse/live_query/event_queue.rb', line 146

def full?
  @monitor.synchronize { @queue.size >= @max_size }
end

#running?Boolean

Check if queue is running

Returns:

  • (Boolean)


158
159
160
# File 'lib/parse/live_query/event_queue.rb', line 158

def running?
  @monitor.synchronize { @running }
end

#sizeInteger

Current queue size

Returns:

  • (Integer)


140
141
142
# File 'lib/parse/live_query/event_queue.rb', line 140

def size
  @monitor.synchronize { @queue.size }
end

#start {|event| ... } ⇒ void

This method returns an undefined value.

Start the event processor thread

Yields:

  • (event)

    Block to process each event

Raises:

  • (ArgumentError)


83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/parse/live_query/event_queue.rb', line 83

def start(&processor)
  raise ArgumentError, "Processor block required" unless block_given?

  @monitor.synchronize do
    return if @running

    @running = true
    @processor_thread = Thread.new { process_loop(&processor) }
    @processor_thread.abort_on_exception = false

    Logging.debug("Event queue started", max_size: @max_size, strategy: @strategy)
  end
end

#statsHash

Get queue statistics

Returns:



164
165
166
167
168
169
170
171
172
173
174
175
176
177
# File 'lib/parse/live_query/event_queue.rb', line 164

def stats
  @monitor.synchronize do
    {
      size: @queue.size,
      max_size: @max_size,
      strategy: @strategy,
      running: @running,
      enqueued_count: @enqueued_count,
      processed_count: @processed_count,
      dropped_count: @dropped_count,
      utilization: @max_size > 0 ? (@queue.size.to_f / @max_size * 100).round(1) : 0,
    }
  end
end

#stop(drain: true, timeout: 5.0) ⇒ void

This method returns an undefined value.

Stop the event processor

Parameters:

  • drain (Boolean) (defaults to: true)

    process remaining events before stopping

  • timeout (Float) (defaults to: 5.0)

    seconds to wait for drain



101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/parse/live_query/event_queue.rb', line 101

def stop(drain: true, timeout: 5.0)
  @monitor.synchronize do
    return unless @running

    @running = false
    @condition.broadcast
  end

  if drain && @processor_thread
    @processor_thread.join(timeout)
  end

  @processor_thread&.kill
  @processor_thread = nil

  remaining = @monitor.synchronize { @queue.size }
  Logging.debug("Event queue stopped", remaining: remaining, dropped: @dropped_count)
end