001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.mapreduce.lib.input;
020
021 import java.io.IOException;
022 import java.util.ArrayList;
023 import java.util.Collection;
024 import java.util.Collections;
025 import java.util.LinkedHashSet;
026 import java.util.HashSet;
027 import java.util.List;
028 import java.util.HashMap;
029 import java.util.Set;
030 import java.util.Iterator;
031 import java.util.Map;
032 import java.util.Map.Entry;
033
034 import org.apache.commons.logging.Log;
035 import org.apache.commons.logging.LogFactory;
036 import org.apache.hadoop.classification.InterfaceAudience;
037 import org.apache.hadoop.classification.InterfaceStability;
038 import org.apache.hadoop.conf.Configuration;
039 import org.apache.hadoop.fs.FileSystem;
040 import org.apache.hadoop.fs.LocatedFileStatus;
041 import org.apache.hadoop.fs.Path;
042 import org.apache.hadoop.fs.BlockLocation;
043 import org.apache.hadoop.fs.FileStatus;
044 import org.apache.hadoop.fs.PathFilter;
045 import org.apache.hadoop.io.compress.CompressionCodec;
046 import org.apache.hadoop.io.compress.CompressionCodecFactory;
047 import org.apache.hadoop.io.compress.SplittableCompressionCodec;
048 import org.apache.hadoop.mapreduce.InputFormat;
049 import org.apache.hadoop.mapreduce.InputSplit;
050 import org.apache.hadoop.mapreduce.JobContext;
051 import org.apache.hadoop.mapreduce.RecordReader;
052 import org.apache.hadoop.mapreduce.TaskAttemptContext;
053 import org.apache.hadoop.net.NodeBase;
054 import org.apache.hadoop.net.NetworkTopology;
055
056 import com.google.common.annotations.VisibleForTesting;
057 import com.google.common.collect.HashMultiset;
058 import com.google.common.collect.Multiset;
059
060 /**
061 * An abstract {@link InputFormat} that returns {@link CombineFileSplit}'s in
062 * {@link InputFormat#getSplits(JobContext)} method.
063 *
064 * Splits are constructed from the files under the input paths.
065 * A split cannot have files from different pools.
066 * Each split returned may contain blocks from different files.
067 * If a maxSplitSize is specified, then blocks on the same node are
068 * combined to form a single split. Blocks that are left over are
069 * then combined with other blocks in the same rack.
070 * If maxSplitSize is not specified, then blocks from the same rack
071 * are combined in a single split; no attempt is made to create
072 * node-local splits.
073 * If the maxSplitSize is equal to the block size, then this class
074 * is similar to the default splitting behavior in Hadoop: each
075 * block is a locally processed split.
076 * Subclasses implement
077 * {@link InputFormat#createRecordReader(InputSplit, TaskAttemptContext)}
078 * to construct <code>RecordReader</code>'s for
079 * <code>CombineFileSplit</code>'s.
080 *
081 * @see CombineFileSplit
082 */
083 @InterfaceAudience.Public
084 @InterfaceStability.Stable
085 public abstract class CombineFileInputFormat<K, V>
086 extends FileInputFormat<K, V> {
087
088 private static final Log LOG = LogFactory.getLog(CombineFileInputFormat.class);
089
090 public static final String SPLIT_MINSIZE_PERNODE =
091 "mapreduce.input.fileinputformat.split.minsize.per.node";
092 public static final String SPLIT_MINSIZE_PERRACK =
093 "mapreduce.input.fileinputformat.split.minsize.per.rack";
094 // ability to limit the size of a single split
095 private long maxSplitSize = 0;
096 private long minSplitSizeNode = 0;
097 private long minSplitSizeRack = 0;
098
099 // A pool of input paths filters. A split cannot have blocks from files
100 // across multiple pools.
101 private ArrayList<MultiPathFilter> pools = new ArrayList<MultiPathFilter>();
102
103 // mapping from a rack name to the set of Nodes in the rack
104 private HashMap<String, Set<String>> rackToNodes =
105 new HashMap<String, Set<String>>();
106 /**
107 * Specify the maximum size (in bytes) of each split. Each split is
108 * approximately equal to the specified size.
109 */
110 protected void setMaxSplitSize(long maxSplitSize) {
111 this.maxSplitSize = maxSplitSize;
112 }
113
114 /**
115 * Specify the minimum size (in bytes) of each split per node.
116 * This applies to data that is left over after combining data on a single
117 * node into splits that are of maximum size specified by maxSplitSize.
118 * This leftover data will be combined into its own split if its size
119 * exceeds minSplitSizeNode.
120 */
121 protected void setMinSplitSizeNode(long minSplitSizeNode) {
122 this.minSplitSizeNode = minSplitSizeNode;
123 }
124
125 /**
126 * Specify the minimum size (in bytes) of each split per rack.
127 * This applies to data that is left over after combining data on a single
128 * rack into splits that are of maximum size specified by maxSplitSize.
129 * This leftover data will be combined into its own split if its size
130 * exceeds minSplitSizeRack.
131 */
132 protected void setMinSplitSizeRack(long minSplitSizeRack) {
133 this.minSplitSizeRack = minSplitSizeRack;
134 }
135
136 /**
137 * Create a new pool and add the filters to it.
138 * A split cannot have files from different pools.
139 */
140 protected void createPool(List<PathFilter> filters) {
141 pools.add(new MultiPathFilter(filters));
142 }
143
144 /**
145 * Create a new pool and add the filters to it.
146 * A pathname can satisfy any one of the specified filters.
147 * A split cannot have files from different pools.
148 */
149 protected void createPool(PathFilter... filters) {
150 MultiPathFilter multi = new MultiPathFilter();
151 for (PathFilter f: filters) {
152 multi.add(f);
153 }
154 pools.add(multi);
155 }
156
157 @Override
158 protected boolean isSplitable(JobContext context, Path file) {
159 final CompressionCodec codec =
160 new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
161 if (null == codec) {
162 return true;
163 }
164 return codec instanceof SplittableCompressionCodec;
165 }
166
167 /**
168 * default constructor
169 */
170 public CombineFileInputFormat() {
171 }
172
173 @Override
174 public List<InputSplit> getSplits(JobContext job)
175 throws IOException {
176 long minSizeNode = 0;
177 long minSizeRack = 0;
178 long maxSize = 0;
179 Configuration conf = job.getConfiguration();
180
181 // the values specified by setxxxSplitSize() takes precedence over the
182 // values that might have been specified in the config
183 if (minSplitSizeNode != 0) {
184 minSizeNode = minSplitSizeNode;
185 } else {
186 minSizeNode = conf.getLong(SPLIT_MINSIZE_PERNODE, 0);
187 }
188 if (minSplitSizeRack != 0) {
189 minSizeRack = minSplitSizeRack;
190 } else {
191 minSizeRack = conf.getLong(SPLIT_MINSIZE_PERRACK, 0);
192 }
193 if (maxSplitSize != 0) {
194 maxSize = maxSplitSize;
195 } else {
196 maxSize = conf.getLong("mapreduce.input.fileinputformat.split.maxsize", 0);
197 // If maxSize is not configured, a single split will be generated per
198 // node.
199 }
200 if (minSizeNode != 0 && maxSize != 0 && minSizeNode > maxSize) {
201 throw new IOException("Minimum split size pernode " + minSizeNode +
202 " cannot be larger than maximum split size " +
203 maxSize);
204 }
205 if (minSizeRack != 0 && maxSize != 0 && minSizeRack > maxSize) {
206 throw new IOException("Minimum split size per rack " + minSizeRack +
207 " cannot be larger than maximum split size " +
208 maxSize);
209 }
210 if (minSizeRack != 0 && minSizeNode > minSizeRack) {
211 throw new IOException("Minimum split size per node " + minSizeNode +
212 " cannot be larger than minimum split " +
213 "size per rack " + minSizeRack);
214 }
215
216 // all the files in input set
217 List<FileStatus> stats = listStatus(job);
218 List<InputSplit> splits = new ArrayList<InputSplit>();
219 if (stats.size() == 0) {
220 return splits;
221 }
222
223 // In one single iteration, process all the paths in a single pool.
224 // Processing one pool at a time ensures that a split contains paths
225 // from a single pool only.
226 for (MultiPathFilter onepool : pools) {
227 ArrayList<FileStatus> myPaths = new ArrayList<FileStatus>();
228
229 // pick one input path. If it matches all the filters in a pool,
230 // add it to the output set
231 for (Iterator<FileStatus> iter = stats.iterator(); iter.hasNext();) {
232 FileStatus p = iter.next();
233 if (onepool.accept(p.getPath())) {
234 myPaths.add(p); // add it to my output set
235 iter.remove();
236 }
237 }
238 // create splits for all files in this pool.
239 getMoreSplits(job, myPaths, maxSize, minSizeNode, minSizeRack, splits);
240 }
241
242 // create splits for all files that are not in any pool.
243 getMoreSplits(job, stats, maxSize, minSizeNode, minSizeRack, splits);
244
245 // free up rackToNodes map
246 rackToNodes.clear();
247 return splits;
248 }
249
250 /**
251 * Return all the splits in the specified set of paths
252 */
253 private void getMoreSplits(JobContext job, List<FileStatus> stats,
254 long maxSize, long minSizeNode, long minSizeRack,
255 List<InputSplit> splits)
256 throws IOException {
257 Configuration conf = job.getConfiguration();
258
259 // all blocks for all the files in input set
260 OneFileInfo[] files;
261
262 // mapping from a rack name to the list of blocks it has
263 HashMap<String, List<OneBlockInfo>> rackToBlocks =
264 new HashMap<String, List<OneBlockInfo>>();
265
266 // mapping from a block to the nodes on which it has replicas
267 HashMap<OneBlockInfo, String[]> blockToNodes =
268 new HashMap<OneBlockInfo, String[]>();
269
270 // mapping from a node to the list of blocks that it contains
271 HashMap<String, Set<OneBlockInfo>> nodeToBlocks =
272 new HashMap<String, Set<OneBlockInfo>>();
273
274 files = new OneFileInfo[stats.size()];
275 if (stats.size() == 0) {
276 return;
277 }
278
279 // populate all the blocks for all files
280 long totLength = 0;
281 int i = 0;
282 for (FileStatus stat : stats) {
283 files[i] = new OneFileInfo(stat, conf, isSplitable(job, stat.getPath()),
284 rackToBlocks, blockToNodes, nodeToBlocks,
285 rackToNodes, maxSize);
286 totLength += files[i].getLength();
287 }
288 createSplits(nodeToBlocks, blockToNodes, rackToBlocks, totLength,
289 maxSize, minSizeNode, minSizeRack, splits);
290 }
291
292 @VisibleForTesting
293 void createSplits(Map<String, Set<OneBlockInfo>> nodeToBlocks,
294 Map<OneBlockInfo, String[]> blockToNodes,
295 Map<String, List<OneBlockInfo>> rackToBlocks,
296 long totLength,
297 long maxSize,
298 long minSizeNode,
299 long minSizeRack,
300 List<InputSplit> splits
301 ) {
302 ArrayList<OneBlockInfo> validBlocks = new ArrayList<OneBlockInfo>();
303 long curSplitSize = 0;
304
305 int totalNodes = nodeToBlocks.size();
306 long totalLength = totLength;
307
308 Multiset<String> splitsPerNode = HashMultiset.create();
309 Set<String> completedNodes = new HashSet<String>();
310
311 while(true) {
312 // it is allowed for maxSize to be 0. Disable smoothing load for such cases
313
314 // process all nodes and create splits that are local to a node. Generate
315 // one split per node iteration, and walk over nodes multiple times to
316 // distribute the splits across nodes.
317 for (Iterator<Map.Entry<String, Set<OneBlockInfo>>> iter = nodeToBlocks
318 .entrySet().iterator(); iter.hasNext();) {
319 Map.Entry<String, Set<OneBlockInfo>> one = iter.next();
320
321 String node = one.getKey();
322
323 // Skip the node if it has previously been marked as completed.
324 if (completedNodes.contains(node)) {
325 continue;
326 }
327
328 Set<OneBlockInfo> blocksInCurrentNode = one.getValue();
329
330 // for each block, copy it into validBlocks. Delete it from
331 // blockToNodes so that the same block does not appear in
332 // two different splits.
333 Iterator<OneBlockInfo> oneBlockIter = blocksInCurrentNode.iterator();
334 while (oneBlockIter.hasNext()) {
335 OneBlockInfo oneblock = oneBlockIter.next();
336
337 // Remove all blocks which may already have been assigned to other
338 // splits.
339 if(!blockToNodes.containsKey(oneblock)) {
340 oneBlockIter.remove();
341 continue;
342 }
343
344 validBlocks.add(oneblock);
345 blockToNodes.remove(oneblock);
346 curSplitSize += oneblock.length;
347
348 // if the accumulated split size exceeds the maximum, then
349 // create this split.
350 if (maxSize != 0 && curSplitSize >= maxSize) {
351 // create an input split and add it to the splits array
352 addCreatedSplit(splits, Collections.singleton(node), validBlocks);
353 totalLength -= curSplitSize;
354 curSplitSize = 0;
355
356 splitsPerNode.add(node);
357
358 // Remove entries from blocksInNode so that we don't walk these
359 // again.
360 blocksInCurrentNode.removeAll(validBlocks);
361 validBlocks.clear();
362
363 // Done creating a single split for this node. Move on to the next
364 // node so that splits are distributed across nodes.
365 break;
366 }
367
368 }
369 if (validBlocks.size() != 0) {
370 // This implies that the last few blocks (or all in case maxSize=0)
371 // were not part of a split. The node is complete.
372
373 // if there were any blocks left over and their combined size is
374 // larger than minSplitNode, then combine them into one split.
375 // Otherwise add them back to the unprocessed pool. It is likely
376 // that they will be combined with other blocks from the
377 // same rack later on.
378 // This condition also kicks in when max split size is not set. All
379 // blocks on a node will be grouped together into a single split.
380 if (minSizeNode != 0 && curSplitSize >= minSizeNode
381 && splitsPerNode.count(node) == 0) {
382 // haven't created any split on this machine. so its ok to add a
383 // smaller one for parallelism. Otherwise group it in the rack for
384 // balanced size create an input split and add it to the splits
385 // array
386 addCreatedSplit(splits, Collections.singleton(node), validBlocks);
387 totalLength -= curSplitSize;
388 splitsPerNode.add(node);
389 // Remove entries from blocksInNode so that we don't walk this again.
390 blocksInCurrentNode.removeAll(validBlocks);
391 // The node is done. This was the last set of blocks for this node.
392 } else {
393 // Put the unplaced blocks back into the pool for later rack-allocation.
394 for (OneBlockInfo oneblock : validBlocks) {
395 blockToNodes.put(oneblock, oneblock.hosts);
396 }
397 }
398 validBlocks.clear();
399 curSplitSize = 0;
400 completedNodes.add(node);
401 } else { // No in-flight blocks.
402 if (blocksInCurrentNode.size() == 0) {
403 // Node is done. All blocks were fit into node-local splits.
404 completedNodes.add(node);
405 } // else Run through the node again.
406 }
407 }
408
409 // Check if node-local assignments are complete.
410 if (completedNodes.size() == totalNodes || totalLength == 0) {
411 // All nodes have been walked over and marked as completed or all blocks
412 // have been assigned. The rest should be handled via rackLock assignment.
413 LOG.info("DEBUG: Terminated node allocation with : CompletedNodes: "
414 + completedNodes.size() + ", size left: " + totalLength);
415 break;
416 }
417 }
418
419 // if blocks in a rack are below the specified minimum size, then keep them
420 // in 'overflow'. After the processing of all racks is complete, these
421 // overflow blocks will be combined into splits.
422 ArrayList<OneBlockInfo> overflowBlocks = new ArrayList<OneBlockInfo>();
423 Set<String> racks = new HashSet<String>();
424
425 // Process all racks over and over again until there is no more work to do.
426 while (blockToNodes.size() > 0) {
427
428 // Create one split for this rack before moving over to the next rack.
429 // Come back to this rack after creating a single split for each of the
430 // remaining racks.
431 // Process one rack location at a time, Combine all possible blocks that
432 // reside on this rack as one split. (constrained by minimum and maximum
433 // split size).
434
435 // iterate over all racks
436 for (Iterator<Map.Entry<String, List<OneBlockInfo>>> iter =
437 rackToBlocks.entrySet().iterator(); iter.hasNext();) {
438
439 Map.Entry<String, List<OneBlockInfo>> one = iter.next();
440 racks.add(one.getKey());
441 List<OneBlockInfo> blocks = one.getValue();
442
443 // for each block, copy it into validBlocks. Delete it from
444 // blockToNodes so that the same block does not appear in
445 // two different splits.
446 boolean createdSplit = false;
447 for (OneBlockInfo oneblock : blocks) {
448 if (blockToNodes.containsKey(oneblock)) {
449 validBlocks.add(oneblock);
450 blockToNodes.remove(oneblock);
451 curSplitSize += oneblock.length;
452
453 // if the accumulated split size exceeds the maximum, then
454 // create this split.
455 if (maxSize != 0 && curSplitSize >= maxSize) {
456 // create an input split and add it to the splits array
457 addCreatedSplit(splits, getHosts(racks), validBlocks);
458 createdSplit = true;
459 break;
460 }
461 }
462 }
463
464 // if we created a split, then just go to the next rack
465 if (createdSplit) {
466 curSplitSize = 0;
467 validBlocks.clear();
468 racks.clear();
469 continue;
470 }
471
472 if (!validBlocks.isEmpty()) {
473 if (minSizeRack != 0 && curSplitSize >= minSizeRack) {
474 // if there is a minimum size specified, then create a single split
475 // otherwise, store these blocks into overflow data structure
476 addCreatedSplit(splits, getHosts(racks), validBlocks);
477 } else {
478 // There were a few blocks in this rack that
479 // remained to be processed. Keep them in 'overflow' block list.
480 // These will be combined later.
481 overflowBlocks.addAll(validBlocks);
482 }
483 }
484 curSplitSize = 0;
485 validBlocks.clear();
486 racks.clear();
487 }
488 }
489
490 assert blockToNodes.isEmpty();
491 assert curSplitSize == 0;
492 assert validBlocks.isEmpty();
493 assert racks.isEmpty();
494
495 // Process all overflow blocks
496 for (OneBlockInfo oneblock : overflowBlocks) {
497 validBlocks.add(oneblock);
498 curSplitSize += oneblock.length;
499
500 // This might cause an exiting rack location to be re-added,
501 // but it should be ok.
502 for (int i = 0; i < oneblock.racks.length; i++) {
503 racks.add(oneblock.racks[i]);
504 }
505
506 // if the accumulated split size exceeds the maximum, then
507 // create this split.
508 if (maxSize != 0 && curSplitSize >= maxSize) {
509 // create an input split and add it to the splits array
510 addCreatedSplit(splits, getHosts(racks), validBlocks);
511 curSplitSize = 0;
512 validBlocks.clear();
513 racks.clear();
514 }
515 }
516
517 // Process any remaining blocks, if any.
518 if (!validBlocks.isEmpty()) {
519 addCreatedSplit(splits, getHosts(racks), validBlocks);
520 }
521 }
522
523 /**
524 * Create a single split from the list of blocks specified in validBlocks
525 * Add this new split into splitList.
526 */
527 private void addCreatedSplit(List<InputSplit> splitList,
528 Collection<String> locations,
529 ArrayList<OneBlockInfo> validBlocks) {
530 // create an input split
531 Path[] fl = new Path[validBlocks.size()];
532 long[] offset = new long[validBlocks.size()];
533 long[] length = new long[validBlocks.size()];
534 for (int i = 0; i < validBlocks.size(); i++) {
535 fl[i] = validBlocks.get(i).onepath;
536 offset[i] = validBlocks.get(i).offset;
537 length[i] = validBlocks.get(i).length;
538 }
539 // add this split to the list that is returned
540 CombineFileSplit thissplit = new CombineFileSplit(fl, offset,
541 length, locations.toArray(new String[0]));
542 splitList.add(thissplit);
543 }
544
545 /**
546 * This is not implemented yet.
547 */
548 public abstract RecordReader<K, V> createRecordReader(InputSplit split,
549 TaskAttemptContext context) throws IOException;
550
551 /**
552 * information about one file from the File System
553 */
554 @VisibleForTesting
555 static class OneFileInfo {
556 private long fileSize; // size of the file
557 private OneBlockInfo[] blocks; // all blocks in this file
558
559 OneFileInfo(FileStatus stat, Configuration conf,
560 boolean isSplitable,
561 HashMap<String, List<OneBlockInfo>> rackToBlocks,
562 HashMap<OneBlockInfo, String[]> blockToNodes,
563 HashMap<String, Set<OneBlockInfo>> nodeToBlocks,
564 HashMap<String, Set<String>> rackToNodes,
565 long maxSize)
566 throws IOException {
567 this.fileSize = 0;
568
569 // get block locations from file system
570 BlockLocation[] locations;
571 if (stat instanceof LocatedFileStatus) {
572 locations = ((LocatedFileStatus) stat).getBlockLocations();
573 } else {
574 FileSystem fs = stat.getPath().getFileSystem(conf);
575 locations = fs.getFileBlockLocations(stat, 0, stat.getLen());
576 }
577 // create a list of all block and their locations
578 if (locations == null) {
579 blocks = new OneBlockInfo[0];
580 } else {
581
582 if(locations.length == 0 && !stat.isDirectory()) {
583 locations = new BlockLocation[] { new BlockLocation() };
584 }
585
586 if (!isSplitable) {
587 // if the file is not splitable, just create the one block with
588 // full file length
589 blocks = new OneBlockInfo[1];
590 fileSize = stat.getLen();
591 blocks[0] = new OneBlockInfo(stat.getPath(), 0, fileSize,
592 locations[0].getHosts(), locations[0].getTopologyPaths());
593 } else {
594 ArrayList<OneBlockInfo> blocksList = new ArrayList<OneBlockInfo>(
595 locations.length);
596 for (int i = 0; i < locations.length; i++) {
597 fileSize += locations[i].getLength();
598
599 // each split can be a maximum of maxSize
600 long left = locations[i].getLength();
601 long myOffset = locations[i].getOffset();
602 long myLength = 0;
603 do {
604 if (maxSize == 0) {
605 myLength = left;
606 } else {
607 if (left > maxSize && left < 2 * maxSize) {
608 // if remainder is between max and 2*max - then
609 // instead of creating splits of size max, left-max we
610 // create splits of size left/2 and left/2. This is
611 // a heuristic to avoid creating really really small
612 // splits.
613 myLength = left / 2;
614 } else {
615 myLength = Math.min(maxSize, left);
616 }
617 }
618 OneBlockInfo oneblock = new OneBlockInfo(stat.getPath(),
619 myOffset, myLength, locations[i].getHosts(),
620 locations[i].getTopologyPaths());
621 left -= myLength;
622 myOffset += myLength;
623
624 blocksList.add(oneblock);
625 } while (left > 0);
626 }
627 blocks = blocksList.toArray(new OneBlockInfo[blocksList.size()]);
628 }
629
630 populateBlockInfo(blocks, rackToBlocks, blockToNodes,
631 nodeToBlocks, rackToNodes);
632 }
633 }
634
635 @VisibleForTesting
636 static void populateBlockInfo(OneBlockInfo[] blocks,
637 Map<String, List<OneBlockInfo>> rackToBlocks,
638 Map<OneBlockInfo, String[]> blockToNodes,
639 Map<String, Set<OneBlockInfo>> nodeToBlocks,
640 Map<String, Set<String>> rackToNodes) {
641 for (OneBlockInfo oneblock : blocks) {
642 // add this block to the block --> node locations map
643 blockToNodes.put(oneblock, oneblock.hosts);
644
645 // For blocks that do not have host/rack information,
646 // assign to default rack.
647 String[] racks = null;
648 if (oneblock.hosts.length == 0) {
649 racks = new String[]{NetworkTopology.DEFAULT_RACK};
650 } else {
651 racks = oneblock.racks;
652 }
653
654 // add this block to the rack --> block map
655 for (int j = 0; j < racks.length; j++) {
656 String rack = racks[j];
657 List<OneBlockInfo> blklist = rackToBlocks.get(rack);
658 if (blklist == null) {
659 blklist = new ArrayList<OneBlockInfo>();
660 rackToBlocks.put(rack, blklist);
661 }
662 blklist.add(oneblock);
663 if (!racks[j].equals(NetworkTopology.DEFAULT_RACK)) {
664 // Add this host to rackToNodes map
665 addHostToRack(rackToNodes, racks[j], oneblock.hosts[j]);
666 }
667 }
668
669 // add this block to the node --> block map
670 for (int j = 0; j < oneblock.hosts.length; j++) {
671 String node = oneblock.hosts[j];
672 Set<OneBlockInfo> blklist = nodeToBlocks.get(node);
673 if (blklist == null) {
674 blklist = new LinkedHashSet<OneBlockInfo>();
675 nodeToBlocks.put(node, blklist);
676 }
677 blklist.add(oneblock);
678 }
679 }
680 }
681
682 long getLength() {
683 return fileSize;
684 }
685
686 OneBlockInfo[] getBlocks() {
687 return blocks;
688 }
689 }
690
691 /**
692 * information about one block from the File System
693 */
694 @VisibleForTesting
695 static class OneBlockInfo {
696 Path onepath; // name of this file
697 long offset; // offset in file
698 long length; // length of this block
699 String[] hosts; // nodes on which this block resides
700 String[] racks; // network topology of hosts
701
702 OneBlockInfo(Path path, long offset, long len,
703 String[] hosts, String[] topologyPaths) {
704 this.onepath = path;
705 this.offset = offset;
706 this.hosts = hosts;
707 this.length = len;
708 assert (hosts.length == topologyPaths.length ||
709 topologyPaths.length == 0);
710
711 // if the file system does not have any rack information, then
712 // use dummy rack location.
713 if (topologyPaths.length == 0) {
714 topologyPaths = new String[hosts.length];
715 for (int i = 0; i < topologyPaths.length; i++) {
716 topologyPaths[i] = (new NodeBase(hosts[i],
717 NetworkTopology.DEFAULT_RACK)).toString();
718 }
719 }
720
721 // The topology paths have the host name included as the last
722 // component. Strip it.
723 this.racks = new String[topologyPaths.length];
724 for (int i = 0; i < topologyPaths.length; i++) {
725 this.racks[i] = (new NodeBase(topologyPaths[i])).getNetworkLocation();
726 }
727 }
728 }
729
730 protected BlockLocation[] getFileBlockLocations(
731 FileSystem fs, FileStatus stat) throws IOException {
732 if (stat instanceof LocatedFileStatus) {
733 return ((LocatedFileStatus) stat).getBlockLocations();
734 }
735 return fs.getFileBlockLocations(stat, 0, stat.getLen());
736 }
737
738 private static void addHostToRack(Map<String, Set<String>> rackToNodes,
739 String rack, String host) {
740 Set<String> hosts = rackToNodes.get(rack);
741 if (hosts == null) {
742 hosts = new HashSet<String>();
743 rackToNodes.put(rack, hosts);
744 }
745 hosts.add(host);
746 }
747
748 private Set<String> getHosts(Set<String> racks) {
749 Set<String> hosts = new HashSet<String>();
750 for (String rack : racks) {
751 if (rackToNodes.containsKey(rack)) {
752 hosts.addAll(rackToNodes.get(rack));
753 }
754 }
755 return hosts;
756 }
757
758 /**
759 * Accept a path only if any one of filters given in the
760 * constructor do.
761 */
762 private static class MultiPathFilter implements PathFilter {
763 private List<PathFilter> filters;
764
765 public MultiPathFilter() {
766 this.filters = new ArrayList<PathFilter>();
767 }
768
769 public MultiPathFilter(List<PathFilter> filters) {
770 this.filters = filters;
771 }
772
773 public void add(PathFilter one) {
774 filters.add(one);
775 }
776
777 public boolean accept(Path path) {
778 for (PathFilter filter : filters) {
779 if (filter.accept(path)) {
780 return true;
781 }
782 }
783 return false;
784 }
785
786 public String toString() {
787 StringBuffer buf = new StringBuffer();
788 buf.append("[");
789 for (PathFilter f: filters) {
790 buf.append(f);
791 buf.append(",");
792 }
793 buf.append("]");
794 return buf.toString();
795 }
796 }
797 }