Class: Takagi::EventBus::CoAPBridge
- Inherits:
-
Object
- Object
- Takagi::EventBus::CoAPBridge
- Defined in:
- lib/takagi/event_bus/coap_bridge.rb
Overview
CoAP Observe integration - maps events to CoAP resources Thread-safe with Mutex for resource registration
Maps EventBus addresses to CoAP observable resources:
-
“sensor.temperature.room1” -> “/events/sensor/temperature/room1”
Class Method Summary collapse
-
.address_to_path(address) ⇒ String
Convert event address to CoAP path.
-
.clear ⇒ Object
Clear all registrations (for testing).
-
.path_to_address(path) ⇒ String
Convert CoAP path to event address.
-
.publish_to_observers(address, message) ⇒ Object
Publish event to all CoAP observers Notifies all observers subscribed via CoAP Observe.
-
.register_observable_resource(address, app) ⇒ Boolean
Auto-register observable CoAP resource (thread-safe) Creates a CoAP observable endpoint that returns current state.
-
.registered?(address) ⇒ Boolean
Check if resource is registered.
-
.registered_addresses ⇒ Array<String>
Get all registered resource addresses.
-
.registered_count ⇒ Integer
Get count of registered resources.
-
.subscribe_remote(address, node_url) {|message| ... } ⇒ String
Subscribe to remote event via CoAP Observe.
-
.unregister(address) ⇒ Boolean
Unregister a resource (for testing).
Class Method Details
.address_to_path(address) ⇒ String
Convert event address to CoAP path
29 30 31 |
# File 'lib/takagi/event_bus/coap_bridge.rb', line 29 def address_to_path(address) "/events/#{address.gsub('.', '/')}" end |
.clear ⇒ Object
Clear all registrations (for testing)
200 201 202 203 204 |
# File 'lib/takagi/event_bus/coap_bridge.rb', line 200 def clear @mutex.synchronize do @registered_resources.clear end end |
.path_to_address(path) ⇒ String
Convert CoAP path to event address
40 41 42 |
# File 'lib/takagi/event_bus/coap_bridge.rb', line 40 def path_to_address(path) path.sub(%r{^/events/}, '').gsub('/', '.') end |
.publish_to_observers(address, message) ⇒ Object
Publish event to all CoAP observers Notifies all observers subscribed via CoAP Observe
103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 |
# File 'lib/takagi/event_bus/coap_bridge.rb', line 103 def publish_to_observers(address, ) path = address_to_path(address) # Build notification payload state = { address: address, body: .body, headers: .headers, timestamp: ..to_i } # Notify via ObserveRegistry # ObserveRegistry will send CoAP notifications to all observers if defined?(Takagi::ObserveRegistry) Takagi::ObserveRegistry.notify(path, state) else Takagi.logger.warn 'ObserveRegistry not available, cannot publish to observers' end rescue StandardError => e Takagi.logger.error "Error publishing to observers for #{address}: #{e.}" # Store failed message for retry if message buffering is enabled # This allows recovery when remote nodes reconnect EventBus.instance_variable_get(:@message_store)&.store_failed( address, , "coap_observers:#{path}" ) end |
.register_observable_resource(address, app) ⇒ Boolean
Auto-register observable CoAP resource (thread-safe) Creates a CoAP observable endpoint that returns current state
Uses AddressPrefix registry to determine if address should be distributed
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 |
# File 'lib/takagi/event_bus/coap_bridge.rb', line 55 def register_observable_resource(address, app) # rubocop:disable Metrics/MethodLength # Only register distributed addresses (uses AddressPrefix registry) return false unless AddressPrefix.distributed?(address) # Thread-safe check-and-register @mutex.synchronize do # Already registered? return false if @registered_resources.include?(address) # Mark as registered before creating resource @registered_resources << address path = address_to_path(address) begin # Create observable CoAP endpoint # This endpoint returns current state when polled app.observable path do |_req| # Return current state from EventBus current_state = EventBus.current_state(address) current_state || { address: address, status: 'observable', timestamp: Time.now.to_i } end Takagi.logger.info "Observable resource created: #{path} (#{address})" true rescue StandardError => e # If registration fails, remove from set @registered_resources.delete(address) Takagi.logger.error "Failed to register observable resource #{address}: #{e.}" false end end end |
.registered?(address) ⇒ Boolean
Check if resource is registered
172 173 174 |
# File 'lib/takagi/event_bus/coap_bridge.rb', line 172 def registered?(address) @mutex.synchronize { @registered_resources.include?(address) } end |
.registered_addresses ⇒ Array<String>
Get all registered resource addresses
178 179 180 |
# File 'lib/takagi/event_bus/coap_bridge.rb', line 178 def registered_addresses @mutex.synchronize { @registered_resources.to_a } end |
.registered_count ⇒ Integer
Get count of registered resources
184 185 186 |
# File 'lib/takagi/event_bus/coap_bridge.rb', line 184 def registered_count @mutex.synchronize { @registered_resources.size } end |
.subscribe_remote(address, node_url) {|message| ... } ⇒ String
Subscribe to remote event via CoAP Observe
143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 |
# File 'lib/takagi/event_bus/coap_bridge.rb', line 143 def subscribe_remote(address, node_url) path = address_to_path(address) full_url = "#{node_url}#{path}" # TODO: Implement using Takagi::Observer::Client or similar # This requires: # 1. CoAP client that supports OBSERVE requests # 2. Registration of observer with remote server # 3. Handling of notification messages # 4. Conversion of CoAP notifications back to EventBus messages # # Placeholder implementation: subscription_id = SecureRandom.uuid Takagi.logger.warn "Remote subscription not yet fully implemented: #{full_url}" Takagi.logger.info "Subscription ID: #{subscription_id} for #{address} at #{node_url}" # When implemented, should: # 1. Create CoAP observer client # 2. Send OBSERVE request to full_url # 3. Register callback that converts CoAP notifications to EventBus messages # 4. Call the provided block with the converted message subscription_id end |
.unregister(address) ⇒ Boolean
Unregister a resource (for testing)
191 192 193 194 195 196 197 |
# File 'lib/takagi/event_bus/coap_bridge.rb', line 191 def unregister(address) @mutex.synchronize do # Set#delete? returns the set if element was present, nil if not # Convert to boolean: true if was present, false if not !!@registered_resources.delete?(address) end end |