001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.mapred.join;
020
021 import java.io.IOException;
022 import java.util.PriorityQueue;
023
024 import org.apache.hadoop.classification.InterfaceAudience;
025 import org.apache.hadoop.classification.InterfaceStability;
026 import org.apache.hadoop.io.Writable;
027 import org.apache.hadoop.io.WritableComparable;
028 import org.apache.hadoop.io.WritableComparator;
029 import org.apache.hadoop.io.WritableUtils;
030 import org.apache.hadoop.util.ReflectionUtils;
031 import org.apache.hadoop.mapred.JobConf;
032 import org.apache.hadoop.mapred.RecordReader;
033
034 /**
035 * Base class for Composite join returning values derived from multiple
036 * sources, but generally not tuples.
037 */
038 @InterfaceAudience.Public
039 @InterfaceStability.Stable
040 public abstract class MultiFilterRecordReader<K extends WritableComparable,
041 V extends Writable>
042 extends CompositeRecordReader<K,V,V>
043 implements ComposableRecordReader<K,V> {
044
045 private Class<? extends Writable> valueclass;
046 private TupleWritable ivalue;
047
048 public MultiFilterRecordReader(int id, JobConf conf, int capacity,
049 Class<? extends WritableComparator> cmpcl) throws IOException {
050 super(id, capacity, cmpcl);
051 setConf(conf);
052 }
053
054 /**
055 * For each tuple emitted, return a value (typically one of the values
056 * in the tuple).
057 * Modifying the Writables in the tuple is permitted and unlikely to affect
058 * join behavior in most cases, but it is not recommended. It's safer to
059 * clone first.
060 */
061 protected abstract V emit(TupleWritable dst) throws IOException;
062
063 /**
064 * Default implementation offers {@link #emit} every Tuple from the
065 * collector (the outer join of child RRs).
066 */
067 protected boolean combine(Object[] srcs, TupleWritable dst) {
068 return true;
069 }
070
071 /** {@inheritDoc} */
072 public boolean next(K key, V value) throws IOException {
073 if (jc.flush(ivalue)) {
074 WritableUtils.cloneInto(key, jc.key());
075 WritableUtils.cloneInto(value, emit(ivalue));
076 return true;
077 }
078 jc.clear();
079 K iterkey = createKey();
080 final PriorityQueue<ComposableRecordReader<K,?>> q = getRecordReaderQueue();
081 while (!q.isEmpty()) {
082 fillJoinCollector(iterkey);
083 jc.reset(iterkey);
084 if (jc.flush(ivalue)) {
085 WritableUtils.cloneInto(key, jc.key());
086 WritableUtils.cloneInto(value, emit(ivalue));
087 return true;
088 }
089 jc.clear();
090 }
091 return false;
092 }
093
094 /** {@inheritDoc} */
095 @SuppressWarnings("unchecked") // Explicit check for value class agreement
096 public V createValue() {
097 if (null == valueclass) {
098 final Class<?> cls = kids[0].createValue().getClass();
099 for (RecordReader<K,? extends V> rr : kids) {
100 if (!cls.equals(rr.createValue().getClass())) {
101 throw new ClassCastException("Child value classes fail to agree");
102 }
103 }
104 valueclass = cls.asSubclass(Writable.class);
105 ivalue = createInternalValue();
106 }
107 return (V) ReflectionUtils.newInstance(valueclass, null);
108 }
109
110 /**
111 * Return an iterator returning a single value from the tuple.
112 * @see MultiFilterDelegationIterator
113 */
114 protected ResetableIterator<V> getDelegate() {
115 return new MultiFilterDelegationIterator();
116 }
117
118 /**
119 * Proxy the JoinCollector, but include callback to emit.
120 */
121 protected class MultiFilterDelegationIterator
122 implements ResetableIterator<V> {
123
124 public boolean hasNext() {
125 return jc.hasNext();
126 }
127
128 public boolean next(V val) throws IOException {
129 boolean ret;
130 if (ret = jc.flush(ivalue)) {
131 WritableUtils.cloneInto(val, emit(ivalue));
132 }
133 return ret;
134 }
135
136 public boolean replay(V val) throws IOException {
137 WritableUtils.cloneInto(val, emit(ivalue));
138 return true;
139 }
140
141 public void reset() {
142 jc.reset(jc.key());
143 }
144
145 public void add(V item) throws IOException {
146 throw new UnsupportedOperationException();
147 }
148
149 public void close() throws IOException {
150 jc.close();
151 }
152
153 public void clear() {
154 jc.clear();
155 }
156 }
157
158 }