Class: Kubernetes::Watch

Inherits:
Object
  • Object
show all
Defined in:
lib/kubernetes/watch.rb

Overview

The Watch class provides the ability to watch a specific resource for updates.

Constant Summary collapse

MAX_RECONNECT_ATTEMPTS =
3
RECONNECT_BACKOFF_SECONDS =
1.0

Instance Method Summary collapse

Constructor Details

#initialize(client) ⇒ Watch

Returns a new instance of Watch.



26
27
28
# File 'lib/kubernetes/watch.rb', line 26

def initialize(client)
  @client = client
end

Instance Method Details

#connect(path, resource_version = nil, &_block) ⇒ Object



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/kubernetes/watch.rb', line 39

def connect(path, resource_version = nil, &_block)
  current_resource_version = resource_version
  reconnect_attempts = 0

  loop do
    reconnect_requested = false
    opts = { auth_names: ['BearerToken'] }
    url = make_url(path, current_resource_version)
    request = @client.build_request('GET', url, opts)
    last = ''

    process_event = lambda do |part|
      return if part.nil? || part.strip.empty?

      event = parse_event(part)
      return unless event

      if watch_reset_event?(event)
        current_resource_version = nil
        reconnect_requested = true
        next
      end

      current_resource_version = extract_resource_version(event) || current_resource_version
      yield event
    end

    request.on_body do |chunk|
      last, pieces = split_lines(last, chunk)
      pieces.each { |part| process_event.call(part) }
    end

    request.on_complete do |_response|
      process_event.call(last)
      last = ''
    end

    response = request.run
    reconnect_reason = reconnect_reason(response, reconnect_requested)
    return response unless reconnect_reason
    raise_terminal_watch_error(response, reconnect_reason) if reconnect_attempts >= MAX_RECONNECT_ATTEMPTS

    reconnect_attempts += 1
    sleep reconnect_backoff_seconds(reconnect_attempts - 1) if reconnect_reason == :transport
  end
end

#make_url(path, resource_version) ⇒ Object



30
31
32
33
34
35
36
37
# File 'lib/kubernetes/watch.rb', line 30

def make_url(path, resource_version)
  uri = URI.parse(path)
  query = URI.decode_www_form(uri.query || '').to_h
  query['watch'] = 'true'
  query['resourceVersion'] = resource_version if resource_version
  query_string = query.map { |k, v| "#{URI.encode_www_form_component(k).gsub('+', '%20')}=#{URI.encode_www_form_component(v).gsub('+', '%20')}" }.join('&')
  "#{uri.path}?#{query_string}"
end

#split_lines(last, chunk) ⇒ Object



86
87
88
89
90
91
92
93
94
95
96
# File 'lib/kubernetes/watch.rb', line 86

def split_lines(last, chunk)
  data = chunk
  data = last + '' + data

  ix = data.rindex("\n")
  return [data, []] unless ix

  complete = data[0..ix]
  last = data[(ix + 1)..data.length]
  [last, complete.split(/\n/)]
end