Class: Karafka::Swarm::Node
- Inherits:
 - 
      Object
      
        
- Object
 - Karafka::Swarm::Node
 
 
- Defined in:
 - lib/karafka/swarm/node.rb
 
Overview
Some of this APIs are for parent process only
Keep in mind this can be used in both forks and supervisor and has a slightly different role in each. In case of the supervisor it is used to get information about the child and make certain requests to it. In case of child, it is used to provide zombie-fencing and report liveness
Represents a single forked process node in a swarm Provides simple API to control forks and check their status
Instance Attribute Summary collapse
- 
  
    
      #id  ⇒ Integer 
    
    
  
  
  
  
    
      readonly
    
    
  
  
  
  
  
  
    
Id of the node.
 - 
  
    
      #pid  ⇒ Integer 
    
    
  
  
  
  
    
      readonly
    
    
  
  
  
  
  
  
    
Pid of the node.
 
Instance Method Summary collapse
- 
  
    
      #alive?  ⇒ Boolean 
    
    
  
  
  
  
  
  
  
  
  
    
True if node is alive or false if died.
 - 
  
    
      #cleanup  ⇒ Object 
    
    
  
  
  
  
  
  
  
  
  
    
Removes the dead process from the processes table.
 - 
  
    
      #healthy  ⇒ Object 
    
    
  
  
  
  
  
  
  
  
  
    
Indicates that this node is doing well.
 - 
  
    
      #initialize(id, parent_pid)  ⇒ Node 
    
    
  
  
  
    constructor
  
  
  
  
  
  
  
    
A new instance of Node.
 - 
  
    
      #orphaned?  ⇒ Boolean 
    
    
  
  
  
  
  
  
  
  
  
    
True if node is orphaned or false otherwise.
 - 
  
    
      #quiet  ⇒ Object 
    
    
  
  
  
  
  
  
  
  
  
    
Sends sigtstp to the node.
 - 
  
    
      #signal(signal)  ⇒ Object 
    
    
  
  
  
  
  
  
  
  
  
    
Sends provided signal to the node.
 - 
  
    
      #start  ⇒ Object 
    
    
  
  
  
  
  
  
  
  
  
    
Starts a new fork and: - stores pid and parent reference - makes sure reader pipe is closed - sets up liveness listener - recreates producer and web producer.
 - 
  
    
      #status  ⇒ Integer 
    
    
  
  
  
  
  
  
  
  
  
    
This returns following status code depending on the data: - -1 if node did not report anything new - 0 if all good, - positive number if there was a problem (indicates error code).
 - 
  
    
      #stop  ⇒ Object 
    
    
  
  
  
  
  
  
  
  
  
    
Sends sigterm to the node.
 - 
  
    
      #terminate  ⇒ Object 
    
    
  
  
  
  
  
  
  
  
  
    
Terminates node.
 - 
  
    
      #unhealthy(reason_code = '1')  ⇒ Object 
    
    
  
  
  
  
  
  
  
  
  
    
Indicates, that this node has failed.
 
Constructor Details
Instance Attribute Details
#id ⇒ Integer (readonly)
Returns id of the node. Useful for client.group.id assignment.
      25 26 27  | 
    
      # File 'lib/karafka/swarm/node.rb', line 25 def id @id end  | 
  
#pid ⇒ Integer (readonly)
Returns pid of the node.
      28 29 30  | 
    
      # File 'lib/karafka/swarm/node.rb', line 28 def pid @pid end  | 
  
Instance Method Details
#alive? ⇒ Boolean
Parent API
Keep in mind that the fact that process is alive does not mean it is healthy
Returns true if node is alive or false if died.
      118 119 120  | 
    
      # File 'lib/karafka/swarm/node.rb', line 118 def alive? @pidfd.alive? end  | 
  
#cleanup ⇒ Object
Removes the dead process from the processes table
      153 154 155  | 
    
      # File 'lib/karafka/swarm/node.rb', line 153 def cleanup @pidfd.cleanup end  | 
  
#healthy ⇒ Object
Child API
Indicates that this node is doing well
      85 86 87  | 
    
      # File 'lib/karafka/swarm/node.rb', line 85 def healthy write('0') end  | 
  
#orphaned? ⇒ Boolean
Child API
Returns true if node is orphaned or false otherwise. Used for orphans detection.
      124 125 126  | 
    
      # File 'lib/karafka/swarm/node.rb', line 124 def orphaned? !@parent_pidfd.alive? end  | 
  
#quiet ⇒ Object
Parent API
Sends sigtstp to the node
      136 137 138  | 
    
      # File 'lib/karafka/swarm/node.rb', line 136 def quiet signal('TSTP') end  | 
  
#signal(signal) ⇒ Object
Sends provided signal to the node
      148 149 150  | 
    
      # File 'lib/karafka/swarm/node.rb', line 148 def signal(signal) @pidfd.signal(signal) end  | 
  
#start ⇒ Object
Parent API
Starts a new fork and:
- stores pid and parent reference
- makes sure reader pipe is closed
- sets up liveness listener
- recreates producer and web producer
  
      45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81  | 
    
      # File 'lib/karafka/swarm/node.rb', line 45 def start @reader, @writer = IO.pipe # :nocov: @pid = ::Process.fork do # Close the old producer so it is not a subject to GC # While it was not opened in the parent, without explicit closing, there still could be # an attempt to close it when finalized, meaning it would be kept in memory. config.producer.close # Supervisor producer is closed, hence we need a new one here config.producer = ::WaterDrop::Producer.new do |p_config| p_config.kafka = Setup::AttributesMap.producer(kafka.dup) p_config.logger = config.logger end @pid = ::Process.pid @reader.close # Indicate we are alive right after start healthy swarm.node = self monitor.subscribe(liveness_listener) monitor.instrument('swarm.node.after_fork', caller: self) Karafka::Process..add(:execution_mode, 'mode:swarm') Server.execution_mode = :swarm Server.run @writer.close end # :nocov: @writer.close @pidfd = Pidfd.new(@pid) end  | 
  
#status ⇒ Integer
Parent API
If there were few issues reported, it will pick the one with highest number
Returns This returns following status code depending on the data:
- 
-1 if node did not report anything new
 - 
0 if all good,
 - 
positive number if there was a problem (indicates error code).
 
      106 107 108 109 110 111 112 113  | 
    
      # File 'lib/karafka/swarm/node.rb', line 106 def status result = read return -1 if result.nil? return -1 if result == false result.split("\n").map(&:to_i).max end  | 
  
#stop ⇒ Object
Parent API
Sends sigterm to the node
      130 131 132  | 
    
      # File 'lib/karafka/swarm/node.rb', line 130 def stop signal('TERM') end  | 
  
#terminate ⇒ Object
Parent API
Terminates node
      142 143 144  | 
    
      # File 'lib/karafka/swarm/node.rb', line 142 def terminate signal('KILL') end  | 
  
#unhealthy(reason_code = '1') ⇒ Object
Child API
We convert this to string to normalize the API
Indicates, that this node has failed
      95 96 97  | 
    
      # File 'lib/karafka/swarm/node.rb', line 95 def unhealthy(reason_code = '1') write(reason_code.to_s) end  |