Class: Takagi::EventBus::AsyncExecutor::ProcessExecutor

Inherits:
Object
  • Object
show all
Defined in:
lib/takagi/event_bus/async_executor.rb

Overview

Process-based executor for multi-reactor workloads

Defined Under Namespace

Classes: Job

Instance Method Summary collapse

Constructor Details

#initialize(processes:, threads:) ⇒ ProcessExecutor

Returns a new instance of ProcessExecutor.



73
74
75
76
77
78
79
80
# File 'lib/takagi/event_bus/async_executor.rb', line 73

def initialize(processes:, threads:)
  @processes = processes.positive? ? processes : 0
  @threads = threads
  @mutex = Mutex.new
  @jobs = []
  @next_index = 0
  @needs_restart = false
end

Instance Method Details

#post(handler, message) ⇒ Object



82
83
84
85
# File 'lib/takagi/event_bus/async_executor.rb', line 82

def post(handler, message)
  ensure_running
  dispatch(handler, message)
end

#register_handler(_handler) ⇒ Object

Mark for restart so new handlers are visible in workers



88
89
90
# File 'lib/takagi/event_bus/async_executor.rb', line 88

def register_handler(_handler)
  mark_restart_needed
end

#shutdownObject



96
97
98
# File 'lib/takagi/event_bus/async_executor.rb', line 96

def shutdown
  @mutex.synchronize { shutdown_workers }
end

#statsObject



100
101
102
# File 'lib/takagi/event_bus/async_executor.rb', line 100

def stats
  { mode: :processes, size: @jobs.size }
end

#unregister_handler(_handler) ⇒ Object



92
93
94
# File 'lib/takagi/event_bus/async_executor.rb', line 92

def unregister_handler(_handler)
  mark_restart_needed
end