001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.mapreduce.lib.join;
020
021 import java.io.IOException;
022 import java.util.ArrayList;
023 import java.util.Comparator;
024 import java.util.PriorityQueue;
025
026 import org.apache.hadoop.classification.InterfaceAudience;
027 import org.apache.hadoop.classification.InterfaceStability;
028 import org.apache.hadoop.conf.Configurable;
029 import org.apache.hadoop.conf.Configuration;
030 import org.apache.hadoop.io.NullWritable;
031 import org.apache.hadoop.io.Writable;
032 import org.apache.hadoop.io.WritableComparable;
033 import org.apache.hadoop.io.WritableComparator;
034 import org.apache.hadoop.mapreduce.InputSplit;
035 import org.apache.hadoop.mapreduce.RecordReader;
036 import org.apache.hadoop.mapreduce.TaskAttemptContext;
037 import org.apache.hadoop.util.ReflectionUtils;
038
039 /**
040 * A RecordReader that can effect joins of RecordReaders sharing a common key
041 * type and partitioning.
042 */
043 @InterfaceAudience.Public
044 @InterfaceStability.Stable
045 public abstract class CompositeRecordReader<
046 K extends WritableComparable<?>, // key type
047 V extends Writable, // accepts RecordReader<K,V> as children
048 X extends Writable> // emits Writables of this type
049 extends ComposableRecordReader<K, X>
050 implements Configurable {
051
052 private int id;
053 protected Configuration conf;
054 private final ResetableIterator<X> EMPTY = new ResetableIterator.EMPTY<X>();
055
056 private WritableComparator cmp;
057 @SuppressWarnings("unchecked")
058 protected Class<? extends WritableComparable> keyclass = null;
059 private PriorityQueue<ComposableRecordReader<K,?>> q;
060
061 protected final JoinCollector jc;
062 protected final ComposableRecordReader<K,? extends V>[] kids;
063
064 protected abstract boolean combine(Object[] srcs, TupleWritable value);
065
066 protected K key;
067 protected X value;
068
069 /**
070 * Create a RecordReader with <tt>capacity</tt> children to position
071 * <tt>id</tt> in the parent reader.
072 * The id of a root CompositeRecordReader is -1 by convention, but relying
073 * on this is not recommended.
074 */
075 @SuppressWarnings("unchecked") // Generic array assignment
076 public CompositeRecordReader(int id, int capacity,
077 Class<? extends WritableComparator> cmpcl)
078 throws IOException {
079 assert capacity > 0 : "Invalid capacity";
080 this.id = id;
081 if (null != cmpcl) {
082 cmp = ReflectionUtils.newInstance(cmpcl, null);
083 q = new PriorityQueue<ComposableRecordReader<K,?>>(3,
084 new Comparator<ComposableRecordReader<K,?>>() {
085 public int compare(ComposableRecordReader<K,?> o1,
086 ComposableRecordReader<K,?> o2) {
087 return cmp.compare(o1.key(), o2.key());
088 }
089 });
090 }
091 jc = new JoinCollector(capacity);
092 kids = new ComposableRecordReader[capacity];
093 }
094
095 @SuppressWarnings("unchecked")
096 public void initialize(InputSplit split, TaskAttemptContext context)
097 throws IOException, InterruptedException {
098 if (kids != null) {
099 for (int i = 0; i < kids.length; ++i) {
100 kids[i].initialize(((CompositeInputSplit)split).get(i), context);
101 if (kids[i].key() == null) {
102 continue;
103 }
104
105 // get keyclass
106 if (keyclass == null) {
107 keyclass = kids[i].createKey().getClass().
108 asSubclass(WritableComparable.class);
109 }
110 // create priority queue
111 if (null == q) {
112 cmp = WritableComparator.get(keyclass, conf);
113 q = new PriorityQueue<ComposableRecordReader<K,?>>(3,
114 new Comparator<ComposableRecordReader<K,?>>() {
115 public int compare(ComposableRecordReader<K,?> o1,
116 ComposableRecordReader<K,?> o2) {
117 return cmp.compare(o1.key(), o2.key());
118 }
119 });
120 }
121 // Explicit check for key class agreement
122 if (!keyclass.equals(kids[i].key().getClass())) {
123 throw new ClassCastException("Child key classes fail to agree");
124 }
125
126 // add the kid to priority queue if it has any elements
127 if (kids[i].hasNext()) {
128 q.add(kids[i]);
129 }
130 }
131 }
132 }
133
134 /**
135 * Return the position in the collector this class occupies.
136 */
137 public int id() {
138 return id;
139 }
140
141 /**
142 * {@inheritDoc}
143 */
144 public void setConf(Configuration conf) {
145 this.conf = conf;
146 }
147
148 /**
149 * {@inheritDoc}
150 */
151 public Configuration getConf() {
152 return conf;
153 }
154
155 /**
156 * Return sorted list of RecordReaders for this composite.
157 */
158 protected PriorityQueue<ComposableRecordReader<K,?>> getRecordReaderQueue() {
159 return q;
160 }
161
162 /**
163 * Return comparator defining the ordering for RecordReaders in this
164 * composite.
165 */
166 protected WritableComparator getComparator() {
167 return cmp;
168 }
169
170 /**
171 * Add a RecordReader to this collection.
172 * The id() of a RecordReader determines where in the Tuple its
173 * entry will appear. Adding RecordReaders with the same id has
174 * undefined behavior.
175 */
176 public void add(ComposableRecordReader<K,? extends V> rr)
177 throws IOException, InterruptedException {
178 kids[rr.id()] = rr;
179 }
180
181 /**
182 * Collector for join values.
183 * This accumulates values for a given key from the child RecordReaders. If
184 * one or more child RR contain duplicate keys, this will emit the cross
185 * product of the associated values until exhausted.
186 */
187 public class JoinCollector {
188 private K key;
189 private ResetableIterator<X>[] iters;
190 private int pos = -1;
191 private boolean first = true;
192
193 /**
194 * Construct a collector capable of handling the specified number of
195 * children.
196 */
197 @SuppressWarnings("unchecked") // Generic array assignment
198 public JoinCollector(int card) {
199 iters = new ResetableIterator[card];
200 for (int i = 0; i < iters.length; ++i) {
201 iters[i] = EMPTY;
202 }
203 }
204
205 /**
206 * Register a given iterator at position id.
207 */
208 public void add(int id, ResetableIterator<X> i)
209 throws IOException {
210 iters[id] = i;
211 }
212
213 /**
214 * Return the key associated with this collection.
215 */
216 public K key() {
217 return key;
218 }
219
220 /**
221 * Codify the contents of the collector to be iterated over.
222 * When this is called, all RecordReaders registered for this
223 * key should have added ResetableIterators.
224 */
225 public void reset(K key) {
226 this.key = key;
227 first = true;
228 pos = iters.length - 1;
229 for (int i = 0; i < iters.length; ++i) {
230 iters[i].reset();
231 }
232 }
233
234 /**
235 * Clear all state information.
236 */
237 public void clear() {
238 key = null;
239 pos = -1;
240 for (int i = 0; i < iters.length; ++i) {
241 iters[i].clear();
242 iters[i] = EMPTY;
243 }
244 }
245
246 /**
247 * Returns false if exhausted or if reset(K) has not been called.
248 */
249 public boolean hasNext() {
250 return !(pos < 0);
251 }
252
253 /**
254 * Populate Tuple from iterators.
255 * It should be the case that, given iterators i_1...i_n over values from
256 * sources s_1...s_n sharing key k, repeated calls to next should yield
257 * I x I.
258 */
259 @SuppressWarnings("unchecked") // No static type info on Tuples
260 protected boolean next(TupleWritable val) throws IOException {
261 if (first) {
262 int i = -1;
263 for (pos = 0; pos < iters.length; ++pos) {
264 if (iters[pos].hasNext() && iters[pos].next((X)val.get(pos))) {
265 i = pos;
266 val.setWritten(i);
267 }
268 }
269 pos = i;
270 first = false;
271 if (pos < 0) {
272 clear();
273 return false;
274 }
275 return true;
276 }
277 while (0 <= pos && !(iters[pos].hasNext() &&
278 iters[pos].next((X)val.get(pos)))) {
279 --pos;
280 }
281 if (pos < 0) {
282 clear();
283 return false;
284 }
285 val.setWritten(pos);
286 for (int i = 0; i < pos; ++i) {
287 if (iters[i].replay((X)val.get(i))) {
288 val.setWritten(i);
289 }
290 }
291 while (pos + 1 < iters.length) {
292 ++pos;
293 iters[pos].reset();
294 if (iters[pos].hasNext() && iters[pos].next((X)val.get(pos))) {
295 val.setWritten(pos);
296 }
297 }
298 return true;
299 }
300
301 /**
302 * Replay the last Tuple emitted.
303 */
304 @SuppressWarnings("unchecked") // No static typeinfo on Tuples
305 public boolean replay(TupleWritable val) throws IOException {
306 // The last emitted tuple might have drawn on an empty source;
307 // it can't be cleared prematurely, b/c there may be more duplicate
308 // keys in iterator positions < pos
309 assert !first;
310 boolean ret = false;
311 for (int i = 0; i < iters.length; ++i) {
312 if (iters[i].replay((X)val.get(i))) {
313 val.setWritten(i);
314 ret = true;
315 }
316 }
317 return ret;
318 }
319
320 /**
321 * Close all child iterators.
322 */
323 public void close() throws IOException {
324 for (int i = 0; i < iters.length; ++i) {
325 iters[i].close();
326 }
327 }
328
329 /**
330 * Write the next value into key, value as accepted by the operation
331 * associated with this set of RecordReaders.
332 */
333 public boolean flush(TupleWritable value) throws IOException {
334 while (hasNext()) {
335 value.clearWritten();
336 if (next(value) && combine(kids, value)) {
337 return true;
338 }
339 }
340 return false;
341 }
342 }
343
344 /**
345 * Return the key for the current join or the value at the top of the
346 * RecordReader heap.
347 */
348 public K key() {
349 if (jc.hasNext()) {
350 return jc.key();
351 }
352 if (!q.isEmpty()) {
353 return q.peek().key();
354 }
355 return null;
356 }
357
358 /**
359 * Clone the key at the top of this RR into the given object.
360 */
361 public void key(K key) throws IOException {
362 ReflectionUtils.copy(conf, key(), key);
363 }
364
365 public K getCurrentKey() {
366 return key;
367 }
368
369 /**
370 * Return true if it is possible that this could emit more values.
371 */
372 public boolean hasNext() {
373 return jc.hasNext() || !q.isEmpty();
374 }
375
376 /**
377 * Pass skip key to child RRs.
378 */
379 public void skip(K key) throws IOException, InterruptedException {
380 ArrayList<ComposableRecordReader<K,?>> tmp =
381 new ArrayList<ComposableRecordReader<K,?>>();
382 while (!q.isEmpty() && cmp.compare(q.peek().key(), key) <= 0) {
383 tmp.add(q.poll());
384 }
385 for (ComposableRecordReader<K,?> rr : tmp) {
386 rr.skip(key);
387 if (rr.hasNext()) {
388 q.add(rr);
389 }
390 }
391 }
392
393 /**
394 * Obtain an iterator over the child RRs apropos of the value type
395 * ultimately emitted from this join.
396 */
397 protected abstract ResetableIterator<X> getDelegate();
398
399 /**
400 * If key provided matches that of this Composite, give JoinCollector
401 * iterator over values it may emit.
402 */
403 @SuppressWarnings("unchecked") // No values from static EMPTY class
404 @Override
405 public void accept(CompositeRecordReader.JoinCollector jc, K key)
406 throws IOException, InterruptedException {
407 if (hasNext() && 0 == cmp.compare(key, key())) {
408 fillJoinCollector(createKey());
409 jc.add(id, getDelegate());
410 return;
411 }
412 jc.add(id, EMPTY);
413 }
414
415 /**
416 * For all child RRs offering the key provided, obtain an iterator
417 * at that position in the JoinCollector.
418 */
419 protected void fillJoinCollector(K iterkey)
420 throws IOException, InterruptedException {
421 if (!q.isEmpty()) {
422 q.peek().key(iterkey);
423 while (0 == cmp.compare(q.peek().key(), iterkey)) {
424 ComposableRecordReader<K,?> t = q.poll();
425 t.accept(jc, iterkey);
426 if (t.hasNext()) {
427 q.add(t);
428 } else if (q.isEmpty()) {
429 return;
430 }
431 }
432 }
433 }
434
435 /**
436 * Implement Comparable contract (compare key of join or head of heap
437 * with that of another).
438 */
439 public int compareTo(ComposableRecordReader<K,?> other) {
440 return cmp.compare(key(), other.key());
441 }
442
443 /**
444 * Create a new key common to all child RRs.
445 * @throws ClassCastException if key classes differ.
446 */
447 @SuppressWarnings("unchecked")
448 protected K createKey() {
449 if (keyclass == null || keyclass.equals(NullWritable.class)) {
450 return (K) NullWritable.get();
451 }
452 return (K) ReflectionUtils.newInstance(keyclass, getConf());
453 }
454
455 /**
456 * Create a value to be used internally for joins.
457 */
458 protected TupleWritable createTupleWritable() {
459 Writable[] vals = new Writable[kids.length];
460 for (int i = 0; i < vals.length; ++i) {
461 vals[i] = kids[i].createValue();
462 }
463 return new TupleWritable(vals);
464 }
465
466 /** {@inheritDoc} */
467 public X getCurrentValue()
468 throws IOException, InterruptedException {
469 return value;
470 }
471
472 /**
473 * Close all child RRs.
474 */
475 public void close() throws IOException {
476 if (kids != null) {
477 for (RecordReader<K,? extends Writable> rr : kids) {
478 rr.close();
479 }
480 }
481 if (jc != null) {
482 jc.close();
483 }
484 }
485
486 /**
487 * Report progress as the minimum of all child RR progress.
488 */
489 public float getProgress() throws IOException, InterruptedException {
490 float ret = 1.0f;
491 for (RecordReader<K,? extends Writable> rr : kids) {
492 ret = Math.min(ret, rr.getProgress());
493 }
494 return ret;
495 }
496
497 }