Class: E2B::Services::LiveEventStream

Inherits:
Object
  • Object
show all
Defined in:
lib/e2b/services/command_handle.rb

Overview

Thread-safe buffer for live command events.

Allows a producer thread to push envd events while the command handle drains them on demand from CommandHandle#each or CommandHandle#wait.

Instance Method Summary collapse

Constructor Details

#initializeLiveEventStream

Returns a new instance of LiveEventStream.



12
13
14
15
16
17
# File 'lib/e2b/services/command_handle.rb', line 12

def initialize
  @mutex = Mutex.new
  @condition = ConditionVariable.new
  @items = []
  @closed = false
end

Instance Method Details

#close(discard_pending: false) ⇒ Object



37
38
39
40
41
42
43
44
45
# File 'lib/e2b/services/command_handle.rb', line 37

def close(discard_pending: false)
  @mutex.synchronize do
    return if @closed

    @closed = true
    @items.clear if discard_pending
    @condition.broadcast
  end
end

#eachObject



47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/e2b/services/command_handle.rb', line 47

def each
  return enum_for(:each) unless block_given?

  loop do
    item = @mutex.synchronize do
      @condition.wait(@mutex) while @items.empty? && !@closed
      @items.shift
    end

    break unless item

    type, payload = item
    raise payload if type == :error

    yield payload
  end
end

#fail(error) ⇒ Object



28
29
30
31
32
33
34
35
# File 'lib/e2b/services/command_handle.rb', line 28

def fail(error)
  @mutex.synchronize do
    return if @closed

    @items << [:error, error]
    @condition.signal
  end
end

#push(event) ⇒ Object



19
20
21
22
23
24
25
26
# File 'lib/e2b/services/command_handle.rb', line 19

def push(event)
  @mutex.synchronize do
    return if @closed

    @items << [:event, event]
    @condition.signal
  end
end