Module: Legion::API::Routes::Events

Defined in:
lib/legion/api/events.rb

Constant Summary collapse

BUFFER_SIZE =
100
SSE_STOP =
Object.new.freeze

Class Method Summary collapse

Class Method Details

.buffer_mutexObject



15
16
17
# File 'lib/legion/api/events.rb', line 15

def buffer_mutex
  @buffer_mutex ||= Mutex.new
end

.event_bufferObject



11
12
13
# File 'lib/legion/api/events.rb', line 11

def event_buffer
  @event_buffer ||= []
end

.install_listenerObject



32
33
34
35
36
37
38
39
40
# File 'lib/legion/api/events.rb', line 32

def install_listener
  return if @listener_installed
  return unless defined?(Legion::Events)

  Legion::Events.on('*') do |event|
    push_event(event.transform_keys(&:to_s))
  end
  @listener_installed = true
end

.push_event(event) ⇒ Object



19
20
21
22
23
24
# File 'lib/legion/api/events.rb', line 19

def push_event(event)
  buffer_mutex.synchronize do
    event_buffer.push(event)
    event_buffer.shift if event_buffer.length > BUFFER_SIZE
  end
end

.recent_events(count = 25) ⇒ Object



26
27
28
29
30
# File 'lib/legion/api/events.rb', line 26

def recent_events(count = 25)
  buffer_mutex.synchronize do
    event_buffer.last(count)
  end
end

.registered(app) ⇒ Object



78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/legion/api/events.rb', line 78

def registered(app)
  install_listener if defined?(Legion::Events)

  app.get '/api/events' do
    content_type 'text/event-stream'
    headers 'Cache-Control'     => 'no-cache',
            'Connection'        => 'keep-alive',
            'X-Accel-Buffering' => 'no'

    queue = Queue.new
    listener = Legion::Events.on('*') do |event|
      queue.push(event)
    end

    stream do |out|
      Routes::Events.stream_queue(out: out, queue: queue, listener: listener)
    end
  end

  app.get '/api/events/recent' do
    count = (params[:count] || 25).to_i
    count = [count, BUFFER_SIZE].min
    events = Events.recent_events(count)
    json_response(events)
  end
end

.stop_queue_stream(queue:, worker:, listener:) ⇒ Object



47
48
49
50
51
52
53
54
55
# File 'lib/legion/api/events.rb', line 47

def stop_queue_stream(queue:, worker:, listener:)
  Legion::Events.off('*', listener) if defined?(Legion::Events)
  return unless worker&.alive?

  queue.push(SSE_STOP)
  worker.join(0.1)
rescue ThreadError, IOError, Errno::EPIPE => e
  Legion::Logging.debug("Events SSE cleanup failed: #{e.message}") if defined?(Legion::Logging)
end

.stream_queue(out:, queue:, listener:) ⇒ Object



57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/legion/api/events.rb', line 57

def stream_queue(out:, queue:, listener:)
  worker = Thread.new do
    loop do
      event = queue.pop
      break if event.equal?(SSE_STOP)

      write_sse_event(out, event)
    rescue IOError, Errno::EPIPE => e
      Legion::Logging.debug("Events SSE stream broken for #{event[:event]}: #{e.message}") if defined?(Legion::Logging)
      break
    end
  ensure
    Legion::Events.off('*', listener) if defined?(Legion::Events)
  end

  cleanup = proc { stop_queue_stream(queue: queue, worker: worker, listener: listener) }
  out.callback(&cleanup)
  out.errback(&cleanup)
  worker
end

.write_sse_event(out, event) ⇒ Object



42
43
44
45
# File 'lib/legion/api/events.rb', line 42

def write_sse_event(out, event)
  payload = event.transform_keys(&:to_s)
  out << "event: #{payload['event']}\ndata: #{Legion::JSON.dump(payload)}\n\n"
end