Class: Takagi::EventBus::ObserverCleanup
- Inherits:
-
Object
- Object
- Takagi::EventBus::ObserverCleanup
- Defined in:
- lib/takagi/event_bus/observer_cleanup.rb
Overview
Background thread for stale observer cleanup
Periodically cleans up stale observers from the ObserveRegistry. Observers are considered stale if they haven’t received notifications for longer than max_age seconds.
Instance Attribute Summary collapse
-
#interval ⇒ Object
readonly
Returns the value of attribute interval.
-
#max_age ⇒ Object
readonly
Returns the value of attribute max_age.
Instance Method Summary collapse
-
#cleanup_now ⇒ Object
Force a cleanup run (for testing).
-
#initialize(interval: 60, max_age: 600) ⇒ ObserverCleanup
constructor
Initialize observer cleanup.
-
#running? ⇒ Boolean
Check if cleanup is running.
-
#start ⇒ Object
Start the cleanup thread.
-
#stats ⇒ Hash
Get cleanup statistics.
-
#stop ⇒ Object
Stop the cleanup thread.
Constructor Details
#initialize(interval: 60, max_age: 600) ⇒ ObserverCleanup
Initialize observer cleanup
22 23 24 25 26 27 28 29 |
# File 'lib/takagi/event_bus/observer_cleanup.rb', line 22 def initialize(interval: 60, max_age: 600) @interval = interval @max_age = max_age @running = false @thread = nil @mutex = Mutex.new @stats = { runs: 0, cleaned: 0, errors: 0 } end |
Instance Attribute Details
#interval ⇒ Object (readonly)
Returns the value of attribute interval.
17 18 19 |
# File 'lib/takagi/event_bus/observer_cleanup.rb', line 17 def interval @interval end |
#max_age ⇒ Object (readonly)
Returns the value of attribute max_age.
17 18 19 |
# File 'lib/takagi/event_bus/observer_cleanup.rb', line 17 def max_age @max_age end |
Instance Method Details
#cleanup_now ⇒ Object
Force a cleanup run (for testing)
71 72 73 |
# File 'lib/takagi/event_bus/observer_cleanup.rb', line 71 def cleanup_now cleanup_stale_observers end |
#running? ⇒ Boolean
Check if cleanup is running
60 61 62 |
# File 'lib/takagi/event_bus/observer_cleanup.rb', line 60 def running? @mutex.synchronize { @running } end |
#start ⇒ Object
Start the cleanup thread
32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/takagi/event_bus/observer_cleanup.rb', line 32 def start @mutex.synchronize do return if @running @running = true @thread = Thread.new { run_cleanup_loop } @thread.name = 'ObserverCleanup' end Takagi.logger.info "Observer cleanup started (interval: #{@interval}s, max_age: #{@max_age}s)" end |
#stats ⇒ Hash
Get cleanup statistics
66 67 68 |
# File 'lib/takagi/event_bus/observer_cleanup.rb', line 66 def stats @mutex.synchronize { @stats.dup } end |
#stop ⇒ Object
Stop the cleanup thread
45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/takagi/event_bus/observer_cleanup.rb', line 45 def stop @mutex.synchronize do return unless @running @running = false @thread&.kill @thread&.join(5) # Wait up to 5 seconds @thread = nil end Takagi.logger.info 'Observer cleanup stopped' end |