001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.io;
020
021 import java.io.EOFException;
022 import java.io.IOException;
023 import java.util.ArrayList;
024 import java.util.Arrays;
025
026 import org.apache.commons.logging.Log;
027 import org.apache.commons.logging.LogFactory;
028 import org.apache.hadoop.classification.InterfaceAudience;
029 import org.apache.hadoop.classification.InterfaceStability;
030 import org.apache.hadoop.conf.Configuration;
031 import org.apache.hadoop.fs.FileSystem;
032 import org.apache.hadoop.fs.Path;
033 import org.apache.hadoop.io.IOUtils;
034 import org.apache.hadoop.io.SequenceFile.CompressionType;
035 import org.apache.hadoop.io.compress.CompressionCodec;
036 import org.apache.hadoop.util.Options;
037 import org.apache.hadoop.util.Progressable;
038 import org.apache.hadoop.util.ReflectionUtils;
039
040 /** A file-based map from keys to values.
041 *
042 * <p>A map is a directory containing two files, the <code>data</code> file,
043 * containing all keys and values in the map, and a smaller <code>index</code>
044 * file, containing a fraction of the keys. The fraction is determined by
045 * {@link Writer#getIndexInterval()}.
046 *
047 * <p>The index file is read entirely into memory. Thus key implementations
048 * should try to keep themselves small.
049 *
050 * <p>Map files are created by adding entries in-order. To maintain a large
051 * database, perform updates by copying the previous version of a database and
052 * merging in a sorted change list, to create a new version of the database in
053 * a new file. Sorting large change lists can be done with {@link
054 * SequenceFile.Sorter}.
055 */
056 @InterfaceAudience.Public
057 @InterfaceStability.Stable
058 public class MapFile {
059 private static final Log LOG = LogFactory.getLog(MapFile.class);
060
061 /** The name of the index file. */
062 public static final String INDEX_FILE_NAME = "index";
063
064 /** The name of the data file. */
065 public static final String DATA_FILE_NAME = "data";
066
067 protected MapFile() {} // no public ctor
068
069 /** Writes a new map. */
070 public static class Writer implements java.io.Closeable {
071 private SequenceFile.Writer data;
072 private SequenceFile.Writer index;
073
074 final private static String INDEX_INTERVAL = "io.map.index.interval";
075 private int indexInterval = 128;
076
077 private long size;
078 private LongWritable position = new LongWritable();
079
080 // the following fields are used only for checking key order
081 private WritableComparator comparator;
082 private DataInputBuffer inBuf = new DataInputBuffer();
083 private DataOutputBuffer outBuf = new DataOutputBuffer();
084 private WritableComparable lastKey;
085
086 /** What's the position (in bytes) we wrote when we got the last index */
087 private long lastIndexPos = -1;
088
089 /**
090 * What was size when we last wrote an index. Set to MIN_VALUE to ensure that
091 * we have an index at position zero -- midKey will throw an exception if this
092 * is not the case
093 */
094 private long lastIndexKeyCount = Long.MIN_VALUE;
095
096
097 /** Create the named map for keys of the named class.
098 * @deprecated Use Writer(Configuration, Path, Option...) instead.
099 */
100 @Deprecated
101 public Writer(Configuration conf, FileSystem fs, String dirName,
102 Class<? extends WritableComparable> keyClass,
103 Class valClass) throws IOException {
104 this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass));
105 }
106
107 /** Create the named map for keys of the named class.
108 * @deprecated Use Writer(Configuration, Path, Option...) instead.
109 */
110 @Deprecated
111 public Writer(Configuration conf, FileSystem fs, String dirName,
112 Class<? extends WritableComparable> keyClass, Class valClass,
113 CompressionType compress,
114 Progressable progress) throws IOException {
115 this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass),
116 compression(compress), progressable(progress));
117 }
118
119 /** Create the named map for keys of the named class.
120 * @deprecated Use Writer(Configuration, Path, Option...) instead.
121 */
122 @Deprecated
123 public Writer(Configuration conf, FileSystem fs, String dirName,
124 Class<? extends WritableComparable> keyClass, Class valClass,
125 CompressionType compress, CompressionCodec codec,
126 Progressable progress) throws IOException {
127 this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass),
128 compression(compress, codec), progressable(progress));
129 }
130
131 /** Create the named map for keys of the named class.
132 * @deprecated Use Writer(Configuration, Path, Option...) instead.
133 */
134 @Deprecated
135 public Writer(Configuration conf, FileSystem fs, String dirName,
136 Class<? extends WritableComparable> keyClass, Class valClass,
137 CompressionType compress) throws IOException {
138 this(conf, new Path(dirName), keyClass(keyClass),
139 valueClass(valClass), compression(compress));
140 }
141
142 /** Create the named map using the named key comparator.
143 * @deprecated Use Writer(Configuration, Path, Option...) instead.
144 */
145 @Deprecated
146 public Writer(Configuration conf, FileSystem fs, String dirName,
147 WritableComparator comparator, Class valClass
148 ) throws IOException {
149 this(conf, new Path(dirName), comparator(comparator),
150 valueClass(valClass));
151 }
152
153 /** Create the named map using the named key comparator.
154 * @deprecated Use Writer(Configuration, Path, Option...) instead.
155 */
156 @Deprecated
157 public Writer(Configuration conf, FileSystem fs, String dirName,
158 WritableComparator comparator, Class valClass,
159 SequenceFile.CompressionType compress) throws IOException {
160 this(conf, new Path(dirName), comparator(comparator),
161 valueClass(valClass), compression(compress));
162 }
163
164 /** Create the named map using the named key comparator.
165 * @deprecated Use Writer(Configuration, Path, Option...)} instead.
166 */
167 @Deprecated
168 public Writer(Configuration conf, FileSystem fs, String dirName,
169 WritableComparator comparator, Class valClass,
170 SequenceFile.CompressionType compress,
171 Progressable progress) throws IOException {
172 this(conf, new Path(dirName), comparator(comparator),
173 valueClass(valClass), compression(compress),
174 progressable(progress));
175 }
176
177 /** Create the named map using the named key comparator.
178 * @deprecated Use Writer(Configuration, Path, Option...) instead.
179 */
180 @Deprecated
181 public Writer(Configuration conf, FileSystem fs, String dirName,
182 WritableComparator comparator, Class valClass,
183 SequenceFile.CompressionType compress, CompressionCodec codec,
184 Progressable progress) throws IOException {
185 this(conf, new Path(dirName), comparator(comparator),
186 valueClass(valClass), compression(compress, codec),
187 progressable(progress));
188 }
189
190 // our options are a superset of sequence file writer options
191 public static interface Option extends SequenceFile.Writer.Option { }
192
193 private static class KeyClassOption extends Options.ClassOption
194 implements Option {
195 KeyClassOption(Class<?> value) {
196 super(value);
197 }
198 }
199
200 private static class ComparatorOption implements Option {
201 private final WritableComparator value;
202 ComparatorOption(WritableComparator value) {
203 this.value = value;
204 }
205 WritableComparator getValue() {
206 return value;
207 }
208 }
209
210 public static Option keyClass(Class<? extends WritableComparable> value) {
211 return new KeyClassOption(value);
212 }
213
214 public static Option comparator(WritableComparator value) {
215 return new ComparatorOption(value);
216 }
217
218 public static SequenceFile.Writer.Option valueClass(Class<?> value) {
219 return SequenceFile.Writer.valueClass(value);
220 }
221
222 public static
223 SequenceFile.Writer.Option compression(CompressionType type) {
224 return SequenceFile.Writer.compression(type);
225 }
226
227 public static
228 SequenceFile.Writer.Option compression(CompressionType type,
229 CompressionCodec codec) {
230 return SequenceFile.Writer.compression(type, codec);
231 }
232
233 public static SequenceFile.Writer.Option progressable(Progressable value) {
234 return SequenceFile.Writer.progressable(value);
235 }
236
237 @SuppressWarnings("unchecked")
238 public Writer(Configuration conf,
239 Path dirName,
240 SequenceFile.Writer.Option... opts
241 ) throws IOException {
242 KeyClassOption keyClassOption =
243 Options.getOption(KeyClassOption.class, opts);
244 ComparatorOption comparatorOption =
245 Options.getOption(ComparatorOption.class, opts);
246 if ((keyClassOption == null) == (comparatorOption == null)) {
247 throw new IllegalArgumentException("key class or comparator option "
248 + "must be set");
249 }
250 this.indexInterval = conf.getInt(INDEX_INTERVAL, this.indexInterval);
251
252 Class<? extends WritableComparable> keyClass;
253 if (keyClassOption == null) {
254 this.comparator = comparatorOption.getValue();
255 keyClass = comparator.getKeyClass();
256 } else {
257 keyClass=
258 (Class<? extends WritableComparable>) keyClassOption.getValue();
259 this.comparator = WritableComparator.get(keyClass, conf);
260 }
261 this.lastKey = comparator.newKey();
262 FileSystem fs = dirName.getFileSystem(conf);
263
264 if (!fs.mkdirs(dirName)) {
265 throw new IOException("Mkdirs failed to create directory " + dirName);
266 }
267 Path dataFile = new Path(dirName, DATA_FILE_NAME);
268 Path indexFile = new Path(dirName, INDEX_FILE_NAME);
269
270 SequenceFile.Writer.Option[] dataOptions =
271 Options.prependOptions(opts,
272 SequenceFile.Writer.file(dataFile),
273 SequenceFile.Writer.keyClass(keyClass));
274 this.data = SequenceFile.createWriter(conf, dataOptions);
275
276 SequenceFile.Writer.Option[] indexOptions =
277 Options.prependOptions(opts, SequenceFile.Writer.file(indexFile),
278 SequenceFile.Writer.keyClass(keyClass),
279 SequenceFile.Writer.valueClass(LongWritable.class),
280 SequenceFile.Writer.compression(CompressionType.BLOCK));
281 this.index = SequenceFile.createWriter(conf, indexOptions);
282 }
283
284 /** The number of entries that are added before an index entry is added.*/
285 public int getIndexInterval() { return indexInterval; }
286
287 /** Sets the index interval.
288 * @see #getIndexInterval()
289 */
290 public void setIndexInterval(int interval) { indexInterval = interval; }
291
292 /** Sets the index interval and stores it in conf
293 * @see #getIndexInterval()
294 */
295 public static void setIndexInterval(Configuration conf, int interval) {
296 conf.setInt(INDEX_INTERVAL, interval);
297 }
298
299 /** Close the map. */
300 @Override
301 public synchronized void close() throws IOException {
302 data.close();
303 index.close();
304 }
305
306 /** Append a key/value pair to the map. The key must be greater or equal
307 * to the previous key added to the map. */
308 public synchronized void append(WritableComparable key, Writable val)
309 throws IOException {
310
311 checkKey(key);
312
313 long pos = data.getLength();
314 // Only write an index if we've changed positions. In a block compressed
315 // file, this means we write an entry at the start of each block
316 if (size >= lastIndexKeyCount + indexInterval && pos > lastIndexPos) {
317 position.set(pos); // point to current eof
318 index.append(key, position);
319 lastIndexPos = pos;
320 lastIndexKeyCount = size;
321 }
322
323 data.append(key, val); // append key/value to data
324 size++;
325 }
326
327 private void checkKey(WritableComparable key) throws IOException {
328 // check that keys are well-ordered
329 if (size != 0 && comparator.compare(lastKey, key) > 0)
330 throw new IOException("key out of order: "+key+" after "+lastKey);
331
332 // update lastKey with a copy of key by writing and reading
333 outBuf.reset();
334 key.write(outBuf); // write new key
335
336 inBuf.reset(outBuf.getData(), outBuf.getLength());
337 lastKey.readFields(inBuf); // read into lastKey
338 }
339
340 }
341
342 /** Provide access to an existing map. */
343 public static class Reader implements java.io.Closeable {
344
345 /** Number of index entries to skip between each entry. Zero by default.
346 * Setting this to values larger than zero can facilitate opening large map
347 * files using less memory. */
348 private int INDEX_SKIP = 0;
349
350 private WritableComparator comparator;
351
352 private WritableComparable nextKey;
353 private long seekPosition = -1;
354 private int seekIndex = -1;
355 private long firstPosition;
356
357 // the data, on disk
358 private SequenceFile.Reader data;
359 private SequenceFile.Reader index;
360
361 // whether the index Reader was closed
362 private boolean indexClosed = false;
363
364 // the index, in memory
365 private int count = -1;
366 private WritableComparable[] keys;
367 private long[] positions;
368
369 /** Returns the class of keys in this file. */
370 public Class<?> getKeyClass() { return data.getKeyClass(); }
371
372 /** Returns the class of values in this file. */
373 public Class<?> getValueClass() { return data.getValueClass(); }
374
375 public static interface Option extends SequenceFile.Reader.Option {}
376
377 public static Option comparator(WritableComparator value) {
378 return new ComparatorOption(value);
379 }
380
381 static class ComparatorOption implements Option {
382 private final WritableComparator value;
383 ComparatorOption(WritableComparator value) {
384 this.value = value;
385 }
386 WritableComparator getValue() {
387 return value;
388 }
389 }
390
391 public Reader(Path dir, Configuration conf,
392 SequenceFile.Reader.Option... opts) throws IOException {
393 ComparatorOption comparatorOption =
394 Options.getOption(ComparatorOption.class, opts);
395 WritableComparator comparator =
396 comparatorOption == null ? null : comparatorOption.getValue();
397 INDEX_SKIP = conf.getInt("io.map.index.skip", 0);
398 open(dir, comparator, conf, opts);
399 }
400
401 /** Construct a map reader for the named map.
402 * @deprecated
403 */
404 @Deprecated
405 public Reader(FileSystem fs, String dirName,
406 Configuration conf) throws IOException {
407 this(new Path(dirName), conf);
408 }
409
410 /** Construct a map reader for the named map using the named comparator.
411 * @deprecated
412 */
413 @Deprecated
414 public Reader(FileSystem fs, String dirName, WritableComparator comparator,
415 Configuration conf) throws IOException {
416 this(new Path(dirName), conf, comparator(comparator));
417 }
418
419 protected synchronized void open(Path dir,
420 WritableComparator comparator,
421 Configuration conf,
422 SequenceFile.Reader.Option... options
423 ) throws IOException {
424 Path dataFile = new Path(dir, DATA_FILE_NAME);
425 Path indexFile = new Path(dir, INDEX_FILE_NAME);
426
427 // open the data
428 this.data = createDataFileReader(dataFile, conf, options);
429 this.firstPosition = data.getPosition();
430
431 if (comparator == null) {
432 Class<? extends WritableComparable> cls;
433 cls = data.getKeyClass().asSubclass(WritableComparable.class);
434 this.comparator = WritableComparator.get(cls, conf);
435 } else {
436 this.comparator = comparator;
437 }
438
439 // open the index
440 SequenceFile.Reader.Option[] indexOptions =
441 Options.prependOptions(options, SequenceFile.Reader.file(indexFile));
442 this.index = new SequenceFile.Reader(conf, indexOptions);
443 }
444
445 /**
446 * Override this method to specialize the type of
447 * {@link SequenceFile.Reader} returned.
448 */
449 protected SequenceFile.Reader
450 createDataFileReader(Path dataFile, Configuration conf,
451 SequenceFile.Reader.Option... options
452 ) throws IOException {
453 SequenceFile.Reader.Option[] newOptions =
454 Options.prependOptions(options, SequenceFile.Reader.file(dataFile));
455 return new SequenceFile.Reader(conf, newOptions);
456 }
457
458 private void readIndex() throws IOException {
459 // read the index entirely into memory
460 if (this.keys != null)
461 return;
462 this.count = 0;
463 this.positions = new long[1024];
464
465 try {
466 int skip = INDEX_SKIP;
467 LongWritable position = new LongWritable();
468 WritableComparable lastKey = null;
469 long lastIndex = -1;
470 ArrayList<WritableComparable> keyBuilder = new ArrayList<WritableComparable>(1024);
471 while (true) {
472 WritableComparable k = comparator.newKey();
473
474 if (!index.next(k, position))
475 break;
476
477 // check order to make sure comparator is compatible
478 if (lastKey != null && comparator.compare(lastKey, k) > 0)
479 throw new IOException("key out of order: "+k+" after "+lastKey);
480 lastKey = k;
481 if (skip > 0) {
482 skip--;
483 continue; // skip this entry
484 } else {
485 skip = INDEX_SKIP; // reset skip
486 }
487
488 // don't read an index that is the same as the previous one. Block
489 // compressed map files used to do this (multiple entries would point
490 // at the same block)
491 if (position.get() == lastIndex)
492 continue;
493
494 if (count == positions.length) {
495 positions = Arrays.copyOf(positions, positions.length * 2);
496 }
497
498 keyBuilder.add(k);
499 positions[count] = position.get();
500 count++;
501 }
502
503 this.keys = keyBuilder.toArray(new WritableComparable[count]);
504 positions = Arrays.copyOf(positions, count);
505 } catch (EOFException e) {
506 LOG.warn("Unexpected EOF reading " + index +
507 " at entry #" + count + ". Ignoring.");
508 } finally {
509 indexClosed = true;
510 index.close();
511 }
512 }
513
514 /** Re-positions the reader before its first key. */
515 public synchronized void reset() throws IOException {
516 data.seek(firstPosition);
517 }
518
519 /** Get the key at approximately the middle of the file. Or null if the
520 * file is empty.
521 */
522 public synchronized WritableComparable midKey() throws IOException {
523
524 readIndex();
525 if (count == 0) {
526 return null;
527 }
528
529 return keys[(count - 1) / 2];
530 }
531
532 /** Reads the final key from the file.
533 *
534 * @param key key to read into
535 */
536 public synchronized void finalKey(WritableComparable key)
537 throws IOException {
538
539 long originalPosition = data.getPosition(); // save position
540 try {
541 readIndex(); // make sure index is valid
542 if (count > 0) {
543 data.seek(positions[count-1]); // skip to last indexed entry
544 } else {
545 reset(); // start at the beginning
546 }
547 while (data.next(key)) {} // scan to eof
548
549 } finally {
550 data.seek(originalPosition); // restore position
551 }
552 }
553
554 /** Positions the reader at the named key, or if none such exists, at the
555 * first entry after the named key. Returns true iff the named key exists
556 * in this map.
557 */
558 public synchronized boolean seek(WritableComparable key) throws IOException {
559 return seekInternal(key) == 0;
560 }
561
562 /**
563 * Positions the reader at the named key, or if none such exists, at the
564 * first entry after the named key.
565 *
566 * @return 0 - exact match found
567 * < 0 - positioned at next record
568 * 1 - no more records in file
569 */
570 private synchronized int seekInternal(WritableComparable key)
571 throws IOException {
572 return seekInternal(key, false);
573 }
574
575 /**
576 * Positions the reader at the named key, or if none such exists, at the
577 * key that falls just before or just after dependent on how the
578 * <code>before</code> parameter is set.
579 *
580 * @param before - IF true, and <code>key</code> does not exist, position
581 * file at entry that falls just before <code>key</code>. Otherwise,
582 * position file at record that sorts just after.
583 * @return 0 - exact match found
584 * < 0 - positioned at next record
585 * 1 - no more records in file
586 */
587 private synchronized int seekInternal(WritableComparable key,
588 final boolean before)
589 throws IOException {
590 readIndex(); // make sure index is read
591
592 if (seekIndex != -1 // seeked before
593 && seekIndex+1 < count
594 && comparator.compare(key, keys[seekIndex+1])<0 // before next indexed
595 && comparator.compare(key, nextKey)
596 >= 0) { // but after last seeked
597 // do nothing
598 } else {
599 seekIndex = binarySearch(key);
600 if (seekIndex < 0) // decode insertion point
601 seekIndex = -seekIndex-2;
602
603 if (seekIndex == -1) // belongs before first entry
604 seekPosition = firstPosition; // use beginning of file
605 else
606 seekPosition = positions[seekIndex]; // else use index
607 }
608 data.seek(seekPosition);
609
610 if (nextKey == null)
611 nextKey = comparator.newKey();
612
613 // If we're looking for the key before, we need to keep track
614 // of the position we got the current key as well as the position
615 // of the key before it.
616 long prevPosition = -1;
617 long curPosition = seekPosition;
618
619 while (data.next(nextKey)) {
620 int c = comparator.compare(key, nextKey);
621 if (c <= 0) { // at or beyond desired
622 if (before && c != 0) {
623 if (prevPosition == -1) {
624 // We're on the first record of this index block
625 // and we've already passed the search key. Therefore
626 // we must be at the beginning of the file, so seek
627 // to the beginning of this block and return c
628 data.seek(curPosition);
629 } else {
630 // We have a previous record to back up to
631 data.seek(prevPosition);
632 data.next(nextKey);
633 // now that we've rewound, the search key must be greater than this key
634 return 1;
635 }
636 }
637 return c;
638 }
639 if (before) {
640 prevPosition = curPosition;
641 curPosition = data.getPosition();
642 }
643 }
644
645 return 1;
646 }
647
648 private int binarySearch(WritableComparable key) {
649 int low = 0;
650 int high = count-1;
651
652 while (low <= high) {
653 int mid = (low + high) >>> 1;
654 WritableComparable midVal = keys[mid];
655 int cmp = comparator.compare(midVal, key);
656
657 if (cmp < 0)
658 low = mid + 1;
659 else if (cmp > 0)
660 high = mid - 1;
661 else
662 return mid; // key found
663 }
664 return -(low + 1); // key not found.
665 }
666
667 /** Read the next key/value pair in the map into <code>key</code> and
668 * <code>val</code>. Returns true if such a pair exists and false when at
669 * the end of the map */
670 public synchronized boolean next(WritableComparable key, Writable val)
671 throws IOException {
672 return data.next(key, val);
673 }
674
675 /** Return the value for the named key, or null if none exists. */
676 public synchronized Writable get(WritableComparable key, Writable val)
677 throws IOException {
678 if (seek(key)) {
679 data.getCurrentValue(val);
680 return val;
681 } else
682 return null;
683 }
684
685 /**
686 * Finds the record that is the closest match to the specified key.
687 * Returns <code>key</code> or if it does not exist, at the first entry
688 * after the named key.
689 *
690 - * @param key - key that we're trying to find
691 - * @param val - data value if key is found
692 - * @return - the key that was the closest match or null if eof.
693 */
694 public synchronized WritableComparable getClosest(WritableComparable key,
695 Writable val)
696 throws IOException {
697 return getClosest(key, val, false);
698 }
699
700 /**
701 * Finds the record that is the closest match to the specified key.
702 *
703 * @param key - key that we're trying to find
704 * @param val - data value if key is found
705 * @param before - IF true, and <code>key</code> does not exist, return
706 * the first entry that falls just before the <code>key</code>. Otherwise,
707 * return the record that sorts just after.
708 * @return - the key that was the closest match or null if eof.
709 */
710 public synchronized WritableComparable getClosest(WritableComparable key,
711 Writable val, final boolean before)
712 throws IOException {
713
714 int c = seekInternal(key, before);
715
716 // If we didn't get an exact match, and we ended up in the wrong
717 // direction relative to the query key, return null since we
718 // must be at the beginning or end of the file.
719 if ((!before && c > 0) ||
720 (before && c < 0)) {
721 return null;
722 }
723
724 data.getCurrentValue(val);
725 return nextKey;
726 }
727
728 /** Close the map. */
729 @Override
730 public synchronized void close() throws IOException {
731 if (!indexClosed) {
732 index.close();
733 }
734 data.close();
735 }
736
737 }
738
739 /** Renames an existing map directory. */
740 public static void rename(FileSystem fs, String oldName, String newName)
741 throws IOException {
742 Path oldDir = new Path(oldName);
743 Path newDir = new Path(newName);
744 if (!fs.rename(oldDir, newDir)) {
745 throw new IOException("Could not rename " + oldDir + " to " + newDir);
746 }
747 }
748
749 /** Deletes the named map file. */
750 public static void delete(FileSystem fs, String name) throws IOException {
751 Path dir = new Path(name);
752 Path data = new Path(dir, DATA_FILE_NAME);
753 Path index = new Path(dir, INDEX_FILE_NAME);
754
755 fs.delete(data, true);
756 fs.delete(index, true);
757 fs.delete(dir, true);
758 }
759
760 /**
761 * This method attempts to fix a corrupt MapFile by re-creating its index.
762 * @param fs filesystem
763 * @param dir directory containing the MapFile data and index
764 * @param keyClass key class (has to be a subclass of Writable)
765 * @param valueClass value class (has to be a subclass of Writable)
766 * @param dryrun do not perform any changes, just report what needs to be done
767 * @return number of valid entries in this MapFile, or -1 if no fixing was needed
768 * @throws Exception
769 */
770 public static long fix(FileSystem fs, Path dir,
771 Class<? extends Writable> keyClass,
772 Class<? extends Writable> valueClass, boolean dryrun,
773 Configuration conf) throws Exception {
774 String dr = (dryrun ? "[DRY RUN ] " : "");
775 Path data = new Path(dir, DATA_FILE_NAME);
776 Path index = new Path(dir, INDEX_FILE_NAME);
777 int indexInterval = conf.getInt(Writer.INDEX_INTERVAL, 128);
778 if (!fs.exists(data)) {
779 // there's nothing we can do to fix this!
780 throw new Exception(dr + "Missing data file in " + dir + ", impossible to fix this.");
781 }
782 if (fs.exists(index)) {
783 // no fixing needed
784 return -1;
785 }
786 SequenceFile.Reader dataReader =
787 new SequenceFile.Reader(conf, SequenceFile.Reader.file(data));
788 if (!dataReader.getKeyClass().equals(keyClass)) {
789 throw new Exception(dr + "Wrong key class in " + dir + ", expected" + keyClass.getName() +
790 ", got " + dataReader.getKeyClass().getName());
791 }
792 if (!dataReader.getValueClass().equals(valueClass)) {
793 throw new Exception(dr + "Wrong value class in " + dir + ", expected" + valueClass.getName() +
794 ", got " + dataReader.getValueClass().getName());
795 }
796 long cnt = 0L;
797 Writable key = ReflectionUtils.newInstance(keyClass, conf);
798 Writable value = ReflectionUtils.newInstance(valueClass, conf);
799 SequenceFile.Writer indexWriter = null;
800 if (!dryrun) {
801 indexWriter =
802 SequenceFile.createWriter(conf,
803 SequenceFile.Writer.file(index),
804 SequenceFile.Writer.keyClass(keyClass),
805 SequenceFile.Writer.valueClass
806 (LongWritable.class));
807 }
808 try {
809 long pos = 0L;
810 LongWritable position = new LongWritable();
811 while(dataReader.next(key, value)) {
812 cnt++;
813 if (cnt % indexInterval == 0) {
814 position.set(pos);
815 if (!dryrun) indexWriter.append(key, position);
816 }
817 pos = dataReader.getPosition();
818 }
819 } catch(Throwable t) {
820 // truncated data file. swallow it.
821 }
822 dataReader.close();
823 if (!dryrun) indexWriter.close();
824 return cnt;
825 }
826
827
828 public static void main(String[] args) throws Exception {
829 String usage = "Usage: MapFile inFile outFile";
830
831 if (args.length != 2) {
832 System.err.println(usage);
833 System.exit(-1);
834 }
835
836 String in = args[0];
837 String out = args[1];
838
839 Configuration conf = new Configuration();
840 FileSystem fs = FileSystem.getLocal(conf);
841 MapFile.Reader reader = null;
842 MapFile.Writer writer = null;
843 try {
844 reader = new MapFile.Reader(fs, in, conf);
845 writer =
846 new MapFile.Writer(conf, fs, out,
847 reader.getKeyClass().asSubclass(WritableComparable.class),
848 reader.getValueClass());
849
850 WritableComparable key = ReflectionUtils.newInstance(reader.getKeyClass()
851 .asSubclass(WritableComparable.class), conf);
852 Writable value = ReflectionUtils.newInstance(reader.getValueClass()
853 .asSubclass(Writable.class), conf);
854
855 while (reader.next(key, value)) // copy all entries
856 writer.append(key, value);
857 } finally {
858 IOUtils.cleanup(LOG, writer, reader);
859 }
860 }
861 }