Class: Pgbus::Process::Lifecycle

Inherits:
Object
  • Object
show all
Defined in:
lib/pgbus/process/lifecycle.rb

Overview

Thread-safe worker lifecycle state machine inspired by LavinMQ’s QueueState.

States:

:starting  → initial state, setting up
:running   → actively processing messages
:paused    → temporarily stopped (manual or circuit breaker)
:draining  → finishing in-flight work before stopping
:stopped   → terminal state

Transitions:

starting  running
running   paused | draining | stopped
paused    running | draining | stopped
draining  stopped

Constant Summary collapse

STATES =
%i[starting running paused draining stopped].freeze
TRANSITIONS =
{
  starting: %i[running stopped],
  running: %i[paused draining stopped],
  paused: %i[running draining stopped],
  draining: %i[stopped],
  stopped: []
}.freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeLifecycle

Returns a new instance of Lifecycle.



34
35
36
37
38
# File 'lib/pgbus/process/lifecycle.rb', line 34

def initialize
  @state = :starting
  @mutex = Mutex.new
  @callbacks = Hash.new { |h, k| h[k] = [] }
end

Instance Attribute Details

#stateObject (readonly)

Returns the value of attribute state.



32
33
34
# File 'lib/pgbus/process/lifecycle.rb', line 32

def state
  @state
end

Instance Method Details

#active?Boolean

Returns:

  • (Boolean)


80
81
82
# File 'lib/pgbus/process/lifecycle.rb', line 80

def active?
  running? || paused?
end

#can_process?Boolean

Returns:

  • (Boolean)


84
85
86
# File 'lib/pgbus/process/lifecycle.rb', line 84

def can_process?
  running?
end

#draining?Boolean

Returns:

  • (Boolean)


72
73
74
# File 'lib/pgbus/process/lifecycle.rb', line 72

def draining?
  @state == :draining
end

#on(event, &block) ⇒ Object



56
57
58
# File 'lib/pgbus/process/lifecycle.rb', line 56

def on(event, &block)
  @callbacks[event] << block
end

#paused?Boolean

Returns:

  • (Boolean)


68
69
70
# File 'lib/pgbus/process/lifecycle.rb', line 68

def paused?
  @state == :paused
end

#running?Boolean

Returns:

  • (Boolean)


64
65
66
# File 'lib/pgbus/process/lifecycle.rb', line 64

def running?
  @state == :running
end

#starting?Boolean

Returns:

  • (Boolean)


60
61
62
# File 'lib/pgbus/process/lifecycle.rb', line 60

def starting?
  @state == :starting
end

#stopped?Boolean

Returns:

  • (Boolean)


76
77
78
# File 'lib/pgbus/process/lifecycle.rb', line 76

def stopped?
  @state == :stopped
end

#terminal?Boolean

Returns:

  • (Boolean)


88
89
90
# File 'lib/pgbus/process/lifecycle.rb', line 88

def terminal?
  stopped?
end

#transition_to(new_state) ⇒ Object



50
51
52
53
54
# File 'lib/pgbus/process/lifecycle.rb', line 50

def transition_to(new_state)
  transition_to!(new_state)
rescue InvalidTransition
  false
end

#transition_to!(new_state) ⇒ Object



40
41
42
43
44
45
46
47
48
# File 'lib/pgbus/process/lifecycle.rb', line 40

def transition_to!(new_state)
  @mutex.synchronize do
    validate_transition!(new_state)
    old_state = @state
    @state = new_state
    fire_callbacks(old_state, new_state)
    new_state
  end
end