Class: Takagi::EventBus::CoAPBridge

Inherits:
Object
  • Object
show all
Defined in:
lib/takagi/event_bus/coap_bridge.rb

Overview

CoAP Observe integration - maps events to CoAP resources Thread-safe with Mutex for resource registration

Maps EventBus addresses to CoAP observable resources:

  • “sensor.temperature.room1” -> “/events/sensor/temperature/room1”

Examples:

CoAPBridge.register_observable_resource('sensor.temp.room1', app)
CoAPBridge.publish_to_observers('sensor.temp.room1', message)

Class Method Summary collapse

Class Method Details

.address_to_path(address) ⇒ String

Convert event address to CoAP path

Examples:

CoAPBridge.address_to_path('sensor.temperature.room1')
# => "/events/sensor/temperature/room1"

Parameters:

  • address (String)

    Event address (e.g., “sensor.temperature.room1”)

Returns:

  • (String)

    CoAP path (e.g., “/events/sensor/temperature/room1”)



29
30
31
# File 'lib/takagi/event_bus/coap_bridge.rb', line 29

def address_to_path(address)
  "/events/#{address.gsub('.', '/')}"
end

.clearObject

Clear all registrations (for testing)



200
201
202
203
204
# File 'lib/takagi/event_bus/coap_bridge.rb', line 200

def clear
  @mutex.synchronize do
    @registered_resources.clear
  end
end

.path_to_address(path) ⇒ String

Convert CoAP path to event address

Examples:

CoAPBridge.path_to_address('/events/sensor/temperature/room1')
# => "sensor.temperature.room1"

Parameters:

  • path (String)

    CoAP path (e.g., “/events/sensor/temperature/room1”)

Returns:

  • (String)

    Event address (e.g., “sensor.temperature.room1”)



40
41
42
# File 'lib/takagi/event_bus/coap_bridge.rb', line 40

def path_to_address(path)
  path.sub(%r{^/events/}, '').gsub('/', '.')
end

.publish_to_observers(address, message) ⇒ Object

Publish event to all CoAP observers Notifies all observers subscribed via CoAP Observe

Examples:

message = EventBus::Message.new('sensor.temp.room1', { value: 25.5 })
CoAPBridge.publish_to_observers('sensor.temp.room1', message)

Parameters:



103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# File 'lib/takagi/event_bus/coap_bridge.rb', line 103

def publish_to_observers(address, message)
  path = address_to_path(address)

  # Build notification payload
  state = {
    address: address,
    body: message.body,
    headers: message.headers,
    timestamp: message.timestamp.to_i
  }

  # Notify via ObserveRegistry
  # ObserveRegistry will send CoAP notifications to all observers
  if defined?(Takagi::ObserveRegistry)
    Takagi::ObserveRegistry.notify(path, state)
  else
    Takagi.logger.warn 'ObserveRegistry not available, cannot publish to observers'
  end
rescue StandardError => e
  Takagi.logger.error "Error publishing to observers for #{address}: #{e.message}"

  # Store failed message for retry if message buffering is enabled
  # This allows recovery when remote nodes reconnect
  EventBus.instance_variable_get(:@message_store)&.store_failed(
    address,
    message,
    "coap_observers:#{path}"
  )
end

.register_observable_resource(address, app) ⇒ Boolean

Auto-register observable CoAP resource (thread-safe) Creates a CoAP observable endpoint that returns current state

Uses AddressPrefix registry to determine if address should be distributed

Examples:

CoAPBridge.register_observable_resource('sensor.temp.room1', MyApp)

Parameters:

  • address (String)

    Event address

  • app (Class)

    Application class (must respond to #observable)

Returns:

  • (Boolean)

    True if registered, false if already exists



55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/takagi/event_bus/coap_bridge.rb', line 55

def register_observable_resource(address, app) # rubocop:disable Metrics/MethodLength
  # Only register distributed addresses (uses AddressPrefix registry)
  return false unless AddressPrefix.distributed?(address)

  # Thread-safe check-and-register
  @mutex.synchronize do
    # Already registered?
    return false if @registered_resources.include?(address)

    # Mark as registered before creating resource
    @registered_resources << address

    path = address_to_path(address)

    begin
      # Create observable CoAP endpoint
      # This endpoint returns current state when polled
      app.observable path do |_req|
        # Return current state from EventBus
        current_state = EventBus.current_state(address)

        current_state || {
          address: address,
          status: 'observable',
          timestamp: Time.now.to_i
        }
      end

      Takagi.logger.info "Observable resource created: #{path} (#{address})"
      true
    rescue StandardError => e
      # If registration fails, remove from set
      @registered_resources.delete(address)
      Takagi.logger.error "Failed to register observable resource #{address}: #{e.message}"
      false
    end
  end
end

.registered?(address) ⇒ Boolean

Check if resource is registered

Parameters:

  • address (String)

    Event address

Returns:

  • (Boolean)


172
173
174
# File 'lib/takagi/event_bus/coap_bridge.rb', line 172

def registered?(address)
  @mutex.synchronize { @registered_resources.include?(address) }
end

.registered_addressesArray<String>

Get all registered resource addresses

Returns:

  • (Array<String>)


178
179
180
# File 'lib/takagi/event_bus/coap_bridge.rb', line 178

def registered_addresses
  @mutex.synchronize { @registered_resources.to_a }
end

.registered_countInteger

Get count of registered resources

Returns:

  • (Integer)


184
185
186
# File 'lib/takagi/event_bus/coap_bridge.rb', line 184

def registered_count
  @mutex.synchronize { @registered_resources.size }
end

.subscribe_remote(address, node_url) {|message| ... } ⇒ String

Subscribe to remote event via CoAP Observe

Examples:

id = CoAPBridge.subscribe_remote('sensor.temp.buildingA', 'coap://building-a:5683') do |msg|
  puts "Remote temp: #{msg.body[:value]}"
end

Parameters:

  • address (String)

    Event address

  • node_url (String)

    Remote node URL (e.g., ‘coap://building-a:5683’)

Yields:

  • (message)

    Block called when remote notification received

Returns:

  • (String)

    Subscription ID



143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
# File 'lib/takagi/event_bus/coap_bridge.rb', line 143

def subscribe_remote(address, node_url)
  path = address_to_path(address)
  full_url = "#{node_url}#{path}"

  # TODO: Implement using Takagi::Observer::Client or similar
  # This requires:
  # 1. CoAP client that supports OBSERVE requests
  # 2. Registration of observer with remote server
  # 3. Handling of notification messages
  # 4. Conversion of CoAP notifications back to EventBus messages
  #
  # Placeholder implementation:
  subscription_id = SecureRandom.uuid

  Takagi.logger.warn "Remote subscription not yet fully implemented: #{full_url}"
  Takagi.logger.info "Subscription ID: #{subscription_id} for #{address} at #{node_url}"

  # When implemented, should:
  # 1. Create CoAP observer client
  # 2. Send OBSERVE request to full_url
  # 3. Register callback that converts CoAP notifications to EventBus messages
  # 4. Call the provided block with the converted message

  subscription_id
end

.unregister(address) ⇒ Boolean

Unregister a resource (for testing)

Parameters:

  • address (String)

    Event address

Returns:

  • (Boolean)

    True if was registered, false otherwise



191
192
193
194
195
196
197
# File 'lib/takagi/event_bus/coap_bridge.rb', line 191

def unregister(address)
  @mutex.synchronize do
    # Set#delete? returns the set if element was present, nil if not
    # Convert to boolean: true if was present, false if not
    !!@registered_resources.delete?(address)
  end
end