Class: Takagi::Observable::Reactor

Inherits:
Object
  • Object
show all
Defined in:
lib/takagi/observable/reactor.rb

Overview

Coordinator for observables (resources others watch) and observers (resources we watch)

Supports both interval-based polling and event-driven push notifications. Uses thread pools for parallel execution and resource control.

Similar to Controllers, provides a declarative configuration API for better DX.

Examples:

Inline in controller

class SensorController < Takagi::Controller
  reactor do
    observable '/temp' do |emitter|
      emitter.on_event('sensor.temp.changed')
    end
  end
end

Standalone reactor file (declarative)

class IotReactor < Takagi::Observable::Reactor
  configure do
    threads 8
    name 'iot-reactor'
    interval 0.5
  end

  observable '/sensors/temp' do |emitter|
    emitter.on_event('sensor.temp.changed')
  end

  observe 'coap://gateway:5683/commands' do |data|
    handle_command(data)
  end

  def handle_command(data)
    # Your logic here
  end
end

Defined Under Namespace

Classes: ConfigContext

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(threads: nil, name: nil, interval: nil) ⇒ Reactor

Returns a new instance of Reactor.



126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
# File 'lib/takagi/observable/reactor.rb', line 126

def initialize(threads: nil, name: nil, interval: nil)
  # Auto-detect controller by naming convention
  auto_inherit_from_controller unless self.class.config[:from_controller]

  # Merge class config with instance overrides
  class_config = self.class.config
  controller = class_config[:from_controller]

  Takagi.logger.debug "Initializing reactor: #{self.class.name}"
  Takagi.logger.debug "  Associated controller: #{controller&.name || 'none'}"
  Takagi.logger.debug "  Explicit threads param: #{threads.inspect}"
  Takagi.logger.debug "  Class config threads: #{class_config[:threads].inspect}"

  # Determine if we should share controller's thread pool
  # Share if: controller exists AND no explicit thread configuration
  should_share_pool = controller && threads.nil? && class_config[:threads].nil?

  if should_share_pool
    Takagi.logger.debug "  Decision: Share thread pool with controller"

    # Share controller's thread pool (lazy initialization)
    @thread_pool = controller.thread_pool
    @shared_pool = true

    @config = {
      threads: @thread_pool.size,
      name: name || class_config[:name] || default_reactor_name,
      interval: interval || class_config[:interval] || 1.0
    }

    Takagi.logger.info "Reactor '#{@config[:name]}' sharing thread pool with #{controller.name} (#{@thread_pool.size} threads)"
    Takagi.logger.debug "  Shared pool object_id: #{@thread_pool.object_id}"
  else
    Takagi.logger.debug "  Decision: Create independent thread pool"

    # Create independent thread pool
    @shared_pool = false

    @config = {
      threads: threads || class_config[:threads] || 1,
      name: name || class_config[:name] || default_reactor_name,
      interval: interval || class_config[:interval] || 1.0
    }

    @thread_pool = Controller::ThreadPool.new(
      size: @config[:threads],
      name: @config[:name]
    )

    reason = if !controller
               "no associated controller"
             elsif threads
               "explicit threads parameter (#{threads})"
             elsif class_config[:threads]
               "explicit class configuration (#{class_config[:threads]} threads)"
             else
               "unknown reason"
             end

    Takagi.logger.info "Reactor '#{@config[:name]}' created independent thread pool (#{@config[:threads]} threads) - #{reason}"
    Takagi.logger.debug "  Independent pool object_id: #{@thread_pool.object_id}"
  end

  @observables = {}
  @observers = {}
  @running = false
  @watcher = Observer::Watcher.new(interval: @config[:interval])

  Takagi.logger.debug "Reactor initialization complete: #{@config[:name]}"

  # Register class-level observables and observers
  setup_class_definitions
end

Instance Attribute Details

#configObject (readonly)

Returns the value of attribute config.



44
45
46
# File 'lib/takagi/observable/reactor.rb', line 44

def config
  @config
end

#observablesObject (readonly)

Returns the value of attribute observables.



44
45
46
# File 'lib/takagi/observable/reactor.rb', line 44

def observables
  @observables
end

#observersObject (readonly)

Returns the value of attribute observers.



44
45
46
# File 'lib/takagi/observable/reactor.rb', line 44

def observers
  @observers
end

#thread_poolObject (readonly)

Returns the value of attribute thread_pool.



44
45
46
# File 'lib/takagi/observable/reactor.rb', line 44

def thread_pool
  @thread_pool
end

Class Method Details

.configHash

Get or create the reactor’s configuration

Returns:

  • (Hash)

    The reactor’s configuration hash



50
51
52
53
54
55
56
57
# File 'lib/takagi/observable/reactor.rb', line 50

def config
  @config ||= {
    threads: nil,  # Will be set to 1 if no controller found
    name: nil,
    interval: 1.0,
    from_controller: nil
  }
end

.configure { ... } ⇒ Object

Configure the reactor (class-level DSL)

Examples:

configure do
  threads 8
  name 'iot-reactor'
  interval 0.5
end

Yields:

  • Block for configuration DSL



69
70
71
# File 'lib/takagi/observable/reactor.rb', line 69

def configure(&block)
  ConfigContext.new(self).instance_eval(&block) if block
end

.inherit_from(controller_class) ⇒ Object

Inherit settings from a controller

Associates this reactor with a controller for thread pool sharing. Does NOT set threads in config to allow automatic pool sharing.

Examples:

class TelemetryReactor < Takagi::Observable::Reactor
  inherit_from TelemetryController
end

Parameters:

  • controller_class (Class)

    Controller class to inherit from



84
85
86
87
88
# File 'lib/takagi/observable/reactor.rb', line 84

def inherit_from(controller_class)
  config[:from_controller] = controller_class
  # Only set name, NOT threads (to allow pool sharing)
  config[:name] ||= "#{controller_class.name.split('::').last.downcase.gsub('controller', '')}-reactor"
end

.observable(path) { ... } ⇒ Object

Define an observable at class level

Parameters:

  • path (String)

    The resource path

Yields:

  • Block for observable definition



94
95
96
# File 'lib/takagi/observable/reactor.rb', line 94

def observable(path, &block)
  observables[path] = { type: :event_driven, block: block }
end

.observable_polling(path, interval: nil) { ... } ⇒ Object

Define a polling observable at class level

Parameters:

  • path (String)

    The resource path

  • interval (Float) (defaults to: nil)

    Polling interval

Yields:

  • Block for observable definition



103
104
105
# File 'lib/takagi/observable/reactor.rb', line 103

def observable_polling(path, interval: nil, &block)
  observables[path] = { type: :polling, interval: interval, block: block }
end

.observablesObject

Get class-level observables



116
117
118
# File 'lib/takagi/observable/reactor.rb', line 116

def observables
  @observables ||= {}
end

.observe(uri) { ... } ⇒ Object

Define a remote observation at class level

Parameters:

  • uri (String)

    Remote CoAP URI

Yields:

  • Block for handling notifications



111
112
113
# File 'lib/takagi/observable/reactor.rb', line 111

def observe(uri, &block)
  observers[uri] = block
end

.observersObject

Get class-level observers



121
122
123
# File 'lib/takagi/observable/reactor.rb', line 121

def observers
  @observers ||= {}
end

Instance Method Details

#notify(path, value) ⇒ void

This method returns an undefined value.

Manually trigger a notification for an observable

Useful for triggering updates from request handlers or other code.

Examples:

In a controller action

post '/data' do
  process_data(request)
  reactor.notify('/summary', calculate_summary)
end

Parameters:

  • path (String)

    The observable path

  • value (Object)

    The value to send



348
349
350
351
352
# File 'lib/takagi/observable/reactor.rb', line 348

def notify(path, value)
  @thread_pool.schedule do
    Observer::Registry.notify(path, value)
  end
end

#observable(path) {|emitter| ... } ⇒ void

This method returns an undefined value.

Define an event-driven observable (push-based)

The block receives an emitter that can push updates immediately when data changes, without relying on polling.

Examples:

EventBus-driven

observable '/alerts' do |emitter|
  emitter.on_event('alert.critical')
end

Custom events

observable '/temp' do |emitter|
  TempSensor.on_change { |val| emitter.notify(val) }
end

Parameters:

  • path (String)

    The resource path

Yields:

  • (emitter)

    Block that sets up event listeners



283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
# File 'lib/takagi/observable/reactor.rb', line 283

def observable(path, &block)
  emitter = Emitter.new(path)
  @observables[path] = {
    type: :event_driven,
    emitter: emitter,
    block: block
  }

  # Register with router so it can be requested
  Takagi::Base.router.observable(path, &block)

  # Initialize the observable in thread pool
  @thread_pool.schedule do
    block.call(emitter)
  end

  Takagi.logger.debug "Event-driven observable registered: #{path}"
end

#observable_polling(path, interval: ) { ... } ⇒ void

This method returns an undefined value.

Define a polling observable (interval-based)

The block is called periodically at the specified interval. Use this for checking remote resources or when event-driven isn’t possible.

Examples:

Polling remote resource

observable_polling '/external/status', interval: 5.0 do
  check_external_api
end

Parameters:

  • path (String)

    The resource path

  • interval (Float) (defaults to: )

    Seconds between polls (default: reactor’s interval)

Yields:

  • Block that returns the current value



316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
# File 'lib/takagi/observable/reactor.rb', line 316

def observable_polling(path, interval: @config[:interval], &block)
  @observables[path] = {
    type: :polling,
    interval: interval,
    block: block,
    last_run: nil
  }

  # Register with router
  Takagi::Base.router.observable(path, &block)

  # Start polling loop in thread pool
  @thread_pool.schedule do
    poll_observable(path, interval, &block)
  end

  Takagi.logger.debug "Polling observable registered: #{path} (interval: #{interval}s)"
end

#observe(uri) {|payload, inbound| ... } ⇒ void

This method returns an undefined value.

Observe a remote resource

Subscribe to changes on a remote CoAP server.

Examples:

observe 'coap://sensor:5683/temp' do |data|
  process_temperature(data)
end

Parameters:

  • uri (String)

    CoAP URI to observe

Yields:

  • (payload, inbound)

    Block called when notification received



366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
# File 'lib/takagi/observable/reactor.rb', line 366

def observe(uri, &block)
  Takagi.logger.info("Observing remote resource: #{uri}")
  @observers[uri] = { uri: uri, handler: block }

  parsed = URI.parse(uri)
  path = parsed.path

  # Register local subscription
  Observer::Registry.subscribe(
    path,
    address: parsed.host,
    port: parsed.port || 5683,
    token: SecureRandom.hex(2),
    handler: block
  )

  # Start observation in thread pool
  @thread_pool.schedule do
    client = Observer::Client.new(uri)
    client.on_notify(&block)
    client.subscribe
  end
end

#running?Boolean

Check if reactor is running

Returns:

  • (Boolean)


447
448
449
# File 'lib/takagi/observable/reactor.rb', line 447

def running?
  @running
end

#shared_pool?Boolean

Check if reactor is sharing a thread pool with its controller

Returns:

  • (Boolean)


454
455
456
# File 'lib/takagi/observable/reactor.rb', line 454

def shared_pool?
  @shared_pool || false
end

#startvoid

This method returns an undefined value.

Start the reactor

Starts the thread pool and any background watchers.



406
407
408
409
410
411
412
413
414
415
416
# File 'lib/takagi/observable/reactor.rb', line 406

def start
  return if @running

  @running = true
  @watcher.start if has_polling_observables?

  Takagi.logger.info "Reactor '#{@config[:name]}' started with #{@config[:threads]} threads"
  Takagi.logger.debug "  Event-driven observables: #{event_driven_count}"
  Takagi.logger.debug "  Polling observables: #{polling_count}"
  Takagi.logger.debug "  Remote observers: #{@observers.size}"
end

#stopvoid

This method returns an undefined value.

Stop the reactor

Gracefully shuts down the thread pool and watchers. Only shuts down thread pool if we own it (not shared).



424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
# File 'lib/takagi/observable/reactor.rb', line 424

def stop
  return unless @running

  Takagi.logger.debug "Stopping reactor: #{@config[:name]}"
  Takagi.logger.debug "  Shared pool: #{@shared_pool}"

  @running = false
  @watcher.stop if @watcher

  # Only shutdown thread pool if we own it (not shared)
  if @shared_pool
    Takagi.logger.debug "  Skipping thread pool shutdown (shared with controller)"
    Takagi.logger.info "Reactor '#{@config[:name]}' stopped (thread pool remains active for controller)"
  else
    Takagi.logger.debug "  Shutting down independent thread pool"
    @thread_pool.shutdown
    Takagi.logger.info "Reactor '#{@config[:name]}' stopped (thread pool shutdown)"
  end
end

#trigger_observe(uri, value) ⇒ void

This method returns an undefined value.

Trigger an observe notification manually

Parameters:

  • uri (String)

    The URI to trigger

  • value (Object)

    The value



395
396
397
398
399
# File 'lib/takagi/observable/reactor.rb', line 395

def trigger_observe(uri, value)
  path = URI.parse(uri).path
  Takagi.logger.debug "Trigger observe for path: #{path} with value: #{value}"
  Observer::Registry.notify(path, value)
end