001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.io;
020
021 import java.io.*;
022 import java.util.*;
023 import java.rmi.server.UID;
024 import java.security.MessageDigest;
025 import org.apache.commons.logging.*;
026 import org.apache.hadoop.util.Options;
027 import org.apache.hadoop.fs.*;
028 import org.apache.hadoop.fs.Options.CreateOpts;
029 import org.apache.hadoop.io.compress.CodecPool;
030 import org.apache.hadoop.io.compress.CompressionCodec;
031 import org.apache.hadoop.io.compress.CompressionInputStream;
032 import org.apache.hadoop.io.compress.CompressionOutputStream;
033 import org.apache.hadoop.io.compress.Compressor;
034 import org.apache.hadoop.io.compress.Decompressor;
035 import org.apache.hadoop.io.compress.DefaultCodec;
036 import org.apache.hadoop.io.compress.GzipCodec;
037 import org.apache.hadoop.io.compress.zlib.ZlibFactory;
038 import org.apache.hadoop.io.serializer.Deserializer;
039 import org.apache.hadoop.io.serializer.Serializer;
040 import org.apache.hadoop.io.serializer.SerializationFactory;
041 import org.apache.hadoop.classification.InterfaceAudience;
042 import org.apache.hadoop.classification.InterfaceStability;
043 import org.apache.hadoop.conf.*;
044 import org.apache.hadoop.util.Progressable;
045 import org.apache.hadoop.util.Progress;
046 import org.apache.hadoop.util.ReflectionUtils;
047 import org.apache.hadoop.util.NativeCodeLoader;
048 import org.apache.hadoop.util.MergeSort;
049 import org.apache.hadoop.util.PriorityQueue;
050 import org.apache.hadoop.util.Time;
051
052 /**
053 * <code>SequenceFile</code>s are flat files consisting of binary key/value
054 * pairs.
055 *
056 * <p><code>SequenceFile</code> provides {@link SequenceFile.Writer},
057 * {@link SequenceFile.Reader} and {@link Sorter} classes for writing,
058 * reading and sorting respectively.</p>
059 *
060 * There are three <code>SequenceFile</code> <code>Writer</code>s based on the
061 * {@link CompressionType} used to compress key/value pairs:
062 * <ol>
063 * <li>
064 * <code>Writer</code> : Uncompressed records.
065 * </li>
066 * <li>
067 * <code>RecordCompressWriter</code> : Record-compressed files, only compress
068 * values.
069 * </li>
070 * <li>
071 * <code>BlockCompressWriter</code> : Block-compressed files, both keys &
072 * values are collected in 'blocks'
073 * separately and compressed. The size of
074 * the 'block' is configurable.
075 * </ol>
076 *
077 * <p>The actual compression algorithm used to compress key and/or values can be
078 * specified by using the appropriate {@link CompressionCodec}.</p>
079 *
080 * <p>The recommended way is to use the static <tt>createWriter</tt> methods
081 * provided by the <code>SequenceFile</code> to chose the preferred format.</p>
082 *
083 * <p>The {@link SequenceFile.Reader} acts as the bridge and can read any of the
084 * above <code>SequenceFile</code> formats.</p>
085 *
086 * <h4 id="Formats">SequenceFile Formats</h4>
087 *
088 * <p>Essentially there are 3 different formats for <code>SequenceFile</code>s
089 * depending on the <code>CompressionType</code> specified. All of them share a
090 * <a href="#Header">common header</a> described below.
091 *
092 * <h5 id="Header">SequenceFile Header</h5>
093 * <ul>
094 * <li>
095 * version - 3 bytes of magic header <b>SEQ</b>, followed by 1 byte of actual
096 * version number (e.g. SEQ4 or SEQ6)
097 * </li>
098 * <li>
099 * keyClassName -key class
100 * </li>
101 * <li>
102 * valueClassName - value class
103 * </li>
104 * <li>
105 * compression - A boolean which specifies if compression is turned on for
106 * keys/values in this file.
107 * </li>
108 * <li>
109 * blockCompression - A boolean which specifies if block-compression is
110 * turned on for keys/values in this file.
111 * </li>
112 * <li>
113 * compression codec - <code>CompressionCodec</code> class which is used for
114 * compression of keys and/or values (if compression is
115 * enabled).
116 * </li>
117 * <li>
118 * metadata - {@link Metadata} for this file.
119 * </li>
120 * <li>
121 * sync - A sync marker to denote end of the header.
122 * </li>
123 * </ul>
124 *
125 * <h5 id="#UncompressedFormat">Uncompressed SequenceFile Format</h5>
126 * <ul>
127 * <li>
128 * <a href="#Header">Header</a>
129 * </li>
130 * <li>
131 * Record
132 * <ul>
133 * <li>Record length</li>
134 * <li>Key length</li>
135 * <li>Key</li>
136 * <li>Value</li>
137 * </ul>
138 * </li>
139 * <li>
140 * A sync-marker every few <code>100</code> bytes or so.
141 * </li>
142 * </ul>
143 *
144 * <h5 id="#RecordCompressedFormat">Record-Compressed SequenceFile Format</h5>
145 * <ul>
146 * <li>
147 * <a href="#Header">Header</a>
148 * </li>
149 * <li>
150 * Record
151 * <ul>
152 * <li>Record length</li>
153 * <li>Key length</li>
154 * <li>Key</li>
155 * <li><i>Compressed</i> Value</li>
156 * </ul>
157 * </li>
158 * <li>
159 * A sync-marker every few <code>100</code> bytes or so.
160 * </li>
161 * </ul>
162 *
163 * <h5 id="#BlockCompressedFormat">Block-Compressed SequenceFile Format</h5>
164 * <ul>
165 * <li>
166 * <a href="#Header">Header</a>
167 * </li>
168 * <li>
169 * Record <i>Block</i>
170 * <ul>
171 * <li>Uncompressed number of records in the block</li>
172 * <li>Compressed key-lengths block-size</li>
173 * <li>Compressed key-lengths block</li>
174 * <li>Compressed keys block-size</li>
175 * <li>Compressed keys block</li>
176 * <li>Compressed value-lengths block-size</li>
177 * <li>Compressed value-lengths block</li>
178 * <li>Compressed values block-size</li>
179 * <li>Compressed values block</li>
180 * </ul>
181 * </li>
182 * <li>
183 * A sync-marker every block.
184 * </li>
185 * </ul>
186 *
187 * <p>The compressed blocks of key lengths and value lengths consist of the
188 * actual lengths of individual keys/values encoded in ZeroCompressedInteger
189 * format.</p>
190 *
191 * @see CompressionCodec
192 */
193 @InterfaceAudience.Public
194 @InterfaceStability.Stable
195 public class SequenceFile {
196 private static final Log LOG = LogFactory.getLog(SequenceFile.class);
197
198 private SequenceFile() {} // no public ctor
199
200 private static final byte BLOCK_COMPRESS_VERSION = (byte)4;
201 private static final byte CUSTOM_COMPRESS_VERSION = (byte)5;
202 private static final byte VERSION_WITH_METADATA = (byte)6;
203 private static byte[] VERSION = new byte[] {
204 (byte)'S', (byte)'E', (byte)'Q', VERSION_WITH_METADATA
205 };
206
207 private static final int SYNC_ESCAPE = -1; // "length" of sync entries
208 private static final int SYNC_HASH_SIZE = 16; // number of bytes in hash
209 private static final int SYNC_SIZE = 4+SYNC_HASH_SIZE; // escape + hash
210
211 /** The number of bytes between sync points.*/
212 public static final int SYNC_INTERVAL = 100*SYNC_SIZE;
213
214 /**
215 * The compression type used to compress key/value pairs in the
216 * {@link SequenceFile}.
217 *
218 * @see SequenceFile.Writer
219 */
220 public static enum CompressionType {
221 /** Do not compress records. */
222 NONE,
223 /** Compress values only, each separately. */
224 RECORD,
225 /** Compress sequences of records together in blocks. */
226 BLOCK
227 }
228
229 /**
230 * Get the compression type for the reduce outputs
231 * @param job the job config to look in
232 * @return the kind of compression to use
233 */
234 static public CompressionType getDefaultCompressionType(Configuration job) {
235 String name = job.get("io.seqfile.compression.type");
236 return name == null ? CompressionType.RECORD :
237 CompressionType.valueOf(name);
238 }
239
240 /**
241 * Set the default compression type for sequence files.
242 * @param job the configuration to modify
243 * @param val the new compression type (none, block, record)
244 */
245 static public void setDefaultCompressionType(Configuration job,
246 CompressionType val) {
247 job.set("io.seqfile.compression.type", val.toString());
248 }
249
250 /**
251 * Create a new Writer with the given options.
252 * @param conf the configuration to use
253 * @param opts the options to create the file with
254 * @return a new Writer
255 * @throws IOException
256 */
257 public static Writer createWriter(Configuration conf, Writer.Option... opts
258 ) throws IOException {
259 Writer.CompressionOption compressionOption =
260 Options.getOption(Writer.CompressionOption.class, opts);
261 CompressionType kind;
262 if (compressionOption != null) {
263 kind = compressionOption.getValue();
264 } else {
265 kind = getDefaultCompressionType(conf);
266 opts = Options.prependOptions(opts, Writer.compression(kind));
267 }
268 switch (kind) {
269 default:
270 case NONE:
271 return new Writer(conf, opts);
272 case RECORD:
273 return new RecordCompressWriter(conf, opts);
274 case BLOCK:
275 return new BlockCompressWriter(conf, opts);
276 }
277 }
278
279 /**
280 * Construct the preferred type of SequenceFile Writer.
281 * @param fs The configured filesystem.
282 * @param conf The configuration.
283 * @param name The name of the file.
284 * @param keyClass The 'key' type.
285 * @param valClass The 'value' type.
286 * @return Returns the handle to the constructed SequenceFile Writer.
287 * @throws IOException
288 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
289 * instead.
290 */
291 @Deprecated
292 public static Writer
293 createWriter(FileSystem fs, Configuration conf, Path name,
294 Class keyClass, Class valClass) throws IOException {
295 return createWriter(conf, Writer.filesystem(fs),
296 Writer.file(name), Writer.keyClass(keyClass),
297 Writer.valueClass(valClass));
298 }
299
300 /**
301 * Construct the preferred type of SequenceFile Writer.
302 * @param fs The configured filesystem.
303 * @param conf The configuration.
304 * @param name The name of the file.
305 * @param keyClass The 'key' type.
306 * @param valClass The 'value' type.
307 * @param compressionType The compression type.
308 * @return Returns the handle to the constructed SequenceFile Writer.
309 * @throws IOException
310 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
311 * instead.
312 */
313 @Deprecated
314 public static Writer
315 createWriter(FileSystem fs, Configuration conf, Path name,
316 Class keyClass, Class valClass,
317 CompressionType compressionType) throws IOException {
318 return createWriter(conf, Writer.filesystem(fs),
319 Writer.file(name), Writer.keyClass(keyClass),
320 Writer.valueClass(valClass),
321 Writer.compression(compressionType));
322 }
323
324 /**
325 * Construct the preferred type of SequenceFile Writer.
326 * @param fs The configured filesystem.
327 * @param conf The configuration.
328 * @param name The name of the file.
329 * @param keyClass The 'key' type.
330 * @param valClass The 'value' type.
331 * @param compressionType The compression type.
332 * @param progress The Progressable object to track progress.
333 * @return Returns the handle to the constructed SequenceFile Writer.
334 * @throws IOException
335 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
336 * instead.
337 */
338 @Deprecated
339 public static Writer
340 createWriter(FileSystem fs, Configuration conf, Path name,
341 Class keyClass, Class valClass, CompressionType compressionType,
342 Progressable progress) throws IOException {
343 return createWriter(conf, Writer.file(name),
344 Writer.filesystem(fs),
345 Writer.keyClass(keyClass),
346 Writer.valueClass(valClass),
347 Writer.compression(compressionType),
348 Writer.progressable(progress));
349 }
350
351 /**
352 * Construct the preferred type of SequenceFile Writer.
353 * @param fs The configured filesystem.
354 * @param conf The configuration.
355 * @param name The name of the file.
356 * @param keyClass The 'key' type.
357 * @param valClass The 'value' type.
358 * @param compressionType The compression type.
359 * @param codec The compression codec.
360 * @return Returns the handle to the constructed SequenceFile Writer.
361 * @throws IOException
362 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
363 * instead.
364 */
365 @Deprecated
366 public static Writer
367 createWriter(FileSystem fs, Configuration conf, Path name,
368 Class keyClass, Class valClass, CompressionType compressionType,
369 CompressionCodec codec) throws IOException {
370 return createWriter(conf, Writer.file(name),
371 Writer.filesystem(fs),
372 Writer.keyClass(keyClass),
373 Writer.valueClass(valClass),
374 Writer.compression(compressionType, codec));
375 }
376
377 /**
378 * Construct the preferred type of SequenceFile Writer.
379 * @param fs The configured filesystem.
380 * @param conf The configuration.
381 * @param name The name of the file.
382 * @param keyClass The 'key' type.
383 * @param valClass The 'value' type.
384 * @param compressionType The compression type.
385 * @param codec The compression codec.
386 * @param progress The Progressable object to track progress.
387 * @param metadata The metadata of the file.
388 * @return Returns the handle to the constructed SequenceFile Writer.
389 * @throws IOException
390 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
391 * instead.
392 */
393 @Deprecated
394 public static Writer
395 createWriter(FileSystem fs, Configuration conf, Path name,
396 Class keyClass, Class valClass,
397 CompressionType compressionType, CompressionCodec codec,
398 Progressable progress, Metadata metadata) throws IOException {
399 return createWriter(conf, Writer.file(name),
400 Writer.filesystem(fs),
401 Writer.keyClass(keyClass),
402 Writer.valueClass(valClass),
403 Writer.compression(compressionType, codec),
404 Writer.progressable(progress),
405 Writer.metadata(metadata));
406 }
407
408 /**
409 * Construct the preferred type of SequenceFile Writer.
410 * @param fs The configured filesystem.
411 * @param conf The configuration.
412 * @param name The name of the file.
413 * @param keyClass The 'key' type.
414 * @param valClass The 'value' type.
415 * @param bufferSize buffer size for the underlaying outputstream.
416 * @param replication replication factor for the file.
417 * @param blockSize block size for the file.
418 * @param compressionType The compression type.
419 * @param codec The compression codec.
420 * @param progress The Progressable object to track progress.
421 * @param metadata The metadata of the file.
422 * @return Returns the handle to the constructed SequenceFile Writer.
423 * @throws IOException
424 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
425 * instead.
426 */
427 @Deprecated
428 public static Writer
429 createWriter(FileSystem fs, Configuration conf, Path name,
430 Class keyClass, Class valClass, int bufferSize,
431 short replication, long blockSize,
432 CompressionType compressionType, CompressionCodec codec,
433 Progressable progress, Metadata metadata) throws IOException {
434 return createWriter(conf, Writer.file(name),
435 Writer.filesystem(fs),
436 Writer.keyClass(keyClass),
437 Writer.valueClass(valClass),
438 Writer.bufferSize(bufferSize),
439 Writer.replication(replication),
440 Writer.blockSize(blockSize),
441 Writer.compression(compressionType, codec),
442 Writer.progressable(progress),
443 Writer.metadata(metadata));
444 }
445
446 /**
447 * Construct the preferred type of SequenceFile Writer.
448 * @param fs The configured filesystem.
449 * @param conf The configuration.
450 * @param name The name of the file.
451 * @param keyClass The 'key' type.
452 * @param valClass The 'value' type.
453 * @param bufferSize buffer size for the underlaying outputstream.
454 * @param replication replication factor for the file.
455 * @param blockSize block size for the file.
456 * @param createParent create parent directory if non-existent
457 * @param compressionType The compression type.
458 * @param codec The compression codec.
459 * @param metadata The metadata of the file.
460 * @return Returns the handle to the constructed SequenceFile Writer.
461 * @throws IOException
462 */
463 @Deprecated
464 public static Writer
465 createWriter(FileSystem fs, Configuration conf, Path name,
466 Class keyClass, Class valClass, int bufferSize,
467 short replication, long blockSize, boolean createParent,
468 CompressionType compressionType, CompressionCodec codec,
469 Metadata metadata) throws IOException {
470 return createWriter(FileContext.getFileContext(fs.getUri(), conf),
471 conf, name, keyClass, valClass, compressionType, codec,
472 metadata, EnumSet.of(CreateFlag.CREATE,CreateFlag.OVERWRITE),
473 CreateOpts.bufferSize(bufferSize),
474 createParent ? CreateOpts.createParent()
475 : CreateOpts.donotCreateParent(),
476 CreateOpts.repFac(replication),
477 CreateOpts.blockSize(blockSize)
478 );
479 }
480
481 /**
482 * Construct the preferred type of SequenceFile Writer.
483 * @param fc The context for the specified file.
484 * @param conf The configuration.
485 * @param name The name of the file.
486 * @param keyClass The 'key' type.
487 * @param valClass The 'value' type.
488 * @param compressionType The compression type.
489 * @param codec The compression codec.
490 * @param metadata The metadata of the file.
491 * @param createFlag gives the semantics of create: overwrite, append etc.
492 * @param opts file creation options; see {@link CreateOpts}.
493 * @return Returns the handle to the constructed SequenceFile Writer.
494 * @throws IOException
495 */
496 public static Writer
497 createWriter(FileContext fc, Configuration conf, Path name,
498 Class keyClass, Class valClass,
499 CompressionType compressionType, CompressionCodec codec,
500 Metadata metadata,
501 final EnumSet<CreateFlag> createFlag, CreateOpts... opts)
502 throws IOException {
503 return createWriter(conf, fc.create(name, createFlag, opts),
504 keyClass, valClass, compressionType, codec, metadata).ownStream();
505 }
506
507 /**
508 * Construct the preferred type of SequenceFile Writer.
509 * @param fs The configured filesystem.
510 * @param conf The configuration.
511 * @param name The name of the file.
512 * @param keyClass The 'key' type.
513 * @param valClass The 'value' type.
514 * @param compressionType The compression type.
515 * @param codec The compression codec.
516 * @param progress The Progressable object to track progress.
517 * @return Returns the handle to the constructed SequenceFile Writer.
518 * @throws IOException
519 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
520 * instead.
521 */
522 @Deprecated
523 public static Writer
524 createWriter(FileSystem fs, Configuration conf, Path name,
525 Class keyClass, Class valClass,
526 CompressionType compressionType, CompressionCodec codec,
527 Progressable progress) throws IOException {
528 return createWriter(conf, Writer.file(name),
529 Writer.filesystem(fs),
530 Writer.keyClass(keyClass),
531 Writer.valueClass(valClass),
532 Writer.compression(compressionType, codec),
533 Writer.progressable(progress));
534 }
535
536 /**
537 * Construct the preferred type of 'raw' SequenceFile Writer.
538 * @param conf The configuration.
539 * @param out The stream on top which the writer is to be constructed.
540 * @param keyClass The 'key' type.
541 * @param valClass The 'value' type.
542 * @param compressionType The compression type.
543 * @param codec The compression codec.
544 * @param metadata The metadata of the file.
545 * @return Returns the handle to the constructed SequenceFile Writer.
546 * @throws IOException
547 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
548 * instead.
549 */
550 @Deprecated
551 public static Writer
552 createWriter(Configuration conf, FSDataOutputStream out,
553 Class keyClass, Class valClass,
554 CompressionType compressionType,
555 CompressionCodec codec, Metadata metadata) throws IOException {
556 return createWriter(conf, Writer.stream(out), Writer.keyClass(keyClass),
557 Writer.valueClass(valClass),
558 Writer.compression(compressionType, codec),
559 Writer.metadata(metadata));
560 }
561
562 /**
563 * Construct the preferred type of 'raw' SequenceFile Writer.
564 * @param conf The configuration.
565 * @param out The stream on top which the writer is to be constructed.
566 * @param keyClass The 'key' type.
567 * @param valClass The 'value' type.
568 * @param compressionType The compression type.
569 * @param codec The compression codec.
570 * @return Returns the handle to the constructed SequenceFile Writer.
571 * @throws IOException
572 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
573 * instead.
574 */
575 @Deprecated
576 public static Writer
577 createWriter(Configuration conf, FSDataOutputStream out,
578 Class keyClass, Class valClass, CompressionType compressionType,
579 CompressionCodec codec) throws IOException {
580 return createWriter(conf, Writer.stream(out), Writer.keyClass(keyClass),
581 Writer.valueClass(valClass),
582 Writer.compression(compressionType, codec));
583 }
584
585
586 /** The interface to 'raw' values of SequenceFiles. */
587 public static interface ValueBytes {
588
589 /** Writes the uncompressed bytes to the outStream.
590 * @param outStream : Stream to write uncompressed bytes into.
591 * @throws IOException
592 */
593 public void writeUncompressedBytes(DataOutputStream outStream)
594 throws IOException;
595
596 /** Write compressed bytes to outStream.
597 * Note: that it will NOT compress the bytes if they are not compressed.
598 * @param outStream : Stream to write compressed bytes into.
599 */
600 public void writeCompressedBytes(DataOutputStream outStream)
601 throws IllegalArgumentException, IOException;
602
603 /**
604 * Size of stored data.
605 */
606 public int getSize();
607 }
608
609 private static class UncompressedBytes implements ValueBytes {
610 private int dataSize;
611 private byte[] data;
612
613 private UncompressedBytes() {
614 data = null;
615 dataSize = 0;
616 }
617
618 private void reset(DataInputStream in, int length) throws IOException {
619 if (data == null) {
620 data = new byte[length];
621 } else if (length > data.length) {
622 data = new byte[Math.max(length, data.length * 2)];
623 }
624 dataSize = -1;
625 in.readFully(data, 0, length);
626 dataSize = length;
627 }
628
629 @Override
630 public int getSize() {
631 return dataSize;
632 }
633
634 @Override
635 public void writeUncompressedBytes(DataOutputStream outStream)
636 throws IOException {
637 outStream.write(data, 0, dataSize);
638 }
639
640 @Override
641 public void writeCompressedBytes(DataOutputStream outStream)
642 throws IllegalArgumentException, IOException {
643 throw
644 new IllegalArgumentException("UncompressedBytes cannot be compressed!");
645 }
646
647 } // UncompressedBytes
648
649 private static class CompressedBytes implements ValueBytes {
650 private int dataSize;
651 private byte[] data;
652 DataInputBuffer rawData = null;
653 CompressionCodec codec = null;
654 CompressionInputStream decompressedStream = null;
655
656 private CompressedBytes(CompressionCodec codec) {
657 data = null;
658 dataSize = 0;
659 this.codec = codec;
660 }
661
662 private void reset(DataInputStream in, int length) throws IOException {
663 if (data == null) {
664 data = new byte[length];
665 } else if (length > data.length) {
666 data = new byte[Math.max(length, data.length * 2)];
667 }
668 dataSize = -1;
669 in.readFully(data, 0, length);
670 dataSize = length;
671 }
672
673 @Override
674 public int getSize() {
675 return dataSize;
676 }
677
678 @Override
679 public void writeUncompressedBytes(DataOutputStream outStream)
680 throws IOException {
681 if (decompressedStream == null) {
682 rawData = new DataInputBuffer();
683 decompressedStream = codec.createInputStream(rawData);
684 } else {
685 decompressedStream.resetState();
686 }
687 rawData.reset(data, 0, dataSize);
688
689 byte[] buffer = new byte[8192];
690 int bytesRead = 0;
691 while ((bytesRead = decompressedStream.read(buffer, 0, 8192)) != -1) {
692 outStream.write(buffer, 0, bytesRead);
693 }
694 }
695
696 @Override
697 public void writeCompressedBytes(DataOutputStream outStream)
698 throws IllegalArgumentException, IOException {
699 outStream.write(data, 0, dataSize);
700 }
701
702 } // CompressedBytes
703
704 /**
705 * The class encapsulating with the metadata of a file.
706 * The metadata of a file is a list of attribute name/value
707 * pairs of Text type.
708 *
709 */
710 public static class Metadata implements Writable {
711
712 private TreeMap<Text, Text> theMetadata;
713
714 public Metadata() {
715 this(new TreeMap<Text, Text>());
716 }
717
718 public Metadata(TreeMap<Text, Text> arg) {
719 if (arg == null) {
720 this.theMetadata = new TreeMap<Text, Text>();
721 } else {
722 this.theMetadata = arg;
723 }
724 }
725
726 public Text get(Text name) {
727 return this.theMetadata.get(name);
728 }
729
730 public void set(Text name, Text value) {
731 this.theMetadata.put(name, value);
732 }
733
734 public TreeMap<Text, Text> getMetadata() {
735 return new TreeMap<Text, Text>(this.theMetadata);
736 }
737
738 @Override
739 public void write(DataOutput out) throws IOException {
740 out.writeInt(this.theMetadata.size());
741 Iterator<Map.Entry<Text, Text>> iter =
742 this.theMetadata.entrySet().iterator();
743 while (iter.hasNext()) {
744 Map.Entry<Text, Text> en = iter.next();
745 en.getKey().write(out);
746 en.getValue().write(out);
747 }
748 }
749
750 @Override
751 public void readFields(DataInput in) throws IOException {
752 int sz = in.readInt();
753 if (sz < 0) throw new IOException("Invalid size: " + sz + " for file metadata object");
754 this.theMetadata = new TreeMap<Text, Text>();
755 for (int i = 0; i < sz; i++) {
756 Text key = new Text();
757 Text val = new Text();
758 key.readFields(in);
759 val.readFields(in);
760 this.theMetadata.put(key, val);
761 }
762 }
763
764 @Override
765 public boolean equals(Object other) {
766 if (other == null) {
767 return false;
768 }
769 if (other.getClass() != this.getClass()) {
770 return false;
771 } else {
772 return equals((Metadata)other);
773 }
774 }
775
776 public boolean equals(Metadata other) {
777 if (other == null) return false;
778 if (this.theMetadata.size() != other.theMetadata.size()) {
779 return false;
780 }
781 Iterator<Map.Entry<Text, Text>> iter1 =
782 this.theMetadata.entrySet().iterator();
783 Iterator<Map.Entry<Text, Text>> iter2 =
784 other.theMetadata.entrySet().iterator();
785 while (iter1.hasNext() && iter2.hasNext()) {
786 Map.Entry<Text, Text> en1 = iter1.next();
787 Map.Entry<Text, Text> en2 = iter2.next();
788 if (!en1.getKey().equals(en2.getKey())) {
789 return false;
790 }
791 if (!en1.getValue().equals(en2.getValue())) {
792 return false;
793 }
794 }
795 if (iter1.hasNext() || iter2.hasNext()) {
796 return false;
797 }
798 return true;
799 }
800
801 @Override
802 public int hashCode() {
803 assert false : "hashCode not designed";
804 return 42; // any arbitrary constant will do
805 }
806
807 @Override
808 public String toString() {
809 StringBuilder sb = new StringBuilder();
810 sb.append("size: ").append(this.theMetadata.size()).append("\n");
811 Iterator<Map.Entry<Text, Text>> iter =
812 this.theMetadata.entrySet().iterator();
813 while (iter.hasNext()) {
814 Map.Entry<Text, Text> en = iter.next();
815 sb.append("\t").append(en.getKey().toString()).append("\t").append(en.getValue().toString());
816 sb.append("\n");
817 }
818 return sb.toString();
819 }
820 }
821
822 /** Write key/value pairs to a sequence-format file. */
823 public static class Writer implements java.io.Closeable, Syncable {
824 private Configuration conf;
825 FSDataOutputStream out;
826 boolean ownOutputStream = true;
827 DataOutputBuffer buffer = new DataOutputBuffer();
828
829 Class keyClass;
830 Class valClass;
831
832 private final CompressionType compress;
833 CompressionCodec codec = null;
834 CompressionOutputStream deflateFilter = null;
835 DataOutputStream deflateOut = null;
836 Metadata metadata = null;
837 Compressor compressor = null;
838
839 protected Serializer keySerializer;
840 protected Serializer uncompressedValSerializer;
841 protected Serializer compressedValSerializer;
842
843 // Insert a globally unique 16-byte value every few entries, so that one
844 // can seek into the middle of a file and then synchronize with record
845 // starts and ends by scanning for this value.
846 long lastSyncPos; // position of last sync
847 byte[] sync; // 16 random bytes
848 {
849 try {
850 MessageDigest digester = MessageDigest.getInstance("MD5");
851 long time = Time.now();
852 digester.update((new UID()+"@"+time).getBytes());
853 sync = digester.digest();
854 } catch (Exception e) {
855 throw new RuntimeException(e);
856 }
857 }
858
859 public static interface Option {}
860
861 static class FileOption extends Options.PathOption
862 implements Option {
863 FileOption(Path path) {
864 super(path);
865 }
866 }
867
868 /**
869 * @deprecated only used for backwards-compatibility in the createWriter methods
870 * that take FileSystem.
871 */
872 @Deprecated
873 private static class FileSystemOption implements Option {
874 private final FileSystem value;
875 protected FileSystemOption(FileSystem value) {
876 this.value = value;
877 }
878 public FileSystem getValue() {
879 return value;
880 }
881 }
882
883 static class StreamOption extends Options.FSDataOutputStreamOption
884 implements Option {
885 StreamOption(FSDataOutputStream stream) {
886 super(stream);
887 }
888 }
889
890 static class BufferSizeOption extends Options.IntegerOption
891 implements Option {
892 BufferSizeOption(int value) {
893 super(value);
894 }
895 }
896
897 static class BlockSizeOption extends Options.LongOption implements Option {
898 BlockSizeOption(long value) {
899 super(value);
900 }
901 }
902
903 static class ReplicationOption extends Options.IntegerOption
904 implements Option {
905 ReplicationOption(int value) {
906 super(value);
907 }
908 }
909
910 static class KeyClassOption extends Options.ClassOption implements Option {
911 KeyClassOption(Class<?> value) {
912 super(value);
913 }
914 }
915
916 static class ValueClassOption extends Options.ClassOption
917 implements Option {
918 ValueClassOption(Class<?> value) {
919 super(value);
920 }
921 }
922
923 static class MetadataOption implements Option {
924 private final Metadata value;
925 MetadataOption(Metadata value) {
926 this.value = value;
927 }
928 Metadata getValue() {
929 return value;
930 }
931 }
932
933 static class ProgressableOption extends Options.ProgressableOption
934 implements Option {
935 ProgressableOption(Progressable value) {
936 super(value);
937 }
938 }
939
940 private static class CompressionOption implements Option {
941 private final CompressionType value;
942 private final CompressionCodec codec;
943 CompressionOption(CompressionType value) {
944 this(value, null);
945 }
946 CompressionOption(CompressionType value, CompressionCodec codec) {
947 this.value = value;
948 this.codec = (CompressionType.NONE != value && null == codec)
949 ? new DefaultCodec()
950 : codec;
951 }
952 CompressionType getValue() {
953 return value;
954 }
955 CompressionCodec getCodec() {
956 return codec;
957 }
958 }
959
960 public static Option file(Path value) {
961 return new FileOption(value);
962 }
963
964 /**
965 * @deprecated only used for backwards-compatibility in the createWriter methods
966 * that take FileSystem.
967 */
968 @Deprecated
969 private static Option filesystem(FileSystem fs) {
970 return new SequenceFile.Writer.FileSystemOption(fs);
971 }
972
973 public static Option bufferSize(int value) {
974 return new BufferSizeOption(value);
975 }
976
977 public static Option stream(FSDataOutputStream value) {
978 return new StreamOption(value);
979 }
980
981 public static Option replication(short value) {
982 return new ReplicationOption(value);
983 }
984
985 public static Option blockSize(long value) {
986 return new BlockSizeOption(value);
987 }
988
989 public static Option progressable(Progressable value) {
990 return new ProgressableOption(value);
991 }
992
993 public static Option keyClass(Class<?> value) {
994 return new KeyClassOption(value);
995 }
996
997 public static Option valueClass(Class<?> value) {
998 return new ValueClassOption(value);
999 }
1000
1001 public static Option metadata(Metadata value) {
1002 return new MetadataOption(value);
1003 }
1004
1005 public static Option compression(CompressionType value) {
1006 return new CompressionOption(value);
1007 }
1008
1009 public static Option compression(CompressionType value,
1010 CompressionCodec codec) {
1011 return new CompressionOption(value, codec);
1012 }
1013
1014 /**
1015 * Construct a uncompressed writer from a set of options.
1016 * @param conf the configuration to use
1017 * @param options the options used when creating the writer
1018 * @throws IOException if it fails
1019 */
1020 Writer(Configuration conf,
1021 Option... opts) throws IOException {
1022 BlockSizeOption blockSizeOption =
1023 Options.getOption(BlockSizeOption.class, opts);
1024 BufferSizeOption bufferSizeOption =
1025 Options.getOption(BufferSizeOption.class, opts);
1026 ReplicationOption replicationOption =
1027 Options.getOption(ReplicationOption.class, opts);
1028 ProgressableOption progressOption =
1029 Options.getOption(ProgressableOption.class, opts);
1030 FileOption fileOption = Options.getOption(FileOption.class, opts);
1031 FileSystemOption fsOption = Options.getOption(FileSystemOption.class, opts);
1032 StreamOption streamOption = Options.getOption(StreamOption.class, opts);
1033 KeyClassOption keyClassOption =
1034 Options.getOption(KeyClassOption.class, opts);
1035 ValueClassOption valueClassOption =
1036 Options.getOption(ValueClassOption.class, opts);
1037 MetadataOption metadataOption =
1038 Options.getOption(MetadataOption.class, opts);
1039 CompressionOption compressionTypeOption =
1040 Options.getOption(CompressionOption.class, opts);
1041 // check consistency of options
1042 if ((fileOption == null) == (streamOption == null)) {
1043 throw new IllegalArgumentException("file or stream must be specified");
1044 }
1045 if (fileOption == null && (blockSizeOption != null ||
1046 bufferSizeOption != null ||
1047 replicationOption != null ||
1048 progressOption != null)) {
1049 throw new IllegalArgumentException("file modifier options not " +
1050 "compatible with stream");
1051 }
1052
1053 FSDataOutputStream out;
1054 boolean ownStream = fileOption != null;
1055 if (ownStream) {
1056 Path p = fileOption.getValue();
1057 FileSystem fs;
1058 if (fsOption != null) {
1059 fs = fsOption.getValue();
1060 } else {
1061 fs = p.getFileSystem(conf);
1062 }
1063 int bufferSize = bufferSizeOption == null ? getBufferSize(conf) :
1064 bufferSizeOption.getValue();
1065 short replication = replicationOption == null ?
1066 fs.getDefaultReplication(p) :
1067 (short) replicationOption.getValue();
1068 long blockSize = blockSizeOption == null ? fs.getDefaultBlockSize(p) :
1069 blockSizeOption.getValue();
1070 Progressable progress = progressOption == null ? null :
1071 progressOption.getValue();
1072 out = fs.create(p, true, bufferSize, replication, blockSize, progress);
1073 } else {
1074 out = streamOption.getValue();
1075 }
1076 Class<?> keyClass = keyClassOption == null ?
1077 Object.class : keyClassOption.getValue();
1078 Class<?> valueClass = valueClassOption == null ?
1079 Object.class : valueClassOption.getValue();
1080 Metadata metadata = metadataOption == null ?
1081 new Metadata() : metadataOption.getValue();
1082 this.compress = compressionTypeOption.getValue();
1083 final CompressionCodec codec = compressionTypeOption.getCodec();
1084 if (codec != null &&
1085 (codec instanceof GzipCodec) &&
1086 !NativeCodeLoader.isNativeCodeLoaded() &&
1087 !ZlibFactory.isNativeZlibLoaded(conf)) {
1088 throw new IllegalArgumentException("SequenceFile doesn't work with " +
1089 "GzipCodec without native-hadoop " +
1090 "code!");
1091 }
1092 init(conf, out, ownStream, keyClass, valueClass, codec, metadata);
1093 }
1094
1095 /** Create the named file.
1096 * @deprecated Use
1097 * {@link SequenceFile#createWriter(Configuration, Writer.Option...)}
1098 * instead.
1099 */
1100 @Deprecated
1101 public Writer(FileSystem fs, Configuration conf, Path name,
1102 Class keyClass, Class valClass) throws IOException {
1103 this.compress = CompressionType.NONE;
1104 init(conf, fs.create(name), true, keyClass, valClass, null,
1105 new Metadata());
1106 }
1107
1108 /** Create the named file with write-progress reporter.
1109 * @deprecated Use
1110 * {@link SequenceFile#createWriter(Configuration, Writer.Option...)}
1111 * instead.
1112 */
1113 @Deprecated
1114 public Writer(FileSystem fs, Configuration conf, Path name,
1115 Class keyClass, Class valClass,
1116 Progressable progress, Metadata metadata) throws IOException {
1117 this.compress = CompressionType.NONE;
1118 init(conf, fs.create(name, progress), true, keyClass, valClass,
1119 null, metadata);
1120 }
1121
1122 /** Create the named file with write-progress reporter.
1123 * @deprecated Use
1124 * {@link SequenceFile#createWriter(Configuration, Writer.Option...)}
1125 * instead.
1126 */
1127 @Deprecated
1128 public Writer(FileSystem fs, Configuration conf, Path name,
1129 Class keyClass, Class valClass,
1130 int bufferSize, short replication, long blockSize,
1131 Progressable progress, Metadata metadata) throws IOException {
1132 this.compress = CompressionType.NONE;
1133 init(conf,
1134 fs.create(name, true, bufferSize, replication, blockSize, progress),
1135 true, keyClass, valClass, null, metadata);
1136 }
1137
1138 boolean isCompressed() { return compress != CompressionType.NONE; }
1139 boolean isBlockCompressed() { return compress == CompressionType.BLOCK; }
1140
1141 Writer ownStream() { this.ownOutputStream = true; return this; }
1142
1143 /** Write and flush the file header. */
1144 private void writeFileHeader()
1145 throws IOException {
1146 out.write(VERSION);
1147 Text.writeString(out, keyClass.getName());
1148 Text.writeString(out, valClass.getName());
1149
1150 out.writeBoolean(this.isCompressed());
1151 out.writeBoolean(this.isBlockCompressed());
1152
1153 if (this.isCompressed()) {
1154 Text.writeString(out, (codec.getClass()).getName());
1155 }
1156 this.metadata.write(out);
1157 out.write(sync); // write the sync bytes
1158 out.flush(); // flush header
1159 }
1160
1161 /** Initialize. */
1162 @SuppressWarnings("unchecked")
1163 void init(Configuration conf, FSDataOutputStream out, boolean ownStream,
1164 Class keyClass, Class valClass,
1165 CompressionCodec codec, Metadata metadata)
1166 throws IOException {
1167 this.conf = conf;
1168 this.out = out;
1169 this.ownOutputStream = ownStream;
1170 this.keyClass = keyClass;
1171 this.valClass = valClass;
1172 this.codec = codec;
1173 this.metadata = metadata;
1174 SerializationFactory serializationFactory = new SerializationFactory(conf);
1175 this.keySerializer = serializationFactory.getSerializer(keyClass);
1176 if (this.keySerializer == null) {
1177 throw new IOException(
1178 "Could not find a serializer for the Key class: '"
1179 + keyClass.getCanonicalName() + "'. "
1180 + "Please ensure that the configuration '" +
1181 CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is "
1182 + "properly configured, if you're using"
1183 + "custom serialization.");
1184 }
1185 this.keySerializer.open(buffer);
1186 this.uncompressedValSerializer = serializationFactory.getSerializer(valClass);
1187 if (this.uncompressedValSerializer == null) {
1188 throw new IOException(
1189 "Could not find a serializer for the Value class: '"
1190 + valClass.getCanonicalName() + "'. "
1191 + "Please ensure that the configuration '" +
1192 CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is "
1193 + "properly configured, if you're using"
1194 + "custom serialization.");
1195 }
1196 this.uncompressedValSerializer.open(buffer);
1197 if (this.codec != null) {
1198 ReflectionUtils.setConf(this.codec, this.conf);
1199 this.compressor = CodecPool.getCompressor(this.codec);
1200 this.deflateFilter = this.codec.createOutputStream(buffer, compressor);
1201 this.deflateOut =
1202 new DataOutputStream(new BufferedOutputStream(deflateFilter));
1203 this.compressedValSerializer = serializationFactory.getSerializer(valClass);
1204 if (this.compressedValSerializer == null) {
1205 throw new IOException(
1206 "Could not find a serializer for the Value class: '"
1207 + valClass.getCanonicalName() + "'. "
1208 + "Please ensure that the configuration '" +
1209 CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is "
1210 + "properly configured, if you're using"
1211 + "custom serialization.");
1212 }
1213 this.compressedValSerializer.open(deflateOut);
1214 }
1215 writeFileHeader();
1216 }
1217
1218 /** Returns the class of keys in this file. */
1219 public Class getKeyClass() { return keyClass; }
1220
1221 /** Returns the class of values in this file. */
1222 public Class getValueClass() { return valClass; }
1223
1224 /** Returns the compression codec of data in this file. */
1225 public CompressionCodec getCompressionCodec() { return codec; }
1226
1227 /** create a sync point */
1228 public void sync() throws IOException {
1229 if (sync != null && lastSyncPos != out.getPos()) {
1230 out.writeInt(SYNC_ESCAPE); // mark the start of the sync
1231 out.write(sync); // write sync
1232 lastSyncPos = out.getPos(); // update lastSyncPos
1233 }
1234 }
1235
1236 /**
1237 * flush all currently written data to the file system
1238 * @deprecated Use {@link #hsync()} or {@link #hflush()} instead
1239 */
1240 @Deprecated
1241 public void syncFs() throws IOException {
1242 if (out != null) {
1243 out.sync(); // flush contents to file system
1244 }
1245 }
1246
1247 @Override
1248 public void hsync() throws IOException {
1249 if (out != null) {
1250 out.hsync();
1251 }
1252 }
1253
1254 @Override
1255 public void hflush() throws IOException {
1256 if (out != null) {
1257 out.hflush();
1258 }
1259 }
1260
1261 /** Returns the configuration of this file. */
1262 Configuration getConf() { return conf; }
1263
1264 /** Close the file. */
1265 @Override
1266 public synchronized void close() throws IOException {
1267 keySerializer.close();
1268 uncompressedValSerializer.close();
1269 if (compressedValSerializer != null) {
1270 compressedValSerializer.close();
1271 }
1272
1273 CodecPool.returnCompressor(compressor);
1274 compressor = null;
1275
1276 if (out != null) {
1277
1278 // Close the underlying stream iff we own it...
1279 if (ownOutputStream) {
1280 out.close();
1281 } else {
1282 out.flush();
1283 }
1284 out = null;
1285 }
1286 }
1287
1288 synchronized void checkAndWriteSync() throws IOException {
1289 if (sync != null &&
1290 out.getPos() >= lastSyncPos+SYNC_INTERVAL) { // time to emit sync
1291 sync();
1292 }
1293 }
1294
1295 /** Append a key/value pair. */
1296 public void append(Writable key, Writable val)
1297 throws IOException {
1298 append((Object) key, (Object) val);
1299 }
1300
1301 /** Append a key/value pair. */
1302 @SuppressWarnings("unchecked")
1303 public synchronized void append(Object key, Object val)
1304 throws IOException {
1305 if (key.getClass() != keyClass)
1306 throw new IOException("wrong key class: "+key.getClass().getName()
1307 +" is not "+keyClass);
1308 if (val.getClass() != valClass)
1309 throw new IOException("wrong value class: "+val.getClass().getName()
1310 +" is not "+valClass);
1311
1312 buffer.reset();
1313
1314 // Append the 'key'
1315 keySerializer.serialize(key);
1316 int keyLength = buffer.getLength();
1317 if (keyLength < 0)
1318 throw new IOException("negative length keys not allowed: " + key);
1319
1320 // Append the 'value'
1321 if (compress == CompressionType.RECORD) {
1322 deflateFilter.resetState();
1323 compressedValSerializer.serialize(val);
1324 deflateOut.flush();
1325 deflateFilter.finish();
1326 } else {
1327 uncompressedValSerializer.serialize(val);
1328 }
1329
1330 // Write the record out
1331 checkAndWriteSync(); // sync
1332 out.writeInt(buffer.getLength()); // total record length
1333 out.writeInt(keyLength); // key portion length
1334 out.write(buffer.getData(), 0, buffer.getLength()); // data
1335 }
1336
1337 public synchronized void appendRaw(byte[] keyData, int keyOffset,
1338 int keyLength, ValueBytes val) throws IOException {
1339 if (keyLength < 0)
1340 throw new IOException("negative length keys not allowed: " + keyLength);
1341
1342 int valLength = val.getSize();
1343
1344 checkAndWriteSync();
1345
1346 out.writeInt(keyLength+valLength); // total record length
1347 out.writeInt(keyLength); // key portion length
1348 out.write(keyData, keyOffset, keyLength); // key
1349 val.writeUncompressedBytes(out); // value
1350 }
1351
1352 /** Returns the current length of the output file.
1353 *
1354 * <p>This always returns a synchronized position. In other words,
1355 * immediately after calling {@link SequenceFile.Reader#seek(long)} with a position
1356 * returned by this method, {@link SequenceFile.Reader#next(Writable)} may be called. However
1357 * the key may be earlier in the file than key last written when this
1358 * method was called (e.g., with block-compression, it may be the first key
1359 * in the block that was being written when this method was called).
1360 */
1361 public synchronized long getLength() throws IOException {
1362 return out.getPos();
1363 }
1364
1365 } // class Writer
1366
1367 /** Write key/compressed-value pairs to a sequence-format file. */
1368 static class RecordCompressWriter extends Writer {
1369
1370 RecordCompressWriter(Configuration conf,
1371 Option... options) throws IOException {
1372 super(conf, options);
1373 }
1374
1375 /** Append a key/value pair. */
1376 @Override
1377 @SuppressWarnings("unchecked")
1378 public synchronized void append(Object key, Object val)
1379 throws IOException {
1380 if (key.getClass() != keyClass)
1381 throw new IOException("wrong key class: "+key.getClass().getName()
1382 +" is not "+keyClass);
1383 if (val.getClass() != valClass)
1384 throw new IOException("wrong value class: "+val.getClass().getName()
1385 +" is not "+valClass);
1386
1387 buffer.reset();
1388
1389 // Append the 'key'
1390 keySerializer.serialize(key);
1391 int keyLength = buffer.getLength();
1392 if (keyLength < 0)
1393 throw new IOException("negative length keys not allowed: " + key);
1394
1395 // Compress 'value' and append it
1396 deflateFilter.resetState();
1397 compressedValSerializer.serialize(val);
1398 deflateOut.flush();
1399 deflateFilter.finish();
1400
1401 // Write the record out
1402 checkAndWriteSync(); // sync
1403 out.writeInt(buffer.getLength()); // total record length
1404 out.writeInt(keyLength); // key portion length
1405 out.write(buffer.getData(), 0, buffer.getLength()); // data
1406 }
1407
1408 /** Append a key/value pair. */
1409 @Override
1410 public synchronized void appendRaw(byte[] keyData, int keyOffset,
1411 int keyLength, ValueBytes val) throws IOException {
1412
1413 if (keyLength < 0)
1414 throw new IOException("negative length keys not allowed: " + keyLength);
1415
1416 int valLength = val.getSize();
1417
1418 checkAndWriteSync(); // sync
1419 out.writeInt(keyLength+valLength); // total record length
1420 out.writeInt(keyLength); // key portion length
1421 out.write(keyData, keyOffset, keyLength); // 'key' data
1422 val.writeCompressedBytes(out); // 'value' data
1423 }
1424
1425 } // RecordCompressionWriter
1426
1427 /** Write compressed key/value blocks to a sequence-format file. */
1428 static class BlockCompressWriter extends Writer {
1429
1430 private int noBufferedRecords = 0;
1431
1432 private DataOutputBuffer keyLenBuffer = new DataOutputBuffer();
1433 private DataOutputBuffer keyBuffer = new DataOutputBuffer();
1434
1435 private DataOutputBuffer valLenBuffer = new DataOutputBuffer();
1436 private DataOutputBuffer valBuffer = new DataOutputBuffer();
1437
1438 private final int compressionBlockSize;
1439
1440 BlockCompressWriter(Configuration conf,
1441 Option... options) throws IOException {
1442 super(conf, options);
1443 compressionBlockSize =
1444 conf.getInt("io.seqfile.compress.blocksize", 1000000);
1445 keySerializer.close();
1446 keySerializer.open(keyBuffer);
1447 uncompressedValSerializer.close();
1448 uncompressedValSerializer.open(valBuffer);
1449 }
1450
1451 /** Workhorse to check and write out compressed data/lengths */
1452 private synchronized
1453 void writeBuffer(DataOutputBuffer uncompressedDataBuffer)
1454 throws IOException {
1455 deflateFilter.resetState();
1456 buffer.reset();
1457 deflateOut.write(uncompressedDataBuffer.getData(), 0,
1458 uncompressedDataBuffer.getLength());
1459 deflateOut.flush();
1460 deflateFilter.finish();
1461
1462 WritableUtils.writeVInt(out, buffer.getLength());
1463 out.write(buffer.getData(), 0, buffer.getLength());
1464 }
1465
1466 /** Compress and flush contents to dfs */
1467 @Override
1468 public synchronized void sync() throws IOException {
1469 if (noBufferedRecords > 0) {
1470 super.sync();
1471
1472 // No. of records
1473 WritableUtils.writeVInt(out, noBufferedRecords);
1474
1475 // Write 'keys' and lengths
1476 writeBuffer(keyLenBuffer);
1477 writeBuffer(keyBuffer);
1478
1479 // Write 'values' and lengths
1480 writeBuffer(valLenBuffer);
1481 writeBuffer(valBuffer);
1482
1483 // Flush the file-stream
1484 out.flush();
1485
1486 // Reset internal states
1487 keyLenBuffer.reset();
1488 keyBuffer.reset();
1489 valLenBuffer.reset();
1490 valBuffer.reset();
1491 noBufferedRecords = 0;
1492 }
1493
1494 }
1495
1496 /** Close the file. */
1497 @Override
1498 public synchronized void close() throws IOException {
1499 if (out != null) {
1500 sync();
1501 }
1502 super.close();
1503 }
1504
1505 /** Append a key/value pair. */
1506 @Override
1507 @SuppressWarnings("unchecked")
1508 public synchronized void append(Object key, Object val)
1509 throws IOException {
1510 if (key.getClass() != keyClass)
1511 throw new IOException("wrong key class: "+key+" is not "+keyClass);
1512 if (val.getClass() != valClass)
1513 throw new IOException("wrong value class: "+val+" is not "+valClass);
1514
1515 // Save key/value into respective buffers
1516 int oldKeyLength = keyBuffer.getLength();
1517 keySerializer.serialize(key);
1518 int keyLength = keyBuffer.getLength() - oldKeyLength;
1519 if (keyLength < 0)
1520 throw new IOException("negative length keys not allowed: " + key);
1521 WritableUtils.writeVInt(keyLenBuffer, keyLength);
1522
1523 int oldValLength = valBuffer.getLength();
1524 uncompressedValSerializer.serialize(val);
1525 int valLength = valBuffer.getLength() - oldValLength;
1526 WritableUtils.writeVInt(valLenBuffer, valLength);
1527
1528 // Added another key/value pair
1529 ++noBufferedRecords;
1530
1531 // Compress and flush?
1532 int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength();
1533 if (currentBlockSize >= compressionBlockSize) {
1534 sync();
1535 }
1536 }
1537
1538 /** Append a key/value pair. */
1539 @Override
1540 public synchronized void appendRaw(byte[] keyData, int keyOffset,
1541 int keyLength, ValueBytes val) throws IOException {
1542
1543 if (keyLength < 0)
1544 throw new IOException("negative length keys not allowed");
1545
1546 int valLength = val.getSize();
1547
1548 // Save key/value data in relevant buffers
1549 WritableUtils.writeVInt(keyLenBuffer, keyLength);
1550 keyBuffer.write(keyData, keyOffset, keyLength);
1551 WritableUtils.writeVInt(valLenBuffer, valLength);
1552 val.writeUncompressedBytes(valBuffer);
1553
1554 // Added another key/value pair
1555 ++noBufferedRecords;
1556
1557 // Compress and flush?
1558 int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength();
1559 if (currentBlockSize >= compressionBlockSize) {
1560 sync();
1561 }
1562 }
1563
1564 } // BlockCompressionWriter
1565
1566 /** Get the configured buffer size */
1567 private static int getBufferSize(Configuration conf) {
1568 return conf.getInt("io.file.buffer.size", 4096);
1569 }
1570
1571 /** Reads key/value pairs from a sequence-format file. */
1572 public static class Reader implements java.io.Closeable {
1573 private String filename;
1574 private FSDataInputStream in;
1575 private DataOutputBuffer outBuf = new DataOutputBuffer();
1576
1577 private byte version;
1578
1579 private String keyClassName;
1580 private String valClassName;
1581 private Class keyClass;
1582 private Class valClass;
1583
1584 private CompressionCodec codec = null;
1585 private Metadata metadata = null;
1586
1587 private byte[] sync = new byte[SYNC_HASH_SIZE];
1588 private byte[] syncCheck = new byte[SYNC_HASH_SIZE];
1589 private boolean syncSeen;
1590
1591 private long headerEnd;
1592 private long end;
1593 private int keyLength;
1594 private int recordLength;
1595
1596 private boolean decompress;
1597 private boolean blockCompressed;
1598
1599 private Configuration conf;
1600
1601 private int noBufferedRecords = 0;
1602 private boolean lazyDecompress = true;
1603 private boolean valuesDecompressed = true;
1604
1605 private int noBufferedKeys = 0;
1606 private int noBufferedValues = 0;
1607
1608 private DataInputBuffer keyLenBuffer = null;
1609 private CompressionInputStream keyLenInFilter = null;
1610 private DataInputStream keyLenIn = null;
1611 private Decompressor keyLenDecompressor = null;
1612 private DataInputBuffer keyBuffer = null;
1613 private CompressionInputStream keyInFilter = null;
1614 private DataInputStream keyIn = null;
1615 private Decompressor keyDecompressor = null;
1616
1617 private DataInputBuffer valLenBuffer = null;
1618 private CompressionInputStream valLenInFilter = null;
1619 private DataInputStream valLenIn = null;
1620 private Decompressor valLenDecompressor = null;
1621 private DataInputBuffer valBuffer = null;
1622 private CompressionInputStream valInFilter = null;
1623 private DataInputStream valIn = null;
1624 private Decompressor valDecompressor = null;
1625
1626 private Deserializer keyDeserializer;
1627 private Deserializer valDeserializer;
1628
1629 /**
1630 * A tag interface for all of the Reader options
1631 */
1632 public static interface Option {}
1633
1634 /**
1635 * Create an option to specify the path name of the sequence file.
1636 * @param value the path to read
1637 * @return a new option
1638 */
1639 public static Option file(Path value) {
1640 return new FileOption(value);
1641 }
1642
1643 /**
1644 * Create an option to specify the stream with the sequence file.
1645 * @param value the stream to read.
1646 * @return a new option
1647 */
1648 public static Option stream(FSDataInputStream value) {
1649 return new InputStreamOption(value);
1650 }
1651
1652 /**
1653 * Create an option to specify the starting byte to read.
1654 * @param value the number of bytes to skip over
1655 * @return a new option
1656 */
1657 public static Option start(long value) {
1658 return new StartOption(value);
1659 }
1660
1661 /**
1662 * Create an option to specify the number of bytes to read.
1663 * @param value the number of bytes to read
1664 * @return a new option
1665 */
1666 public static Option length(long value) {
1667 return new LengthOption(value);
1668 }
1669
1670 /**
1671 * Create an option with the buffer size for reading the given pathname.
1672 * @param value the number of bytes to buffer
1673 * @return a new option
1674 */
1675 public static Option bufferSize(int value) {
1676 return new BufferSizeOption(value);
1677 }
1678
1679 private static class FileOption extends Options.PathOption
1680 implements Option {
1681 private FileOption(Path value) {
1682 super(value);
1683 }
1684 }
1685
1686 private static class InputStreamOption
1687 extends Options.FSDataInputStreamOption
1688 implements Option {
1689 private InputStreamOption(FSDataInputStream value) {
1690 super(value);
1691 }
1692 }
1693
1694 private static class StartOption extends Options.LongOption
1695 implements Option {
1696 private StartOption(long value) {
1697 super(value);
1698 }
1699 }
1700
1701 private static class LengthOption extends Options.LongOption
1702 implements Option {
1703 private LengthOption(long value) {
1704 super(value);
1705 }
1706 }
1707
1708 private static class BufferSizeOption extends Options.IntegerOption
1709 implements Option {
1710 private BufferSizeOption(int value) {
1711 super(value);
1712 }
1713 }
1714
1715 // only used directly
1716 private static class OnlyHeaderOption extends Options.BooleanOption
1717 implements Option {
1718 private OnlyHeaderOption() {
1719 super(true);
1720 }
1721 }
1722
1723 public Reader(Configuration conf, Option... opts) throws IOException {
1724 // Look up the options, these are null if not set
1725 FileOption fileOpt = Options.getOption(FileOption.class, opts);
1726 InputStreamOption streamOpt =
1727 Options.getOption(InputStreamOption.class, opts);
1728 StartOption startOpt = Options.getOption(StartOption.class, opts);
1729 LengthOption lenOpt = Options.getOption(LengthOption.class, opts);
1730 BufferSizeOption bufOpt = Options.getOption(BufferSizeOption.class,opts);
1731 OnlyHeaderOption headerOnly =
1732 Options.getOption(OnlyHeaderOption.class, opts);
1733 // check for consistency
1734 if ((fileOpt == null) == (streamOpt == null)) {
1735 throw new
1736 IllegalArgumentException("File or stream option must be specified");
1737 }
1738 if (fileOpt == null && bufOpt != null) {
1739 throw new IllegalArgumentException("buffer size can only be set when" +
1740 " a file is specified.");
1741 }
1742 // figure out the real values
1743 Path filename = null;
1744 FSDataInputStream file;
1745 final long len;
1746 if (fileOpt != null) {
1747 filename = fileOpt.getValue();
1748 FileSystem fs = filename.getFileSystem(conf);
1749 int bufSize = bufOpt == null ? getBufferSize(conf): bufOpt.getValue();
1750 len = null == lenOpt
1751 ? fs.getFileStatus(filename).getLen()
1752 : lenOpt.getValue();
1753 file = openFile(fs, filename, bufSize, len);
1754 } else {
1755 len = null == lenOpt ? Long.MAX_VALUE : lenOpt.getValue();
1756 file = streamOpt.getValue();
1757 }
1758 long start = startOpt == null ? 0 : startOpt.getValue();
1759 // really set up
1760 initialize(filename, file, start, len, conf, headerOnly != null);
1761 }
1762
1763 /**
1764 * Construct a reader by opening a file from the given file system.
1765 * @param fs The file system used to open the file.
1766 * @param file The file being read.
1767 * @param conf Configuration
1768 * @throws IOException
1769 * @deprecated Use Reader(Configuration, Option...) instead.
1770 */
1771 @Deprecated
1772 public Reader(FileSystem fs, Path file,
1773 Configuration conf) throws IOException {
1774 this(conf, file(file.makeQualified(fs)));
1775 }
1776
1777 /**
1778 * Construct a reader by the given input stream.
1779 * @param in An input stream.
1780 * @param buffersize unused
1781 * @param start The starting position.
1782 * @param length The length being read.
1783 * @param conf Configuration
1784 * @throws IOException
1785 * @deprecated Use Reader(Configuration, Reader.Option...) instead.
1786 */
1787 @Deprecated
1788 public Reader(FSDataInputStream in, int buffersize,
1789 long start, long length, Configuration conf) throws IOException {
1790 this(conf, stream(in), start(start), length(length));
1791 }
1792
1793 /** Common work of the constructors. */
1794 private void initialize(Path filename, FSDataInputStream in,
1795 long start, long length, Configuration conf,
1796 boolean tempReader) throws IOException {
1797 if (in == null) {
1798 throw new IllegalArgumentException("in == null");
1799 }
1800 this.filename = filename == null ? "<unknown>" : filename.toString();
1801 this.in = in;
1802 this.conf = conf;
1803 boolean succeeded = false;
1804 try {
1805 seek(start);
1806 this.end = this.in.getPos() + length;
1807 // if it wrapped around, use the max
1808 if (end < length) {
1809 end = Long.MAX_VALUE;
1810 }
1811 init(tempReader);
1812 succeeded = true;
1813 } finally {
1814 if (!succeeded) {
1815 IOUtils.cleanup(LOG, this.in);
1816 }
1817 }
1818 }
1819
1820 /**
1821 * Override this method to specialize the type of
1822 * {@link FSDataInputStream} returned.
1823 * @param fs The file system used to open the file.
1824 * @param file The file being read.
1825 * @param bufferSize The buffer size used to read the file.
1826 * @param length The length being read if it is >= 0. Otherwise,
1827 * the length is not available.
1828 * @return The opened stream.
1829 * @throws IOException
1830 */
1831 protected FSDataInputStream openFile(FileSystem fs, Path file,
1832 int bufferSize, long length) throws IOException {
1833 return fs.open(file, bufferSize);
1834 }
1835
1836 /**
1837 * Initialize the {@link Reader}
1838 * @param tmpReader <code>true</code> if we are constructing a temporary
1839 * reader {@link SequenceFile.Sorter.cloneFileAttributes},
1840 * and hence do not initialize every component;
1841 * <code>false</code> otherwise.
1842 * @throws IOException
1843 */
1844 private void init(boolean tempReader) throws IOException {
1845 byte[] versionBlock = new byte[VERSION.length];
1846 in.readFully(versionBlock);
1847
1848 if ((versionBlock[0] != VERSION[0]) ||
1849 (versionBlock[1] != VERSION[1]) ||
1850 (versionBlock[2] != VERSION[2]))
1851 throw new IOException(this + " not a SequenceFile");
1852
1853 // Set 'version'
1854 version = versionBlock[3];
1855 if (version > VERSION[3])
1856 throw new VersionMismatchException(VERSION[3], version);
1857
1858 if (version < BLOCK_COMPRESS_VERSION) {
1859 UTF8 className = new UTF8();
1860
1861 className.readFields(in);
1862 keyClassName = className.toStringChecked(); // key class name
1863
1864 className.readFields(in);
1865 valClassName = className.toStringChecked(); // val class name
1866 } else {
1867 keyClassName = Text.readString(in);
1868 valClassName = Text.readString(in);
1869 }
1870
1871 if (version > 2) { // if version > 2
1872 this.decompress = in.readBoolean(); // is compressed?
1873 } else {
1874 decompress = false;
1875 }
1876
1877 if (version >= BLOCK_COMPRESS_VERSION) { // if version >= 4
1878 this.blockCompressed = in.readBoolean(); // is block-compressed?
1879 } else {
1880 blockCompressed = false;
1881 }
1882
1883 // if version >= 5
1884 // setup the compression codec
1885 if (decompress) {
1886 if (version >= CUSTOM_COMPRESS_VERSION) {
1887 String codecClassname = Text.readString(in);
1888 try {
1889 Class<? extends CompressionCodec> codecClass
1890 = conf.getClassByName(codecClassname).asSubclass(CompressionCodec.class);
1891 this.codec = ReflectionUtils.newInstance(codecClass, conf);
1892 } catch (ClassNotFoundException cnfe) {
1893 throw new IllegalArgumentException("Unknown codec: " +
1894 codecClassname, cnfe);
1895 }
1896 } else {
1897 codec = new DefaultCodec();
1898 ((Configurable)codec).setConf(conf);
1899 }
1900 }
1901
1902 this.metadata = new Metadata();
1903 if (version >= VERSION_WITH_METADATA) { // if version >= 6
1904 this.metadata.readFields(in);
1905 }
1906
1907 if (version > 1) { // if version > 1
1908 in.readFully(sync); // read sync bytes
1909 headerEnd = in.getPos(); // record end of header
1910 }
1911
1912 // Initialize... *not* if this we are constructing a temporary Reader
1913 if (!tempReader) {
1914 valBuffer = new DataInputBuffer();
1915 if (decompress) {
1916 valDecompressor = CodecPool.getDecompressor(codec);
1917 valInFilter = codec.createInputStream(valBuffer, valDecompressor);
1918 valIn = new DataInputStream(valInFilter);
1919 } else {
1920 valIn = valBuffer;
1921 }
1922
1923 if (blockCompressed) {
1924 keyLenBuffer = new DataInputBuffer();
1925 keyBuffer = new DataInputBuffer();
1926 valLenBuffer = new DataInputBuffer();
1927
1928 keyLenDecompressor = CodecPool.getDecompressor(codec);
1929 keyLenInFilter = codec.createInputStream(keyLenBuffer,
1930 keyLenDecompressor);
1931 keyLenIn = new DataInputStream(keyLenInFilter);
1932
1933 keyDecompressor = CodecPool.getDecompressor(codec);
1934 keyInFilter = codec.createInputStream(keyBuffer, keyDecompressor);
1935 keyIn = new DataInputStream(keyInFilter);
1936
1937 valLenDecompressor = CodecPool.getDecompressor(codec);
1938 valLenInFilter = codec.createInputStream(valLenBuffer,
1939 valLenDecompressor);
1940 valLenIn = new DataInputStream(valLenInFilter);
1941 }
1942
1943 SerializationFactory serializationFactory =
1944 new SerializationFactory(conf);
1945 this.keyDeserializer =
1946 getDeserializer(serializationFactory, getKeyClass());
1947 if (this.keyDeserializer == null) {
1948 throw new IOException(
1949 "Could not find a deserializer for the Key class: '"
1950 + getKeyClass().getCanonicalName() + "'. "
1951 + "Please ensure that the configuration '" +
1952 CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is "
1953 + "properly configured, if you're using "
1954 + "custom serialization.");
1955 }
1956 if (!blockCompressed) {
1957 this.keyDeserializer.open(valBuffer);
1958 } else {
1959 this.keyDeserializer.open(keyIn);
1960 }
1961 this.valDeserializer =
1962 getDeserializer(serializationFactory, getValueClass());
1963 if (this.valDeserializer == null) {
1964 throw new IOException(
1965 "Could not find a deserializer for the Value class: '"
1966 + getValueClass().getCanonicalName() + "'. "
1967 + "Please ensure that the configuration '" +
1968 CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is "
1969 + "properly configured, if you're using "
1970 + "custom serialization.");
1971 }
1972 this.valDeserializer.open(valIn);
1973 }
1974 }
1975
1976 @SuppressWarnings("unchecked")
1977 private Deserializer getDeserializer(SerializationFactory sf, Class c) {
1978 return sf.getDeserializer(c);
1979 }
1980
1981 /** Close the file. */
1982 @Override
1983 public synchronized void close() throws IOException {
1984 // Return the decompressors to the pool
1985 CodecPool.returnDecompressor(keyLenDecompressor);
1986 CodecPool.returnDecompressor(keyDecompressor);
1987 CodecPool.returnDecompressor(valLenDecompressor);
1988 CodecPool.returnDecompressor(valDecompressor);
1989 keyLenDecompressor = keyDecompressor = null;
1990 valLenDecompressor = valDecompressor = null;
1991
1992 if (keyDeserializer != null) {
1993 keyDeserializer.close();
1994 }
1995 if (valDeserializer != null) {
1996 valDeserializer.close();
1997 }
1998
1999 // Close the input-stream
2000 in.close();
2001 }
2002
2003 /** Returns the name of the key class. */
2004 public String getKeyClassName() {
2005 return keyClassName;
2006 }
2007
2008 /** Returns the class of keys in this file. */
2009 public synchronized Class<?> getKeyClass() {
2010 if (null == keyClass) {
2011 try {
2012 keyClass = WritableName.getClass(getKeyClassName(), conf);
2013 } catch (IOException e) {
2014 throw new RuntimeException(e);
2015 }
2016 }
2017 return keyClass;
2018 }
2019
2020 /** Returns the name of the value class. */
2021 public String getValueClassName() {
2022 return valClassName;
2023 }
2024
2025 /** Returns the class of values in this file. */
2026 public synchronized Class<?> getValueClass() {
2027 if (null == valClass) {
2028 try {
2029 valClass = WritableName.getClass(getValueClassName(), conf);
2030 } catch (IOException e) {
2031 throw new RuntimeException(e);
2032 }
2033 }
2034 return valClass;
2035 }
2036
2037 /** Returns true if values are compressed. */
2038 public boolean isCompressed() { return decompress; }
2039
2040 /** Returns true if records are block-compressed. */
2041 public boolean isBlockCompressed() { return blockCompressed; }
2042
2043 /** Returns the compression codec of data in this file. */
2044 public CompressionCodec getCompressionCodec() { return codec; }
2045
2046 /**
2047 * Get the compression type for this file.
2048 * @return the compression type
2049 */
2050 public CompressionType getCompressionType() {
2051 if (decompress) {
2052 return blockCompressed ? CompressionType.BLOCK : CompressionType.RECORD;
2053 } else {
2054 return CompressionType.NONE;
2055 }
2056 }
2057
2058 /** Returns the metadata object of the file */
2059 public Metadata getMetadata() {
2060 return this.metadata;
2061 }
2062
2063 /** Returns the configuration used for this file. */
2064 Configuration getConf() { return conf; }
2065
2066 /** Read a compressed buffer */
2067 private synchronized void readBuffer(DataInputBuffer buffer,
2068 CompressionInputStream filter) throws IOException {
2069 // Read data into a temporary buffer
2070 DataOutputBuffer dataBuffer = new DataOutputBuffer();
2071
2072 try {
2073 int dataBufferLength = WritableUtils.readVInt(in);
2074 dataBuffer.write(in, dataBufferLength);
2075
2076 // Set up 'buffer' connected to the input-stream
2077 buffer.reset(dataBuffer.getData(), 0, dataBuffer.getLength());
2078 } finally {
2079 dataBuffer.close();
2080 }
2081
2082 // Reset the codec
2083 filter.resetState();
2084 }
2085
2086 /** Read the next 'compressed' block */
2087 private synchronized void readBlock() throws IOException {
2088 // Check if we need to throw away a whole block of
2089 // 'values' due to 'lazy decompression'
2090 if (lazyDecompress && !valuesDecompressed) {
2091 in.seek(WritableUtils.readVInt(in)+in.getPos());
2092 in.seek(WritableUtils.readVInt(in)+in.getPos());
2093 }
2094
2095 // Reset internal states
2096 noBufferedKeys = 0; noBufferedValues = 0; noBufferedRecords = 0;
2097 valuesDecompressed = false;
2098
2099 //Process sync
2100 if (sync != null) {
2101 in.readInt();
2102 in.readFully(syncCheck); // read syncCheck
2103 if (!Arrays.equals(sync, syncCheck)) // check it
2104 throw new IOException("File is corrupt!");
2105 }
2106 syncSeen = true;
2107
2108 // Read number of records in this block
2109 noBufferedRecords = WritableUtils.readVInt(in);
2110
2111 // Read key lengths and keys
2112 readBuffer(keyLenBuffer, keyLenInFilter);
2113 readBuffer(keyBuffer, keyInFilter);
2114 noBufferedKeys = noBufferedRecords;
2115
2116 // Read value lengths and values
2117 if (!lazyDecompress) {
2118 readBuffer(valLenBuffer, valLenInFilter);
2119 readBuffer(valBuffer, valInFilter);
2120 noBufferedValues = noBufferedRecords;
2121 valuesDecompressed = true;
2122 }
2123 }
2124
2125 /**
2126 * Position valLenIn/valIn to the 'value'
2127 * corresponding to the 'current' key
2128 */
2129 private synchronized void seekToCurrentValue() throws IOException {
2130 if (!blockCompressed) {
2131 if (decompress) {
2132 valInFilter.resetState();
2133 }
2134 valBuffer.reset();
2135 } else {
2136 // Check if this is the first value in the 'block' to be read
2137 if (lazyDecompress && !valuesDecompressed) {
2138 // Read the value lengths and values
2139 readBuffer(valLenBuffer, valLenInFilter);
2140 readBuffer(valBuffer, valInFilter);
2141 noBufferedValues = noBufferedRecords;
2142 valuesDecompressed = true;
2143 }
2144
2145 // Calculate the no. of bytes to skip
2146 // Note: 'current' key has already been read!
2147 int skipValBytes = 0;
2148 int currentKey = noBufferedKeys + 1;
2149 for (int i=noBufferedValues; i > currentKey; --i) {
2150 skipValBytes += WritableUtils.readVInt(valLenIn);
2151 --noBufferedValues;
2152 }
2153
2154 // Skip to the 'val' corresponding to 'current' key
2155 if (skipValBytes > 0) {
2156 if (valIn.skipBytes(skipValBytes) != skipValBytes) {
2157 throw new IOException("Failed to seek to " + currentKey +
2158 "(th) value!");
2159 }
2160 }
2161 }
2162 }
2163
2164 /**
2165 * Get the 'value' corresponding to the last read 'key'.
2166 * @param val : The 'value' to be read.
2167 * @throws IOException
2168 */
2169 public synchronized void getCurrentValue(Writable val)
2170 throws IOException {
2171 if (val instanceof Configurable) {
2172 ((Configurable) val).setConf(this.conf);
2173 }
2174
2175 // Position stream to 'current' value
2176 seekToCurrentValue();
2177
2178 if (!blockCompressed) {
2179 val.readFields(valIn);
2180
2181 if (valIn.read() > 0) {
2182 LOG.info("available bytes: " + valIn.available());
2183 throw new IOException(val+" read "+(valBuffer.getPosition()-keyLength)
2184 + " bytes, should read " +
2185 (valBuffer.getLength()-keyLength));
2186 }
2187 } else {
2188 // Get the value
2189 int valLength = WritableUtils.readVInt(valLenIn);
2190 val.readFields(valIn);
2191
2192 // Read another compressed 'value'
2193 --noBufferedValues;
2194
2195 // Sanity check
2196 if ((valLength < 0) && LOG.isDebugEnabled()) {
2197 LOG.debug(val + " is a zero-length value");
2198 }
2199 }
2200
2201 }
2202
2203 /**
2204 * Get the 'value' corresponding to the last read 'key'.
2205 * @param val : The 'value' to be read.
2206 * @throws IOException
2207 */
2208 public synchronized Object getCurrentValue(Object val)
2209 throws IOException {
2210 if (val instanceof Configurable) {
2211 ((Configurable) val).setConf(this.conf);
2212 }
2213
2214 // Position stream to 'current' value
2215 seekToCurrentValue();
2216
2217 if (!blockCompressed) {
2218 val = deserializeValue(val);
2219
2220 if (valIn.read() > 0) {
2221 LOG.info("available bytes: " + valIn.available());
2222 throw new IOException(val+" read "+(valBuffer.getPosition()-keyLength)
2223 + " bytes, should read " +
2224 (valBuffer.getLength()-keyLength));
2225 }
2226 } else {
2227 // Get the value
2228 int valLength = WritableUtils.readVInt(valLenIn);
2229 val = deserializeValue(val);
2230
2231 // Read another compressed 'value'
2232 --noBufferedValues;
2233
2234 // Sanity check
2235 if ((valLength < 0) && LOG.isDebugEnabled()) {
2236 LOG.debug(val + " is a zero-length value");
2237 }
2238 }
2239 return val;
2240
2241 }
2242
2243 @SuppressWarnings("unchecked")
2244 private Object deserializeValue(Object val) throws IOException {
2245 return valDeserializer.deserialize(val);
2246 }
2247
2248 /** Read the next key in the file into <code>key</code>, skipping its
2249 * value. True if another entry exists, and false at end of file. */
2250 public synchronized boolean next(Writable key) throws IOException {
2251 if (key.getClass() != getKeyClass())
2252 throw new IOException("wrong key class: "+key.getClass().getName()
2253 +" is not "+keyClass);
2254
2255 if (!blockCompressed) {
2256 outBuf.reset();
2257
2258 keyLength = next(outBuf);
2259 if (keyLength < 0)
2260 return false;
2261
2262 valBuffer.reset(outBuf.getData(), outBuf.getLength());
2263
2264 key.readFields(valBuffer);
2265 valBuffer.mark(0);
2266 if (valBuffer.getPosition() != keyLength)
2267 throw new IOException(key + " read " + valBuffer.getPosition()
2268 + " bytes, should read " + keyLength);
2269 } else {
2270 //Reset syncSeen
2271 syncSeen = false;
2272
2273 if (noBufferedKeys == 0) {
2274 try {
2275 readBlock();
2276 } catch (EOFException eof) {
2277 return false;
2278 }
2279 }
2280
2281 int keyLength = WritableUtils.readVInt(keyLenIn);
2282
2283 // Sanity check
2284 if (keyLength < 0) {
2285 return false;
2286 }
2287
2288 //Read another compressed 'key'
2289 key.readFields(keyIn);
2290 --noBufferedKeys;
2291 }
2292
2293 return true;
2294 }
2295
2296 /** Read the next key/value pair in the file into <code>key</code> and
2297 * <code>val</code>. Returns true if such a pair exists and false when at
2298 * end of file */
2299 public synchronized boolean next(Writable key, Writable val)
2300 throws IOException {
2301 if (val.getClass() != getValueClass())
2302 throw new IOException("wrong value class: "+val+" is not "+valClass);
2303
2304 boolean more = next(key);
2305
2306 if (more) {
2307 getCurrentValue(val);
2308 }
2309
2310 return more;
2311 }
2312
2313 /**
2314 * Read and return the next record length, potentially skipping over
2315 * a sync block.
2316 * @return the length of the next record or -1 if there is no next record
2317 * @throws IOException
2318 */
2319 private synchronized int readRecordLength() throws IOException {
2320 if (in.getPos() >= end) {
2321 return -1;
2322 }
2323 int length = in.readInt();
2324 if (version > 1 && sync != null &&
2325 length == SYNC_ESCAPE) { // process a sync entry
2326 in.readFully(syncCheck); // read syncCheck
2327 if (!Arrays.equals(sync, syncCheck)) // check it
2328 throw new IOException("File is corrupt!");
2329 syncSeen = true;
2330 if (in.getPos() >= end) {
2331 return -1;
2332 }
2333 length = in.readInt(); // re-read length
2334 } else {
2335 syncSeen = false;
2336 }
2337
2338 return length;
2339 }
2340
2341 /** Read the next key/value pair in the file into <code>buffer</code>.
2342 * Returns the length of the key read, or -1 if at end of file. The length
2343 * of the value may be computed by calling buffer.getLength() before and
2344 * after calls to this method. */
2345 /** @deprecated Call {@link #nextRaw(DataOutputBuffer,SequenceFile.ValueBytes)}. */
2346 @Deprecated
2347 synchronized int next(DataOutputBuffer buffer) throws IOException {
2348 // Unsupported for block-compressed sequence files
2349 if (blockCompressed) {
2350 throw new IOException("Unsupported call for block-compressed" +
2351 " SequenceFiles - use SequenceFile.Reader.next(DataOutputStream, ValueBytes)");
2352 }
2353 try {
2354 int length = readRecordLength();
2355 if (length == -1) {
2356 return -1;
2357 }
2358 int keyLength = in.readInt();
2359 buffer.write(in, length);
2360 return keyLength;
2361 } catch (ChecksumException e) { // checksum failure
2362 handleChecksumException(e);
2363 return next(buffer);
2364 }
2365 }
2366
2367 public ValueBytes createValueBytes() {
2368 ValueBytes val = null;
2369 if (!decompress || blockCompressed) {
2370 val = new UncompressedBytes();
2371 } else {
2372 val = new CompressedBytes(codec);
2373 }
2374 return val;
2375 }
2376
2377 /**
2378 * Read 'raw' records.
2379 * @param key - The buffer into which the key is read
2380 * @param val - The 'raw' value
2381 * @return Returns the total record length or -1 for end of file
2382 * @throws IOException
2383 */
2384 public synchronized int nextRaw(DataOutputBuffer key, ValueBytes val)
2385 throws IOException {
2386 if (!blockCompressed) {
2387 int length = readRecordLength();
2388 if (length == -1) {
2389 return -1;
2390 }
2391 int keyLength = in.readInt();
2392 int valLength = length - keyLength;
2393 key.write(in, keyLength);
2394 if (decompress) {
2395 CompressedBytes value = (CompressedBytes)val;
2396 value.reset(in, valLength);
2397 } else {
2398 UncompressedBytes value = (UncompressedBytes)val;
2399 value.reset(in, valLength);
2400 }
2401
2402 return length;
2403 } else {
2404 //Reset syncSeen
2405 syncSeen = false;
2406
2407 // Read 'key'
2408 if (noBufferedKeys == 0) {
2409 if (in.getPos() >= end)
2410 return -1;
2411
2412 try {
2413 readBlock();
2414 } catch (EOFException eof) {
2415 return -1;
2416 }
2417 }
2418 int keyLength = WritableUtils.readVInt(keyLenIn);
2419 if (keyLength < 0) {
2420 throw new IOException("zero length key found!");
2421 }
2422 key.write(keyIn, keyLength);
2423 --noBufferedKeys;
2424
2425 // Read raw 'value'
2426 seekToCurrentValue();
2427 int valLength = WritableUtils.readVInt(valLenIn);
2428 UncompressedBytes rawValue = (UncompressedBytes)val;
2429 rawValue.reset(valIn, valLength);
2430 --noBufferedValues;
2431
2432 return (keyLength+valLength);
2433 }
2434
2435 }
2436
2437 /**
2438 * Read 'raw' keys.
2439 * @param key - The buffer into which the key is read
2440 * @return Returns the key length or -1 for end of file
2441 * @throws IOException
2442 */
2443 public synchronized int nextRawKey(DataOutputBuffer key)
2444 throws IOException {
2445 if (!blockCompressed) {
2446 recordLength = readRecordLength();
2447 if (recordLength == -1) {
2448 return -1;
2449 }
2450 keyLength = in.readInt();
2451 key.write(in, keyLength);
2452 return keyLength;
2453 } else {
2454 //Reset syncSeen
2455 syncSeen = false;
2456
2457 // Read 'key'
2458 if (noBufferedKeys == 0) {
2459 if (in.getPos() >= end)
2460 return -1;
2461
2462 try {
2463 readBlock();
2464 } catch (EOFException eof) {
2465 return -1;
2466 }
2467 }
2468 int keyLength = WritableUtils.readVInt(keyLenIn);
2469 if (keyLength < 0) {
2470 throw new IOException("zero length key found!");
2471 }
2472 key.write(keyIn, keyLength);
2473 --noBufferedKeys;
2474
2475 return keyLength;
2476 }
2477
2478 }
2479
2480 /** Read the next key in the file, skipping its
2481 * value. Return null at end of file. */
2482 public synchronized Object next(Object key) throws IOException {
2483 if (key != null && key.getClass() != getKeyClass()) {
2484 throw new IOException("wrong key class: "+key.getClass().getName()
2485 +" is not "+keyClass);
2486 }
2487
2488 if (!blockCompressed) {
2489 outBuf.reset();
2490
2491 keyLength = next(outBuf);
2492 if (keyLength < 0)
2493 return null;
2494
2495 valBuffer.reset(outBuf.getData(), outBuf.getLength());
2496
2497 key = deserializeKey(key);
2498 valBuffer.mark(0);
2499 if (valBuffer.getPosition() != keyLength)
2500 throw new IOException(key + " read " + valBuffer.getPosition()
2501 + " bytes, should read " + keyLength);
2502 } else {
2503 //Reset syncSeen
2504 syncSeen = false;
2505
2506 if (noBufferedKeys == 0) {
2507 try {
2508 readBlock();
2509 } catch (EOFException eof) {
2510 return null;
2511 }
2512 }
2513
2514 int keyLength = WritableUtils.readVInt(keyLenIn);
2515
2516 // Sanity check
2517 if (keyLength < 0) {
2518 return null;
2519 }
2520
2521 //Read another compressed 'key'
2522 key = deserializeKey(key);
2523 --noBufferedKeys;
2524 }
2525
2526 return key;
2527 }
2528
2529 @SuppressWarnings("unchecked")
2530 private Object deserializeKey(Object key) throws IOException {
2531 return keyDeserializer.deserialize(key);
2532 }
2533
2534 /**
2535 * Read 'raw' values.
2536 * @param val - The 'raw' value
2537 * @return Returns the value length
2538 * @throws IOException
2539 */
2540 public synchronized int nextRawValue(ValueBytes val)
2541 throws IOException {
2542
2543 // Position stream to current value
2544 seekToCurrentValue();
2545
2546 if (!blockCompressed) {
2547 int valLength = recordLength - keyLength;
2548 if (decompress) {
2549 CompressedBytes value = (CompressedBytes)val;
2550 value.reset(in, valLength);
2551 } else {
2552 UncompressedBytes value = (UncompressedBytes)val;
2553 value.reset(in, valLength);
2554 }
2555
2556 return valLength;
2557 } else {
2558 int valLength = WritableUtils.readVInt(valLenIn);
2559 UncompressedBytes rawValue = (UncompressedBytes)val;
2560 rawValue.reset(valIn, valLength);
2561 --noBufferedValues;
2562 return valLength;
2563 }
2564
2565 }
2566
2567 private void handleChecksumException(ChecksumException e)
2568 throws IOException {
2569 if (this.conf.getBoolean("io.skip.checksum.errors", false)) {
2570 LOG.warn("Bad checksum at "+getPosition()+". Skipping entries.");
2571 sync(getPosition()+this.conf.getInt("io.bytes.per.checksum", 512));
2572 } else {
2573 throw e;
2574 }
2575 }
2576
2577 /** disables sync. often invoked for tmp files */
2578 synchronized void ignoreSync() {
2579 sync = null;
2580 }
2581
2582 /** Set the current byte position in the input file.
2583 *
2584 * <p>The position passed must be a position returned by {@link
2585 * SequenceFile.Writer#getLength()} when writing this file. To seek to an arbitrary
2586 * position, use {@link SequenceFile.Reader#sync(long)}.
2587 */
2588 public synchronized void seek(long position) throws IOException {
2589 in.seek(position);
2590 if (blockCompressed) { // trigger block read
2591 noBufferedKeys = 0;
2592 valuesDecompressed = true;
2593 }
2594 }
2595
2596 /** Seek to the next sync mark past a given position.*/
2597 public synchronized void sync(long position) throws IOException {
2598 if (position+SYNC_SIZE >= end) {
2599 seek(end);
2600 return;
2601 }
2602
2603 if (position < headerEnd) {
2604 // seek directly to first record
2605 in.seek(headerEnd);
2606 // note the sync marker "seen" in the header
2607 syncSeen = true;
2608 return;
2609 }
2610
2611 try {
2612 seek(position+4); // skip escape
2613 in.readFully(syncCheck);
2614 int syncLen = sync.length;
2615 for (int i = 0; in.getPos() < end; i++) {
2616 int j = 0;
2617 for (; j < syncLen; j++) {
2618 if (sync[j] != syncCheck[(i+j)%syncLen])
2619 break;
2620 }
2621 if (j == syncLen) {
2622 in.seek(in.getPos() - SYNC_SIZE); // position before sync
2623 return;
2624 }
2625 syncCheck[i%syncLen] = in.readByte();
2626 }
2627 } catch (ChecksumException e) { // checksum failure
2628 handleChecksumException(e);
2629 }
2630 }
2631
2632 /** Returns true iff the previous call to next passed a sync mark.*/
2633 public synchronized boolean syncSeen() { return syncSeen; }
2634
2635 /** Return the current byte position in the input file. */
2636 public synchronized long getPosition() throws IOException {
2637 return in.getPos();
2638 }
2639
2640 /** Returns the name of the file. */
2641 @Override
2642 public String toString() {
2643 return filename;
2644 }
2645
2646 }
2647
2648 /** Sorts key/value pairs in a sequence-format file.
2649 *
2650 * <p>For best performance, applications should make sure that the {@link
2651 * Writable#readFields(DataInput)} implementation of their keys is
2652 * very efficient. In particular, it should avoid allocating memory.
2653 */
2654 public static class Sorter {
2655
2656 private RawComparator comparator;
2657
2658 private MergeSort mergeSort; //the implementation of merge sort
2659
2660 private Path[] inFiles; // when merging or sorting
2661
2662 private Path outFile;
2663
2664 private int memory; // bytes
2665 private int factor; // merged per pass
2666
2667 private FileSystem fs = null;
2668
2669 private Class keyClass;
2670 private Class valClass;
2671
2672 private Configuration conf;
2673 private Metadata metadata;
2674
2675 private Progressable progressable = null;
2676
2677 /** Sort and merge files containing the named classes. */
2678 public Sorter(FileSystem fs, Class<? extends WritableComparable> keyClass,
2679 Class valClass, Configuration conf) {
2680 this(fs, WritableComparator.get(keyClass, conf), keyClass, valClass, conf);
2681 }
2682
2683 /** Sort and merge using an arbitrary {@link RawComparator}. */
2684 public Sorter(FileSystem fs, RawComparator comparator, Class keyClass,
2685 Class valClass, Configuration conf) {
2686 this(fs, comparator, keyClass, valClass, conf, new Metadata());
2687 }
2688
2689 /** Sort and merge using an arbitrary {@link RawComparator}. */
2690 public Sorter(FileSystem fs, RawComparator comparator, Class keyClass,
2691 Class valClass, Configuration conf, Metadata metadata) {
2692 this.fs = fs;
2693 this.comparator = comparator;
2694 this.keyClass = keyClass;
2695 this.valClass = valClass;
2696 this.memory = conf.getInt("io.sort.mb", 100) * 1024 * 1024;
2697 this.factor = conf.getInt("io.sort.factor", 100);
2698 this.conf = conf;
2699 this.metadata = metadata;
2700 }
2701
2702 /** Set the number of streams to merge at once.*/
2703 public void setFactor(int factor) { this.factor = factor; }
2704
2705 /** Get the number of streams to merge at once.*/
2706 public int getFactor() { return factor; }
2707
2708 /** Set the total amount of buffer memory, in bytes.*/
2709 public void setMemory(int memory) { this.memory = memory; }
2710
2711 /** Get the total amount of buffer memory, in bytes.*/
2712 public int getMemory() { return memory; }
2713
2714 /** Set the progressable object in order to report progress. */
2715 public void setProgressable(Progressable progressable) {
2716 this.progressable = progressable;
2717 }
2718
2719 /**
2720 * Perform a file sort from a set of input files into an output file.
2721 * @param inFiles the files to be sorted
2722 * @param outFile the sorted output file
2723 * @param deleteInput should the input files be deleted as they are read?
2724 */
2725 public void sort(Path[] inFiles, Path outFile,
2726 boolean deleteInput) throws IOException {
2727 if (fs.exists(outFile)) {
2728 throw new IOException("already exists: " + outFile);
2729 }
2730
2731 this.inFiles = inFiles;
2732 this.outFile = outFile;
2733
2734 int segments = sortPass(deleteInput);
2735 if (segments > 1) {
2736 mergePass(outFile.getParent());
2737 }
2738 }
2739
2740 /**
2741 * Perform a file sort from a set of input files and return an iterator.
2742 * @param inFiles the files to be sorted
2743 * @param tempDir the directory where temp files are created during sort
2744 * @param deleteInput should the input files be deleted as they are read?
2745 * @return iterator the RawKeyValueIterator
2746 */
2747 public RawKeyValueIterator sortAndIterate(Path[] inFiles, Path tempDir,
2748 boolean deleteInput) throws IOException {
2749 Path outFile = new Path(tempDir + Path.SEPARATOR + "all.2");
2750 if (fs.exists(outFile)) {
2751 throw new IOException("already exists: " + outFile);
2752 }
2753 this.inFiles = inFiles;
2754 //outFile will basically be used as prefix for temp files in the cases
2755 //where sort outputs multiple sorted segments. For the single segment
2756 //case, the outputFile itself will contain the sorted data for that
2757 //segment
2758 this.outFile = outFile;
2759
2760 int segments = sortPass(deleteInput);
2761 if (segments > 1)
2762 return merge(outFile.suffix(".0"), outFile.suffix(".0.index"),
2763 tempDir);
2764 else if (segments == 1)
2765 return merge(new Path[]{outFile}, true, tempDir);
2766 else return null;
2767 }
2768
2769 /**
2770 * The backwards compatible interface to sort.
2771 * @param inFile the input file to sort
2772 * @param outFile the sorted output file
2773 */
2774 public void sort(Path inFile, Path outFile) throws IOException {
2775 sort(new Path[]{inFile}, outFile, false);
2776 }
2777
2778 private int sortPass(boolean deleteInput) throws IOException {
2779 if(LOG.isDebugEnabled()) {
2780 LOG.debug("running sort pass");
2781 }
2782 SortPass sortPass = new SortPass(); // make the SortPass
2783 sortPass.setProgressable(progressable);
2784 mergeSort = new MergeSort(sortPass.new SeqFileComparator());
2785 try {
2786 return sortPass.run(deleteInput); // run it
2787 } finally {
2788 sortPass.close(); // close it
2789 }
2790 }
2791
2792 private class SortPass {
2793 private int memoryLimit = memory/4;
2794 private int recordLimit = 1000000;
2795
2796 private DataOutputBuffer rawKeys = new DataOutputBuffer();
2797 private byte[] rawBuffer;
2798
2799 private int[] keyOffsets = new int[1024];
2800 private int[] pointers = new int[keyOffsets.length];
2801 private int[] pointersCopy = new int[keyOffsets.length];
2802 private int[] keyLengths = new int[keyOffsets.length];
2803 private ValueBytes[] rawValues = new ValueBytes[keyOffsets.length];
2804
2805 private ArrayList segmentLengths = new ArrayList();
2806
2807 private Reader in = null;
2808 private FSDataOutputStream out = null;
2809 private FSDataOutputStream indexOut = null;
2810 private Path outName;
2811
2812 private Progressable progressable = null;
2813
2814 public int run(boolean deleteInput) throws IOException {
2815 int segments = 0;
2816 int currentFile = 0;
2817 boolean atEof = (currentFile >= inFiles.length);
2818 CompressionType compressionType;
2819 CompressionCodec codec = null;
2820 segmentLengths.clear();
2821 if (atEof) {
2822 return 0;
2823 }
2824
2825 // Initialize
2826 in = new Reader(fs, inFiles[currentFile], conf);
2827 compressionType = in.getCompressionType();
2828 codec = in.getCompressionCodec();
2829
2830 for (int i=0; i < rawValues.length; ++i) {
2831 rawValues[i] = null;
2832 }
2833
2834 while (!atEof) {
2835 int count = 0;
2836 int bytesProcessed = 0;
2837 rawKeys.reset();
2838 while (!atEof &&
2839 bytesProcessed < memoryLimit && count < recordLimit) {
2840
2841 // Read a record into buffer
2842 // Note: Attempt to re-use 'rawValue' as far as possible
2843 int keyOffset = rawKeys.getLength();
2844 ValueBytes rawValue =
2845 (count == keyOffsets.length || rawValues[count] == null) ?
2846 in.createValueBytes() :
2847 rawValues[count];
2848 int recordLength = in.nextRaw(rawKeys, rawValue);
2849 if (recordLength == -1) {
2850 in.close();
2851 if (deleteInput) {
2852 fs.delete(inFiles[currentFile], true);
2853 }
2854 currentFile += 1;
2855 atEof = currentFile >= inFiles.length;
2856 if (!atEof) {
2857 in = new Reader(fs, inFiles[currentFile], conf);
2858 } else {
2859 in = null;
2860 }
2861 continue;
2862 }
2863
2864 int keyLength = rawKeys.getLength() - keyOffset;
2865
2866 if (count == keyOffsets.length)
2867 grow();
2868
2869 keyOffsets[count] = keyOffset; // update pointers
2870 pointers[count] = count;
2871 keyLengths[count] = keyLength;
2872 rawValues[count] = rawValue;
2873
2874 bytesProcessed += recordLength;
2875 count++;
2876 }
2877
2878 // buffer is full -- sort & flush it
2879 if(LOG.isDebugEnabled()) {
2880 LOG.debug("flushing segment " + segments);
2881 }
2882 rawBuffer = rawKeys.getData();
2883 sort(count);
2884 // indicate we're making progress
2885 if (progressable != null) {
2886 progressable.progress();
2887 }
2888 flush(count, bytesProcessed, compressionType, codec,
2889 segments==0 && atEof);
2890 segments++;
2891 }
2892 return segments;
2893 }
2894
2895 public void close() throws IOException {
2896 if (in != null) {
2897 in.close();
2898 }
2899 if (out != null) {
2900 out.close();
2901 }
2902 if (indexOut != null) {
2903 indexOut.close();
2904 }
2905 }
2906
2907 private void grow() {
2908 int newLength = keyOffsets.length * 3 / 2;
2909 keyOffsets = grow(keyOffsets, newLength);
2910 pointers = grow(pointers, newLength);
2911 pointersCopy = new int[newLength];
2912 keyLengths = grow(keyLengths, newLength);
2913 rawValues = grow(rawValues, newLength);
2914 }
2915
2916 private int[] grow(int[] old, int newLength) {
2917 int[] result = new int[newLength];
2918 System.arraycopy(old, 0, result, 0, old.length);
2919 return result;
2920 }
2921
2922 private ValueBytes[] grow(ValueBytes[] old, int newLength) {
2923 ValueBytes[] result = new ValueBytes[newLength];
2924 System.arraycopy(old, 0, result, 0, old.length);
2925 for (int i=old.length; i < newLength; ++i) {
2926 result[i] = null;
2927 }
2928 return result;
2929 }
2930
2931 private void flush(int count, int bytesProcessed,
2932 CompressionType compressionType,
2933 CompressionCodec codec,
2934 boolean done) throws IOException {
2935 if (out == null) {
2936 outName = done ? outFile : outFile.suffix(".0");
2937 out = fs.create(outName);
2938 if (!done) {
2939 indexOut = fs.create(outName.suffix(".index"));
2940 }
2941 }
2942
2943 long segmentStart = out.getPos();
2944 Writer writer = createWriter(conf, Writer.stream(out),
2945 Writer.keyClass(keyClass), Writer.valueClass(valClass),
2946 Writer.compression(compressionType, codec),
2947 Writer.metadata(done ? metadata : new Metadata()));
2948
2949 if (!done) {
2950 writer.sync = null; // disable sync on temp files
2951 }
2952
2953 for (int i = 0; i < count; i++) { // write in sorted order
2954 int p = pointers[i];
2955 writer.appendRaw(rawBuffer, keyOffsets[p], keyLengths[p], rawValues[p]);
2956 }
2957 writer.close();
2958
2959 if (!done) {
2960 // Save the segment length
2961 WritableUtils.writeVLong(indexOut, segmentStart);
2962 WritableUtils.writeVLong(indexOut, (out.getPos()-segmentStart));
2963 indexOut.flush();
2964 }
2965 }
2966
2967 private void sort(int count) {
2968 System.arraycopy(pointers, 0, pointersCopy, 0, count);
2969 mergeSort.mergeSort(pointersCopy, pointers, 0, count);
2970 }
2971 class SeqFileComparator implements Comparator<IntWritable> {
2972 @Override
2973 public int compare(IntWritable I, IntWritable J) {
2974 return comparator.compare(rawBuffer, keyOffsets[I.get()],
2975 keyLengths[I.get()], rawBuffer,
2976 keyOffsets[J.get()], keyLengths[J.get()]);
2977 }
2978 }
2979
2980 /** set the progressable object in order to report progress */
2981 public void setProgressable(Progressable progressable)
2982 {
2983 this.progressable = progressable;
2984 }
2985
2986 } // SequenceFile.Sorter.SortPass
2987
2988 /** The interface to iterate over raw keys/values of SequenceFiles. */
2989 public static interface RawKeyValueIterator {
2990 /** Gets the current raw key
2991 * @return DataOutputBuffer
2992 * @throws IOException
2993 */
2994 DataOutputBuffer getKey() throws IOException;
2995 /** Gets the current raw value
2996 * @return ValueBytes
2997 * @throws IOException
2998 */
2999 ValueBytes getValue() throws IOException;
3000 /** Sets up the current key and value (for getKey and getValue)
3001 * @return true if there exists a key/value, false otherwise
3002 * @throws IOException
3003 */
3004 boolean next() throws IOException;
3005 /** closes the iterator so that the underlying streams can be closed
3006 * @throws IOException
3007 */
3008 void close() throws IOException;
3009 /** Gets the Progress object; this has a float (0.0 - 1.0)
3010 * indicating the bytes processed by the iterator so far
3011 */
3012 Progress getProgress();
3013 }
3014
3015 /**
3016 * Merges the list of segments of type <code>SegmentDescriptor</code>
3017 * @param segments the list of SegmentDescriptors
3018 * @param tmpDir the directory to write temporary files into
3019 * @return RawKeyValueIterator
3020 * @throws IOException
3021 */
3022 public RawKeyValueIterator merge(List <SegmentDescriptor> segments,
3023 Path tmpDir)
3024 throws IOException {
3025 // pass in object to report progress, if present
3026 MergeQueue mQueue = new MergeQueue(segments, tmpDir, progressable);
3027 return mQueue.merge();
3028 }
3029
3030 /**
3031 * Merges the contents of files passed in Path[] using a max factor value
3032 * that is already set
3033 * @param inNames the array of path names
3034 * @param deleteInputs true if the input files should be deleted when
3035 * unnecessary
3036 * @param tmpDir the directory to write temporary files into
3037 * @return RawKeyValueIteratorMergeQueue
3038 * @throws IOException
3039 */
3040 public RawKeyValueIterator merge(Path [] inNames, boolean deleteInputs,
3041 Path tmpDir)
3042 throws IOException {
3043 return merge(inNames, deleteInputs,
3044 (inNames.length < factor) ? inNames.length : factor,
3045 tmpDir);
3046 }
3047
3048 /**
3049 * Merges the contents of files passed in Path[]
3050 * @param inNames the array of path names
3051 * @param deleteInputs true if the input files should be deleted when
3052 * unnecessary
3053 * @param factor the factor that will be used as the maximum merge fan-in
3054 * @param tmpDir the directory to write temporary files into
3055 * @return RawKeyValueIteratorMergeQueue
3056 * @throws IOException
3057 */
3058 public RawKeyValueIterator merge(Path [] inNames, boolean deleteInputs,
3059 int factor, Path tmpDir)
3060 throws IOException {
3061 //get the segments from inNames
3062 ArrayList <SegmentDescriptor> a = new ArrayList <SegmentDescriptor>();
3063 for (int i = 0; i < inNames.length; i++) {
3064 SegmentDescriptor s = new SegmentDescriptor(0,
3065 fs.getFileStatus(inNames[i]).getLen(), inNames[i]);
3066 s.preserveInput(!deleteInputs);
3067 s.doSync();
3068 a.add(s);
3069 }
3070 this.factor = factor;
3071 MergeQueue mQueue = new MergeQueue(a, tmpDir, progressable);
3072 return mQueue.merge();
3073 }
3074
3075 /**
3076 * Merges the contents of files passed in Path[]
3077 * @param inNames the array of path names
3078 * @param tempDir the directory for creating temp files during merge
3079 * @param deleteInputs true if the input files should be deleted when
3080 * unnecessary
3081 * @return RawKeyValueIteratorMergeQueue
3082 * @throws IOException
3083 */
3084 public RawKeyValueIterator merge(Path [] inNames, Path tempDir,
3085 boolean deleteInputs)
3086 throws IOException {
3087 //outFile will basically be used as prefix for temp files for the
3088 //intermediate merge outputs
3089 this.outFile = new Path(tempDir + Path.SEPARATOR + "merged");
3090 //get the segments from inNames
3091 ArrayList <SegmentDescriptor> a = new ArrayList <SegmentDescriptor>();
3092 for (int i = 0; i < inNames.length; i++) {
3093 SegmentDescriptor s = new SegmentDescriptor(0,
3094 fs.getFileStatus(inNames[i]).getLen(), inNames[i]);
3095 s.preserveInput(!deleteInputs);
3096 s.doSync();
3097 a.add(s);
3098 }
3099 factor = (inNames.length < factor) ? inNames.length : factor;
3100 // pass in object to report progress, if present
3101 MergeQueue mQueue = new MergeQueue(a, tempDir, progressable);
3102 return mQueue.merge();
3103 }
3104
3105 /**
3106 * Clones the attributes (like compression of the input file and creates a
3107 * corresponding Writer
3108 * @param inputFile the path of the input file whose attributes should be
3109 * cloned
3110 * @param outputFile the path of the output file
3111 * @param prog the Progressable to report status during the file write
3112 * @return Writer
3113 * @throws IOException
3114 */
3115 public Writer cloneFileAttributes(Path inputFile, Path outputFile,
3116 Progressable prog) throws IOException {
3117 Reader reader = new Reader(conf,
3118 Reader.file(inputFile),
3119 new Reader.OnlyHeaderOption());
3120 CompressionType compress = reader.getCompressionType();
3121 CompressionCodec codec = reader.getCompressionCodec();
3122 reader.close();
3123
3124 Writer writer = createWriter(conf,
3125 Writer.file(outputFile),
3126 Writer.keyClass(keyClass),
3127 Writer.valueClass(valClass),
3128 Writer.compression(compress, codec),
3129 Writer.progressable(prog));
3130 return writer;
3131 }
3132
3133 /**
3134 * Writes records from RawKeyValueIterator into a file represented by the
3135 * passed writer
3136 * @param records the RawKeyValueIterator
3137 * @param writer the Writer created earlier
3138 * @throws IOException
3139 */
3140 public void writeFile(RawKeyValueIterator records, Writer writer)
3141 throws IOException {
3142 while(records.next()) {
3143 writer.appendRaw(records.getKey().getData(), 0,
3144 records.getKey().getLength(), records.getValue());
3145 }
3146 writer.sync();
3147 }
3148
3149 /** Merge the provided files.
3150 * @param inFiles the array of input path names
3151 * @param outFile the final output file
3152 * @throws IOException
3153 */
3154 public void merge(Path[] inFiles, Path outFile) throws IOException {
3155 if (fs.exists(outFile)) {
3156 throw new IOException("already exists: " + outFile);
3157 }
3158 RawKeyValueIterator r = merge(inFiles, false, outFile.getParent());
3159 Writer writer = cloneFileAttributes(inFiles[0], outFile, null);
3160
3161 writeFile(r, writer);
3162
3163 writer.close();
3164 }
3165
3166 /** sort calls this to generate the final merged output */
3167 private int mergePass(Path tmpDir) throws IOException {
3168 if(LOG.isDebugEnabled()) {
3169 LOG.debug("running merge pass");
3170 }
3171 Writer writer = cloneFileAttributes(
3172 outFile.suffix(".0"), outFile, null);
3173 RawKeyValueIterator r = merge(outFile.suffix(".0"),
3174 outFile.suffix(".0.index"), tmpDir);
3175 writeFile(r, writer);
3176
3177 writer.close();
3178 return 0;
3179 }
3180
3181 /** Used by mergePass to merge the output of the sort
3182 * @param inName the name of the input file containing sorted segments
3183 * @param indexIn the offsets of the sorted segments
3184 * @param tmpDir the relative directory to store intermediate results in
3185 * @return RawKeyValueIterator
3186 * @throws IOException
3187 */
3188 private RawKeyValueIterator merge(Path inName, Path indexIn, Path tmpDir)
3189 throws IOException {
3190 //get the segments from indexIn
3191 //we create a SegmentContainer so that we can track segments belonging to
3192 //inName and delete inName as soon as we see that we have looked at all
3193 //the contained segments during the merge process & hence don't need
3194 //them anymore
3195 SegmentContainer container = new SegmentContainer(inName, indexIn);
3196 MergeQueue mQueue = new MergeQueue(container.getSegmentList(), tmpDir, progressable);
3197 return mQueue.merge();
3198 }
3199
3200 /** This class implements the core of the merge logic */
3201 private class MergeQueue extends PriorityQueue
3202 implements RawKeyValueIterator {
3203 private boolean compress;
3204 private boolean blockCompress;
3205 private DataOutputBuffer rawKey = new DataOutputBuffer();
3206 private ValueBytes rawValue;
3207 private long totalBytesProcessed;
3208 private float progPerByte;
3209 private Progress mergeProgress = new Progress();
3210 private Path tmpDir;
3211 private Progressable progress = null; //handle to the progress reporting object
3212 private SegmentDescriptor minSegment;
3213
3214 //a TreeMap used to store the segments sorted by size (segment offset and
3215 //segment path name is used to break ties between segments of same sizes)
3216 private Map<SegmentDescriptor, Void> sortedSegmentSizes =
3217 new TreeMap<SegmentDescriptor, Void>();
3218
3219 @SuppressWarnings("unchecked")
3220 public void put(SegmentDescriptor stream) throws IOException {
3221 if (size() == 0) {
3222 compress = stream.in.isCompressed();
3223 blockCompress = stream.in.isBlockCompressed();
3224 } else if (compress != stream.in.isCompressed() ||
3225 blockCompress != stream.in.isBlockCompressed()) {
3226 throw new IOException("All merged files must be compressed or not.");
3227 }
3228 super.put(stream);
3229 }
3230
3231 /**
3232 * A queue of file segments to merge
3233 * @param segments the file segments to merge
3234 * @param tmpDir a relative local directory to save intermediate files in
3235 * @param progress the reference to the Progressable object
3236 */
3237 public MergeQueue(List <SegmentDescriptor> segments,
3238 Path tmpDir, Progressable progress) {
3239 int size = segments.size();
3240 for (int i = 0; i < size; i++) {
3241 sortedSegmentSizes.put(segments.get(i), null);
3242 }
3243 this.tmpDir = tmpDir;
3244 this.progress = progress;
3245 }
3246 @Override
3247 protected boolean lessThan(Object a, Object b) {
3248 // indicate we're making progress
3249 if (progress != null) {
3250 progress.progress();
3251 }
3252 SegmentDescriptor msa = (SegmentDescriptor)a;
3253 SegmentDescriptor msb = (SegmentDescriptor)b;
3254 return comparator.compare(msa.getKey().getData(), 0,
3255 msa.getKey().getLength(), msb.getKey().getData(), 0,
3256 msb.getKey().getLength()) < 0;
3257 }
3258 @Override
3259 public void close() throws IOException {
3260 SegmentDescriptor ms; // close inputs
3261 while ((ms = (SegmentDescriptor)pop()) != null) {
3262 ms.cleanup();
3263 }
3264 minSegment = null;
3265 }
3266 @Override
3267 public DataOutputBuffer getKey() throws IOException {
3268 return rawKey;
3269 }
3270 @Override
3271 public ValueBytes getValue() throws IOException {
3272 return rawValue;
3273 }
3274 @Override
3275 public boolean next() throws IOException {
3276 if (size() == 0)
3277 return false;
3278 if (minSegment != null) {
3279 //minSegment is non-null for all invocations of next except the first
3280 //one. For the first invocation, the priority queue is ready for use
3281 //but for the subsequent invocations, first adjust the queue
3282 adjustPriorityQueue(minSegment);
3283 if (size() == 0) {
3284 minSegment = null;
3285 return false;
3286 }
3287 }
3288 minSegment = (SegmentDescriptor)top();
3289 long startPos = minSegment.in.getPosition(); // Current position in stream
3290 //save the raw key reference
3291 rawKey = minSegment.getKey();
3292 //load the raw value. Re-use the existing rawValue buffer
3293 if (rawValue == null) {
3294 rawValue = minSegment.in.createValueBytes();
3295 }
3296 minSegment.nextRawValue(rawValue);
3297 long endPos = minSegment.in.getPosition(); // End position after reading value
3298 updateProgress(endPos - startPos);
3299 return true;
3300 }
3301
3302 @Override
3303 public Progress getProgress() {
3304 return mergeProgress;
3305 }
3306
3307 private void adjustPriorityQueue(SegmentDescriptor ms) throws IOException{
3308 long startPos = ms.in.getPosition(); // Current position in stream
3309 boolean hasNext = ms.nextRawKey();
3310 long endPos = ms.in.getPosition(); // End position after reading key
3311 updateProgress(endPos - startPos);
3312 if (hasNext) {
3313 adjustTop();
3314 } else {
3315 pop();
3316 ms.cleanup();
3317 }
3318 }
3319
3320 private void updateProgress(long bytesProcessed) {
3321 totalBytesProcessed += bytesProcessed;
3322 if (progPerByte > 0) {
3323 mergeProgress.set(totalBytesProcessed * progPerByte);
3324 }
3325 }
3326
3327 /** This is the single level merge that is called multiple times
3328 * depending on the factor size and the number of segments
3329 * @return RawKeyValueIterator
3330 * @throws IOException
3331 */
3332 public RawKeyValueIterator merge() throws IOException {
3333 //create the MergeStreams from the sorted map created in the constructor
3334 //and dump the final output to a file
3335 int numSegments = sortedSegmentSizes.size();
3336 int origFactor = factor;
3337 int passNo = 1;
3338 LocalDirAllocator lDirAlloc = new LocalDirAllocator("io.seqfile.local.dir");
3339 do {
3340 //get the factor for this pass of merge
3341 factor = getPassFactor(passNo, numSegments);
3342 List<SegmentDescriptor> segmentsToMerge =
3343 new ArrayList<SegmentDescriptor>();
3344 int segmentsConsidered = 0;
3345 int numSegmentsToConsider = factor;
3346 while (true) {
3347 //extract the smallest 'factor' number of segment pointers from the
3348 //TreeMap. Call cleanup on the empty segments (no key/value data)
3349 SegmentDescriptor[] mStream =
3350 getSegmentDescriptors(numSegmentsToConsider);
3351 for (int i = 0; i < mStream.length; i++) {
3352 if (mStream[i].nextRawKey()) {
3353 segmentsToMerge.add(mStream[i]);
3354 segmentsConsidered++;
3355 // Count the fact that we read some bytes in calling nextRawKey()
3356 updateProgress(mStream[i].in.getPosition());
3357 }
3358 else {
3359 mStream[i].cleanup();
3360 numSegments--; //we ignore this segment for the merge
3361 }
3362 }
3363 //if we have the desired number of segments
3364 //or looked at all available segments, we break
3365 if (segmentsConsidered == factor ||
3366 sortedSegmentSizes.size() == 0) {
3367 break;
3368 }
3369
3370 numSegmentsToConsider = factor - segmentsConsidered;
3371 }
3372 //feed the streams to the priority queue
3373 initialize(segmentsToMerge.size()); clear();
3374 for (int i = 0; i < segmentsToMerge.size(); i++) {
3375 put(segmentsToMerge.get(i));
3376 }
3377 //if we have lesser number of segments remaining, then just return the
3378 //iterator, else do another single level merge
3379 if (numSegments <= factor) {
3380 //calculate the length of the remaining segments. Required for
3381 //calculating the merge progress
3382 long totalBytes = 0;
3383 for (int i = 0; i < segmentsToMerge.size(); i++) {
3384 totalBytes += segmentsToMerge.get(i).segmentLength;
3385 }
3386 if (totalBytes != 0) //being paranoid
3387 progPerByte = 1.0f / (float)totalBytes;
3388 //reset factor to what it originally was
3389 factor = origFactor;
3390 return this;
3391 } else {
3392 //we want to spread the creation of temp files on multiple disks if
3393 //available under the space constraints
3394 long approxOutputSize = 0;
3395 for (SegmentDescriptor s : segmentsToMerge) {
3396 approxOutputSize += s.segmentLength +
3397 ChecksumFileSystem.getApproxChkSumLength(
3398 s.segmentLength);
3399 }
3400 Path tmpFilename =
3401 new Path(tmpDir, "intermediate").suffix("." + passNo);
3402
3403 Path outputFile = lDirAlloc.getLocalPathForWrite(
3404 tmpFilename.toString(),
3405 approxOutputSize, conf);
3406 if(LOG.isDebugEnabled()) {
3407 LOG.debug("writing intermediate results to " + outputFile);
3408 }
3409 Writer writer = cloneFileAttributes(
3410 fs.makeQualified(segmentsToMerge.get(0).segmentPathName),
3411 fs.makeQualified(outputFile), null);
3412 writer.sync = null; //disable sync for temp files
3413 writeFile(this, writer);
3414 writer.close();
3415
3416 //we finished one single level merge; now clean up the priority
3417 //queue
3418 this.close();
3419
3420 SegmentDescriptor tempSegment =
3421 new SegmentDescriptor(0,
3422 fs.getFileStatus(outputFile).getLen(), outputFile);
3423 //put the segment back in the TreeMap
3424 sortedSegmentSizes.put(tempSegment, null);
3425 numSegments = sortedSegmentSizes.size();
3426 passNo++;
3427 }
3428 //we are worried about only the first pass merge factor. So reset the
3429 //factor to what it originally was
3430 factor = origFactor;
3431 } while(true);
3432 }
3433
3434 //Hadoop-591
3435 public int getPassFactor(int passNo, int numSegments) {
3436 if (passNo > 1 || numSegments <= factor || factor == 1)
3437 return factor;
3438 int mod = (numSegments - 1) % (factor - 1);
3439 if (mod == 0)
3440 return factor;
3441 return mod + 1;
3442 }
3443
3444 /** Return (& remove) the requested number of segment descriptors from the
3445 * sorted map.
3446 */
3447 public SegmentDescriptor[] getSegmentDescriptors(int numDescriptors) {
3448 if (numDescriptors > sortedSegmentSizes.size())
3449 numDescriptors = sortedSegmentSizes.size();
3450 SegmentDescriptor[] SegmentDescriptors =
3451 new SegmentDescriptor[numDescriptors];
3452 Iterator iter = sortedSegmentSizes.keySet().iterator();
3453 int i = 0;
3454 while (i < numDescriptors) {
3455 SegmentDescriptors[i++] = (SegmentDescriptor)iter.next();
3456 iter.remove();
3457 }
3458 return SegmentDescriptors;
3459 }
3460 } // SequenceFile.Sorter.MergeQueue
3461
3462 /** This class defines a merge segment. This class can be subclassed to
3463 * provide a customized cleanup method implementation. In this
3464 * implementation, cleanup closes the file handle and deletes the file
3465 */
3466 public class SegmentDescriptor implements Comparable {
3467
3468 long segmentOffset; //the start of the segment in the file
3469 long segmentLength; //the length of the segment
3470 Path segmentPathName; //the path name of the file containing the segment
3471 boolean ignoreSync = true; //set to true for temp files
3472 private Reader in = null;
3473 private DataOutputBuffer rawKey = null; //this will hold the current key
3474 private boolean preserveInput = false; //delete input segment files?
3475
3476 /** Constructs a segment
3477 * @param segmentOffset the offset of the segment in the file
3478 * @param segmentLength the length of the segment
3479 * @param segmentPathName the path name of the file containing the segment
3480 */
3481 public SegmentDescriptor (long segmentOffset, long segmentLength,
3482 Path segmentPathName) {
3483 this.segmentOffset = segmentOffset;
3484 this.segmentLength = segmentLength;
3485 this.segmentPathName = segmentPathName;
3486 }
3487
3488 /** Do the sync checks */
3489 public void doSync() {ignoreSync = false;}
3490
3491 /** Whether to delete the files when no longer needed */
3492 public void preserveInput(boolean preserve) {
3493 preserveInput = preserve;
3494 }
3495
3496 public boolean shouldPreserveInput() {
3497 return preserveInput;
3498 }
3499
3500 @Override
3501 public int compareTo(Object o) {
3502 SegmentDescriptor that = (SegmentDescriptor)o;
3503 if (this.segmentLength != that.segmentLength) {
3504 return (this.segmentLength < that.segmentLength ? -1 : 1);
3505 }
3506 if (this.segmentOffset != that.segmentOffset) {
3507 return (this.segmentOffset < that.segmentOffset ? -1 : 1);
3508 }
3509 return (this.segmentPathName.toString()).
3510 compareTo(that.segmentPathName.toString());
3511 }
3512
3513 @Override
3514 public boolean equals(Object o) {
3515 if (!(o instanceof SegmentDescriptor)) {
3516 return false;
3517 }
3518 SegmentDescriptor that = (SegmentDescriptor)o;
3519 if (this.segmentLength == that.segmentLength &&
3520 this.segmentOffset == that.segmentOffset &&
3521 this.segmentPathName.toString().equals(
3522 that.segmentPathName.toString())) {
3523 return true;
3524 }
3525 return false;
3526 }
3527
3528 @Override
3529 public int hashCode() {
3530 return 37 * 17 + (int) (segmentOffset^(segmentOffset>>>32));
3531 }
3532
3533 /** Fills up the rawKey object with the key returned by the Reader
3534 * @return true if there is a key returned; false, otherwise
3535 * @throws IOException
3536 */
3537 public boolean nextRawKey() throws IOException {
3538 if (in == null) {
3539 int bufferSize = getBufferSize(conf);
3540 Reader reader = new Reader(conf,
3541 Reader.file(segmentPathName),
3542 Reader.bufferSize(bufferSize),
3543 Reader.start(segmentOffset),
3544 Reader.length(segmentLength));
3545
3546 //sometimes we ignore syncs especially for temp merge files
3547 if (ignoreSync) reader.ignoreSync();
3548
3549 if (reader.getKeyClass() != keyClass)
3550 throw new IOException("wrong key class: " + reader.getKeyClass() +
3551 " is not " + keyClass);
3552 if (reader.getValueClass() != valClass)
3553 throw new IOException("wrong value class: "+reader.getValueClass()+
3554 " is not " + valClass);
3555 this.in = reader;
3556 rawKey = new DataOutputBuffer();
3557 }
3558 rawKey.reset();
3559 int keyLength =
3560 in.nextRawKey(rawKey);
3561 return (keyLength >= 0);
3562 }
3563
3564 /** Fills up the passed rawValue with the value corresponding to the key
3565 * read earlier
3566 * @param rawValue
3567 * @return the length of the value
3568 * @throws IOException
3569 */
3570 public int nextRawValue(ValueBytes rawValue) throws IOException {
3571 int valLength = in.nextRawValue(rawValue);
3572 return valLength;
3573 }
3574
3575 /** Returns the stored rawKey */
3576 public DataOutputBuffer getKey() {
3577 return rawKey;
3578 }
3579
3580 /** closes the underlying reader */
3581 private void close() throws IOException {
3582 this.in.close();
3583 this.in = null;
3584 }
3585
3586 /** The default cleanup. Subclasses can override this with a custom
3587 * cleanup
3588 */
3589 public void cleanup() throws IOException {
3590 close();
3591 if (!preserveInput) {
3592 fs.delete(segmentPathName, true);
3593 }
3594 }
3595 } // SequenceFile.Sorter.SegmentDescriptor
3596
3597 /** This class provisions multiple segments contained within a single
3598 * file
3599 */
3600 private class LinkedSegmentsDescriptor extends SegmentDescriptor {
3601
3602 SegmentContainer parentContainer = null;
3603
3604 /** Constructs a segment
3605 * @param segmentOffset the offset of the segment in the file
3606 * @param segmentLength the length of the segment
3607 * @param segmentPathName the path name of the file containing the segment
3608 * @param parent the parent SegmentContainer that holds the segment
3609 */
3610 public LinkedSegmentsDescriptor (long segmentOffset, long segmentLength,
3611 Path segmentPathName, SegmentContainer parent) {
3612 super(segmentOffset, segmentLength, segmentPathName);
3613 this.parentContainer = parent;
3614 }
3615 /** The default cleanup. Subclasses can override this with a custom
3616 * cleanup
3617 */
3618 @Override
3619 public void cleanup() throws IOException {
3620 super.close();
3621 if (super.shouldPreserveInput()) return;
3622 parentContainer.cleanup();
3623 }
3624
3625 @Override
3626 public boolean equals(Object o) {
3627 if (!(o instanceof LinkedSegmentsDescriptor)) {
3628 return false;
3629 }
3630 return super.equals(o);
3631 }
3632 } //SequenceFile.Sorter.LinkedSegmentsDescriptor
3633
3634 /** The class that defines a container for segments to be merged. Primarily
3635 * required to delete temp files as soon as all the contained segments
3636 * have been looked at */
3637 private class SegmentContainer {
3638 private int numSegmentsCleanedUp = 0; //track the no. of segment cleanups
3639 private int numSegmentsContained; //# of segments contained
3640 private Path inName; //input file from where segments are created
3641
3642 //the list of segments read from the file
3643 private ArrayList <SegmentDescriptor> segments =
3644 new ArrayList <SegmentDescriptor>();
3645 /** This constructor is there primarily to serve the sort routine that
3646 * generates a single output file with an associated index file */
3647 public SegmentContainer(Path inName, Path indexIn) throws IOException {
3648 //get the segments from indexIn
3649 FSDataInputStream fsIndexIn = fs.open(indexIn);
3650 long end = fs.getFileStatus(indexIn).getLen();
3651 while (fsIndexIn.getPos() < end) {
3652 long segmentOffset = WritableUtils.readVLong(fsIndexIn);
3653 long segmentLength = WritableUtils.readVLong(fsIndexIn);
3654 Path segmentName = inName;
3655 segments.add(new LinkedSegmentsDescriptor(segmentOffset,
3656 segmentLength, segmentName, this));
3657 }
3658 fsIndexIn.close();
3659 fs.delete(indexIn, true);
3660 numSegmentsContained = segments.size();
3661 this.inName = inName;
3662 }
3663
3664 public List <SegmentDescriptor> getSegmentList() {
3665 return segments;
3666 }
3667 public void cleanup() throws IOException {
3668 numSegmentsCleanedUp++;
3669 if (numSegmentsCleanedUp == numSegmentsContained) {
3670 fs.delete(inName, true);
3671 }
3672 }
3673 } //SequenceFile.Sorter.SegmentContainer
3674
3675 } // SequenceFile.Sorter
3676
3677 } // SequenceFile