Class: Karafka::Swarm::Node

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/swarm/node.rb

Overview

Note:

Some of this APIs are for parent process only

Note:

Keep in mind this can be used in both forks and supervisor and has a slightly different role in each. In case of the supervisor it is used to get information about the child and make certain requests to it. In case of child, it is used to provide zombie-fencing and report liveness

Represents a single forked process node in a swarm Provides simple API to control forks and check their status

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(id, parent_pid) ⇒ Node

Returns a new instance of Node.

Parameters:

  • id (Integer)

    number of the fork. Used for uniqueness setup for group client ids and other stuff where we need to know a unique reference of the fork in regards to the rest of them.

  • parent_pid (Integer)

    parent pid for zombie fencing



34
35
36
37
# File 'lib/karafka/swarm/node.rb', line 34

def initialize(id, parent_pid)
  @id = id
  @parent_pidfd = Pidfd.new(parent_pid)
end

Instance Attribute Details

#idInteger (readonly)

Returns id of the node. Useful for client.group.id assignment.

Returns:

  • (Integer)

    id of the node. Useful for client.group.id assignment



25
26
27
# File 'lib/karafka/swarm/node.rb', line 25

def id
  @id
end

#pidInteger (readonly)

Returns pid of the node.

Returns:

  • (Integer)

    pid of the node



28
29
30
# File 'lib/karafka/swarm/node.rb', line 28

def pid
  @pid
end

Instance Method Details

#alive?Boolean

Note:

Parent API

Note:

Keep in mind that the fact that process is alive does not mean it is healthy

Returns true if node is alive or false if died.

Returns:

  • (Boolean)

    true if node is alive or false if died



116
117
118
# File 'lib/karafka/swarm/node.rb', line 116

def alive?
  @pidfd.alive?
end

#cleanupObject

Removes the dead process from the processes table



151
152
153
# File 'lib/karafka/swarm/node.rb', line 151

def cleanup
  @pidfd.cleanup
end

#healthyObject

Note:

Child API

Indicates that this node is doing well



83
84
85
# File 'lib/karafka/swarm/node.rb', line 83

def healthy
  write('0')
end

#orphaned?Boolean

Note:

Child API

Returns true if node is orphaned or false otherwise. Used for orphans detection.

Returns:

  • (Boolean)

    true if node is orphaned or false otherwise. Used for orphans detection.



122
123
124
# File 'lib/karafka/swarm/node.rb', line 122

def orphaned?
  !@parent_pidfd.alive?
end

#quietObject

Note:

Parent API

Sends sigtstp to the node



134
135
136
# File 'lib/karafka/swarm/node.rb', line 134

def quiet
  signal('TSTP')
end

#signal(signal) ⇒ Object

Sends provided signal to the node

Parameters:

  • signal (String)


146
147
148
# File 'lib/karafka/swarm/node.rb', line 146

def signal(signal)
  @pidfd.signal(signal)
end

#startObject

Note:

Parent API

Starts a new fork and:

- stores pid and parent reference
- makes sure reader pipe is closed
- sets up liveness listener
- recreates producer and web producer


45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/karafka/swarm/node.rb', line 45

def start
  @reader, @writer = IO.pipe

  # :nocov:
  @pid = ::Process.fork do
    # Close the old producer so it is not a subject to GC
    # While it was not opened in the parent, without explicit closing, there still could be
    # an attempt to close it when finalized, meaning it would be kept in memory.
    config.producer.close

    # Supervisor producer is closed, hence we need a new one here
    config.producer = ::WaterDrop::Producer.new do |p_config|
      p_config.kafka = Setup::AttributesMap.producer(kafka.dup)
      p_config.logger = config.logger
    end

    @pid = ::Process.pid
    @reader.close

    # Indicate we are alive right after start
    healthy

    swarm.node = self
    monitor.subscribe(liveness_listener)
    monitor.instrument('swarm.node.after_fork', caller: self)

    Server.run

    @writer.close
  end
  # :nocov:

  @writer.close
  @pidfd = Pidfd.new(@pid)
end

#statusInteger

Note:

Parent API

Note:

If there were few issues reported, it will pick the one with highest number

Returns This returns following status code depending on the data:

  • -1 if node did not report anything new

  • 0 if all good,

  • positive number if there was a problem (indicates error code).

Returns:

  • (Integer)

    This returns following status code depending on the data:

    • -1 if node did not report anything new

    • 0 if all good,

    • positive number if there was a problem (indicates error code)



104
105
106
107
108
109
110
111
# File 'lib/karafka/swarm/node.rb', line 104

def status
  result = read

  return -1 if result.nil?
  return -1 if result == false

  result.split("\n").map(&:to_i).max
end

#stopObject

Note:

Parent API

Sends sigterm to the node



128
129
130
# File 'lib/karafka/swarm/node.rb', line 128

def stop
  signal('TERM')
end

#terminateObject

Note:

Parent API

Terminates node



140
141
142
# File 'lib/karafka/swarm/node.rb', line 140

def terminate
  signal('KILL')
end

#unhealthy(reason_code = '1') ⇒ Object

Note:

Child API

Note:

We convert this to string to normalize the API

Indicates, that this node has failed

Parameters:

  • reason_code (Integer, String) (defaults to: '1')

    numeric code we want to use to indicate that we are not healthy. Anything bigger than 0 will be considered not healthy. Useful it we want to have complex health-checking with reporting.



93
94
95
# File 'lib/karafka/swarm/node.rb', line 93

def unhealthy(reason_code = '1')
  write(reason_code.to_s)
end