Class: Takagi::EventBus::AsyncExecutor::ThreadExecutor

Inherits:
Object
  • Object
show all
Defined in:
lib/takagi/event_bus/async_executor.rb

Overview

Thread-based executor (default fallback)

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(size:) ⇒ ThreadExecutor

Returns a new instance of ThreadExecutor.



12
13
14
15
16
17
18
# File 'lib/takagi/event_bus/async_executor.rb', line 12

def initialize(size:)
  @size = size.positive? ? size : 1
  @queue = Queue.new
  @threads = []
  @shutdown = false
  start_workers
end

Instance Attribute Details

#sizeObject (readonly)

Returns the value of attribute size.



10
11
12
# File 'lib/takagi/event_bus/async_executor.rb', line 10

def size
  @size
end

Instance Method Details

#post(handler, message) ⇒ Object



20
21
22
23
24
# File 'lib/takagi/event_bus/async_executor.rb', line 20

def post(handler, message)
  raise 'Executor is shutdown' if @shutdown

  @queue << [handler, message]
end

#register_handler(_handler) ⇒ Object



26
# File 'lib/takagi/event_bus/async_executor.rb', line 26

def register_handler(_handler); end

#running?Boolean

Returns:

  • (Boolean)


39
40
41
# File 'lib/takagi/event_bus/async_executor.rb', line 39

def running?
  !@shutdown
end

#shutdownObject



30
31
32
33
34
35
36
37
# File 'lib/takagi/event_bus/async_executor.rb', line 30

def shutdown
  return if @shutdown

  @shutdown = true
  @size.times { @queue << nil }
  @threads.each(&:join)
  @threads.clear
end

#statsObject



43
44
45
# File 'lib/takagi/event_bus/async_executor.rb', line 43

def stats
  { mode: :threads, size: @size }
end

#unregister_handler(_handler) ⇒ Object



28
# File 'lib/takagi/event_bus/async_executor.rb', line 28

def unregister_handler(_handler); end