001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.mapreduce.lib.join;
020
021 import java.io.IOException;
022
023 import org.apache.hadoop.classification.InterfaceAudience;
024 import org.apache.hadoop.classification.InterfaceStability;
025 import org.apache.hadoop.conf.Configuration;
026 import org.apache.hadoop.io.NullWritable;
027 import org.apache.hadoop.io.Writable;
028 import org.apache.hadoop.io.WritableComparable;
029 import org.apache.hadoop.io.WritableComparator;
030 import org.apache.hadoop.mapreduce.InputSplit;
031 import org.apache.hadoop.mapreduce.RecordReader;
032 import org.apache.hadoop.mapreduce.TaskAttemptContext;
033 import org.apache.hadoop.util.ReflectionUtils;
034
035 /**
036 * Proxy class for a RecordReader participating in the join framework.
037 *
038 * This class keeps track of the "head" key-value pair for the
039 * provided RecordReader and keeps a store of values matching a key when
040 * this source is participating in a join.
041 */
042 @InterfaceAudience.Public
043 @InterfaceStability.Stable
044 public class WrappedRecordReader<K extends WritableComparable<?>,
045 U extends Writable> extends ComposableRecordReader<K,U> {
046
047 protected boolean empty = false;
048 private RecordReader<K,U> rr;
049 private int id; // index at which values will be inserted in collector
050
051 protected WritableComparator cmp = null;
052 private K key; // key at the top of this RR
053 private U value; // value assoc with key
054 private ResetableIterator<U> vjoin;
055 private Configuration conf = new Configuration();
056 @SuppressWarnings("unchecked")
057 private Class<? extends WritableComparable> keyclass = null;
058 private Class<? extends Writable> valueclass = null;
059
060 protected WrappedRecordReader(int id) {
061 this.id = id;
062 vjoin = new StreamBackedIterator<U>();
063 }
064
065 /**
066 * For a given RecordReader rr, occupy position id in collector.
067 */
068 WrappedRecordReader(int id, RecordReader<K,U> rr,
069 Class<? extends WritableComparator> cmpcl)
070 throws IOException, InterruptedException {
071 this.id = id;
072 this.rr = rr;
073 if (cmpcl != null) {
074 try {
075 this.cmp = cmpcl.newInstance();
076 } catch (InstantiationException e) {
077 throw new IOException(e);
078 } catch (IllegalAccessException e) {
079 throw new IOException(e);
080 }
081 }
082 vjoin = new StreamBackedIterator<U>();
083 }
084
085 public void initialize(InputSplit split,
086 TaskAttemptContext context)
087 throws IOException, InterruptedException {
088 rr.initialize(split, context);
089 conf = context.getConfiguration();
090 nextKeyValue();
091 if (!empty) {
092 keyclass = key.getClass().asSubclass(WritableComparable.class);
093 valueclass = value.getClass();
094 if (cmp == null) {
095 cmp = WritableComparator.get(keyclass, conf);
096 }
097 }
098 }
099
100 /**
101 * Request new key from proxied RR.
102 */
103 @SuppressWarnings("unchecked")
104 public K createKey() {
105 if (keyclass != null) {
106 return (K) ReflectionUtils.newInstance(keyclass, conf);
107 }
108 return (K) NullWritable.get();
109 }
110
111 @SuppressWarnings("unchecked")
112 public U createValue() {
113 if (valueclass != null) {
114 return (U) ReflectionUtils.newInstance(valueclass, conf);
115 }
116 return (U) NullWritable.get();
117 }
118
119 /** {@inheritDoc} */
120 public int id() {
121 return id;
122 }
123
124 /**
125 * Return the key at the head of this RR.
126 */
127 public K key() {
128 return key;
129 }
130
131 /**
132 * Clone the key at the head of this RR into the object supplied.
133 */
134 public void key(K qkey) throws IOException {
135 ReflectionUtils.copy(conf, key, qkey);
136 }
137
138 /**
139 * Return true if the RR- including the k,v pair stored in this object-
140 * is exhausted.
141 */
142 public boolean hasNext() {
143 return !empty;
144 }
145
146 /**
147 * Skip key-value pairs with keys less than or equal to the key provided.
148 */
149 public void skip(K key) throws IOException, InterruptedException {
150 if (hasNext()) {
151 while (cmp.compare(key(), key) <= 0 && next());
152 }
153 }
154
155 /**
156 * Add an iterator to the collector at the position occupied by this
157 * RecordReader over the values in this stream paired with the key
158 * provided (ie register a stream of values from this source matching K
159 * with a collector).
160 */
161 @SuppressWarnings("unchecked")
162 public void accept(CompositeRecordReader.JoinCollector i, K key)
163 throws IOException, InterruptedException {
164 vjoin.clear();
165 if (key() != null && 0 == cmp.compare(key, key())) {
166 do {
167 vjoin.add(value);
168 } while (next() && 0 == cmp.compare(key, key()));
169 }
170 i.add(id, vjoin);
171 }
172
173 /**
174 * Read the next k,v pair into the head of this object; return true iff
175 * the RR and this are exhausted.
176 */
177 public boolean nextKeyValue() throws IOException, InterruptedException {
178 if (hasNext()) {
179 next();
180 return true;
181 }
182 return false;
183 }
184
185 /**
186 * Read the next k,v pair into the head of this object; return true iff
187 * the RR and this are exhausted.
188 */
189 private boolean next() throws IOException, InterruptedException {
190 empty = !rr.nextKeyValue();
191 key = rr.getCurrentKey();
192 value = rr.getCurrentValue();
193 return !empty;
194 }
195
196 /**
197 * Get current key
198 */
199 public K getCurrentKey() throws IOException, InterruptedException {
200 return rr.getCurrentKey();
201 }
202
203 /**
204 * Get current value
205 */
206 public U getCurrentValue() throws IOException, InterruptedException {
207 return rr.getCurrentValue();
208 }
209
210 /**
211 * Request progress from proxied RR.
212 */
213 public float getProgress() throws IOException, InterruptedException {
214 return rr.getProgress();
215 }
216
217 /**
218 * Forward close request to proxied RR.
219 */
220 public void close() throws IOException {
221 rr.close();
222 }
223
224 /**
225 * Implement Comparable contract (compare key at head of proxied RR
226 * with that of another).
227 */
228 public int compareTo(ComposableRecordReader<K,?> other) {
229 return cmp.compare(key(), other.key());
230 }
231
232 /**
233 * Return true iff compareTo(other) retn true.
234 */
235 @SuppressWarnings("unchecked") // Explicit type check prior to cast
236 public boolean equals(Object other) {
237 return other instanceof ComposableRecordReader
238 && 0 == compareTo((ComposableRecordReader)other);
239 }
240
241 public int hashCode() {
242 assert false : "hashCode not designed";
243 return 42;
244 }
245 }