Class: TCB::EventBus
- Inherits:
-
Object
- Object
- TCB::EventBus
- Defined in:
- lib/tcb/event_bus.rb,
lib/tcb/event_bus/running_strategy.rb,
lib/tcb/event_bus/shutdown_strategy.rb,
lib/tcb/event_bus/subscriber_registry.rb,
lib/tcb/event_bus/termination_signal_handler.rb
Defined Under Namespace
Classes: RunningStrategy, ShutdownError, ShutdownStrategy, SubscriberRegistry, TerminationSignalHandler
Instance Attribute Summary collapse
-
#active_dispatches ⇒ Object
readonly
Returns the value of attribute active_dispatches.
-
#dispatcher ⇒ Object
readonly
Returns the value of attribute dispatcher.
-
#events_processed_during_shutdown ⇒ Object
readonly
Returns the value of attribute events_processed_during_shutdown.
-
#mutex ⇒ Object
readonly
Returns the value of attribute mutex.
-
#queue ⇒ Object
readonly
Returns the value of attribute queue.
-
#registry ⇒ Object
readonly
Returns the value of attribute registry.
Instance Method Summary collapse
-
#dispatch(event) ⇒ Object
Public for strategy access.
-
#force_shutdown ⇒ Object
Force shutdown - immediate, no draining.
-
#initialize(handle_signals: false, shutdown_timeout: 30.0, shutdown_signals: [:TERM, :INT], on_signal: nil) ⇒ EventBus
constructor
A new instance of EventBus.
-
#publish(event) ⇒ Object
Publish an event instance.
-
#shutdown(drain: true, timeout: 5.0) ⇒ Object
Graceful shutdown - drains queue with timeout.
-
#shutdown? ⇒ Boolean
Check if bus is shut down.
-
#subscribe(event_class, &block) ⇒ Object
Subscribe to a specific event class.
-
#unsubscribe(subscription) ⇒ Object
Unsubscribe using a subscription token.
Constructor Details
#initialize(handle_signals: false, shutdown_timeout: 30.0, shutdown_signals: [:TERM, :INT], on_signal: nil) ⇒ EventBus
Returns a new instance of EventBus.
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
# File 'lib/tcb/event_bus.rb', line 16 def initialize( handle_signals: false, shutdown_timeout: 30.0, shutdown_signals: [:TERM, :INT], on_signal: nil ) @queue = Queue.new @registry = SubscriberRegistry.new @mutex = Mutex.new @active_dispatches = 0 @events_processed_during_shutdown = 0 @execution_strategy = RunningStrategy.new(self) @dispatcher = Thread.new do loop do event = @queue.pop break if event == :shutdown_sentinel dispatch(event) end end if handle_signals @termination_signal_handler = TerminationSignalHandler.new( event_bus: self, shutdown_timeout: shutdown_timeout, signals: shutdown_signals, on_signal: on_signal ) @termination_signal_handler.install end end |
Instance Attribute Details
#active_dispatches ⇒ Object (readonly)
Returns the value of attribute active_dispatches.
13 14 15 |
# File 'lib/tcb/event_bus.rb', line 13 def active_dispatches @active_dispatches end |
#dispatcher ⇒ Object (readonly)
Returns the value of attribute dispatcher.
13 14 15 |
# File 'lib/tcb/event_bus.rb', line 13 def dispatcher @dispatcher end |
#events_processed_during_shutdown ⇒ Object (readonly)
Returns the value of attribute events_processed_during_shutdown.
13 14 15 |
# File 'lib/tcb/event_bus.rb', line 13 def events_processed_during_shutdown @events_processed_during_shutdown end |
#mutex ⇒ Object (readonly)
Returns the value of attribute mutex.
13 14 15 |
# File 'lib/tcb/event_bus.rb', line 13 def mutex @mutex end |
#queue ⇒ Object (readonly)
Returns the value of attribute queue.
13 14 15 |
# File 'lib/tcb/event_bus.rb', line 13 def queue @queue end |
#registry ⇒ Object (readonly)
Returns the value of attribute registry.
13 14 15 |
# File 'lib/tcb/event_bus.rb', line 13 def registry @registry end |
Instance Method Details
#dispatch(event) ⇒ Object
Public for strategy access
85 86 87 88 89 90 91 92 93 94 95 96 |
# File 'lib/tcb/event_bus.rb', line 85 def dispatch(event) @mutex.synchronize { @active_dispatches += 1 } @events_processed_during_shutdown += 1 if shutdown? handlers = @registry.handlers_for(event.class) handlers.each do |handler| execute_handler(handler, event) end ensure @mutex.synchronize { @active_dispatches -= 1 } end |
#force_shutdown ⇒ Object
Force shutdown - immediate, no draining
75 76 77 |
# File 'lib/tcb/event_bus.rb', line 75 def force_shutdown shutdown(drain: false, timeout: 0) end |
#publish(event) ⇒ Object
Publish an event instance
60 61 62 |
# File 'lib/tcb/event_bus.rb', line 60 def publish(event) @execution_strategy.publish(event) end |
#shutdown(drain: true, timeout: 5.0) ⇒ Object
Graceful shutdown - drains queue with timeout
65 66 67 68 69 70 71 72 |
# File 'lib/tcb/event_bus.rb', line 65 def shutdown(drain: true, timeout: 5.0) @execution_strategy = ShutdownStrategy.new( event_bus: self, drain: drain, timeout: timeout ) @execution_strategy.execute end |
#shutdown? ⇒ Boolean
Check if bus is shut down
80 81 82 |
# File 'lib/tcb/event_bus.rb', line 80 def shutdown? @execution_strategy.shutdown? end |
#subscribe(event_class, &block) ⇒ Object
Subscribe to a specific event class
50 51 52 |
# File 'lib/tcb/event_bus.rb', line 50 def subscribe(event_class, &block) @execution_strategy.subscribe(event_class, &block) end |
#unsubscribe(subscription) ⇒ Object
Unsubscribe using a subscription token
55 56 57 |
# File 'lib/tcb/event_bus.rb', line 55 def unsubscribe(subscription) @registry.remove(subscription) end |