Class: Pulsar::Internal::ThreadRuntime

Inherits:
Object
  • Object
show all
Defined in:
lib/pulsar/internal/thread_runtime.rb

Overview

Tracks background threads and queues for coordinated shutdown.

Instance Method Summary collapse

Constructor Details

#initializeThreadRuntime

Returns a new instance of ThreadRuntime.



7
8
9
10
11
12
# File 'lib/pulsar/internal/thread_runtime.rb', line 7

def initialize
  @mutex = Mutex.new
  @threads = []
  @queues = []
  @shutdown = false
end

Instance Method Details

#promiseObject



14
15
16
# File 'lib/pulsar/internal/thread_runtime.rb', line 14

def promise
  Promise.new
end

#queue(capacity:) ⇒ Object

Raises:



18
19
20
21
22
23
24
# File 'lib/pulsar/internal/thread_runtime.rb', line 18

def queue(capacity:)
  raise ClosedError, 'runtime is shut down' if shutdown?

  BoundedQueue.new(capacity: capacity).tap do |queue|
    @mutex.synchronize { @queues << queue }
  end
end

#shutdownObject



34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/pulsar/internal/thread_runtime.rb', line 34

def shutdown
  threads, queues = @mutex.synchronize do
    @shutdown = true
    [@threads.dup, @queues.dup].tap do
      @threads.clear
      @queues.clear
    end
  end

  queues.each(&:close)
  threads.each(&:kill)
  threads.each { |thread| thread.join(0.1) }
  nil
end

#shutdown?Boolean

Returns:

  • (Boolean)


49
50
51
# File 'lib/pulsar/internal/thread_runtime.rb', line 49

def shutdown?
  @mutex.synchronize { @shutdown }
end

#spawn(&block) ⇒ Object

Raises:



26
27
28
29
30
31
32
# File 'lib/pulsar/internal/thread_runtime.rb', line 26

def spawn(&block)
  raise ClosedError, 'runtime is shut down' if shutdown?

  thread = Thread.new(&block)
  @mutex.synchronize { @threads << thread }
  thread
end