8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
|
# File 'app/channels/kube/station/watch_channel.rb', line 8
def subscribed
Console.info(self, "subscribed called", cluster_id: params[:cluster_id])
cluster = Cluster.find(params[:cluster_id])
Console.info(self, "cluster found", name: cluster.name)
@kubeconfig_file = Tempfile.new(["kubeconfig", ".yaml"])
@kubeconfig_file.write(cluster.config.data)
@kubeconfig_file.close
@proxy_rd, proxy_wr = IO.pipe
@proxy_pid = Process.spawn(
"kubectl", "proxy", "--port=0",
"--kubeconfig=#{@kubeconfig_file.path}",
out: proxy_wr, err: "/dev/null"
)
proxy_wr.close
banner = @proxy_rd.gets
@proxy_port = banner[/127\.0\.0\.1:(\d+)/, 1].to_i
Console.info(self, "proxy ready", port: @proxy_port, pid: @proxy_pid)
@proxy_rd.close
resources = params[:resources] || ["/v1/pods"]
Console.info(self, "resources", resources: resources)
@kwatch_rd, kwatch_wr = IO.pipe
kwatch_err_rd, kwatch_err_wr = IO.pipe
@kwatch_pid = Process.spawn(
kwatch_binary_path,
"-proxy", "http://127.0.0.1:#{@proxy_port}",
*resources,
out: kwatch_wr, err: kwatch_err_wr
)
kwatch_wr.close
kwatch_err_wr.close
Console.info(self, "kwatch spawned", pid: @kwatch_pid)
@internet = Async::HTTP::Internet.new
Async do
kwatch_err_rd.each_line do |line|
Console.info(self, "kwatch stderr", line: line.chomp)
end
end
@reader_task = Async do
Console.info(self, "reader task started")
@kwatch_rd.each_line do |line|
type, gvr, ns, name, rv = line.chomp.split("\t")
if type == "DELETED"
transmit({
"event" => "DELETED",
"gvr" => gvr,
"namespace" => ns,
"name" => name
})
else
obj = fetch_object(gvr, ns, name)
next unless obj
transmit({
"event" => type,
"gvr" => gvr,
"node" => trim(obj)
})
end
end
Console.info(self, "reader task ended")
end
end
|