Class: Kubernetes::Watch
- Inherits:
-
Object
- Object
- Kubernetes::Watch
- Defined in:
- lib/kubernetes/watch.rb
Overview
The Watch class provides the ability to watch a specific resource for updates.
Constant Summary collapse
- MAX_RECONNECT_ATTEMPTS =
3- RECONNECT_BACKOFF_SECONDS =
1.0
Instance Method Summary collapse
- #connect(path, resource_version = nil, &_block) ⇒ Object
-
#initialize(client) ⇒ Watch
constructor
A new instance of Watch.
- #make_url(path, resource_version) ⇒ Object
- #split_lines(last, chunk) ⇒ Object
Constructor Details
#initialize(client) ⇒ Watch
Returns a new instance of Watch.
26 27 28 |
# File 'lib/kubernetes/watch.rb', line 26 def initialize(client) @client = client end |
Instance Method Details
#connect(path, resource_version = nil, &_block) ⇒ Object
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/kubernetes/watch.rb', line 39 def connect(path, resource_version = nil, &_block) current_resource_version = resource_version reconnect_attempts = 0 loop do reconnect_requested = false opts = { auth_names: ['BearerToken'] } url = make_url(path, current_resource_version) request = @client.build_request('GET', url, opts) last = '' process_event = lambda do |part| return if part.nil? || part.strip.empty? event = parse_event(part) return unless event if watch_reset_event?(event) current_resource_version = nil reconnect_requested = true next end current_resource_version = extract_resource_version(event) || current_resource_version yield event end request.on_body do |chunk| last, pieces = split_lines(last, chunk) pieces.each { |part| process_event.call(part) } end request.on_complete do |_response| process_event.call(last) last = '' end response = request.run reconnect_reason = reconnect_reason(response, reconnect_requested) return response unless reconnect_reason raise_terminal_watch_error(response, reconnect_reason) if reconnect_attempts >= MAX_RECONNECT_ATTEMPTS reconnect_attempts += 1 sleep reconnect_backoff_seconds(reconnect_attempts - 1) if reconnect_reason == :transport end end |
#make_url(path, resource_version) ⇒ Object
30 31 32 33 34 35 36 37 |
# File 'lib/kubernetes/watch.rb', line 30 def make_url(path, resource_version) uri = URI.parse(path) query = URI.decode_www_form(uri.query || '').to_h query['watch'] = 'true' query['resourceVersion'] = resource_version if resource_version query_string = query.map { |k, v| "#{URI.encode_www_form_component(k).gsub('+', '%20')}=#{URI.encode_www_form_component(v).gsub('+', '%20')}" }.join('&') "#{uri.path}?#{query_string}" end |
#split_lines(last, chunk) ⇒ Object
86 87 88 89 90 91 92 93 94 95 96 |
# File 'lib/kubernetes/watch.rb', line 86 def split_lines(last, chunk) data = chunk data = last + '' + data ix = data.rindex("\n") return [data, []] unless ix complete = data[0..ix] last = data[(ix + 1)..data.length] [last, complete.split(/\n/)] end |