001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.mapreduce.lib.input;
020
021 import java.io.IOException;
022
023
024 import org.apache.hadoop.classification.InterfaceAudience;
025 import org.apache.hadoop.classification.InterfaceStability;
026 import org.apache.hadoop.conf.Configuration;
027 import org.apache.hadoop.fs.FileSystem;
028 import org.apache.hadoop.fs.Path;
029 import org.apache.hadoop.io.*;
030 import org.apache.hadoop.mapreduce.InputSplit;
031 import org.apache.hadoop.mapreduce.RecordReader;
032 import org.apache.hadoop.mapreduce.TaskAttemptContext;
033
034 /** An {@link RecordReader} for {@link SequenceFile}s. */
035 @InterfaceAudience.Public
036 @InterfaceStability.Stable
037 public class SequenceFileRecordReader<K, V> extends RecordReader<K, V> {
038 private SequenceFile.Reader in;
039 private long start;
040 private long end;
041 private boolean more = true;
042 private K key = null;
043 private V value = null;
044 protected Configuration conf;
045
046 @Override
047 public void initialize(InputSplit split,
048 TaskAttemptContext context
049 ) throws IOException, InterruptedException {
050 FileSplit fileSplit = (FileSplit) split;
051 conf = context.getConfiguration();
052 Path path = fileSplit.getPath();
053 FileSystem fs = path.getFileSystem(conf);
054 this.in = new SequenceFile.Reader(fs, path, conf);
055 this.end = fileSplit.getStart() + fileSplit.getLength();
056
057 if (fileSplit.getStart() > in.getPosition()) {
058 in.sync(fileSplit.getStart()); // sync to start
059 }
060
061 this.start = in.getPosition();
062 more = start < end;
063 }
064
065 @Override
066 @SuppressWarnings("unchecked")
067 public boolean nextKeyValue() throws IOException, InterruptedException {
068 if (!more) {
069 return false;
070 }
071 long pos = in.getPosition();
072 key = (K) in.next(key);
073 if (key == null || (pos >= end && in.syncSeen())) {
074 more = false;
075 key = null;
076 value = null;
077 } else {
078 value = (V) in.getCurrentValue(value);
079 }
080 return more;
081 }
082
083 @Override
084 public K getCurrentKey() {
085 return key;
086 }
087
088 @Override
089 public V getCurrentValue() {
090 return value;
091 }
092
093 /**
094 * Return the progress within the input split
095 * @return 0.0 to 1.0 of the input byte range
096 */
097 public float getProgress() throws IOException {
098 if (end == start) {
099 return 0.0f;
100 } else {
101 return Math.min(1.0f, (in.getPosition() - start) / (float)(end - start));
102 }
103 }
104
105 public synchronized void close() throws IOException { in.close(); }
106
107 }
108