001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.mapreduce.lib.input;
020
021 import java.io.IOException;
022
023 import org.apache.hadoop.classification.InterfaceAudience;
024 import org.apache.hadoop.classification.InterfaceStability;
025 import org.apache.hadoop.conf.Configuration;
026 import org.apache.hadoop.io.Text;
027 import org.apache.hadoop.mapreduce.InputSplit;
028 import org.apache.hadoop.mapreduce.RecordReader;
029 import org.apache.hadoop.mapreduce.TaskAttemptContext;
030
031 /**
032 * This class treats a line in the input as a key/value pair separated by a
033 * separator character. The separator can be specified in config file
034 * under the attribute name mapreduce.input.keyvaluelinerecordreader.key.value.separator. The default
035 * separator is the tab character ('\t').
036 */
037 @InterfaceAudience.Public
038 @InterfaceStability.Stable
039 public class KeyValueLineRecordReader extends RecordReader<Text, Text> {
040 public static final String KEY_VALUE_SEPERATOR =
041 "mapreduce.input.keyvaluelinerecordreader.key.value.separator";
042
043 private final LineRecordReader lineRecordReader;
044
045 private byte separator = (byte) '\t';
046
047 private Text innerValue;
048
049 private Text key;
050
051 private Text value;
052
053 public Class getKeyClass() { return Text.class; }
054
055 public KeyValueLineRecordReader(Configuration conf)
056 throws IOException {
057
058 lineRecordReader = new LineRecordReader();
059 String sepStr = conf.get(KEY_VALUE_SEPERATOR, "\t");
060 this.separator = (byte) sepStr.charAt(0);
061 }
062
063 public void initialize(InputSplit genericSplit,
064 TaskAttemptContext context) throws IOException {
065 lineRecordReader.initialize(genericSplit, context);
066 }
067
068 public static int findSeparator(byte[] utf, int start, int length,
069 byte sep) {
070 for (int i = start; i < (start + length); i++) {
071 if (utf[i] == sep) {
072 return i;
073 }
074 }
075 return -1;
076 }
077
078 public static void setKeyValue(Text key, Text value, byte[] line,
079 int lineLen, int pos) {
080 if (pos == -1) {
081 key.set(line, 0, lineLen);
082 value.set("");
083 } else {
084 key.set(line, 0, pos);
085 value.set(line, pos + 1, lineLen - pos - 1);
086 }
087 }
088 /** Read key/value pair in a line. */
089 public synchronized boolean nextKeyValue()
090 throws IOException {
091 byte[] line = null;
092 int lineLen = -1;
093 if (lineRecordReader.nextKeyValue()) {
094 innerValue = lineRecordReader.getCurrentValue();
095 line = innerValue.getBytes();
096 lineLen = innerValue.getLength();
097 } else {
098 return false;
099 }
100 if (line == null)
101 return false;
102 if (key == null) {
103 key = new Text();
104 }
105 if (value == null) {
106 value = new Text();
107 }
108 int pos = findSeparator(line, 0, lineLen, this.separator);
109 setKeyValue(key, value, line, lineLen, pos);
110 return true;
111 }
112
113 public Text getCurrentKey() {
114 return key;
115 }
116
117 public Text getCurrentValue() {
118 return value;
119 }
120
121 public float getProgress() throws IOException {
122 return lineRecordReader.getProgress();
123 }
124
125 public synchronized void close() throws IOException {
126 lineRecordReader.close();
127 }
128 }