Class: TCB::EventBus

Inherits:
Object
  • Object
show all
Defined in:
lib/tcb/event_bus.rb,
lib/tcb/event_bus/running_strategy.rb,
lib/tcb/event_bus/shutdown_strategy.rb,
lib/tcb/event_bus/subscriber_registry.rb,
lib/tcb/event_bus/termination_signal_handler.rb

Defined Under Namespace

Classes: RunningStrategy, ShutdownError, ShutdownStrategy, SubscriberRegistry, TerminationSignalHandler

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(handle_signals: false, shutdown_timeout: 30.0, shutdown_signals: [:TERM, :INT], on_signal: nil) ⇒ EventBus

Returns a new instance of EventBus.



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/tcb/event_bus.rb', line 16

def initialize(
  handle_signals: false,
  shutdown_timeout: 30.0,
  shutdown_signals: [:TERM, :INT],
  on_signal: nil
)
  @queue = Queue.new
  @registry = SubscriberRegistry.new
  @mutex = Mutex.new
  @active_dispatches = 0
  @events_processed_during_shutdown = 0
  @execution_strategy = RunningStrategy.new(self)

  @dispatcher = Thread.new do
    loop do
      event = @queue.pop
      break if event == :shutdown_sentinel

      dispatch(event)
    end
  end

  if handle_signals
    @termination_signal_handler = TerminationSignalHandler.new(
      event_bus: self,
      shutdown_timeout: shutdown_timeout,
      signals: shutdown_signals,
      on_signal: on_signal
    )
    @termination_signal_handler.install
  end
end

Instance Attribute Details

#active_dispatchesObject (readonly)

Returns the value of attribute active_dispatches.



13
14
15
# File 'lib/tcb/event_bus.rb', line 13

def active_dispatches
  @active_dispatches
end

#dispatcherObject (readonly)

Returns the value of attribute dispatcher.



13
14
15
# File 'lib/tcb/event_bus.rb', line 13

def dispatcher
  @dispatcher
end

#events_processed_during_shutdownObject (readonly)

Returns the value of attribute events_processed_during_shutdown.



13
14
15
# File 'lib/tcb/event_bus.rb', line 13

def events_processed_during_shutdown
  @events_processed_during_shutdown
end

#mutexObject (readonly)

Returns the value of attribute mutex.



13
14
15
# File 'lib/tcb/event_bus.rb', line 13

def mutex
  @mutex
end

#queueObject (readonly)

Returns the value of attribute queue.



13
14
15
# File 'lib/tcb/event_bus.rb', line 13

def queue
  @queue
end

#registryObject (readonly)

Returns the value of attribute registry.



13
14
15
# File 'lib/tcb/event_bus.rb', line 13

def registry
  @registry
end

Instance Method Details

#dispatch(event) ⇒ Object

Public for strategy access



85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/tcb/event_bus.rb', line 85

def dispatch(event)
  @mutex.synchronize { @active_dispatches += 1 }

  @events_processed_during_shutdown += 1 if shutdown?
  handlers = @registry.handlers_for(event.class)

  handlers.each do |handler|
    execute_handler(handler, event)
  end
ensure
  @mutex.synchronize { @active_dispatches -= 1 }
end

#force_shutdownObject

Force shutdown - immediate, no draining



75
76
77
# File 'lib/tcb/event_bus.rb', line 75

def force_shutdown
  shutdown(drain: false, timeout: 0)
end

#publish(event) ⇒ Object

Publish an event instance



60
61
62
# File 'lib/tcb/event_bus.rb', line 60

def publish(event)
  @execution_strategy.publish(event)
end

#shutdown(drain: true, timeout: 5.0) ⇒ Object

Graceful shutdown - drains queue with timeout



65
66
67
68
69
70
71
72
# File 'lib/tcb/event_bus.rb', line 65

def shutdown(drain: true, timeout: 5.0)
  @execution_strategy = ShutdownStrategy.new(
    event_bus: self,
    drain: drain,
    timeout: timeout
  )
  @execution_strategy.execute
end

#shutdown?Boolean

Check if bus is shut down

Returns:

  • (Boolean)


80
81
82
# File 'lib/tcb/event_bus.rb', line 80

def shutdown?
  @execution_strategy.shutdown?
end

#subscribe(event_class, &block) ⇒ Object

Subscribe to a specific event class



50
51
52
# File 'lib/tcb/event_bus.rb', line 50

def subscribe(event_class, &block)
  @execution_strategy.subscribe(event_class, &block)
end

#unsubscribe(subscription) ⇒ Object

Unsubscribe using a subscription token



55
56
57
# File 'lib/tcb/event_bus.rb', line 55

def unsubscribe(subscription)
  @registry.remove(subscription)
end