Class: Karafka::Swarm::Node

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/swarm/node.rb

Overview

Note:

Some of this APIs are for parent process only

Note:

Keep in mind this can be used in both forks and supervisor and has a slightly different role in each. In case of the supervisor it is used to get information about the child and make certain requests to it. In case of child, it is used to provide zombie-fencing and report liveness

Represents a single forked process node in a swarm Provides simple API to control forks and check their status

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(id, parent_pid) ⇒ Node

Returns a new instance of Node.

Parameters:

  • id (Integer)

    number of the fork. Used for uniqueness setup for group client ids and other stuff where we need to know a unique reference of the fork in regards to the rest of them.

  • parent_pid (Integer)

    parent pid for zombie fencing



34
35
36
37
38
39
# File 'lib/karafka/swarm/node.rb', line 34

def initialize(id, parent_pid)
  @id = id
  @parent_pid = parent_pid
  @mutex = Mutex.new
  @alive = nil
end

Instance Attribute Details

#idInteger (readonly)

Returns id of the node. Useful for client.group.id assignment.

Returns:

  • (Integer)

    id of the node. Useful for client.group.id assignment



25
26
27
# File 'lib/karafka/swarm/node.rb', line 25

def id
  @id
end

#pidInteger (readonly)

Returns pid of the node.

Returns:

  • (Integer)

    pid of the node



28
29
30
# File 'lib/karafka/swarm/node.rb', line 28

def pid
  @pid
end

Instance Method Details

#alive?Boolean

Note:

Parent API

Note:

Keep in mind that the fact that process is alive does not mean it is healthy

Returns true if node is alive or false if died.

Returns:

  • (Boolean)

    true if node is alive or false if died



124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
# File 'lib/karafka/swarm/node.rb', line 124

def alive?
  # Don't try to waitpid on ourselves - just check if process exists
  return true if @pid == ::Process.pid

  @mutex.synchronize do
    # Return cached result if we've already determined the process is dead
    return false if @alive == false

    begin
      # Try to reap the process without blocking. If it returns the pid,
      # the process has exited (zombie). If it returns nil, still running.
      result = ::Process.waitpid(@pid, ::Process::WNOHANG)

      if result
        # Process has exited and we've reaped it
        @alive = false
        false
      else
        # Process is still running
        true
      end
    rescue Errno::ECHILD
      # Process doesn't exist or already reaped
      @alive = false
      false
    rescue Errno::ESRCH
      # Process doesn't exist
      @alive = false
      false
    end
  end
end

#cleanupBoolean

Removes the dead process from the processes table

Returns:

  • (Boolean)

    true if process was reaped, false if still running or already reaped



194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
# File 'lib/karafka/swarm/node.rb', line 194

def cleanup
  @mutex.synchronize do
    # If we've already marked it as dead (reaped in alive?), nothing to do
    return false if @alive == false

    begin
      # WNOHANG means don't block if process hasn't exited yet
      result = ::Process.waitpid(@pid, ::Process::WNOHANG)

      if result
        # Process exited and was reaped
        @alive = false
        true
      else
        # Process is still running
        false
      end
    rescue Errno::ECHILD
      # Process already reaped or doesn't exist, which is fine
      @alive = false
      false
    end
  end
end

#healthyObject

Note:

Child API

Indicates that this node is doing well



91
92
93
# File 'lib/karafka/swarm/node.rb', line 91

def healthy
  write("0")
end

#orphaned?Boolean

Note:

Child API

Returns true if node is orphaned or false otherwise. Used for orphans detection.

Returns:

  • (Boolean)

    true if node is orphaned or false otherwise. Used for orphans detection.



159
160
161
# File 'lib/karafka/swarm/node.rb', line 159

def orphaned?
  ::Process.ppid != @parent_pid
end

#quietObject

Note:

Parent API

Sends sigtstp to the node



171
172
173
# File 'lib/karafka/swarm/node.rb', line 171

def quiet
  signal("TSTP")
end

#signal(signal) ⇒ Boolean

Sends provided signal to the node

Parameters:

  • signal (String)

Returns:

  • (Boolean)

    true if signal was sent, false if process doesn’t exist



184
185
186
187
188
189
190
# File 'lib/karafka/swarm/node.rb', line 184

def signal(signal)
  ::Process.kill(signal, @pid)
  true
rescue Errno::ESRCH
  # Process doesn't exist
  false
end

#startObject

Note:

Parent API

Starts a new fork and:

- stores pid and parent reference
- makes sure reader pipe is closed
- sets up liveness listener
- recreates producer and web producer


47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/karafka/swarm/node.rb', line 47

def start
  @reader, @writer = IO.pipe
  # Reset alive status when starting/restarting a node
  # nil means unknown status - will check with waitpid
  @mutex.synchronize { @alive = nil }

  # :nocov:
  @pid = ::Process.fork do
    # Close the old producer so it is not a subject to GC
    # While it was not opened in the parent, without explicit closing, there still could be
    # an attempt to close it when finalized, meaning it would be kept in memory.
    config.producer.close

    old_producer = config.producer
    config.producer = ProducerReplacer.new.call(old_producer, kafka, config.logger)

    @pid = ::Process.pid
    @reader.close

    # Certain features need to be reconfigured / reinitialized after fork in Pro
    Pro::Loader.post_fork(config, old_producer) if Karafka.pro?

    # Indicate we are alive right after start
    healthy

    swarm.node = self
    monitor.subscribe(liveness_listener)
    monitor.instrument("swarm.node.after_fork", caller: self)

    Karafka::Process.tags.add(:execution_mode, "mode:swarm")
    Karafka::Process.tags.add(:swarm_nodeid, "node:#{@id}")

    Server.execution_mode.swarm!
    Server.run

    @writer.close
  end
  # :nocov:

  @writer.close
end

#statusInteger

Note:

Parent API

Note:

If there were few issues reported, it will pick the one with highest number

Returns This returns following status code depending on the data:

  • -1 if node did not report anything new

  • 0 if all good,

  • positive number if there was a problem (indicates error code).

Returns:

  • (Integer)

    This returns following status code depending on the data:

    • -1 if node did not report anything new

    • 0 if all good,

    • positive number if there was a problem (indicates error code)



112
113
114
115
116
117
118
119
# File 'lib/karafka/swarm/node.rb', line 112

def status
  result = read

  return -1 if result.nil?
  return -1 if result == false

  result.split("\n").map(&:to_i).max
end

#stopObject

Note:

Parent API

Sends sigterm to the node



165
166
167
# File 'lib/karafka/swarm/node.rb', line 165

def stop
  signal("TERM")
end

#terminateObject

Note:

Parent API

Terminates node



177
178
179
# File 'lib/karafka/swarm/node.rb', line 177

def terminate
  signal("KILL")
end

#unhealthy(reason_code = "1") ⇒ Object

Note:

Child API

Note:

We convert this to string to normalize the API

Indicates, that this node has failed

Parameters:

  • reason_code (Integer, String) (defaults to: "1")

    numeric code we want to use to indicate that we are not healthy. Anything bigger than 0 will be considered not healthy. Useful it we want to have complex health-checking with reporting.



101
102
103
# File 'lib/karafka/swarm/node.rb', line 101

def unhealthy(reason_code = "1")
  write(reason_code.to_s)
end