Class: Takagi::EventBus::ObserverCleanup

Inherits:
Object
  • Object
show all
Defined in:
lib/takagi/event_bus/observer_cleanup.rb

Overview

Background thread for stale observer cleanup

Periodically cleans up stale observers from the ObserveRegistry. Observers are considered stale if they haven’t received notifications for longer than max_age seconds.

Examples:

cleanup = ObserverCleanup.new(interval: 60, max_age: 600)
cleanup.start
# ... cleanup runs in background ...
cleanup.stop

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(interval: 60, max_age: 600) ⇒ ObserverCleanup

Initialize observer cleanup

Parameters:

  • interval (Integer) (defaults to: 60)

    Cleanup interval in seconds (default: 60)

  • max_age (Integer) (defaults to: 600)

    Max observer age in seconds (default: 600)



22
23
24
25
26
27
28
29
# File 'lib/takagi/event_bus/observer_cleanup.rb', line 22

def initialize(interval: 60, max_age: 600)
  @interval = interval
  @max_age = max_age
  @running = false
  @thread = nil
  @mutex = Mutex.new
  @stats = { runs: 0, cleaned: 0, errors: 0 }
end

Instance Attribute Details

#intervalObject (readonly)

Returns the value of attribute interval.



17
18
19
# File 'lib/takagi/event_bus/observer_cleanup.rb', line 17

def interval
  @interval
end

#max_ageObject (readonly)

Returns the value of attribute max_age.



17
18
19
# File 'lib/takagi/event_bus/observer_cleanup.rb', line 17

def max_age
  @max_age
end

Instance Method Details

#cleanup_nowObject

Force a cleanup run (for testing)



71
72
73
# File 'lib/takagi/event_bus/observer_cleanup.rb', line 71

def cleanup_now
  cleanup_stale_observers
end

#running?Boolean

Check if cleanup is running

Returns:

  • (Boolean)


60
61
62
# File 'lib/takagi/event_bus/observer_cleanup.rb', line 60

def running?
  @mutex.synchronize { @running }
end

#startObject

Start the cleanup thread



32
33
34
35
36
37
38
39
40
41
42
# File 'lib/takagi/event_bus/observer_cleanup.rb', line 32

def start
  @mutex.synchronize do
    return if @running

    @running = true
    @thread = Thread.new { run_cleanup_loop }
    @thread.name = 'ObserverCleanup'
  end

  Takagi.logger.info "Observer cleanup started (interval: #{@interval}s, max_age: #{@max_age}s)"
end

#statsHash

Get cleanup statistics

Returns:

  • (Hash)

    Statistics



66
67
68
# File 'lib/takagi/event_bus/observer_cleanup.rb', line 66

def stats
  @mutex.synchronize { @stats.dup }
end

#stopObject

Stop the cleanup thread



45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/takagi/event_bus/observer_cleanup.rb', line 45

def stop
  @mutex.synchronize do
    return unless @running

    @running = false
    @thread&.kill
    @thread&.join(5) # Wait up to 5 seconds
    @thread = nil
  end

  Takagi.logger.info 'Observer cleanup stopped'
end