Class: BatchEventsQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/wingify/services/batch_event_queue.rb

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(batch_config) ⇒ BatchEventsQueue

Initializes a new batch events queue with the specified configuration

Parameters:

  • batch_config

    Configuration object containing:

    • request_time_interval: Time interval between batch requests (in seconds)

    • events_per_request: Maximum number of events to include in a single request

    • flush_callback: Callback function to execute after flushing events

    • dispatcher: Function to handle sending the batched events



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/wingify/services/batch_event_queue.rb', line 38

def initialize(batch_config)
  @queue = []
  @batch_config = batch_config
  @network_client = NetworkManager.instance.get_client
  
  if DataTypeUtil.is_number(batch_config[:request_time_interval]) && batch_config[:request_time_interval] >= 1
    @request_time_interval = batch_config[:request_time_interval]
  else
    @request_time_interval = Constants::DEFAULT_REQUEST_TIME_INTERVAL
    LoggerService.log(LogLevelEnum::INFO, "EVENT_BATCH_DEFAULTS", {
      parameter: 'request_time_interval',
      minLimit: 0,
      defaultValue: "#{@request_time_interval} seconds"
    })
  end
  
  if DataTypeUtil.is_number(batch_config[:events_per_request]) && 
     batch_config[:events_per_request] > 0 && 
     batch_config[:events_per_request] <= Constants::MAX_EVENTS_PER_REQUEST
    @events_per_request = batch_config[:events_per_request]
  elsif DataTypeUtil.is_number(batch_config[:events_per_request]) && 
        batch_config[:events_per_request] > Constants::MAX_EVENTS_PER_REQUEST
    @events_per_request = Constants::MAX_EVENTS_PER_REQUEST
    LoggerService.log(LogLevelEnum::INFO, "EVENT_BATCH_MAX_LIMIT", {
      parameter: 'events_per_request',
      maxLimit: Constants::MAX_EVENTS_PER_REQUEST.to_s
    })
  else
    @events_per_request = Constants::DEFAULT_EVENTS_PER_REQUEST
    LoggerService.log(LogLevelEnum::INFO, "EVENT_BATCH_DEFAULTS", {
      parameter: 'events_per_request',
      minLimit: 0,
      defaultValue: @events_per_request.to_s
    })
  end
  
  @flush_callback = batch_config[:flush_callback] if batch_config[:flush_callback].respond_to?(:call)
  
  @dispatcher = batch_config[:dispatcher]
  @batch_lock = Mutex.new
  @timer = nil
  create_new_batch_timer
end

Class Method Details

.configure(batch_config) ⇒ Object



28
29
30
# File 'lib/wingify/services/batch_event_queue.rb', line 28

def configure(batch_config)
  @instance = new(batch_config)
end

.instanceObject



24
25
26
# File 'lib/wingify/services/batch_event_queue.rb', line 24

def instance
  @instance ||= nil
end

Instance Method Details

#create_new_batch_timerObject

Creates a new timer thread to automatically flush events after request_time_interval The timer is only created if one doesn’t already exist



84
85
86
87
88
89
# File 'lib/wingify/services/batch_event_queue.rb', line 84

def create_new_batch_timer
  return if @timer
  
  @timer = Time.now + @request_time_interval
  @thread = Thread.new { flush_when_request_times_up }
end

#enqueue(event) ⇒ Object

Adds a new event to the queue and manages batch processing If queue reaches events_per_request limit, it triggers an immediate flush

Parameters:

  • event

    The event to be added to the queue



94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/wingify/services/batch_event_queue.rb', line 94

def enqueue(event)
  @queue.push(event)
  
  LoggerService.log(LogLevelEnum::INFO, "EVENT_QUEUE", {
    queueType: 'batch',
    event: event.to_json
  })

  # if the number of events in the queue is equal to the events_per_request, flush
  if @queue.length >= @events_per_request
    flush
  end
end

#flush(manual = false) ⇒ Object

Processes and sends all queued events Clears the queue after successful processing

Parameters:

  • manual (defaults to: false)

    Boolean indicating if flush was triggered manually



118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# File 'lib/wingify/services/batch_event_queue.rb', line 118

def flush(manual = false)
  @batch_lock.synchronize do
    if @queue.any?
      LoggerService.log(LogLevelEnum::DEBUG, "EVENT_BATCH_BEFORE_FLUSHING", {
        manually: manual ? 'manually' : '',
        length: @queue.length,
        accountId: @batch_config[:account_id],
        timer: manual ? 'Timer will be cleared and registered again' : ''
      })
  
      # add events to another queue
      temp_queue = @queue.dup
      @queue = []
      
      if manual
        future = Concurrent::Future.new(executor: @network_client.get_thread_pool) do
          handle_flush_response(temp_queue, manual)
        end
        future.execute
        @response = future.value
      else
        @network_client.get_thread_pool.post do
          handle_flush_response(temp_queue, manual)
        end
      end
    else
      LoggerService.log(LogLevelEnum::DEBUG, "BATCH_QUEUE_EMPTY")
      @response = {status: "success", events: []}
    end
    kill_old_thread if !manual && @thread
    clear_request_timer
    create_new_batch_timer
    @response
  end
end

#flush_when_request_times_upObject

Background thread function that monitors the timer When the timer expires, it flushes the queue and cleans up



110
111
112
113
# File 'lib/wingify/services/batch_event_queue.rb', line 110

def flush_when_request_times_up
  sleep(1) while @timer && Time.now < @timer
  flush
end