001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.mapreduce.lib.aggregate;
020
021 import java.util.ArrayList;
022 import java.util.Iterator;
023 import java.util.TreeMap;
024 import java.util.Map.Entry;
025 import java.util.Arrays;
026
027 import org.apache.hadoop.classification.InterfaceAudience;
028 import org.apache.hadoop.classification.InterfaceStability;
029
030
031 /**
032 * This class implements a value aggregator that computes the
033 * histogram of a sequence of strings.
034 *
035 */
036 @InterfaceAudience.Public
037 @InterfaceStability.Stable
038 public class ValueHistogram implements ValueAggregator<String> {
039
040 TreeMap<Object, Object> items = null;
041
042 public ValueHistogram() {
043 items = new TreeMap<Object, Object>();
044 }
045
046 /**
047 * add the given val to the aggregator.
048 *
049 * @param val the value to be added. It is expected to be a string
050 * in the form of xxxx\tnum, meaning xxxx has num occurrences.
051 */
052 public void addNextValue(Object val) {
053 String valCountStr = val.toString();
054 int pos = valCountStr.lastIndexOf("\t");
055 String valStr = valCountStr;
056 String countStr = "1";
057 if (pos >= 0) {
058 valStr = valCountStr.substring(0, pos);
059 countStr = valCountStr.substring(pos + 1);
060 }
061
062 Long count = (Long) this.items.get(valStr);
063 long inc = Long.parseLong(countStr);
064
065 if (count == null) {
066 count = inc;
067 } else {
068 count = count.longValue() + inc;
069 }
070 items.put(valStr, count);
071 }
072
073 /**
074 * @return the string representation of this aggregator.
075 * It includes the following basic statistics of the histogram:
076 * the number of unique values
077 * the minimum value
078 * the media value
079 * the maximum value
080 * the average value
081 * the standard deviation
082 */
083 public String getReport() {
084 long[] counts = new long[items.size()];
085
086 StringBuffer sb = new StringBuffer();
087 Iterator<Object> iter = items.values().iterator();
088 int i = 0;
089 while (iter.hasNext()) {
090 Long count = (Long) iter.next();
091 counts[i] = count.longValue();
092 i += 1;
093 }
094 Arrays.sort(counts);
095 sb.append(counts.length);
096 i = 0;
097 long acc = 0;
098 while (i < counts.length) {
099 long nextVal = counts[i];
100 int j = i + 1;
101 while (j < counts.length && counts[j] == nextVal) {
102 j++;
103 }
104 acc += nextVal * (j - i);
105 i = j;
106 }
107 double average = 0.0;
108 double sd = 0.0;
109 if (counts.length > 0) {
110 sb.append("\t").append(counts[0]);
111 sb.append("\t").append(counts[counts.length / 2]);
112 sb.append("\t").append(counts[counts.length - 1]);
113
114 average = acc * 1.0 / counts.length;
115 sb.append("\t").append(average);
116
117 i = 0;
118 while (i < counts.length) {
119 double nextDiff = counts[i] - average;
120 sd += nextDiff * nextDiff;
121 i += 1;
122 }
123 sd = Math.sqrt(sd / counts.length);
124 sb.append("\t").append(sd);
125
126 }
127 return sb.toString();
128 }
129
130 /**
131 *
132 * @return a string representation of the list of value/frequence pairs of
133 * the histogram
134 */
135 public String getReportDetails() {
136 StringBuffer sb = new StringBuffer();
137 Iterator<Entry<Object,Object>> iter = items.entrySet().iterator();
138 while (iter.hasNext()) {
139 Entry<Object,Object> en = iter.next();
140 Object val = en.getKey();
141 Long count = (Long) en.getValue();
142 sb.append("\t").append(val.toString()).append("\t").
143 append(count.longValue()).append("\n");
144 }
145 return sb.toString();
146 }
147
148 /**
149 * @return a list value/frequence pairs.
150 * The return value is expected to be used by the reducer.
151 */
152 public ArrayList<String> getCombinerOutput() {
153 ArrayList<String> retv = new ArrayList<String>();
154 Iterator<Entry<Object,Object>> iter = items.entrySet().iterator();
155
156 while (iter.hasNext()) {
157 Entry<Object,Object> en = iter.next();
158 Object val = en.getKey();
159 Long count = (Long) en.getValue();
160 retv.add(val.toString() + "\t" + count.longValue());
161 }
162 return retv;
163 }
164
165 /**
166 *
167 * @return a TreeMap representation of the histogram
168 */
169 public TreeMap<Object,Object> getReportItems() {
170 return items;
171 }
172
173 /**
174 * reset the aggregator
175 */
176 public void reset() {
177 items = new TreeMap<Object, Object>();
178 }
179
180 }