Class: Takagi::Observable::Emitter

Inherits:
Object
  • Object
show all
Defined in:
lib/takagi/observable/emitter.rb

Overview

Emitter for push-based observable updates

Allows observables to push notifications immediately when data changes, rather than relying on polling intervals.

Examples:

Event-driven observable

observable '/sensor/temp' do |emitter|
  TempSensor.on_change do |reading|
    emitter.notify({ temp: reading, unit: 'C' })
  end
end

EventBus integration

observable '/alerts' do |emitter|
  emitter.on_event('alert.critical') do |event|
    { level: event.severity, message: event.message }
  end
end

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(path) ⇒ Emitter

Returns a new instance of Emitter.



26
27
28
# File 'lib/takagi/observable/emitter.rb', line 26

def initialize(path)
  @path = path
end

Instance Attribute Details

#pathObject (readonly)

Returns the value of attribute path.



24
25
26
# File 'lib/takagi/observable/emitter.rb', line 24

def path
  @path
end

Instance Method Details

#notify(value) ⇒ void

This method returns an undefined value.

Push a notification to all observers of this path

Examples:

emitter.notify({ temp: 25.5, unit: 'C' })

Parameters:

  • value (Object)

    The value to send to observers



37
38
39
# File 'lib/takagi/observable/emitter.rb', line 37

def notify(value)
  Takagi::Observer::Registry.notify(@path, value)
end

#on_event(address) {|event| ... } ⇒ void

This method returns an undefined value.

Subscribe to EventBus events and forward to observers

Examples:

Direct forwarding

emitter.on_event('sensor.temp.changed')

With transformation

emitter.on_event('sensor.temp.changed') do |event|
  { temp: event.data[:celsius], timestamp: event.timestamp }
end

Parameters:

  • address (String)

    EventBus address to subscribe to

Yields:

  • (event)

    Optional transform block



54
55
56
57
58
59
# File 'lib/takagi/observable/emitter.rb', line 54

def on_event(address, &transform)
  Takagi::EventBus.subscribe(address) do |event|
    value = transform ? transform.call(event) : event.data
    notify(value)
  end
end