Class: Kube::Station::WatchChannel

Inherits:
ActionCable::Channel::Base
  • Object
show all
Defined in:
app/channels/kube/station/watch_channel.rb

Instance Method Summary collapse

Instance Method Details

#subscribedObject



8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'app/channels/kube/station/watch_channel.rb', line 8

def subscribed
  Console.info(self, "subscribed called", cluster_id: params[:cluster_id])

  cluster = Cluster.find(params[:cluster_id])
  Console.info(self, "cluster found", name: cluster.name)

  @kubeconfig_file = Tempfile.new(["kubeconfig", ".yaml"])
  @kubeconfig_file.write(cluster.config.data)
  @kubeconfig_file.close

  @proxy_rd, proxy_wr = IO.pipe
  @proxy_pid = Process.spawn(
    "kubectl", "proxy", "--port=0",
    "--kubeconfig=#{@kubeconfig_file.path}",
    out: proxy_wr, err: "/dev/null"
  )
  proxy_wr.close

  banner = @proxy_rd.gets
  @proxy_port = banner[/127\.0\.0\.1:(\d+)/, 1].to_i
  Console.info(self, "proxy ready", port: @proxy_port, pid: @proxy_pid)
  @proxy_rd.close

  resources = params[:resources] || ["/v1/pods"]
  Console.info(self, "resources", resources: resources)

  @kwatch_rd, kwatch_wr = IO.pipe
  kwatch_err_rd, kwatch_err_wr = IO.pipe
  @kwatch_pid = Process.spawn(
    kwatch_binary_path,
    "-proxy", "http://127.0.0.1:#{@proxy_port}",
    *resources,
    out: kwatch_wr, err: kwatch_err_wr
  )
  kwatch_wr.close
  kwatch_err_wr.close
  Console.info(self, "kwatch spawned", pid: @kwatch_pid)

  @internet = Async::HTTP::Internet.new

  Async do
    kwatch_err_rd.each_line do |line|
      Console.info(self, "kwatch stderr", line: line.chomp)
    end
  end

  @reader_task = Async do
    Console.info(self, "reader task started")
    @kwatch_rd.each_line do |line|
      type, gvr, ns, name, rv = line.chomp.split("\t")

      if type == "DELETED"
        transmit({
          "event" => "DELETED",
          "gvr" => gvr,
          "namespace" => ns,
          "name" => name
        })
      else
        obj = fetch_object(gvr, ns, name)
        next unless obj

        transmit({
          "event" => type,
          "gvr" => gvr,
          "node" => trim(obj)
        })
      end
    end
    Console.info(self, "reader task ended")
  end
end

#unsubscribedObject



81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'app/channels/kube/station/watch_channel.rb', line 81

def unsubscribed
  Console.info(self, "unsubscribed called")
  @reader_task&.stop
  @internet&.close

  if @kwatch_pid
    Process.kill("TERM", @kwatch_pid) rescue nil
    Process.wait(@kwatch_pid) rescue nil
  end

  if @proxy_pid
    Process.kill("TERM", @proxy_pid) rescue nil
    Process.wait(@proxy_pid) rescue nil
  end

  @kwatch_rd&.close rescue nil
  @kubeconfig_file&.unlink rescue nil
end