Class: Presto::Metrics::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/presto/metrics/client.rb

Constant Summary collapse

@@MBEAN_ALIAS =
{
  'memory' => 'java.lang:type=Memory',
  'gc_cms' => 'java.lang:type=GarbageCollector,name=ConcurrentMarkSweep',
  'gc_parnew' => 'java.lang:type=GarbageCollector,name=ParNew',
  'os' => 'java.lang:type=OperatingSystem',
  'query_manager' => 'presto.execution:name=QueryManager',
  'query_execution' => 'presto.execution:name=QueryExecution',
  'split_scheduler_stats' => 'presto.execution.scheduler:name=SplitSchedulerStats',
  'task_executor' => 'presto.execution.executor:name=TaskExecutor',
  'task_manager' => 'presto.execution:name=TaskManager',
  'memory_pool_general' => 'presto.memory:type=MemoryPool,name=general',
  'memory_pool_reserved' => 'presto.memory:type=MemoryPool,name=reserved',
  'cluster_memory_manager' => 'presto.memory:name=ClusterMemoryManager',
  'cluster_memory_pool_general' => 'presto.memory:type=ClusterMemoryPool,name=general',
  'cluster_memory_pool_reserved' => 'presto.memory:type=ClusterMemoryPool,name=reserved',
  'discovery_node_manager' => 'presto.metadata:name=DiscoveryNodeManager',
  'pause_meter' => 'io.airlift.stats:name=PauseMeter',
}

Instance Method Summary collapse

Constructor Details

#initialize(opts = {}) ⇒ Client

Returns a new instance of Client.



4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# File 'lib/presto/metrics/client.rb', line 4

def initialize(opts = {})
  require 'httparty'
  require 'json'
  require 'set'
  require 'time'
  require 'uri'

  @host = opts[:host] || 'localhost'
  @port = opts[:port] || '8080'
  @endpoint = opts[:endpoint] || "http://#{@host}:#{@port}"
  @mbean_path = opts[:mbean_path] || '/v1/jmx/mbean'
  @query_path = opts[:query_path] || '/v1/query'
  @node_path = opts[:node_path] || '/v1/node'
  @caml_case = opts[:caml_case] || false
  @headers = opts[:headers] || {}
end

Instance Method Details

#cluster_memory_manager_metrics(target_attr = []) ⇒ Object



231
232
233
# File 'lib/presto/metrics/client.rb', line 231

def cluster_memory_manager_metrics(target_attr = [])
  get_metrics("presto.memory:name=ClusterMemoryManager", target_attr)
end

#cluster_memory_pool_metrics(target_attr = []) ⇒ Object



225
226
227
228
229
# File 'lib/presto/metrics/client.rb', line 225

def cluster_memory_pool_metrics(target_attr = [])
  ['general', 'reserved'].each_with_object({}) { |type, hash|
    hash[type] = get_metrics("presto.memory:type=ClusterMemoryPool,name=#{type}", target_attr)
  }
end

#extract_path(json_obj, path, depth) ⇒ Object



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/presto/metrics/client.rb', line 56

def extract_path(json_obj, path, depth)
  return nil if json_obj.nil?
  if depth >= path.length
    json_obj
  else
    if json_obj.kind_of?(Array)
      # Handle key, value pairs of GC information
      value = json_obj.find { |e|
        e.is_a?(Hash) && e['key'] == path[depth]
      }
      extract_path(value['value'], path, depth + 1)
    else
      extract_path(json_obj[path[depth]], path, depth + 1)
    end
  end
end

#extract_value(v) ⇒ Object



149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
# File 'lib/presto/metrics/client.rb', line 149

def extract_value(v)
  if v.is_a?(Hash)
    if v.has_key?('key') && v.has_key?('value')
      {sanitize_key(v['key']) => extract_value(v['value'])}
    else
      v.each_with_object({}) { |(k1, v1), h2|
        h2[sanitize_key(k1)] = extract_value(v1)
      }
    end
  elsif v.is_a?(Array) && v.all? { |e| e.is_a?(Hash) }
    v.each_with_object({}) { |e, h| h.merge!(extract_value(e)) }
  else
    v
  end
end

#gc_cms_metrics(target_attr = []) ⇒ Object



181
182
183
# File 'lib/presto/metrics/client.rb', line 181

def gc_cms_metrics(target_attr = [])
  get_gc_metrics('java.lang:type=GarbageCollector,name=ConcurrentMarkSweep', target_attr)
end

#gc_g1_metrics(target_attr = []) ⇒ Object



189
190
191
192
193
# File 'lib/presto/metrics/client.rb', line 189

def gc_g1_metrics(target_attr = [])
  ['G1 Old Generation', 'G1 Young Generation'].each_with_object({}) { |gen, h|
    h[sanitize_key(gen)] = get_gc_metrics("java.lang:type=GarbageCollector,name=#{gen}", target_attr)
  }
end

#gc_parnew_metrics(target_attr = []) ⇒ Object



185
186
187
# File 'lib/presto/metrics/client.rb', line 185

def gc_parnew_metrics(target_attr = [])
  get_gc_metrics('java.lang:type=GarbageCollector,name=ParNew', target_attr)
end

#get(path, default = '{}') ⇒ Object



81
82
83
84
85
86
87
88
89
# File 'lib/presto/metrics/client.rb', line 81

def get(path, default = '{}')
  parser = URI::Parser.new
  resp = HTTParty.get(parser.escape("#{@endpoint}#{path}"), headers: @headers)
  if resp.code == 200
    resp.body
  else
    default
  end
end

#get_attribute(mbean, attr_name) ⇒ Object



108
109
110
# File 'lib/presto/metrics/client.rb', line 108

def get_attribute(mbean, attr_name)
  get_attributes(mbean).find { |obj| obj['name'] == attr_name } || {}
end

#get_attributes(mbean) ⇒ Object



103
104
105
106
# File 'lib/presto/metrics/client.rb', line 103

def get_attributes(mbean)
  json = get_mbean(mbean)
  json['attributes'] || []
end

#get_gc_metrics(mbean, target_attr = []) ⇒ Object



165
166
167
168
169
170
171
172
173
174
175
# File 'lib/presto/metrics/client.rb', line 165

def get_gc_metrics(mbean, target_attr = [])
  h = {}
  get_attributes(mbean)
    .reject { |attr| attr['name'].nil? || attr['value'].nil? }
    .each { |attr|
    key = sanitize_key(attr['name'])
    v = attr['value']
    h[key] = extract_value(v)
  }
  h
end

#get_mbean(mbean) ⇒ Object



77
78
79
# File 'lib/presto/metrics/client.rb', line 77

def get_mbean(mbean)
  JSON.parse(get_mbean_json(mbean))
end

#get_mbean_json(mbean) ⇒ Object



91
92
93
# File 'lib/presto/metrics/client.rb', line 91

def get_mbean_json(mbean)
  get("#{@mbean_path}/#{mbean}")
end

#get_metrics(mbean, target_attr = []) ⇒ Object



129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
# File 'lib/presto/metrics/client.rb', line 129

def get_metrics(mbean, target_attr = [])
  kv = Hash.new
  arr = target_attr.kind_of?(Array) ? target_attr : [target_attr]
  c_target_attr = map_to_canonical_name(arr).to_set
  get_attributes(mbean)
    .reject { |attr| attr['name'].nil? || attr['value'].nil? }
    .each { |attr|
    c_name = to_canonical_name(attr['name'])
    if c_target_attr.empty? || c_target_attr.include?(c_name)
      key = @caml_case ? attr['name'] : underscore(attr['name'])
      kv[key] = attr['value']
    end
  }
  kv
end

#get_node_jsonObject



99
100
101
# File 'lib/presto/metrics/client.rb', line 99

def get_node_json
  get(@node_path)
end

#get_query_json(path = '', default = '[]') ⇒ Object



95
96
97
# File 'lib/presto/metrics/client.rb', line 95

def get_query_json(path = '', default = '[]')
  get("#{@query_path}/#{path}", default)
end

#memory_pool_metrics(target_attr = []) ⇒ Object



219
220
221
222
223
# File 'lib/presto/metrics/client.rb', line 219

def memory_pool_metrics(target_attr = [])
  ['general', 'reserved'].each_with_object({}) { |type, hash|
    hash[type] = get_metrics("presto.memory:type=MemoryPool,name=#{type}", target_attr)
  }
end

#memory_usage_metrics(target_attr = []) ⇒ Object



177
178
179
# File 'lib/presto/metrics/client.rb', line 177

def memory_usage_metrics(target_attr = [])
  get_metrics('java.lang:type=Memory', target_attr)
end

#node_metrics(target_attr = []) ⇒ Object



235
236
237
238
239
240
241
242
243
244
245
246
247
248
# File 'lib/presto/metrics/client.rb', line 235

def node_metrics(target_attr = [])
  p = URI::Parser.new
  node_state = JSON.parse(get_node_json)
  node_state.map { |n|
    uri = n['uri'] || ''
    m = {}
    m['host'] = p.parse(uri).host
    n.each { |k, v|
      key = @caml_case ? k : underscore(k)
      m[key] = v
    }
    m
  }
end

#node_scheduler_metrics(target_attr = []) ⇒ Object



207
208
209
# File 'lib/presto/metrics/client.rb', line 207

def node_scheduler_metrics(target_attr = [])
  get_metrics('presto.execution:name=NodeScheduler', target_attr)
end

#os_metrics(target_attr = []) ⇒ Object



195
196
197
# File 'lib/presto/metrics/client.rb', line 195

def os_metrics(target_attr = [])
  get_metrics('java.lang:type=OperatingSystem', target_attr)
end

#path(path) ⇒ Object



40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/presto/metrics/client.rb', line 40

def path(path)
  c = path.split(/:/)
  target = c[0]
  mbean = @@MBEAN_ALIAS[target] || target
  json_obj = get_metrics(mbean)
  return json_obj if c.size <= 1
  query_list = (c[1] || '').split(/,/)
  result = {}
  query_list.each { |q|
    path_elems = q.split('/')
    target_elem = extract_path(json_obj, path_elems, 0)
    result[q] = target_elem unless target_elem.nil?
  }
  result
end

#queryObject



73
74
75
# File 'lib/presto/metrics/client.rb', line 73

def query
  Query.new(self)
end

#query_execution_metrics(target_attr = []) ⇒ Object



203
204
205
# File 'lib/presto/metrics/client.rb', line 203

def query_execution_metrics(target_attr = [])
  get_metrics('presto.execution:name=QueryExecution', target_attr)
end

#query_manager_metrics(target_attr = []) ⇒ Object



199
200
201
# File 'lib/presto/metrics/client.rb', line 199

def query_manager_metrics(target_attr = [])
  get_metrics('presto.execution:name=QueryManager', target_attr)
end

#sanitize_key(k) ⇒ Object



145
146
147
# File 'lib/presto/metrics/client.rb', line 145

def sanitize_key(k)
  @caml_case ? k : underscore(k)
end

#task_executor_metrics(target_attr = []) ⇒ Object



211
212
213
# File 'lib/presto/metrics/client.rb', line 211

def task_executor_metrics(target_attr = [])
  get_metrics('presto.execution:name=TaskExecutor', target_attr)
end

#task_manager_metrics(target_attr = []) ⇒ Object



215
216
217
# File 'lib/presto/metrics/client.rb', line 215

def task_manager_metrics(target_attr = [])
  get_metrics('presto.execution:name=TaskManager', target_attr)
end