Class: Takagi::EventBus

Inherits:
Object
  • Object
show all
Defined in:
lib/takagi/event_bus.rb,
lib/takagi/event_bus/scope.rb,
lib/takagi/event_bus/future.rb,
lib/takagi/event_bus/lru_cache.rb,
lib/takagi/event_bus/coap_bridge.rb,
lib/takagi/event_bus/address_prefix.rb,
lib/takagi/event_bus/async_executor.rb,
lib/takagi/event_bus/message_buffer.rb,
lib/takagi/event_bus/observer_cleanup.rb

Overview

High-level event distribution with threaded/process async delivery. Built on top of ObserveRegistry with zero runtime dependencies.

Supports both CoAP Observe style and Pub/Sub style APIs:

Examples:

CoAP Observe style

EventBus.observe('sensor.temperature.room1') { |msg| puts msg.body }
EventBus.notify('sensor.temperature.room1', { value: 25.5 })

Pub/Sub style (aliases)

EventBus.consumer('sensor.temperature.room1') { |msg| puts msg.body }
EventBus.publish('sensor.temperature.room1', { value: 25.5 })

Request-Reply pattern

reply = EventBus.send_sync('cache.query', { key: 'user:123' }, timeout: 1.0)

Defined Under Namespace

Modules: AsyncExecutor, Scope Classes: AddressPrefix, CoAPBridge, Error, Future, Handler, LRUCache, Message, MessageBuffer, ObserverCleanup

Class Attribute Summary collapse

Class Method Summary collapse

Class Attribute Details

.message_storeMessageBuffer, ... (readonly)

Get current message store

Returns:



400
401
402
# File 'lib/takagi/event_bus.rb', line 400

def message_store
  @message_store
end

Class Method Details

.addressesArray<String>

List all registered addresses

Returns:

  • (Array<String>)

    Event addresses



471
472
473
# File 'lib/takagi/event_bus.rb', line 471

def addresses
  @handlers.keys
end

.config_value(config_key, env_key, default) ⇒ Object

Helper method to get configuration with fallback to ENV



95
96
97
98
99
100
101
102
103
104
# File 'lib/takagi/event_bus.rb', line 95

def self.config_value(config_key, env_key, default)
  # Priority: Takagi.config > ENV > default
  if defined?(Takagi.config) && Takagi.config.event_bus.respond_to?(config_key)
    Takagi.config.event_bus.public_send(config_key)
  elsif ENV[env_key]
    ENV[env_key].to_i
  else
    default
  end
end

.configure_message_store(store) ⇒ MessageBuffer, ...

Configure message store (for buffering/replay)

Examples:

Enable default buffering

EventBus.enable_message_buffering

Custom configuration

EventBus.configure_message_store(
  MessageBuffer.new(max_messages: 200, ttl: 600)
)

Custom plugin store

EventBus.configure_message_store(RedisMessageStore.new)

Parameters:

  • store (MessageBuffer, Object)

    Message store instance (nil to disable)

Returns:



380
381
382
# File 'lib/takagi/event_bus.rb', line 380

def configure_message_store(store)
  @message_store = store
end

.consumer(address, options = {}) {|message| ... } ⇒ String Also known as: observe, on

Register consumer for address (point-to-point or pub/sub)

Examples:

id = EventBus.consumer('sensor.temperature.room1') do |message|
  puts "Temp: #{message.body[:value]}"
end
EventBus.unregister(id)

Parameters:

  • address (String)

    Event address or pattern

  • options (Hash) (defaults to: {})

    Handler options

Options Hash (options):

  • :local_only (Boolean)

    Only receive local messages

Yields:

  • (message)

    Block called when message received

Returns:

  • (String)

    Consumer ID (for unregistering)

Raises:

  • (ArgumentError)


292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
# File 'lib/takagi/event_bus.rb', line 292

def consumer(address, options = {}, &block)
  raise ArgumentError, 'Block required' unless block

  handler = Handler.new(address, options, &block)
  consumer_id = SecureRandom.uuid

  @mutex.synchronize do
    @handlers[address] << handler
    @consumers[consumer_id] = handler
    @handler_store[handler.pool_id] = handler
  end

  @executor.register_handler(handler) if @executor.respond_to?(:register_handler)

  # Auto-create CoAP observable resource if distributed and not local_only
  if distributed?(address) && !options[:local_only] && defined?(Takagi::Base)
    CoAPBridge.register_observable_resource(address, Takagi::Base)
  end

  log_debug "Consumer registered: #{address} (#{consumer_id})"
  consumer_id
end

.current_state(address) ⇒ Object?

Get current state for address

Parameters:

  • address (String)

    Event address

Returns:

  • (Object, nil)

    Current state or nil



465
466
467
# File 'lib/takagi/event_bus.rb', line 465

def current_state(address)
  @current_states.get(address)
end

.disable_message_bufferingObject

Disable message buffering



393
394
395
396
# File 'lib/takagi/event_bus.rb', line 393

def disable_message_buffering
  @message_store&.shutdown if @message_store.respond_to?(:shutdown)
  @message_store = nil
end

.distributed?(address) ⇒ Boolean

Check if address is distributed via CoAP Uses AddressPrefix registry for extensibility

Parameters:

  • address (String)

    Event address

Returns:

  • (Boolean)


448
449
450
451
452
# File 'lib/takagi/event_bus.rb', line 448

def distributed?(address)
  return false if address.to_s.start_with?('hooks.')

  AddressPrefix.distributed?(address)
end

.enable_message_buffering(max_messages: 100, ttl: 300) ⇒ MessageBuffer

Enable default message buffering

Parameters:

  • max_messages (Integer) (defaults to: 100)

    Max messages per address

  • ttl (Integer) (defaults to: 300)

    Time-to-live in seconds

Returns:



388
389
390
# File 'lib/takagi/event_bus.rb', line 388

def enable_message_buffering(max_messages: 100, ttl: 300)
  @message_store = MessageBuffer.new(max_messages: max_messages, ttl: ttl)
end

.handler_count(address) ⇒ Integer

Get handler count for address

Parameters:

  • address (String)

    Event address

Returns:

  • (Integer)

    Number of handlers



478
479
480
# File 'lib/takagi/event_bus.rb', line 478

def handler_count(address)
  @handlers[address]&.size || 0
end

.local_only?(address) ⇒ Boolean

Check if address is local-only Uses AddressPrefix registry for extensibility

Parameters:

  • address (String)

    Event address

Returns:

  • (Boolean)


458
459
460
# File 'lib/takagi/event_bus.rb', line 458

def local_only?(address)
  AddressPrefix.local?(address)
end

.publish(address, body = nil, headers: {}, scope: Scope::DEFAULT, freeze_body: true) ⇒ Message Also known as: notify, emit

Publish message to all subscribers (pub/sub pattern)

Examples:

Local event (default)

EventBus.publish('system.startup', { version: '1.0' })

Cluster-wide event

EventBus.publish('cache.invalidate', { key: 'user:123' }, scope: :cluster)

Global event (cluster + external)

EventBus.publish('sensor.temperature.room1', { value: 25.5 }, scope: :global)

Parameters:

  • address (String)

    Event address (e.g., “sensor.temperature.room1”)

  • body (Object) (defaults to: nil)

    Message body (must be shareable for Ractors)

  • headers (Hash) (defaults to: {})

    Optional message headers

  • scope (Symbol) (defaults to: Scope::DEFAULT)

    Message scope (:local, :cluster, :global) - defaults to :local

Returns:



160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
# File 'lib/takagi/event_bus.rb', line 160

def publish(address, body = nil, headers: {}, scope: Scope::DEFAULT, freeze_body: true)
  message = Message.new(address, body, headers: headers, scope: scope, freeze_body: freeze_body)

  # Hook: Store message if buffering enabled
  @message_store&.store(address, message)

  # ALWAYS deliver locally (fast in-memory)
  @mutex.synchronize do
    handlers_for(address).each do |handler|
      deliver_async(handler, message)
    end

    # Wildcard handlers
    wildcard_handlers(address).each do |handler|
      deliver_async(handler, message)
    end
  end

  # Scope-aware distribution
  case message.scope
  when Scope::CLUSTER, Scope::GLOBAL
    # Cluster distribution via CoAP OBSERVE
    # ClusterBridge.publish_to_cluster(address, message)
    log_debug "Cluster distribution not yet implemented for scope: #{message.scope}"
  end

  # Global scope: external CoAP subscribers (via /.well-known/core)
  if message.scope == Scope::GLOBAL
    @current_states.set(address, message.body)
    CoAPBridge.publish_to_observers(address, message)
  end

  # Legacy distributed? check (backward compatibility)
  if distributed?(address) && message.scope == Scope::DEFAULT
    @current_states.set(address, message.body)
    CoAPBridge.publish_to_observers(address, message)
  end

  message
end

.replay(address, since: nil) ⇒ Array<Message>

Replay buffered messages for an address

Examples:

Replay all buffered messages

EventBus.replay('sensor.temperature.room1')

Replay last 60 seconds

EventBus.replay('sensor.temperature.room1', since: Time.now - 60)

Parameters:

  • address (String)

    Event address

  • since (Time, nil) (defaults to: nil)

    Return messages since this time (nil = all)

Returns:

  • (Array<Message>)

    Buffered messages



412
413
414
415
416
# File 'lib/takagi/event_bus.rb', line 412

def replay(address, since: nil)
  return [] unless @message_store

  @message_store.replay(address, since: since)
end

.replay_to(address, since: nil) {|message| ... } ⇒ Object

Replay buffered messages to a consumer Useful for late joiners or reconnecting nodes

Examples:

Replay to new subscriber

EventBus.consumer('sensor.temperature.room1') do |msg|
  puts "Temp: #{msg.body[:value]}"
end
# Catch up on last 5 minutes
EventBus.replay_to('sensor.temperature.room1', since: Time.now - 300) do |msg|
  puts "Missed: #{msg.body[:value]}"
end

Parameters:

  • address (String)

    Event address

  • since (Time, nil) (defaults to: nil)

    Replay messages since this time

Yields:

  • (message)

    Block called for each buffered message

Raises:

  • (ArgumentError)


432
433
434
435
436
437
438
# File 'lib/takagi/event_bus.rb', line 432

def replay_to(address, since: nil, &block)
  raise ArgumentError, 'Block required' unless block

  messages = replay(address, since: since)
  messages.each(&block)
  messages.size
end

.send(address, body = nil, headers: {}) {|reply_message| ... } ⇒ Message

Send message to single consumer (point-to-point pattern) Uses round-robin if multiple consumers registered

Examples:

EventBus.send('cache.query', { key: 'user:123' }) do |reply|
  puts "Cache value: #{reply.body[:value]}"
end

Parameters:

  • address (String)

    Event address

  • body (Object) (defaults to: nil)

    Message body

  • headers (Hash) (defaults to: {})

    Optional headers

Yields:

  • (reply_message)

    Optional reply handler (request-reply pattern)

Returns:



213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
# File 'lib/takagi/event_bus.rb', line 213

def send(address, body = nil, headers: {}, &reply_handler)
  reply_address = reply_handler ? generate_reply_address : nil

  # Register temporary reply handler
  if reply_handler
    consumer(reply_address, local_only: true, &reply_handler)

    # Auto-unregister after timeout
    Thread.new do
      sleep 30 # Reply timeout
      unregister(reply_address)
    end
  end

  message = Message.new(address, body, headers: headers, reply_address: reply_address)

  @mutex.synchronize do
    handler = next_handler_for(address)
    deliver_async(handler, message) if handler
  end

  message
end

.send_async(address, body = nil, headers: {}) ⇒ Future

Asynchronous send with Future (non-blocking)

Examples:

future = EventBus.send_async('cache.query', { key: 'user:123' })
# ... do other work ...
reply = future.value(timeout: 1.0)

Parameters:

  • address (String)

    Event address

  • body (Object) (defaults to: nil)

    Message body

  • headers (Hash) (defaults to: {})

    Optional headers

Returns:

  • (Future)

    Future that will contain reply



270
271
272
273
274
275
276
277
278
# File 'lib/takagi/event_bus.rb', line 270

def send_async(address, body = nil, headers: {})
  future = Future.new

  send(address, body, headers: headers) do |reply_message|
    future.set_value(reply_message)
  end

  future
end

.send_sync(address, body = nil, headers: {}, timeout: 1.0) ⇒ Message

Synchronous send with timeout (blocking)

Examples:

reply = EventBus.send_sync('cache.query', { key: 'user:123' }, timeout: 1.0)
puts "Cache hit: #{reply.body[:hit]}"

Parameters:

  • address (String)

    Event address

  • body (Object) (defaults to: nil)

    Message body

  • headers (Hash) (defaults to: {})

    Optional headers

  • timeout (Float) (defaults to: 1.0)

    Timeout in seconds (default: 1.0)

Returns:

Raises:

  • (Timeout::Error)

    If no reply received within timeout



248
249
250
251
252
253
254
255
256
257
258
# File 'lib/takagi/event_bus.rb', line 248

def send_sync(address, body = nil, headers: {}, timeout: 1.0)
  future = Future.new

  send(address, body, headers: headers) do |reply_message|
    future.set_value(reply_message)
  end

  future.value(timeout: timeout)
rescue Timeout::Error
  raise Error, "No reply received for #{address} within #{timeout}s"
end

.shutdownObject

Shutdown EventBus (cleanup resources)



516
517
518
519
520
521
522
523
524
525
526
# File 'lib/takagi/event_bus.rb', line 516

def shutdown
  stop_cleanup
  @executor.shutdown if defined?(@executor)
  @current_states.clear
  @message_store&.shutdown if @message_store.respond_to?(:shutdown)
  @mutex.synchronize do
    @handlers.clear
    @consumers.clear
    @handler_store.clear
  end
end

.start_cleanupObject

Start background cleanup



506
507
508
# File 'lib/takagi/event_bus.rb', line 506

def start_cleanup
  @cleanup.start
end

.statsHash

Get EventBus statistics

Returns:

  • (Hash)

    Statistics



484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
# File 'lib/takagi/event_bus.rb', line 484

def stats
  executor_stats = if defined?(@executor) && @executor.respond_to?(:stats)
                     @executor.stats
                   else
                     {}
                   end
  base_stats = {
    consumers: @consumers.size,
    addresses: @handlers.keys.size,
    async_executor: executor_stats,
    state_cache_size: @current_states.size,
    distributed_addresses: addresses.select { |a| distributed?(a) }.size,
    local_addresses: addresses.select { |a| local_only?(a) }.size
  }

  # Add message buffer stats if enabled
  base_stats[:message_buffer] = @message_store.stats if @message_store.respond_to?(:stats)

  base_stats
end

.stop_cleanupObject

Stop background cleanup



511
512
513
# File 'lib/takagi/event_bus.rb', line 511

def stop_cleanup
  @cleanup.stop
end

.subscribe_remote(address, node_url) {|message| ... } ⇒ String Also known as: subscribe

Subscribe to remote CoAP observable

Examples:

id = EventBus.subscribe_remote('sensor.temp.buildingA', 'coap://building-a:5683') do |msg|
  puts "Remote temp: #{msg.body[:value]}"
end

Parameters:

  • address (String)

    Event address

  • node_url (String)

    Remote node URL (e.g., ‘coap://building-a:5683’)

Yields:

  • (message)

    Block called when remote notification received

Returns:

  • (String)

    Subscription ID



346
347
348
# File 'lib/takagi/event_bus.rb', line 346

def subscribe_remote(address, node_url, &block)
  CoAPBridge.subscribe_remote(address, node_url, &block)
end

.unregister(consumer_id) ⇒ Object Also known as: cancel, unsubscribe

Unregister a consumer

Parameters:

  • consumer_id (String)

    Consumer ID returned from consumer()



317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
# File 'lib/takagi/event_bus.rb', line 317

def unregister(consumer_id)
  handler = nil
  @mutex.synchronize do
    handler = @consumers.delete(consumer_id)
    if handler
      list = @handlers[handler.address]
      list&.delete(handler)
      @handlers.delete(handler.address) if list&.empty?
      @handler_store.delete(handler.pool_id)
    end
  end

  return unless handler

  @executor.unregister_handler(handler) if @executor.respond_to?(:unregister_handler)

  log_debug "Unregistered consumer: #{consumer_id}"
end