001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.mapreduce.lib.output;
020
021 import java.io.IOException;
022
023 import org.apache.hadoop.classification.InterfaceAudience;
024 import org.apache.hadoop.classification.InterfaceStability;
025 import org.apache.hadoop.mapreduce.JobContext;
026 import org.apache.hadoop.mapreduce.OutputCommitter;
027 import org.apache.hadoop.mapreduce.OutputFormat;
028 import org.apache.hadoop.mapreduce.RecordWriter;
029 import org.apache.hadoop.mapreduce.TaskAttemptContext;
030
031 /**
032 * FilterOutputFormat is a convenience class that wraps OutputFormat.
033 */
034 @InterfaceAudience.Public
035 @InterfaceStability.Stable
036 public class FilterOutputFormat <K,V> extends OutputFormat<K, V> {
037
038 protected OutputFormat<K,V> baseOut;
039
040 public FilterOutputFormat() {
041 this.baseOut = null;
042 }
043
044 /**
045 * Create a FilterOutputFormat based on the underlying output format.
046 * @param baseOut the underlying OutputFormat
047 */
048 public FilterOutputFormat(OutputFormat<K,V> baseOut) {
049 this.baseOut = baseOut;
050 }
051
052 @Override
053 public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context)
054 throws IOException, InterruptedException {
055 return getBaseOut().getRecordWriter(context);
056 }
057
058 @Override
059 public void checkOutputSpecs(JobContext context)
060 throws IOException, InterruptedException {
061 getBaseOut().checkOutputSpecs(context);
062 }
063
064 @Override
065 public OutputCommitter getOutputCommitter(TaskAttemptContext context)
066 throws IOException, InterruptedException {
067 return getBaseOut().getOutputCommitter(context);
068 }
069
070 private OutputFormat<K,V> getBaseOut() throws IOException {
071 if (baseOut == null) {
072 throw new IOException("OutputFormat not set for FilterOutputFormat");
073 }
074 return baseOut;
075 }
076 /**
077 * <code>FilterRecordWriter</code> is a convenience wrapper
078 * class that extends the {@link RecordWriter}.
079 */
080
081 public static class FilterRecordWriter<K,V> extends RecordWriter<K,V> {
082
083 protected RecordWriter<K,V> rawWriter = null;
084
085 public FilterRecordWriter() {
086 rawWriter = null;
087 }
088
089 public FilterRecordWriter(RecordWriter<K,V> rwriter) {
090 this.rawWriter = rwriter;
091 }
092
093 @Override
094 public void write(K key, V value) throws IOException, InterruptedException {
095 getRawWriter().write(key, value);
096 }
097
098 @Override
099 public void close(TaskAttemptContext context)
100 throws IOException, InterruptedException {
101 getRawWriter().close(context);
102 }
103
104 private RecordWriter<K,V> getRawWriter() throws IOException {
105 if (rawWriter == null) {
106 throw new IOException("Record Writer not set for FilterRecordWriter");
107 }
108 return rawWriter;
109 }
110 }
111 }