Class: TCB::EventBus

Inherits:
Object
  • Object
show all
Defined in:
lib/tcb/event_bus.rb,
lib/tcb/event_bus/running_strategy.rb,
lib/tcb/event_bus/shutdown_strategy.rb,
lib/tcb/event_bus/subscriber_registry.rb,
lib/tcb/event_bus/queue_pressure_monitor.rb,
lib/tcb/event_bus/termination_signal_handler.rb

Defined Under Namespace

Classes: NullQueuePressureMonitor, QueuePressureMonitor, RunningStrategy, ShutdownError, ShutdownStrategy, SubscriberRegistry, TerminationSignalHandler

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(handle_signals: false, shutdown_timeout: 30.0, shutdown_signals: [:TERM, :INT], on_signal: nil, max_queue_size: nil, high_water_mark: nil, sync: false) ⇒ EventBus

Returns a new instance of EventBus.



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/tcb/event_bus.rb', line 16

def initialize(
  handle_signals: false,
  shutdown_timeout: 30.0,
  shutdown_signals: [:TERM, :INT],
  on_signal: nil,
  max_queue_size: nil,
  high_water_mark: nil,
  sync: false
)
  @sync = sync
  @queue = max_queue_size ? SizedQueue.new(max_queue_size) : Queue.new
  @max_queue_size = max_queue_size
  @pressure_monitor = QueuePressureMonitor.for(max_queue_size:, high_water_mark:)
  @registry = SubscriberRegistry.new
  @mutex = Mutex.new
  @active_dispatches = 0
  @events_processed_during_shutdown = 0
  @execution_strategy = RunningStrategy.new(self, sync: @sync)
  @execution_strategy.start
  install_signal_handlers(shutdown_timeout:, shutdown_signals:, on_signal:) if handle_signals
end

Instance Attribute Details

#active_dispatchesObject (readonly)

Returns the value of attribute active_dispatches.



13
14
15
# File 'lib/tcb/event_bus.rb', line 13

def active_dispatches
  @active_dispatches
end

#dispatcherObject

Returns the value of attribute dispatcher.



14
15
16
# File 'lib/tcb/event_bus.rb', line 14

def dispatcher
  @dispatcher
end

#events_processed_during_shutdownObject (readonly)

Returns the value of attribute events_processed_during_shutdown.



13
14
15
# File 'lib/tcb/event_bus.rb', line 13

def events_processed_during_shutdown
  @events_processed_during_shutdown
end

#max_queue_sizeObject (readonly)

Returns the value of attribute max_queue_size.



13
14
15
# File 'lib/tcb/event_bus.rb', line 13

def max_queue_size
  @max_queue_size
end

#mutexObject (readonly)

Returns the value of attribute mutex.



13
14
15
# File 'lib/tcb/event_bus.rb', line 13

def mutex
  @mutex
end

#queueObject (readonly)

Returns the value of attribute queue.



13
14
15
# File 'lib/tcb/event_bus.rb', line 13

def queue
  @queue
end

#registryObject (readonly)

Returns the value of attribute registry.



13
14
15
# File 'lib/tcb/event_bus.rb', line 13

def registry
  @registry
end

Instance Method Details

#dispatch(event_or_envelope) ⇒ Object

Public for strategy access



74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/tcb/event_bus.rb', line 74

def dispatch(event_or_envelope)
  @mutex.synchronize { @active_dispatches += 1 }
  envelope = TCB::Envelope.coerce(event_or_envelope)

  @events_processed_during_shutdown += 1 if shutdown?
  handlers = @registry.handlers_for(envelope.event.class)

  handlers.each do |handler|
    execute_handler(handler, envelope)
  end
ensure
  @mutex.synchronize { @active_dispatches -= 1 }
end

#force_shutdownObject

Force shutdown - immediate, no draining



64
65
66
# File 'lib/tcb/event_bus.rb', line 64

def force_shutdown
  shutdown(drain: false, timeout: 0)
end

#high_water_mark_reached?Boolean

Returns:

  • (Boolean)


88
# File 'lib/tcb/event_bus.rb', line 88

def high_water_mark_reached? = @pressure_monitor.check?(@queue.size)

#publish(event) ⇒ Object

Publish an event instance



49
50
51
# File 'lib/tcb/event_bus.rb', line 49

def publish(event)
  @execution_strategy.publish(event)
end

#shutdown(drain: true, timeout: 5.0) ⇒ Object

Graceful shutdown - drains queue with timeout



54
55
56
57
58
59
60
61
# File 'lib/tcb/event_bus.rb', line 54

def shutdown(drain: true, timeout: 5.0)
  @execution_strategy = ShutdownStrategy.new(
    event_bus: self,
    drain: drain,
    timeout: timeout
  )
  @execution_strategy.execute
end

#shutdown?Boolean

Check if bus is shut down

Returns:

  • (Boolean)


69
70
71
# File 'lib/tcb/event_bus.rb', line 69

def shutdown?
  @execution_strategy.shutdown?
end

#subscribe(event_class, &block) ⇒ Object

Subscribe to a specific event class



39
40
41
# File 'lib/tcb/event_bus.rb', line 39

def subscribe(event_class, &block)
  @execution_strategy.subscribe(event_class, &block)
end

#unsubscribe(subscription) ⇒ Object

Unsubscribe using a subscription token



44
45
46
# File 'lib/tcb/event_bus.rb', line 44

def unsubscribe(subscription)
  @registry.remove(subscription)
end