001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.metrics2.lib;
020
021 import static org.apache.hadoop.metrics2.lib.Interns.info;
022
023 import java.util.Map;
024 import java.util.concurrent.Executors;
025 import java.util.concurrent.ScheduledExecutorService;
026 import java.util.concurrent.TimeUnit;
027
028 import org.apache.commons.lang.StringUtils;
029 import org.apache.hadoop.classification.InterfaceAudience;
030 import org.apache.hadoop.classification.InterfaceStability;
031 import org.apache.hadoop.metrics2.MetricsInfo;
032 import org.apache.hadoop.metrics2.MetricsRecordBuilder;
033 import org.apache.hadoop.metrics2.util.Quantile;
034 import org.apache.hadoop.metrics2.util.SampleQuantiles;
035
036 import com.google.common.annotations.VisibleForTesting;
037 import com.google.common.util.concurrent.ThreadFactoryBuilder;
038
039 /**
040 * Watches a stream of long values, maintaining online estimates of specific
041 * quantiles with provably low error bounds. This is particularly useful for
042 * accurate high-percentile (e.g. 95th, 99th) latency metrics.
043 */
044 @InterfaceAudience.Public
045 @InterfaceStability.Evolving
046 public class MutableQuantiles extends MutableMetric {
047
048 @VisibleForTesting
049 public static final Quantile[] quantiles = { new Quantile(0.50, 0.050),
050 new Quantile(0.75, 0.025), new Quantile(0.90, 0.010),
051 new Quantile(0.95, 0.005), new Quantile(0.99, 0.001) };
052
053 private final MetricsInfo numInfo;
054 private final MetricsInfo[] quantileInfos;
055 private final int interval;
056
057 private SampleQuantiles estimator;
058 private long previousCount = 0;
059
060 @VisibleForTesting
061 protected Map<Quantile, Long> previousSnapshot = null;
062
063 private static final ScheduledExecutorService scheduler = Executors
064 .newScheduledThreadPool(1, new ThreadFactoryBuilder().setDaemon(true)
065 .setNameFormat("MutableQuantiles-%d").build());
066
067 /**
068 * Instantiates a new {@link MutableQuantiles} for a metric that rolls itself
069 * over on the specified time interval.
070 *
071 * @param name
072 * of the metric
073 * @param description
074 * long-form textual description of the metric
075 * @param sampleName
076 * type of items in the stream (e.g., "Ops")
077 * @param valueName
078 * type of the values
079 * @param interval
080 * rollover interval (in seconds) of the estimator
081 */
082 public MutableQuantiles(String name, String description, String sampleName,
083 String valueName, int interval) {
084 String ucName = StringUtils.capitalize(name);
085 String usName = StringUtils.capitalize(sampleName);
086 String uvName = StringUtils.capitalize(valueName);
087 String desc = StringUtils.uncapitalize(description);
088 String lsName = StringUtils.uncapitalize(sampleName);
089 String lvName = StringUtils.uncapitalize(valueName);
090
091 numInfo = info(ucName + "Num" + usName, String.format(
092 "Number of %s for %s with %ds interval", lsName, desc, interval));
093 // Construct the MetricsInfos for the quantiles, converting to percentiles
094 quantileInfos = new MetricsInfo[quantiles.length];
095 String nameTemplate = ucName + "%dthPercentile" + uvName;
096 String descTemplate = "%d percentile " + lvName + " with " + interval
097 + " second interval for " + desc;
098 for (int i = 0; i < quantiles.length; i++) {
099 int percentile = (int) (100 * quantiles[i].quantile);
100 quantileInfos[i] = info(String.format(nameTemplate, percentile),
101 String.format(descTemplate, percentile));
102 }
103
104 estimator = new SampleQuantiles(quantiles);
105
106 this.interval = interval;
107 scheduler.scheduleAtFixedRate(new RolloverSample(this), interval, interval,
108 TimeUnit.SECONDS);
109 }
110
111 @Override
112 public synchronized void snapshot(MetricsRecordBuilder builder, boolean all) {
113 if (all || changed()) {
114 builder.addGauge(numInfo, previousCount);
115 for (int i = 0; i < quantiles.length; i++) {
116 long newValue = 0;
117 // If snapshot is null, we failed to update since the window was empty
118 if (previousSnapshot != null) {
119 newValue = previousSnapshot.get(quantiles[i]);
120 }
121 builder.addGauge(quantileInfos[i], newValue);
122 }
123 if (changed()) {
124 clearChanged();
125 }
126 }
127 }
128
129 public synchronized void add(long value) {
130 estimator.insert(value);
131 }
132
133 public int getInterval() {
134 return interval;
135 }
136
137 /**
138 * Runnable used to periodically roll over the internal
139 * {@link SampleQuantiles} every interval.
140 */
141 private static class RolloverSample implements Runnable {
142
143 MutableQuantiles parent;
144
145 public RolloverSample(MutableQuantiles parent) {
146 this.parent = parent;
147 }
148
149 @Override
150 public void run() {
151 synchronized (parent) {
152 parent.previousCount = parent.estimator.getCount();
153 parent.previousSnapshot = parent.estimator.snapshot();
154 parent.estimator.clear();
155 }
156 parent.setChanged();
157 }
158
159 }
160 }