Class: Takagi::Observable::Reactor
- Inherits:
-
Object
- Object
- Takagi::Observable::Reactor
- Defined in:
- lib/takagi/observable/reactor.rb
Overview
Coordinator for observables (resources others watch) and observers (resources we watch)
Supports both interval-based polling and event-driven push notifications. Uses thread pools for parallel execution and resource control.
Similar to Controllers, provides a declarative configuration API for better DX.
Defined Under Namespace
Classes: ConfigContext
Instance Attribute Summary collapse
-
#config ⇒ Object
readonly
Returns the value of attribute config.
-
#observables ⇒ Object
readonly
Returns the value of attribute observables.
-
#observers ⇒ Object
readonly
Returns the value of attribute observers.
-
#thread_pool ⇒ Object
readonly
Returns the value of attribute thread_pool.
Class Method Summary collapse
-
.config ⇒ Hash
Get or create the reactor’s configuration.
-
.configure { ... } ⇒ Object
Configure the reactor (class-level DSL).
-
.inherit_from(controller_class) ⇒ Object
Inherit settings from a controller.
-
.observable(path) { ... } ⇒ Object
Define an observable at class level.
-
.observable_polling(path, interval: nil) { ... } ⇒ Object
Define a polling observable at class level.
-
.observables ⇒ Object
Get class-level observables.
-
.observe(uri) { ... } ⇒ Object
Define a remote observation at class level.
-
.observers ⇒ Object
Get class-level observers.
Instance Method Summary collapse
-
#initialize(threads: nil, name: nil, interval: nil) ⇒ Reactor
constructor
A new instance of Reactor.
-
#notify(path, value) ⇒ void
Manually trigger a notification for an observable.
-
#observable(path) {|emitter| ... } ⇒ void
Define an event-driven observable (push-based).
-
#observable_polling(path, interval: ) { ... } ⇒ void
Define a polling observable (interval-based).
-
#observe(uri) {|payload, inbound| ... } ⇒ void
Observe a remote resource.
-
#running? ⇒ Boolean
Check if reactor is running.
-
#shared_pool? ⇒ Boolean
Check if reactor is sharing a thread pool with its controller.
-
#start ⇒ void
Start the reactor.
-
#stop ⇒ void
Stop the reactor.
-
#trigger_observe(uri, value) ⇒ void
Trigger an observe notification manually.
Constructor Details
#initialize(threads: nil, name: nil, interval: nil) ⇒ Reactor
Returns a new instance of Reactor.
126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 |
# File 'lib/takagi/observable/reactor.rb', line 126 def initialize(threads: nil, name: nil, interval: nil) # Auto-detect controller by naming convention auto_inherit_from_controller unless self.class.config[:from_controller] # Merge class config with instance overrides class_config = self.class.config controller = class_config[:from_controller] Takagi.logger.debug "Initializing reactor: #{self.class.name}" Takagi.logger.debug " Associated controller: #{controller&.name || 'none'}" Takagi.logger.debug " Explicit threads param: #{threads.inspect}" Takagi.logger.debug " Class config threads: #{class_config[:threads].inspect}" # Determine if we should share controller's thread pool # Share if: controller exists AND no explicit thread configuration should_share_pool = controller && threads.nil? && class_config[:threads].nil? if should_share_pool Takagi.logger.debug " Decision: Share thread pool with controller" # Share controller's thread pool (lazy initialization) @thread_pool = controller.thread_pool @shared_pool = true @config = { threads: @thread_pool.size, name: name || class_config[:name] || default_reactor_name, interval: interval || class_config[:interval] || 1.0 } Takagi.logger.info "Reactor '#{@config[:name]}' sharing thread pool with #{controller.name} (#{@thread_pool.size} threads)" Takagi.logger.debug " Shared pool object_id: #{@thread_pool.object_id}" else Takagi.logger.debug " Decision: Create independent thread pool" # Create independent thread pool @shared_pool = false @config = { threads: threads || class_config[:threads] || 1, name: name || class_config[:name] || default_reactor_name, interval: interval || class_config[:interval] || 1.0 } @thread_pool = Controller::ThreadPool.new( size: @config[:threads], name: @config[:name] ) reason = if !controller "no associated controller" elsif threads "explicit threads parameter (#{threads})" elsif class_config[:threads] "explicit class configuration (#{class_config[:threads]} threads)" else "unknown reason" end Takagi.logger.info "Reactor '#{@config[:name]}' created independent thread pool (#{@config[:threads]} threads) - #{reason}" Takagi.logger.debug " Independent pool object_id: #{@thread_pool.object_id}" end @observables = {} @observers = {} @running = false @watcher = Observer::Watcher.new(interval: @config[:interval]) Takagi.logger.debug "Reactor initialization complete: #{@config[:name]}" # Register class-level observables and observers setup_class_definitions end |
Instance Attribute Details
#config ⇒ Object (readonly)
Returns the value of attribute config.
44 45 46 |
# File 'lib/takagi/observable/reactor.rb', line 44 def config @config end |
#observables ⇒ Object (readonly)
Returns the value of attribute observables.
44 45 46 |
# File 'lib/takagi/observable/reactor.rb', line 44 def observables @observables end |
#observers ⇒ Object (readonly)
Returns the value of attribute observers.
44 45 46 |
# File 'lib/takagi/observable/reactor.rb', line 44 def observers @observers end |
#thread_pool ⇒ Object (readonly)
Returns the value of attribute thread_pool.
44 45 46 |
# File 'lib/takagi/observable/reactor.rb', line 44 def thread_pool @thread_pool end |
Class Method Details
.config ⇒ Hash
Get or create the reactor’s configuration
50 51 52 53 54 55 56 57 |
# File 'lib/takagi/observable/reactor.rb', line 50 def config @config ||= { threads: nil, # Will be set to 1 if no controller found name: nil, interval: 1.0, from_controller: nil } end |
.configure { ... } ⇒ Object
Configure the reactor (class-level DSL)
69 70 71 |
# File 'lib/takagi/observable/reactor.rb', line 69 def configure(&block) ConfigContext.new(self).instance_eval(&block) if block end |
.inherit_from(controller_class) ⇒ Object
Inherit settings from a controller
Associates this reactor with a controller for thread pool sharing. Does NOT set threads in config to allow automatic pool sharing.
84 85 86 87 88 |
# File 'lib/takagi/observable/reactor.rb', line 84 def inherit_from(controller_class) config[:from_controller] = controller_class # Only set name, NOT threads (to allow pool sharing) config[:name] ||= "#{controller_class.name.split('::').last.downcase.gsub('controller', '')}-reactor" end |
.observable(path) { ... } ⇒ Object
Define an observable at class level
94 95 96 |
# File 'lib/takagi/observable/reactor.rb', line 94 def observable(path, &block) observables[path] = { type: :event_driven, block: block } end |
.observable_polling(path, interval: nil) { ... } ⇒ Object
Define a polling observable at class level
103 104 105 |
# File 'lib/takagi/observable/reactor.rb', line 103 def observable_polling(path, interval: nil, &block) observables[path] = { type: :polling, interval: interval, block: block } end |
.observables ⇒ Object
Get class-level observables
116 117 118 |
# File 'lib/takagi/observable/reactor.rb', line 116 def observables @observables ||= {} end |
.observe(uri) { ... } ⇒ Object
Define a remote observation at class level
111 112 113 |
# File 'lib/takagi/observable/reactor.rb', line 111 def observe(uri, &block) observers[uri] = block end |
.observers ⇒ Object
Get class-level observers
121 122 123 |
# File 'lib/takagi/observable/reactor.rb', line 121 def observers @observers ||= {} end |
Instance Method Details
#notify(path, value) ⇒ void
This method returns an undefined value.
Manually trigger a notification for an observable
Useful for triggering updates from request handlers or other code.
348 349 350 351 352 |
# File 'lib/takagi/observable/reactor.rb', line 348 def notify(path, value) @thread_pool.schedule do Observer::Registry.notify(path, value) end end |
#observable(path) {|emitter| ... } ⇒ void
This method returns an undefined value.
Define an event-driven observable (push-based)
The block receives an emitter that can push updates immediately when data changes, without relying on polling.
283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 |
# File 'lib/takagi/observable/reactor.rb', line 283 def observable(path, &block) emitter = Emitter.new(path) @observables[path] = { type: :event_driven, emitter: emitter, block: block } # Register with router so it can be requested Takagi::Base.router.observable(path, &block) # Initialize the observable in thread pool @thread_pool.schedule do block.call(emitter) end Takagi.logger.debug "Event-driven observable registered: #{path}" end |
#observable_polling(path, interval: ) { ... } ⇒ void
This method returns an undefined value.
Define a polling observable (interval-based)
The block is called periodically at the specified interval. Use this for checking remote resources or when event-driven isn’t possible.
316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 |
# File 'lib/takagi/observable/reactor.rb', line 316 def observable_polling(path, interval: @config[:interval], &block) @observables[path] = { type: :polling, interval: interval, block: block, last_run: nil } # Register with router Takagi::Base.router.observable(path, &block) # Start polling loop in thread pool @thread_pool.schedule do poll_observable(path, interval, &block) end Takagi.logger.debug "Polling observable registered: #{path} (interval: #{interval}s)" end |
#observe(uri) {|payload, inbound| ... } ⇒ void
This method returns an undefined value.
Observe a remote resource
Subscribe to changes on a remote CoAP server.
366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 |
# File 'lib/takagi/observable/reactor.rb', line 366 def observe(uri, &block) Takagi.logger.info("Observing remote resource: #{uri}") @observers[uri] = { uri: uri, handler: block } parsed = URI.parse(uri) path = parsed.path # Register local subscription Observer::Registry.subscribe( path, address: parsed.host, port: parsed.port || 5683, token: SecureRandom.hex(2), handler: block ) # Start observation in thread pool @thread_pool.schedule do client = Observer::Client.new(uri) client.on_notify(&block) client.subscribe end end |
#running? ⇒ Boolean
Check if reactor is running
447 448 449 |
# File 'lib/takagi/observable/reactor.rb', line 447 def running? @running end |
#shared_pool? ⇒ Boolean
Check if reactor is sharing a thread pool with its controller
454 455 456 |
# File 'lib/takagi/observable/reactor.rb', line 454 def shared_pool? @shared_pool || false end |
#start ⇒ void
This method returns an undefined value.
Start the reactor
Starts the thread pool and any background watchers.
406 407 408 409 410 411 412 413 414 415 416 |
# File 'lib/takagi/observable/reactor.rb', line 406 def start return if @running @running = true @watcher.start if has_polling_observables? Takagi.logger.info "Reactor '#{@config[:name]}' started with #{@config[:threads]} threads" Takagi.logger.debug " Event-driven observables: #{event_driven_count}" Takagi.logger.debug " Polling observables: #{polling_count}" Takagi.logger.debug " Remote observers: #{@observers.size}" end |
#stop ⇒ void
This method returns an undefined value.
Stop the reactor
Gracefully shuts down the thread pool and watchers. Only shuts down thread pool if we own it (not shared).
424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 |
# File 'lib/takagi/observable/reactor.rb', line 424 def stop return unless @running Takagi.logger.debug "Stopping reactor: #{@config[:name]}" Takagi.logger.debug " Shared pool: #{@shared_pool}" @running = false @watcher.stop if @watcher # Only shutdown thread pool if we own it (not shared) if @shared_pool Takagi.logger.debug " Skipping thread pool shutdown (shared with controller)" Takagi.logger.info "Reactor '#{@config[:name]}' stopped (thread pool remains active for controller)" else Takagi.logger.debug " Shutting down independent thread pool" @thread_pool.shutdown Takagi.logger.info "Reactor '#{@config[:name]}' stopped (thread pool shutdown)" end end |
#trigger_observe(uri, value) ⇒ void
This method returns an undefined value.
Trigger an observe notification manually
395 396 397 398 399 |
# File 'lib/takagi/observable/reactor.rb', line 395 def trigger_observe(uri, value) path = URI.parse(uri).path Takagi.logger.debug "Trigger observe for path: #{path} with value: #{value}" Observer::Registry.notify(path, value) end |