Class: Ignis::Shared::EventBus

Inherits:
Object
  • Object
show all
Defined in:
lib/nnw/shared/event_bus.rb

Overview

EventBus — Typed, bounded, synchronous-first pub/sub bus.

All cross-layer communication goes through EventBus. Zero global state except the singleton instance.

Event types are frozen constants — do not add more at runtime. All operations are thread-safe via Monitor (reentrant mutex).

Constant Summary collapse

MAX_HISTORY =

Maximum history entries per event type (ring buffer).

100
VALID_EVENT_TYPES =

Frozen set of valid event types.

%i[
  data_ready
  compute_done
  all_reduce_done
  checkpoint_ready
  gpu_failed
  gpu_recovered
  backpressure_on
  backpressure_off
  topology_changed
  nova_flush_done
  health_alert
].freeze

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeEventBus

Instance methods



101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/nnw/shared/event_bus.rb', line 101

def initialize
  @monitor = Monitor.new
  @subscribers = {}
  @history = {}
  @metrics = {}
  @errors = []

  VALID_EVENT_TYPES.each do |et|
    @subscribers[et] = []
    @history[et] = []
    @metrics[et] = { publish_count: 0, last_publish_at: nil, error_count: 0 }
  end
end

Class Method Details

.handlers_for(event_type) ⇒ Array<String, Symbol>

Get all registered handler IDs for an event type.

Parameters:

  • event_type (Symbol)

    one of VALID_EVENT_TYPES

Returns:

  • (Array<String, Symbol>)


88
89
90
# File 'lib/nnw/shared/event_bus.rb', line 88

def self.handlers_for(event_type)
  instance.handlers_for(event_type)
end

.history(event_type, last_n: 10) ⇒ Array<Hash>

Get recent event history for an event type.

Parameters:

  • event_type (Symbol)

    one of VALID_EVENT_TYPES

  • last_n (Integer) (defaults to: 10)

    maximum number of entries to return

Returns:

  • (Array<Hash>)

    array of payload:, timestamp:, subscriber_count:



80
81
82
# File 'lib/nnw/shared/event_bus.rb', line 80

def self.history(event_type, last_n: 10)
  instance.history(event_type, last_n: last_n)
end

.instanceEventBus

Returns singleton instance.

Returns:



32
33
34
# File 'lib/nnw/shared/event_bus.rb', line 32

def self.instance
  @instance ||= new
end

.metricsHash

Get metrics summary.

Returns:

  • (Hash)

    metrics per event type



95
96
97
# File 'lib/nnw/shared/event_bus.rb', line 95

def self.metrics
  instance.metrics
end

.publish(event_type, payload: {}) ⇒ Integer

Publish an event to all subscribers.

Calls all subscribers synchronously in subscription order. Catches and logs subscriber exceptions (never raises to caller). Records publish timestamp in metrics.

Parameters:

  • event_type (Symbol)

    one of VALID_EVENT_TYPES

  • payload (Hash) (defaults to: {})

    event payload

Returns:

  • (Integer)

    number of subscribers notified



71
72
73
# File 'lib/nnw/shared/event_bus.rb', line 71

def self.publish(event_type, payload: {})
  instance.publish(event_type, payload: payload)
end

.reset!void

This method returns an undefined value.

Reset the singleton instance (for testing only).



38
39
40
# File 'lib/nnw/shared/event_bus.rb', line 38

def self.reset!
  @instance = new
end

.subscribe(event_type, handler_id:) {|Hash| ... } ⇒ void

This method returns an undefined value.

Subscribe a handler to an event type.

Parameters:

  • event_type (Symbol)

    one of VALID_EVENT_TYPES

  • handler_id (String, Symbol)

    unique identifier for this handler

Yields:

  • (Hash)

    block called with event payload when event is published

Raises:

  • (ArgumentError)

    if event_type is invalid or no block given



49
50
51
# File 'lib/nnw/shared/event_bus.rb', line 49

def self.subscribe(event_type, handler_id:, &block)
  instance.subscribe(event_type, handler_id: handler_id, &block)
end

.unsubscribe(event_type, handler_id:) ⇒ Boolean

Unsubscribe a handler from an event type.

Parameters:

  • event_type (Symbol)

    one of VALID_EVENT_TYPES

  • handler_id (String, Symbol)

    the handler identifier to remove

Returns:

  • (Boolean)

    true if handler was found and removed



58
59
60
# File 'lib/nnw/shared/event_bus.rb', line 58

def self.unsubscribe(event_type, handler_id:)
  instance.unsubscribe(event_type, handler_id: handler_id)
end

Instance Method Details

#handlers_for(event_type) ⇒ Object

See Also:



203
204
205
206
207
208
209
# File 'lib/nnw/shared/event_bus.rb', line 203

def handlers_for(event_type)
  validate_event_type!(event_type)

  @monitor.synchronize do
    @subscribers[event_type].map { |h| h[:handler_id] }
  end
end

#history(event_type, last_n: 10) ⇒ Object

See Also:



192
193
194
195
196
197
198
199
200
# File 'lib/nnw/shared/event_bus.rb', line 192

def history(event_type, last_n: 10)
  validate_event_type!(event_type)

  @monitor.synchronize do
    ring = @history[event_type]
    n = [last_n, ring.size].min
    ring.last(n)
  end
end

#metricsObject

See Also:



212
213
214
215
216
# File 'lib/nnw/shared/event_bus.rb', line 212

def metrics
  @monitor.synchronize do
    @metrics.transform_values(&:dup)
  end
end

#publish(event_type, payload: {}) ⇒ Object

See Also:



139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
# File 'lib/nnw/shared/event_bus.rb', line 139

def publish(event_type, payload: {})
  validate_event_type!(event_type)

  timestamp = Time.now
  handlers_snapshot = nil
  
  @monitor.synchronize do
    handlers_snapshot = @subscribers[event_type].dup
  end

  notified = 0
  errors_in_publish = []

  handlers_snapshot.each do |handler|
    begin
      handler[:block].call(payload)
      notified += 1
    rescue => e
      notified += 1
      errors_in_publish << {
        handler_id: handler[:handler_id],
        error: e,
        timestamp: timestamp
      }
    end
  end

  # Record history and metrics under lock
  @monitor.synchronize do
    entry = {
      event_type: event_type,
      payload: payload,
      timestamp: timestamp,
      subscriber_count: notified,
      errors: errors_in_publish.map { |err| { handler_id: err[:handler_id], message: err[:error].message } }
    }

    ring = @history[event_type]
    ring << entry
    ring.shift while ring.size > MAX_HISTORY

    @metrics[event_type][:publish_count] += 1
    @metrics[event_type][:last_publish_at] = timestamp
    @metrics[event_type][:error_count] += errors_in_publish.size

    @errors.concat(errors_in_publish)
    @errors.shift while @errors.size > MAX_HISTORY
  end

  notified
end

#recent_errors(last_n: 20) ⇒ Array<Hash>

Get recent errors across all event types.

Parameters:

  • last_n (Integer) (defaults to: 20)

    max entries

Returns:

  • (Array<Hash>)


221
222
223
224
225
# File 'lib/nnw/shared/event_bus.rb', line 221

def recent_errors(last_n: 20)
  @monitor.synchronize do
    @errors.last([last_n, @errors.size].min)
  end
end

#subscribe(event_type, handler_id:, &block) ⇒ Object

Raises:

  • (ArgumentError)

See Also:



116
117
118
119
120
121
122
123
124
125
# File 'lib/nnw/shared/event_bus.rb', line 116

def subscribe(event_type, handler_id:, &block)
  validate_event_type!(event_type)
  raise ArgumentError, 'Block required for subscribe' unless block_given?

  @monitor.synchronize do
    # Remove existing handler with same ID to prevent duplicates
    @subscribers[event_type].reject! { |h| h[:handler_id] == handler_id }
    @subscribers[event_type] << { handler_id: handler_id, block: block }
  end
end

#unsubscribe(event_type, handler_id:) ⇒ Object

See Also:



128
129
130
131
132
133
134
135
136
# File 'lib/nnw/shared/event_bus.rb', line 128

def unsubscribe(event_type, handler_id:)
  validate_event_type!(event_type)

  @monitor.synchronize do
    initial_size = @subscribers[event_type].size
    @subscribers[event_type].reject! { |h| h[:handler_id] == handler_id }
    @subscribers[event_type].size < initial_size
  end
end