001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.mapreduce.lib.join;
020
021 import java.io.IOException;
022 import java.util.PriorityQueue;
023
024 import org.apache.hadoop.classification.InterfaceAudience;
025 import org.apache.hadoop.classification.InterfaceStability;
026 import org.apache.hadoop.conf.Configuration;
027 import org.apache.hadoop.io.Writable;
028 import org.apache.hadoop.io.WritableComparable;
029 import org.apache.hadoop.io.WritableComparator;
030 import org.apache.hadoop.util.ReflectionUtils;
031
032 /**
033 * Base class for Composite joins returning Tuples of arbitrary Writables.
034 */
035 @InterfaceAudience.Public
036 @InterfaceStability.Stable
037 public abstract class JoinRecordReader<K extends WritableComparable<?>>
038 extends CompositeRecordReader<K,Writable,TupleWritable> {
039
040 public JoinRecordReader(int id, Configuration conf, int capacity,
041 Class<? extends WritableComparator> cmpcl) throws IOException {
042 super(id, capacity, cmpcl);
043 setConf(conf);
044 }
045
046 /**
047 * Emit the next set of key, value pairs as defined by the child
048 * RecordReaders and operation associated with this composite RR.
049 */
050 public boolean nextKeyValue()
051 throws IOException, InterruptedException {
052 if (key == null) {
053 key = createKey();
054 }
055 if (jc.flush(value)) {
056 ReflectionUtils.copy(conf, jc.key(), key);
057 return true;
058 }
059 jc.clear();
060 if (value == null) {
061 value = createValue();
062 }
063 final PriorityQueue<ComposableRecordReader<K,?>> q =
064 getRecordReaderQueue();
065 K iterkey = createKey();
066 while (q != null && !q.isEmpty()) {
067 fillJoinCollector(iterkey);
068 jc.reset(iterkey);
069 if (jc.flush(value)) {
070 ReflectionUtils.copy(conf, jc.key(), key);
071 return true;
072 }
073 jc.clear();
074 }
075 return false;
076 }
077
078 public TupleWritable createValue() {
079 return createTupleWritable();
080 }
081
082 /**
083 * Return an iterator wrapping the JoinCollector.
084 */
085 protected ResetableIterator<TupleWritable> getDelegate() {
086 return new JoinDelegationIterator();
087 }
088
089 /**
090 * Since the JoinCollector is effecting our operation, we need only
091 * provide an iterator proxy wrapping its operation.
092 */
093 protected class JoinDelegationIterator
094 implements ResetableIterator<TupleWritable> {
095
096 public boolean hasNext() {
097 return jc.hasNext();
098 }
099
100 public boolean next(TupleWritable val) throws IOException {
101 return jc.flush(val);
102 }
103
104 public boolean replay(TupleWritable val) throws IOException {
105 return jc.replay(val);
106 }
107
108 public void reset() {
109 jc.reset(jc.key());
110 }
111
112 public void add(TupleWritable item) throws IOException {
113 throw new UnsupportedOperationException();
114 }
115
116 public void close() throws IOException {
117 jc.close();
118 }
119
120 public void clear() {
121 jc.clear();
122 }
123 }
124 }