Class: Pgbus::Process::Lifecycle
- Inherits:
-
Object
- Object
- Pgbus::Process::Lifecycle
- Defined in:
- lib/pgbus/process/lifecycle.rb
Overview
Thread-safe worker lifecycle state machine inspired by LavinMQ’s QueueState.
States:
:starting → initial state, setting up
:running → actively processing messages
:paused → temporarily stopped (manual or circuit breaker)
:draining → finishing in-flight work before stopping
:stopped → terminal state
Transitions:
starting → running
running → paused | draining | stopped
paused → running | draining | stopped
draining → stopped
Constant Summary collapse
- STATES =
%i[starting running paused draining stopped].freeze
- TRANSITIONS =
{ starting: %i[running stopped], running: %i[paused draining stopped], paused: %i[running draining stopped], draining: %i[stopped], stopped: [] }.freeze
Instance Attribute Summary collapse
-
#state ⇒ Object
readonly
Returns the value of attribute state.
Instance Method Summary collapse
- #active? ⇒ Boolean
- #can_process? ⇒ Boolean
- #draining? ⇒ Boolean
-
#initialize ⇒ Lifecycle
constructor
A new instance of Lifecycle.
- #on(event, &block) ⇒ Object
- #paused? ⇒ Boolean
- #running? ⇒ Boolean
- #starting? ⇒ Boolean
- #stopped? ⇒ Boolean
- #terminal? ⇒ Boolean
- #transition_to(new_state) ⇒ Object
- #transition_to!(new_state) ⇒ Object
Constructor Details
#initialize ⇒ Lifecycle
Returns a new instance of Lifecycle.
34 35 36 37 38 |
# File 'lib/pgbus/process/lifecycle.rb', line 34 def initialize @state = :starting @mutex = Mutex.new @callbacks = Hash.new { |h, k| h[k] = [] } end |
Instance Attribute Details
#state ⇒ Object (readonly)
Returns the value of attribute state.
32 33 34 |
# File 'lib/pgbus/process/lifecycle.rb', line 32 def state @state end |
Instance Method Details
#active? ⇒ Boolean
80 81 82 |
# File 'lib/pgbus/process/lifecycle.rb', line 80 def active? running? || paused? end |
#can_process? ⇒ Boolean
84 85 86 |
# File 'lib/pgbus/process/lifecycle.rb', line 84 def can_process? running? end |
#draining? ⇒ Boolean
72 73 74 |
# File 'lib/pgbus/process/lifecycle.rb', line 72 def draining? @state == :draining end |
#on(event, &block) ⇒ Object
56 57 58 |
# File 'lib/pgbus/process/lifecycle.rb', line 56 def on(event, &block) @callbacks[event] << block end |
#paused? ⇒ Boolean
68 69 70 |
# File 'lib/pgbus/process/lifecycle.rb', line 68 def paused? @state == :paused end |
#running? ⇒ Boolean
64 65 66 |
# File 'lib/pgbus/process/lifecycle.rb', line 64 def running? @state == :running end |
#starting? ⇒ Boolean
60 61 62 |
# File 'lib/pgbus/process/lifecycle.rb', line 60 def starting? @state == :starting end |
#stopped? ⇒ Boolean
76 77 78 |
# File 'lib/pgbus/process/lifecycle.rb', line 76 def stopped? @state == :stopped end |
#terminal? ⇒ Boolean
88 89 90 |
# File 'lib/pgbus/process/lifecycle.rb', line 88 def terminal? stopped? end |
#transition_to(new_state) ⇒ Object
50 51 52 53 54 |
# File 'lib/pgbus/process/lifecycle.rb', line 50 def transition_to(new_state) transition_to!(new_state) rescue InvalidTransition false end |
#transition_to!(new_state) ⇒ Object
40 41 42 43 44 45 46 47 48 |
# File 'lib/pgbus/process/lifecycle.rb', line 40 def transition_to!(new_state) @mutex.synchronize do validate_transition!(new_state) old_state = @state @state = new_state fire_callbacks(old_state, new_state) new_state end end |