Class: TCB::EventBus
- Inherits:
-
Object
- Object
- TCB::EventBus
- Defined in:
- lib/tcb/event_bus.rb,
lib/tcb/event_bus/running_strategy.rb,
lib/tcb/event_bus/shutdown_strategy.rb,
lib/tcb/event_bus/subscriber_registry.rb,
lib/tcb/event_bus/queue_pressure_monitor.rb,
lib/tcb/event_bus/termination_signal_handler.rb
Defined Under Namespace
Classes: NullQueuePressureMonitor, QueuePressureMonitor, RunningStrategy, ShutdownError, ShutdownStrategy, SubscriberRegistry, TerminationSignalHandler
Instance Attribute Summary collapse
-
#active_dispatches ⇒ Object
readonly
Returns the value of attribute active_dispatches.
-
#dispatcher ⇒ Object
Returns the value of attribute dispatcher.
-
#events_processed_during_shutdown ⇒ Object
readonly
Returns the value of attribute events_processed_during_shutdown.
-
#max_queue_size ⇒ Object
readonly
Returns the value of attribute max_queue_size.
-
#mutex ⇒ Object
readonly
Returns the value of attribute mutex.
-
#queue ⇒ Object
readonly
Returns the value of attribute queue.
-
#registry ⇒ Object
readonly
Returns the value of attribute registry.
Instance Method Summary collapse
-
#dispatch(event_or_envelope) ⇒ Object
Public for strategy access.
-
#force_shutdown ⇒ Object
Force shutdown - immediate, no draining.
- #high_water_mark_reached? ⇒ Boolean
-
#initialize(handle_signals: false, shutdown_timeout: 30.0, shutdown_signals: [:TERM, :INT], on_signal: nil, max_queue_size: nil, high_water_mark: nil, sync: false) ⇒ EventBus
constructor
A new instance of EventBus.
-
#publish(event) ⇒ Object
Publish an event instance.
-
#shutdown(drain: true, timeout: 5.0) ⇒ Object
Graceful shutdown - drains queue with timeout.
-
#shutdown? ⇒ Boolean
Check if bus is shut down.
-
#subscribe(event_class, &block) ⇒ Object
Subscribe to a specific event class.
-
#unsubscribe(subscription) ⇒ Object
Unsubscribe using a subscription token.
Constructor Details
#initialize(handle_signals: false, shutdown_timeout: 30.0, shutdown_signals: [:TERM, :INT], on_signal: nil, max_queue_size: nil, high_water_mark: nil, sync: false) ⇒ EventBus
Returns a new instance of EventBus.
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/tcb/event_bus.rb', line 16 def initialize( handle_signals: false, shutdown_timeout: 30.0, shutdown_signals: [:TERM, :INT], on_signal: nil, max_queue_size: nil, high_water_mark: nil, sync: false ) @sync = sync @queue = max_queue_size ? SizedQueue.new(max_queue_size) : Queue.new @max_queue_size = max_queue_size @pressure_monitor = QueuePressureMonitor.for(max_queue_size:, high_water_mark:) @registry = SubscriberRegistry.new @mutex = Mutex.new @active_dispatches = 0 @events_processed_during_shutdown = 0 @execution_strategy = RunningStrategy.new(self, sync: @sync) @execution_strategy.start install_signal_handlers(shutdown_timeout:, shutdown_signals:, on_signal:) if handle_signals end |
Instance Attribute Details
#active_dispatches ⇒ Object (readonly)
Returns the value of attribute active_dispatches.
13 14 15 |
# File 'lib/tcb/event_bus.rb', line 13 def active_dispatches @active_dispatches end |
#dispatcher ⇒ Object
Returns the value of attribute dispatcher.
14 15 16 |
# File 'lib/tcb/event_bus.rb', line 14 def dispatcher @dispatcher end |
#events_processed_during_shutdown ⇒ Object (readonly)
Returns the value of attribute events_processed_during_shutdown.
13 14 15 |
# File 'lib/tcb/event_bus.rb', line 13 def events_processed_during_shutdown @events_processed_during_shutdown end |
#max_queue_size ⇒ Object (readonly)
Returns the value of attribute max_queue_size.
13 14 15 |
# File 'lib/tcb/event_bus.rb', line 13 def max_queue_size @max_queue_size end |
#mutex ⇒ Object (readonly)
Returns the value of attribute mutex.
13 14 15 |
# File 'lib/tcb/event_bus.rb', line 13 def mutex @mutex end |
#queue ⇒ Object (readonly)
Returns the value of attribute queue.
13 14 15 |
# File 'lib/tcb/event_bus.rb', line 13 def queue @queue end |
#registry ⇒ Object (readonly)
Returns the value of attribute registry.
13 14 15 |
# File 'lib/tcb/event_bus.rb', line 13 def registry @registry end |
Instance Method Details
#dispatch(event_or_envelope) ⇒ Object
Public for strategy access
74 75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/tcb/event_bus.rb', line 74 def dispatch(event_or_envelope) @mutex.synchronize { @active_dispatches += 1 } envelope = TCB::Envelope.coerce(event_or_envelope) @events_processed_during_shutdown += 1 if shutdown? handlers = @registry.handlers_for(envelope.event.class) handlers.each do |handler| execute_handler(handler, envelope) end ensure @mutex.synchronize { @active_dispatches -= 1 } end |
#force_shutdown ⇒ Object
Force shutdown - immediate, no draining
64 65 66 |
# File 'lib/tcb/event_bus.rb', line 64 def force_shutdown shutdown(drain: false, timeout: 0) end |
#high_water_mark_reached? ⇒ Boolean
88 |
# File 'lib/tcb/event_bus.rb', line 88 def high_water_mark_reached? = @pressure_monitor.check?(@queue.size) |
#publish(event) ⇒ Object
Publish an event instance
49 50 51 |
# File 'lib/tcb/event_bus.rb', line 49 def publish(event) @execution_strategy.publish(event) end |
#shutdown(drain: true, timeout: 5.0) ⇒ Object
Graceful shutdown - drains queue with timeout
54 55 56 57 58 59 60 61 |
# File 'lib/tcb/event_bus.rb', line 54 def shutdown(drain: true, timeout: 5.0) @execution_strategy = ShutdownStrategy.new( event_bus: self, drain: drain, timeout: timeout ) @execution_strategy.execute end |
#shutdown? ⇒ Boolean
Check if bus is shut down
69 70 71 |
# File 'lib/tcb/event_bus.rb', line 69 def shutdown? @execution_strategy.shutdown? end |
#subscribe(event_class, &block) ⇒ Object
Subscribe to a specific event class
39 40 41 |
# File 'lib/tcb/event_bus.rb', line 39 def subscribe(event_class, &block) @execution_strategy.subscribe(event_class, &block) end |
#unsubscribe(subscription) ⇒ Object
Unsubscribe using a subscription token
44 45 46 |
# File 'lib/tcb/event_bus.rb', line 44 def unsubscribe(subscription) @registry.remove(subscription) end |