001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.lib.service.instrumentation;
020
021 import org.apache.hadoop.classification.InterfaceAudience;
022 import org.apache.hadoop.lib.server.BaseService;
023 import org.apache.hadoop.lib.server.ServiceException;
024 import org.apache.hadoop.lib.service.Instrumentation;
025 import org.apache.hadoop.lib.service.Scheduler;
026 import org.apache.hadoop.util.Time;
027 import org.json.simple.JSONAware;
028 import org.json.simple.JSONObject;
029 import org.json.simple.JSONStreamAware;
030
031 import java.io.IOException;
032 import java.io.Writer;
033 import java.util.ArrayList;
034 import java.util.LinkedHashMap;
035 import java.util.List;
036 import java.util.Map;
037 import java.util.concurrent.ConcurrentHashMap;
038 import java.util.concurrent.TimeUnit;
039 import java.util.concurrent.atomic.AtomicLong;
040 import java.util.concurrent.locks.Lock;
041 import java.util.concurrent.locks.ReentrantLock;
042
043 @InterfaceAudience.Private
044 public class InstrumentationService extends BaseService implements Instrumentation {
045 public static final String PREFIX = "instrumentation";
046 public static final String CONF_TIMERS_SIZE = "timers.size";
047
048 private int timersSize;
049 private Lock counterLock;
050 private Lock timerLock;
051 private Lock variableLock;
052 private Lock samplerLock;
053 private Map<String, Map<String, AtomicLong>> counters;
054 private Map<String, Map<String, Timer>> timers;
055 private Map<String, Map<String, VariableHolder>> variables;
056 private Map<String, Map<String, Sampler>> samplers;
057 private List<Sampler> samplersList;
058 private Map<String, Map<String, ?>> all;
059
060 public InstrumentationService() {
061 super(PREFIX);
062 }
063
064 @Override
065 @SuppressWarnings("unchecked")
066 public void init() throws ServiceException {
067 timersSize = getServiceConfig().getInt(CONF_TIMERS_SIZE, 10);
068 counterLock = new ReentrantLock();
069 timerLock = new ReentrantLock();
070 variableLock = new ReentrantLock();
071 samplerLock = new ReentrantLock();
072 Map<String, VariableHolder> jvmVariables = new ConcurrentHashMap<String, VariableHolder>();
073 counters = new ConcurrentHashMap<String, Map<String, AtomicLong>>();
074 timers = new ConcurrentHashMap<String, Map<String, Timer>>();
075 variables = new ConcurrentHashMap<String, Map<String, VariableHolder>>();
076 samplers = new ConcurrentHashMap<String, Map<String, Sampler>>();
077 samplersList = new ArrayList<Sampler>();
078 all = new LinkedHashMap<String, Map<String, ?>>();
079 all.put("os-env", System.getenv());
080 all.put("sys-props", (Map<String, ?>) (Map) System.getProperties());
081 all.put("jvm", jvmVariables);
082 all.put("counters", (Map) counters);
083 all.put("timers", (Map) timers);
084 all.put("variables", (Map) variables);
085 all.put("samplers", (Map) samplers);
086
087 jvmVariables.put("free.memory", new VariableHolder<Long>(new Instrumentation.Variable<Long>() {
088 @Override
089 public Long getValue() {
090 return Runtime.getRuntime().freeMemory();
091 }
092 }));
093 jvmVariables.put("max.memory", new VariableHolder<Long>(new Instrumentation.Variable<Long>() {
094 @Override
095 public Long getValue() {
096 return Runtime.getRuntime().maxMemory();
097 }
098 }));
099 jvmVariables.put("total.memory", new VariableHolder<Long>(new Instrumentation.Variable<Long>() {
100 @Override
101 public Long getValue() {
102 return Runtime.getRuntime().totalMemory();
103 }
104 }));
105 }
106
107 @Override
108 public void postInit() throws ServiceException {
109 Scheduler scheduler = getServer().get(Scheduler.class);
110 if (scheduler != null) {
111 scheduler.schedule(new SamplersRunnable(), 0, 1, TimeUnit.SECONDS);
112 }
113 }
114
115 @Override
116 public Class getInterface() {
117 return Instrumentation.class;
118 }
119
120 @SuppressWarnings("unchecked")
121 private <T> T getToAdd(String group, String name, Class<T> klass, Lock lock, Map<String, Map<String, T>> map) {
122 boolean locked = false;
123 try {
124 Map<String, T> groupMap = map.get(group);
125 if (groupMap == null) {
126 lock.lock();
127 locked = true;
128 groupMap = map.get(group);
129 if (groupMap == null) {
130 groupMap = new ConcurrentHashMap<String, T>();
131 map.put(group, groupMap);
132 }
133 }
134 T element = groupMap.get(name);
135 if (element == null) {
136 if (!locked) {
137 lock.lock();
138 locked = true;
139 }
140 element = groupMap.get(name);
141 if (element == null) {
142 try {
143 if (klass == Timer.class) {
144 element = (T) new Timer(timersSize);
145 } else {
146 element = klass.newInstance();
147 }
148 } catch (Exception ex) {
149 throw new RuntimeException(ex);
150 }
151 groupMap.put(name, element);
152 }
153 }
154 return element;
155 } finally {
156 if (locked) {
157 lock.unlock();
158 }
159 }
160 }
161
162 static class Cron implements Instrumentation.Cron {
163 long start;
164 long lapStart;
165 long own;
166 long total;
167
168 @Override
169 public Cron start() {
170 if (total != 0) {
171 throw new IllegalStateException("Cron already used");
172 }
173 if (start == 0) {
174 start = Time.now();
175 lapStart = start;
176 } else if (lapStart == 0) {
177 lapStart = Time.now();
178 }
179 return this;
180 }
181
182 @Override
183 public Cron stop() {
184 if (total != 0) {
185 throw new IllegalStateException("Cron already used");
186 }
187 if (lapStart > 0) {
188 own += Time.now() - lapStart;
189 lapStart = 0;
190 }
191 return this;
192 }
193
194 void end() {
195 stop();
196 total = Time.now() - start;
197 }
198
199 }
200
201 static class Timer implements JSONAware, JSONStreamAware {
202 static final int LAST_TOTAL = 0;
203 static final int LAST_OWN = 1;
204 static final int AVG_TOTAL = 2;
205 static final int AVG_OWN = 3;
206
207 Lock lock = new ReentrantLock();
208 private long[] own;
209 private long[] total;
210 private int last;
211 private boolean full;
212 private int size;
213
214 public Timer(int size) {
215 this.size = size;
216 own = new long[size];
217 total = new long[size];
218 for (int i = 0; i < size; i++) {
219 own[i] = -1;
220 total[i] = -1;
221 }
222 last = -1;
223 }
224
225 long[] getValues() {
226 lock.lock();
227 try {
228 long[] values = new long[4];
229 values[LAST_TOTAL] = total[last];
230 values[LAST_OWN] = own[last];
231 int limit = (full) ? size : (last + 1);
232 for (int i = 0; i < limit; i++) {
233 values[AVG_TOTAL] += total[i];
234 values[AVG_OWN] += own[i];
235 }
236 values[AVG_TOTAL] = values[AVG_TOTAL] / limit;
237 values[AVG_OWN] = values[AVG_OWN] / limit;
238 return values;
239 } finally {
240 lock.unlock();
241 }
242 }
243
244 void addCron(Cron cron) {
245 cron.end();
246 lock.lock();
247 try {
248 last = (last + 1) % size;
249 full = full || last == (size - 1);
250 total[last] = cron.total;
251 own[last] = cron.own;
252 } finally {
253 lock.unlock();
254 }
255 }
256
257 @SuppressWarnings("unchecked")
258 private JSONObject getJSON() {
259 long[] values = getValues();
260 JSONObject json = new JSONObject();
261 json.put("lastTotal", values[0]);
262 json.put("lastOwn", values[1]);
263 json.put("avgTotal", values[2]);
264 json.put("avgOwn", values[3]);
265 return json;
266 }
267
268 @Override
269 public String toJSONString() {
270 return getJSON().toJSONString();
271 }
272
273 @Override
274 public void writeJSONString(Writer out) throws IOException {
275 getJSON().writeJSONString(out);
276 }
277
278 }
279
280 @Override
281 public Cron createCron() {
282 return new Cron();
283 }
284
285 @Override
286 public void incr(String group, String name, long count) {
287 AtomicLong counter = getToAdd(group, name, AtomicLong.class, counterLock, counters);
288 counter.addAndGet(count);
289 }
290
291 @Override
292 public void addCron(String group, String name, Instrumentation.Cron cron) {
293 Timer timer = getToAdd(group, name, Timer.class, timerLock, timers);
294 timer.addCron((Cron) cron);
295 }
296
297 static class VariableHolder<E> implements JSONAware, JSONStreamAware {
298 Variable<E> var;
299
300 public VariableHolder() {
301 }
302
303 public VariableHolder(Variable<E> var) {
304 this.var = var;
305 }
306
307 @SuppressWarnings("unchecked")
308 private JSONObject getJSON() {
309 JSONObject json = new JSONObject();
310 json.put("value", var.getValue());
311 return json;
312 }
313
314 @Override
315 public String toJSONString() {
316 return getJSON().toJSONString();
317 }
318
319 @Override
320 public void writeJSONString(Writer out) throws IOException {
321 out.write(toJSONString());
322 }
323
324 }
325
326 @Override
327 public void addVariable(String group, String name, Variable<?> variable) {
328 VariableHolder holder = getToAdd(group, name, VariableHolder.class, variableLock, variables);
329 holder.var = variable;
330 }
331
332 static class Sampler implements JSONAware, JSONStreamAware {
333 Variable<Long> variable;
334 long[] values;
335 private AtomicLong sum;
336 private int last;
337 private boolean full;
338
339 void init(int size, Variable<Long> variable) {
340 this.variable = variable;
341 values = new long[size];
342 sum = new AtomicLong();
343 last = 0;
344 }
345
346 void sample() {
347 int index = last;
348 long valueGoingOut = values[last];
349 full = full || last == (values.length - 1);
350 last = (last + 1) % values.length;
351 values[index] = variable.getValue();
352 sum.addAndGet(-valueGoingOut + values[index]);
353 }
354
355 double getRate() {
356 return ((double) sum.get()) / ((full) ? values.length : ((last == 0) ? 1 : last));
357 }
358
359 @SuppressWarnings("unchecked")
360 private JSONObject getJSON() {
361 JSONObject json = new JSONObject();
362 json.put("sampler", getRate());
363 json.put("size", (full) ? values.length : last);
364 return json;
365 }
366
367 @Override
368 public String toJSONString() {
369 return getJSON().toJSONString();
370 }
371
372 @Override
373 public void writeJSONString(Writer out) throws IOException {
374 out.write(toJSONString());
375 }
376 }
377
378 @Override
379 public void addSampler(String group, String name, int samplingSize, Variable<Long> variable) {
380 Sampler sampler = getToAdd(group, name, Sampler.class, samplerLock, samplers);
381 samplerLock.lock();
382 try {
383 sampler.init(samplingSize, variable);
384 samplersList.add(sampler);
385 } finally {
386 samplerLock.unlock();
387 }
388 }
389
390 class SamplersRunnable implements Runnable {
391
392 @Override
393 public void run() {
394 samplerLock.lock();
395 try {
396 for (Sampler sampler : samplersList) {
397 sampler.sample();
398 }
399 } finally {
400 samplerLock.unlock();
401 }
402 }
403 }
404
405 @Override
406 public Map<String, Map<String, ?>> getSnapshot() {
407 return all;
408 }
409
410
411 }