Class: Takagi::Observable::Registry
- Inherits:
-
Object
- Object
- Takagi::Observable::Registry
- Extended by:
- Registry::Base
- Defined in:
- lib/takagi/observable/registry.rb
Overview
Registry for reactor instances
Manages reactor lifecycle and provides consistent API for reactor management. Uses Registry::Base for thread-safe storage and standard operations.
Class Method Summary collapse
-
.allocate_resources(total_threads: 10) ⇒ Hash
Allocate thread resources across reactors.
-
.reactors ⇒ Array<Reactor>
Get all registered reactors.
-
.register(name, reactor, **metadata) ⇒ void
Register a reactor instance.
-
.start_all ⇒ void
Start all registered reactors.
-
.stats ⇒ Hash
Get statistics for all reactors.
-
.stop_all ⇒ void
Stop all registered reactors.
Methods included from Registry::Base
[], clear!, count, each, empty?, entries, extended, get, keys, metadata_for, register, registered?, unregister
Class Method Details
.allocate_resources(total_threads: 10) ⇒ Hash
Allocate thread resources across reactors
Distributes available threads among registered reactors based on weights.
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 |
# File 'lib/takagi/observable/registry.rb', line 85 def allocate_resources(total_threads: 10) reactor_list = reactors return {} if reactor_list.empty? threads_per_reactor = total_threads / reactor_list.size threads_per_reactor = [threads_per_reactor, 1].max allocations = {} reactor_list.each do |reactor| name = keys.find { |k| self[k] == reactor } reactor.thread_pool.resize(threads_per_reactor) if reactor.thread_pool.respond_to?(:resize) allocations[name] = threads_per_reactor end Takagi.logger.debug "Allocated #{threads_per_reactor} threads per reactor" allocations end |
.reactors ⇒ Array<Reactor>
Get all registered reactors
40 41 42 |
# File 'lib/takagi/observable/registry.rb', line 40 def reactors @mutex.synchronize { registry.values } end |
.register(name, reactor, **metadata) ⇒ void
This method returns an undefined value.
Register a reactor instance
33 34 35 |
# File 'lib/takagi/observable/registry.rb', line 33 def register(name, reactor, **) super(name.to_sym, reactor, **) end |
.start_all ⇒ void
This method returns an undefined value.
Start all registered reactors
47 48 49 50 51 52 53 |
# File 'lib/takagi/observable/registry.rb', line 47 def start_all reactor_list = reactors reactor_list.each do |reactor| reactor.start unless reactor.running? end Takagi.logger.info "Started #{reactor_list.size} reactor(s)" end |
.stats ⇒ Hash
Get statistics for all reactors
106 107 108 109 110 111 112 113 114 115 116 117 118 |
# File 'lib/takagi/observable/registry.rb', line 106 def stats result = {} each do |name, reactor| result[name] = { running: reactor.running?, observables: reactor.observables.size, observers: reactor.observers.size, threads: reactor.config[:threads], thread_pool_stats: reactor.thread_pool.current_stats } end result end |
.stop_all ⇒ void
This method returns an undefined value.
Stop all registered reactors
Safe to call from signal traps (doesn’t use mutex directly)
60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/takagi/observable/registry.rb', line 60 def stop_all # Get reactors list without holding mutex (avoid trap context issues) reactor_list = begin @mutex.synchronize { registry.values.dup } rescue ThreadError # If called from trap context, access without lock registry.values.dup end reactor_list.each do |reactor| reactor.stop if reactor.running? end Takagi.logger.info "Stopped #{reactor_list.size} reactor(s)" end |