001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.mapreduce.lib.join;
020
021 import java.io.DataOutput;
022 import java.io.DataInput;
023 import java.io.IOException;
024 import java.util.BitSet;
025 import java.util.Iterator;
026 import java.util.NoSuchElementException;
027
028 import org.apache.hadoop.classification.InterfaceAudience;
029 import org.apache.hadoop.classification.InterfaceStability;
030 import org.apache.hadoop.io.NullWritable;
031 import org.apache.hadoop.io.Text;
032 import org.apache.hadoop.io.Writable;
033 import org.apache.hadoop.io.WritableUtils;
034
035 /**
036 * Writable type storing multiple {@link org.apache.hadoop.io.Writable}s.
037 *
038 * This is *not* a general-purpose tuple type. In almost all cases, users are
039 * encouraged to implement their own serializable types, which can perform
040 * better validation and provide more efficient encodings than this class is
041 * capable. TupleWritable relies on the join framework for type safety and
042 * assumes its instances will rarely be persisted, assumptions not only
043 * incompatible with, but contrary to the general case.
044 *
045 * @see org.apache.hadoop.io.Writable
046 */
047 @InterfaceAudience.Public
048 @InterfaceStability.Stable
049 public class TupleWritable implements Writable, Iterable<Writable> {
050
051 protected BitSet written;
052 private Writable[] values;
053
054 /**
055 * Create an empty tuple with no allocated storage for writables.
056 */
057 public TupleWritable() {
058 written = new BitSet(0);
059 }
060
061 /**
062 * Initialize tuple with storage; unknown whether any of them contain
063 * "written" values.
064 */
065 public TupleWritable(Writable[] vals) {
066 written = new BitSet(vals.length);
067 values = vals;
068 }
069
070 /**
071 * Return true if tuple has an element at the position provided.
072 */
073 public boolean has(int i) {
074 return written.get(i);
075 }
076
077 /**
078 * Get ith Writable from Tuple.
079 */
080 public Writable get(int i) {
081 return values[i];
082 }
083
084 /**
085 * The number of children in this Tuple.
086 */
087 public int size() {
088 return values.length;
089 }
090
091 /**
092 * {@inheritDoc}
093 */
094 public boolean equals(Object other) {
095 if (other instanceof TupleWritable) {
096 TupleWritable that = (TupleWritable)other;
097 if (!this.written.equals(that.written)) {
098 return false;
099 }
100 for (int i = 0; i < values.length; ++i) {
101 if (!has(i)) continue;
102 if (!values[i].equals(that.get(i))) {
103 return false;
104 }
105 }
106 return true;
107 }
108 return false;
109 }
110
111 public int hashCode() {
112 assert false : "hashCode not designed";
113 return written.hashCode();
114 }
115
116 /**
117 * Return an iterator over the elements in this tuple.
118 * Note that this doesn't flatten the tuple; one may receive tuples
119 * from this iterator.
120 */
121 public Iterator<Writable> iterator() {
122 final TupleWritable t = this;
123 return new Iterator<Writable>() {
124 int bitIndex = written.nextSetBit(0);
125 public boolean hasNext() {
126 return bitIndex >= 0;
127 }
128 public Writable next() {
129 int returnIndex = bitIndex;
130 if (returnIndex < 0)
131 throw new NoSuchElementException();
132 bitIndex = written.nextSetBit(bitIndex+1);
133 return t.get(returnIndex);
134 }
135 public void remove() {
136 if (!written.get(bitIndex)) {
137 throw new IllegalStateException(
138 "Attempt to remove non-existent val");
139 }
140 written.clear(bitIndex);
141 }
142 };
143 }
144
145 /**
146 * Convert Tuple to String as in the following.
147 * <tt>[<child1>,<child2>,...,<childn>]</tt>
148 */
149 public String toString() {
150 StringBuffer buf = new StringBuffer("[");
151 for (int i = 0; i < values.length; ++i) {
152 buf.append(has(i) ? values[i].toString() : "");
153 buf.append(",");
154 }
155 if (values.length != 0)
156 buf.setCharAt(buf.length() - 1, ']');
157 else
158 buf.append(']');
159 return buf.toString();
160 }
161
162 // Writable
163
164 /** Writes each Writable to <code>out</code>.
165 * TupleWritable format:
166 * {@code
167 * <count><type1><type2>...<typen><obj1><obj2>...<objn>
168 * }
169 */
170 public void write(DataOutput out) throws IOException {
171 WritableUtils.writeVInt(out, values.length);
172 writeBitSet(out, values.length, written);
173 for (int i = 0; i < values.length; ++i) {
174 Text.writeString(out, values[i].getClass().getName());
175 }
176 for (int i = 0; i < values.length; ++i) {
177 if (has(i)) {
178 values[i].write(out);
179 }
180 }
181 }
182
183 /**
184 * {@inheritDoc}
185 */
186 @SuppressWarnings("unchecked") // No static typeinfo on Tuples
187 public void readFields(DataInput in) throws IOException {
188 int card = WritableUtils.readVInt(in);
189 values = new Writable[card];
190 readBitSet(in, card, written);
191 Class<? extends Writable>[] cls = new Class[card];
192 try {
193 for (int i = 0; i < card; ++i) {
194 cls[i] = Class.forName(Text.readString(in)).asSubclass(Writable.class);
195 }
196 for (int i = 0; i < card; ++i) {
197 if (cls[i].equals(NullWritable.class)) {
198 values[i] = NullWritable.get();
199 } else {
200 values[i] = cls[i].newInstance();
201 }
202 if (has(i)) {
203 values[i].readFields(in);
204 }
205 }
206 } catch (ClassNotFoundException e) {
207 throw new IOException("Failed tuple init", e);
208 } catch (IllegalAccessException e) {
209 throw new IOException("Failed tuple init", e);
210 } catch (InstantiationException e) {
211 throw new IOException("Failed tuple init", e);
212 }
213 }
214
215 /**
216 * Record that the tuple contains an element at the position provided.
217 */
218 void setWritten(int i) {
219 written.set(i);
220 }
221
222 /**
223 * Record that the tuple does not contain an element at the position
224 * provided.
225 */
226 void clearWritten(int i) {
227 written.clear(i);
228 }
229
230 /**
231 * Clear any record of which writables have been written to, without
232 * releasing storage.
233 */
234 void clearWritten() {
235 written.clear();
236 }
237
238 /**
239 * Writes the bit set to the stream. The first 64 bit-positions of the bit
240 * set are written as a VLong for backwards-compatibility with older
241 * versions of TupleWritable. All bit-positions >= 64 are encoded as a byte
242 * for every 8 bit-positions.
243 */
244 private static final void writeBitSet(DataOutput stream, int nbits,
245 BitSet bitSet) throws IOException {
246 long bits = 0L;
247
248 int bitSetIndex = bitSet.nextSetBit(0);
249 for (;bitSetIndex >= 0 && bitSetIndex < Long.SIZE;
250 bitSetIndex=bitSet.nextSetBit(bitSetIndex+1)) {
251 bits |= 1L << bitSetIndex;
252 }
253 WritableUtils.writeVLong(stream,bits);
254
255 if (nbits > Long.SIZE) {
256 bits = 0L;
257 for (int lastWordWritten = 0; bitSetIndex >= 0 && bitSetIndex < nbits;
258 bitSetIndex = bitSet.nextSetBit(bitSetIndex+1)) {
259 int bitsIndex = bitSetIndex % Byte.SIZE;
260 int word = (bitSetIndex-Long.SIZE) / Byte.SIZE;
261 if (word > lastWordWritten) {
262 stream.writeByte((byte)bits);
263 bits = 0L;
264 for (lastWordWritten++;lastWordWritten<word;lastWordWritten++) {
265 stream.writeByte((byte)bits);
266 }
267 }
268 bits |= 1L << bitsIndex;
269 }
270 stream.writeByte((byte)bits);
271 }
272 }
273
274 /**
275 * Reads a bitset from the stream that has been written with
276 * {@link #writeBitSet(DataOutput, int, BitSet)}.
277 */
278 private static final void readBitSet(DataInput stream, int nbits,
279 BitSet bitSet) throws IOException {
280 bitSet.clear();
281 long initialBits = WritableUtils.readVLong(stream);
282 long last = 0L;
283 while (0L != initialBits) {
284 last = Long.lowestOneBit(initialBits);
285 initialBits ^= last;
286 bitSet.set(Long.numberOfTrailingZeros(last));
287 }
288
289 for (int offset=Long.SIZE; offset < nbits; offset+=Byte.SIZE) {
290 byte bits = stream.readByte();
291 while (0 != bits) {
292 last = Long.lowestOneBit(bits);
293 bits ^= last;
294 bitSet.set(Long.numberOfTrailingZeros(last) + offset);
295 }
296 }
297 }
298 }