001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.mapred.lib.aggregate;
020
021 import java.io.IOException;
022 import java.util.ArrayList;
023
024 import org.apache.hadoop.classification.InterfaceAudience;
025 import org.apache.hadoop.classification.InterfaceStability;
026 import org.apache.hadoop.io.Text;
027 import org.apache.hadoop.io.Writable;
028 import org.apache.hadoop.io.WritableComparable;
029 import org.apache.hadoop.mapred.JobConf;
030 import org.apache.hadoop.mapred.Mapper;
031 import org.apache.hadoop.mapred.Reducer;
032
033 /**
034 * This abstract class implements some common functionalities of the
035 * the generic mapper, reducer and combiner classes of Aggregate.
036 */
037 @InterfaceAudience.Public
038 @InterfaceStability.Stable
039 public abstract class ValueAggregatorJobBase<K1 extends WritableComparable,
040 V1 extends Writable>
041 implements Mapper<K1, V1, Text, Text>, Reducer<Text, Text, Text, Text> {
042
043 protected ArrayList<ValueAggregatorDescriptor> aggregatorDescriptorList = null;
044
045 public void configure(JobConf job) {
046 this.initializeMySpec(job);
047 this.logSpec();
048 }
049
050 private static ValueAggregatorDescriptor getValueAggregatorDescriptor(
051 String spec, JobConf job) {
052 if (spec == null)
053 return null;
054 String[] segments = spec.split(",", -1);
055 String type = segments[0];
056 if (type.compareToIgnoreCase("UserDefined") == 0) {
057 String className = segments[1];
058 return new UserDefinedValueAggregatorDescriptor(className, job);
059 }
060 return null;
061 }
062
063 private static ArrayList<ValueAggregatorDescriptor> getAggregatorDescriptors(JobConf job) {
064 String advn = "aggregator.descriptor";
065 int num = job.getInt(advn + ".num", 0);
066 ArrayList<ValueAggregatorDescriptor> retv = new ArrayList<ValueAggregatorDescriptor>(num);
067 for (int i = 0; i < num; i++) {
068 String spec = job.get(advn + "." + i);
069 ValueAggregatorDescriptor ad = getValueAggregatorDescriptor(spec, job);
070 if (ad != null) {
071 retv.add(ad);
072 }
073 }
074 return retv;
075 }
076
077 private void initializeMySpec(JobConf job) {
078 this.aggregatorDescriptorList = getAggregatorDescriptors(job);
079 if (this.aggregatorDescriptorList.size() == 0) {
080 this.aggregatorDescriptorList
081 .add(new UserDefinedValueAggregatorDescriptor(
082 ValueAggregatorBaseDescriptor.class.getCanonicalName(), job));
083 }
084 }
085
086 protected void logSpec() {
087
088 }
089
090 public void close() throws IOException {
091 }
092 }