Class: E2B::Services::LiveEventStream
- Inherits:
-
Object
- Object
- E2B::Services::LiveEventStream
- Defined in:
- lib/e2b/services/command_handle.rb
Overview
Thread-safe buffer for live command events.
Allows a producer thread to push envd events while the command handle drains them on demand from CommandHandle#each or CommandHandle#wait.
Instance Method Summary collapse
- #close(discard_pending: false) ⇒ Object
- #each ⇒ Object
- #fail(error) ⇒ Object
-
#initialize ⇒ LiveEventStream
constructor
A new instance of LiveEventStream.
- #push(event) ⇒ Object
Constructor Details
#initialize ⇒ LiveEventStream
Returns a new instance of LiveEventStream.
12 13 14 15 16 17 |
# File 'lib/e2b/services/command_handle.rb', line 12 def initialize @mutex = Mutex.new @condition = ConditionVariable.new @items = [] @closed = false end |
Instance Method Details
#close(discard_pending: false) ⇒ Object
37 38 39 40 41 42 43 44 45 |
# File 'lib/e2b/services/command_handle.rb', line 37 def close(discard_pending: false) @mutex.synchronize do return if @closed @closed = true @items.clear if discard_pending @condition.broadcast end end |
#each ⇒ Object
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/e2b/services/command_handle.rb', line 47 def each return enum_for(:each) unless block_given? loop do item = @mutex.synchronize do @condition.wait(@mutex) while @items.empty? && !@closed @items.shift end break unless item type, payload = item raise payload if type == :error yield payload end end |
#fail(error) ⇒ Object
28 29 30 31 32 33 34 35 |
# File 'lib/e2b/services/command_handle.rb', line 28 def fail(error) @mutex.synchronize do return if @closed @items << [:error, error] @condition.signal end end |
#push(event) ⇒ Object
19 20 21 22 23 24 25 26 |
# File 'lib/e2b/services/command_handle.rb', line 19 def push(event) @mutex.synchronize do return if @closed @items << [:event, event] @condition.signal end end |