Class: Takagi::Observable::Registry

Inherits:
Object
  • Object
show all
Extended by:
Registry::Base
Defined in:
lib/takagi/observable/registry.rb

Overview

Registry for reactor instances

Manages reactor lifecycle and provides consistent API for reactor management. Uses Registry::Base for thread-safe storage and standard operations.

Examples:

Register reactors

Observable::Registry.register(:iot, IotReactor.new)
Observable::Registry.register(:sensors, SensorReactor.new)

Start all reactors

Observable::Registry.start_all

Access specific reactor

reactor = Observable::Registry.get(:iot)
reactor.notify('/temp', 25.5)

Class Method Summary collapse

Methods included from Registry::Base

[], clear!, count, each, empty?, entries, extended, get, keys, metadata_for, register, registered?, unregister

Class Method Details

.allocate_resources(total_threads: 10) ⇒ Hash

Allocate thread resources across reactors

Distributes available threads among registered reactors based on weights.

Examples:

Registry.allocate_resources(total_threads: 20)
# => { iot: 10, sensors: 6, alerts: 4 }

Parameters:

  • total_threads (Integer) (defaults to: 10)

    Total threads to allocate

Returns:

  • (Hash)

    Map of reactor name => allocated threads



85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/takagi/observable/registry.rb', line 85

def allocate_resources(total_threads: 10)
  reactor_list = reactors
  return {} if reactor_list.empty?

  threads_per_reactor = total_threads / reactor_list.size
  threads_per_reactor = [threads_per_reactor, 1].max

  allocations = {}
  reactor_list.each do |reactor|
    name = keys.find { |k| self[k] == reactor }
    reactor.thread_pool.resize(threads_per_reactor) if reactor.thread_pool.respond_to?(:resize)
    allocations[name] = threads_per_reactor
  end

  Takagi.logger.debug "Allocated #{threads_per_reactor} threads per reactor"
  allocations
end

.reactorsArray<Reactor>

Get all registered reactors

Returns:

  • (Array<Reactor>)

    List of reactor instances



40
41
42
# File 'lib/takagi/observable/registry.rb', line 40

def reactors
  @mutex.synchronize { registry.values }
end

.register(name, reactor, **metadata) ⇒ void

This method returns an undefined value.

Register a reactor instance

Examples:

Registry.register(:sensors, SensorReactor.new, description: 'IoT sensors')

Parameters:

  • name (Symbol)

    Unique identifier for the reactor

  • reactor (Reactor)

    Reactor instance

  • metadata (Hash)

    Optional metadata



33
34
35
# File 'lib/takagi/observable/registry.rb', line 33

def register(name, reactor, **)
  super(name.to_sym, reactor, **)
end

.start_allvoid

This method returns an undefined value.

Start all registered reactors



47
48
49
50
51
52
53
# File 'lib/takagi/observable/registry.rb', line 47

def start_all
  reactor_list = reactors
  reactor_list.each do |reactor|
    reactor.start unless reactor.running?
  end
  Takagi.logger.info "Started #{reactor_list.size} reactor(s)"
end

.statsHash

Get statistics for all reactors

Returns:

  • (Hash)

    Map of reactor name => stats



106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/takagi/observable/registry.rb', line 106

def stats
  result = {}
  each do |name, reactor|
    result[name] = {
      running: reactor.running?,
      observables: reactor.observables.size,
      observers: reactor.observers.size,
      threads: reactor.config[:threads],
      thread_pool_stats: reactor.thread_pool.current_stats
    }
  end
  result
end

.stop_allvoid

This method returns an undefined value.

Stop all registered reactors

Safe to call from signal traps (doesn’t use mutex directly)



60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/takagi/observable/registry.rb', line 60

def stop_all
  # Get reactors list without holding mutex (avoid trap context issues)
  reactor_list = begin
                   @mutex.synchronize { registry.values.dup }
                 rescue ThreadError
                   # If called from trap context, access without lock
                   registry.values.dup
                 end

  reactor_list.each do |reactor|
    reactor.stop if reactor.running?
  end
  Takagi.logger.info "Stopped #{reactor_list.size} reactor(s)"
end