001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.mapred; 020 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.Collections; 024import java.util.Comparator; 025import java.util.HashSet; 026import java.util.IdentityHashMap; 027import java.util.LinkedList; 028import java.util.List; 029import java.util.Map; 030import java.util.Set; 031import java.util.concurrent.TimeUnit; 032 033import org.apache.commons.logging.Log; 034import org.apache.commons.logging.LogFactory; 035import org.apache.hadoop.classification.InterfaceAudience; 036import org.apache.hadoop.classification.InterfaceStability; 037import org.apache.hadoop.fs.BlockLocation; 038import org.apache.hadoop.fs.FileStatus; 039import org.apache.hadoop.fs.FileSystem; 040import org.apache.hadoop.fs.LocatedFileStatus; 041import org.apache.hadoop.fs.Path; 042import org.apache.hadoop.fs.PathFilter; 043import org.apache.hadoop.fs.RemoteIterator; 044import org.apache.hadoop.mapreduce.security.TokenCache; 045import org.apache.hadoop.net.NetworkTopology; 046import org.apache.hadoop.net.Node; 047import org.apache.hadoop.net.NodeBase; 048import org.apache.hadoop.util.ReflectionUtils; 049import org.apache.hadoop.util.StopWatch; 050import org.apache.hadoop.util.StringUtils; 051 052import com.google.common.collect.Iterables; 053 054/** 055 * A base class for file-based {@link InputFormat}. 056 * 057 * <p><code>FileInputFormat</code> is the base class for all file-based 058 * <code>InputFormat</code>s. This provides a generic implementation of 059 * {@link #getSplits(JobConf, int)}. 060 * Subclasses of <code>FileInputFormat</code> can also override the 061 * {@link #isSplitable(FileSystem, Path)} method to ensure input-files are 062 * not split-up and are processed as a whole by {@link Mapper}s. 063 */ 064@InterfaceAudience.Public 065@InterfaceStability.Stable 066public abstract class FileInputFormat<K, V> implements InputFormat<K, V> { 067 068 public static final Log LOG = 069 LogFactory.getLog(FileInputFormat.class); 070 071 @Deprecated 072 public static enum Counter { 073 BYTES_READ 074 } 075 076 public static final String NUM_INPUT_FILES = 077 org.apache.hadoop.mapreduce.lib.input.FileInputFormat.NUM_INPUT_FILES; 078 079 public static final String INPUT_DIR_RECURSIVE = 080 org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR_RECURSIVE; 081 082 083 private static final double SPLIT_SLOP = 1.1; // 10% slop 084 085 private long minSplitSize = 1; 086 private static final PathFilter hiddenFileFilter = new PathFilter(){ 087 public boolean accept(Path p){ 088 String name = p.getName(); 089 return !name.startsWith("_") && !name.startsWith("."); 090 } 091 }; 092 protected void setMinSplitSize(long minSplitSize) { 093 this.minSplitSize = minSplitSize; 094 } 095 096 /** 097 * Proxy PathFilter that accepts a path only if all filters given in the 098 * constructor do. Used by the listPaths() to apply the built-in 099 * hiddenFileFilter together with a user provided one (if any). 100 */ 101 private static class MultiPathFilter implements PathFilter { 102 private List<PathFilter> filters; 103 104 public MultiPathFilter(List<PathFilter> filters) { 105 this.filters = filters; 106 } 107 108 public boolean accept(Path path) { 109 for (PathFilter filter : filters) { 110 if (!filter.accept(path)) { 111 return false; 112 } 113 } 114 return true; 115 } 116 } 117 118 /** 119 * Is the given filename splitable? Usually, true, but if the file is 120 * stream compressed, it will not be. 121 * 122 * <code>FileInputFormat</code> implementations can override this and return 123 * <code>false</code> to ensure that individual input files are never split-up 124 * so that {@link Mapper}s process entire files. 125 * 126 * @param fs the file system that the file is on 127 * @param filename the file name to check 128 * @return is this file splitable? 129 */ 130 protected boolean isSplitable(FileSystem fs, Path filename) { 131 return true; 132 } 133 134 public abstract RecordReader<K, V> getRecordReader(InputSplit split, 135 JobConf job, 136 Reporter reporter) 137 throws IOException; 138 139 /** 140 * Set a PathFilter to be applied to the input paths for the map-reduce job. 141 * 142 * @param filter the PathFilter class use for filtering the input paths. 143 */ 144 public static void setInputPathFilter(JobConf conf, 145 Class<? extends PathFilter> filter) { 146 conf.setClass(org.apache.hadoop.mapreduce.lib.input. 147 FileInputFormat.PATHFILTER_CLASS, filter, PathFilter.class); 148 } 149 150 /** 151 * Get a PathFilter instance of the filter set for the input paths. 152 * 153 * @return the PathFilter instance set for the job, NULL if none has been set. 154 */ 155 public static PathFilter getInputPathFilter(JobConf conf) { 156 Class<? extends PathFilter> filterClass = conf.getClass( 157 org.apache.hadoop.mapreduce.lib.input.FileInputFormat.PATHFILTER_CLASS, 158 null, PathFilter.class); 159 return (filterClass != null) ? 160 ReflectionUtils.newInstance(filterClass, conf) : null; 161 } 162 163 /** 164 * Add files in the input path recursively into the results. 165 * @param result 166 * The List to store all files. 167 * @param fs 168 * The FileSystem. 169 * @param path 170 * The input path. 171 * @param inputFilter 172 * The input filter that can be used to filter files/dirs. 173 * @throws IOException 174 */ 175 protected void addInputPathRecursively(List<FileStatus> result, 176 FileSystem fs, Path path, PathFilter inputFilter) 177 throws IOException { 178 RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(path); 179 while (iter.hasNext()) { 180 LocatedFileStatus stat = iter.next(); 181 if (inputFilter.accept(stat.getPath())) { 182 if (stat.isDirectory()) { 183 addInputPathRecursively(result, fs, stat.getPath(), inputFilter); 184 } else { 185 result.add(stat); 186 } 187 } 188 } 189 } 190 191 /** List input directories. 192 * Subclasses may override to, e.g., select only files matching a regular 193 * expression. 194 * 195 * @param job the job to list input paths for 196 * @return array of FileStatus objects 197 * @throws IOException if zero items. 198 */ 199 protected FileStatus[] listStatus(JobConf job) throws IOException { 200 Path[] dirs = getInputPaths(job); 201 if (dirs.length == 0) { 202 throw new IOException("No input paths specified in job"); 203 } 204 205 // get tokens for all the required FileSystems.. 206 TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs, job); 207 208 // Whether we need to recursive look into the directory structure 209 boolean recursive = job.getBoolean(INPUT_DIR_RECURSIVE, false); 210 211 // creates a MultiPathFilter with the hiddenFileFilter and the 212 // user provided one (if any). 213 List<PathFilter> filters = new ArrayList<PathFilter>(); 214 filters.add(hiddenFileFilter); 215 PathFilter jobFilter = getInputPathFilter(job); 216 if (jobFilter != null) { 217 filters.add(jobFilter); 218 } 219 PathFilter inputFilter = new MultiPathFilter(filters); 220 221 FileStatus[] result; 222 int numThreads = job 223 .getInt( 224 org.apache.hadoop.mapreduce.lib.input.FileInputFormat.LIST_STATUS_NUM_THREADS, 225 org.apache.hadoop.mapreduce.lib.input.FileInputFormat.DEFAULT_LIST_STATUS_NUM_THREADS); 226 227 StopWatch sw = new StopWatch().start(); 228 if (numThreads == 1) { 229 List<FileStatus> locatedFiles = singleThreadedListStatus(job, dirs, inputFilter, recursive); 230 result = locatedFiles.toArray(new FileStatus[locatedFiles.size()]); 231 } else { 232 Iterable<FileStatus> locatedFiles = null; 233 try { 234 235 LocatedFileStatusFetcher locatedFileStatusFetcher = new LocatedFileStatusFetcher( 236 job, dirs, recursive, inputFilter, false); 237 locatedFiles = locatedFileStatusFetcher.getFileStatuses(); 238 } catch (InterruptedException e) { 239 throw new IOException("Interrupted while getting file statuses"); 240 } 241 result = Iterables.toArray(locatedFiles, FileStatus.class); 242 } 243 244 sw.stop(); 245 if (LOG.isDebugEnabled()) { 246 LOG.debug("Time taken to get FileStatuses: " 247 + sw.now(TimeUnit.MILLISECONDS)); 248 } 249 LOG.info("Total input paths to process : " + result.length); 250 return result; 251 } 252 253 private List<FileStatus> singleThreadedListStatus(JobConf job, Path[] dirs, 254 PathFilter inputFilter, boolean recursive) throws IOException { 255 List<FileStatus> result = new ArrayList<FileStatus>(); 256 List<IOException> errors = new ArrayList<IOException>(); 257 for (Path p: dirs) { 258 FileSystem fs = p.getFileSystem(job); 259 FileStatus[] matches = fs.globStatus(p, inputFilter); 260 if (matches == null) { 261 errors.add(new IOException("Input path does not exist: " + p)); 262 } else if (matches.length == 0) { 263 errors.add(new IOException("Input Pattern " + p + " matches 0 files")); 264 } else { 265 for (FileStatus globStat: matches) { 266 if (globStat.isDirectory()) { 267 RemoteIterator<LocatedFileStatus> iter = 268 fs.listLocatedStatus(globStat.getPath()); 269 while (iter.hasNext()) { 270 LocatedFileStatus stat = iter.next(); 271 if (inputFilter.accept(stat.getPath())) { 272 if (recursive && stat.isDirectory()) { 273 addInputPathRecursively(result, fs, stat.getPath(), 274 inputFilter); 275 } else { 276 result.add(stat); 277 } 278 } 279 } 280 } else { 281 result.add(globStat); 282 } 283 } 284 } 285 } 286 if (!errors.isEmpty()) { 287 throw new InvalidInputException(errors); 288 } 289 return result; 290 } 291 292 /** 293 * A factory that makes the split for this class. It can be overridden 294 * by sub-classes to make sub-types 295 */ 296 protected FileSplit makeSplit(Path file, long start, long length, 297 String[] hosts) { 298 return new FileSplit(file, start, length, hosts); 299 } 300 301 /** 302 * A factory that makes the split for this class. It can be overridden 303 * by sub-classes to make sub-types 304 */ 305 protected FileSplit makeSplit(Path file, long start, long length, 306 String[] hosts, String[] inMemoryHosts) { 307 return new FileSplit(file, start, length, hosts, inMemoryHosts); 308 } 309 310 /** Splits files returned by {@link #listStatus(JobConf)} when 311 * they're too big.*/ 312 public InputSplit[] getSplits(JobConf job, int numSplits) 313 throws IOException { 314 StopWatch sw = new StopWatch().start(); 315 FileStatus[] files = listStatus(job); 316 317 // Save the number of input files for metrics/loadgen 318 job.setLong(NUM_INPUT_FILES, files.length); 319 long totalSize = 0; // compute total size 320 for (FileStatus file: files) { // check we have valid files 321 if (file.isDirectory()) { 322 throw new IOException("Not a file: "+ file.getPath()); 323 } 324 totalSize += file.getLen(); 325 } 326 327 long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits); 328 long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input. 329 FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize); 330 331 // generate splits 332 ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits); 333 NetworkTopology clusterMap = new NetworkTopology(); 334 for (FileStatus file: files) { 335 Path path = file.getPath(); 336 long length = file.getLen(); 337 if (length != 0) { 338 FileSystem fs = path.getFileSystem(job); 339 BlockLocation[] blkLocations; 340 if (file instanceof LocatedFileStatus) { 341 blkLocations = ((LocatedFileStatus) file).getBlockLocations(); 342 } else { 343 blkLocations = fs.getFileBlockLocations(file, 0, length); 344 } 345 if (isSplitable(fs, path)) { 346 long blockSize = file.getBlockSize(); 347 long splitSize = computeSplitSize(goalSize, minSize, blockSize); 348 349 long bytesRemaining = length; 350 while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { 351 String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, 352 length-bytesRemaining, splitSize, clusterMap); 353 splits.add(makeSplit(path, length-bytesRemaining, splitSize, 354 splitHosts[0], splitHosts[1])); 355 bytesRemaining -= splitSize; 356 } 357 358 if (bytesRemaining != 0) { 359 String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length 360 - bytesRemaining, bytesRemaining, clusterMap); 361 splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining, 362 splitHosts[0], splitHosts[1])); 363 } 364 } else { 365 if (LOG.isDebugEnabled()) { 366 // Log only if the file is big enough to be splitted 367 if (length > Math.min(file.getBlockSize(), minSize)) { 368 LOG.debug("File is not splittable so no parallelization " 369 + "is possible: " + file.getPath()); 370 } 371 } 372 String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,0,length,clusterMap); 373 splits.add(makeSplit(path, 0, length, splitHosts[0], splitHosts[1])); 374 } 375 } else { 376 //Create empty hosts array for zero length files 377 splits.add(makeSplit(path, 0, length, new String[0])); 378 } 379 } 380 sw.stop(); 381 if (LOG.isDebugEnabled()) { 382 LOG.debug("Total # of splits generated by getSplits: " + splits.size() 383 + ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS)); 384 } 385 return splits.toArray(new FileSplit[splits.size()]); 386 } 387 388 protected long computeSplitSize(long goalSize, long minSize, 389 long blockSize) { 390 return Math.max(minSize, Math.min(goalSize, blockSize)); 391 } 392 393 protected int getBlockIndex(BlockLocation[] blkLocations, 394 long offset) { 395 for (int i = 0 ; i < blkLocations.length; i++) { 396 // is the offset inside this block? 397 if ((blkLocations[i].getOffset() <= offset) && 398 (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())){ 399 return i; 400 } 401 } 402 BlockLocation last = blkLocations[blkLocations.length -1]; 403 long fileLength = last.getOffset() + last.getLength() -1; 404 throw new IllegalArgumentException("Offset " + offset + 405 " is outside of file (0.." + 406 fileLength + ")"); 407 } 408 409 /** 410 * Sets the given comma separated paths as the list of inputs 411 * for the map-reduce job. 412 * 413 * @param conf Configuration of the job 414 * @param commaSeparatedPaths Comma separated paths to be set as 415 * the list of inputs for the map-reduce job. 416 */ 417 public static void setInputPaths(JobConf conf, String commaSeparatedPaths) { 418 setInputPaths(conf, StringUtils.stringToPath( 419 getPathStrings(commaSeparatedPaths))); 420 } 421 422 /** 423 * Add the given comma separated paths to the list of inputs for 424 * the map-reduce job. 425 * 426 * @param conf The configuration of the job 427 * @param commaSeparatedPaths Comma separated paths to be added to 428 * the list of inputs for the map-reduce job. 429 */ 430 public static void addInputPaths(JobConf conf, String commaSeparatedPaths) { 431 for (String str : getPathStrings(commaSeparatedPaths)) { 432 addInputPath(conf, new Path(str)); 433 } 434 } 435 436 /** 437 * Set the array of {@link Path}s as the list of inputs 438 * for the map-reduce job. 439 * 440 * @param conf Configuration of the job. 441 * @param inputPaths the {@link Path}s of the input directories/files 442 * for the map-reduce job. 443 */ 444 public static void setInputPaths(JobConf conf, Path... inputPaths) { 445 Path path = new Path(conf.getWorkingDirectory(), inputPaths[0]); 446 StringBuffer str = new StringBuffer(StringUtils.escapeString(path.toString())); 447 for(int i = 1; i < inputPaths.length;i++) { 448 str.append(StringUtils.COMMA_STR); 449 path = new Path(conf.getWorkingDirectory(), inputPaths[i]); 450 str.append(StringUtils.escapeString(path.toString())); 451 } 452 conf.set(org.apache.hadoop.mapreduce.lib.input. 453 FileInputFormat.INPUT_DIR, str.toString()); 454 } 455 456 /** 457 * Add a {@link Path} to the list of inputs for the map-reduce job. 458 * 459 * @param conf The configuration of the job 460 * @param path {@link Path} to be added to the list of inputs for 461 * the map-reduce job. 462 */ 463 public static void addInputPath(JobConf conf, Path path ) { 464 path = new Path(conf.getWorkingDirectory(), path); 465 String dirStr = StringUtils.escapeString(path.toString()); 466 String dirs = conf.get(org.apache.hadoop.mapreduce.lib.input. 467 FileInputFormat.INPUT_DIR); 468 conf.set(org.apache.hadoop.mapreduce.lib.input. 469 FileInputFormat.INPUT_DIR, dirs == null ? dirStr : 470 dirs + StringUtils.COMMA_STR + dirStr); 471 } 472 473 // This method escapes commas in the glob pattern of the given paths. 474 private static String[] getPathStrings(String commaSeparatedPaths) { 475 int length = commaSeparatedPaths.length(); 476 int curlyOpen = 0; 477 int pathStart = 0; 478 boolean globPattern = false; 479 List<String> pathStrings = new ArrayList<String>(); 480 481 for (int i=0; i<length; i++) { 482 char ch = commaSeparatedPaths.charAt(i); 483 switch(ch) { 484 case '{' : { 485 curlyOpen++; 486 if (!globPattern) { 487 globPattern = true; 488 } 489 break; 490 } 491 case '}' : { 492 curlyOpen--; 493 if (curlyOpen == 0 && globPattern) { 494 globPattern = false; 495 } 496 break; 497 } 498 case ',' : { 499 if (!globPattern) { 500 pathStrings.add(commaSeparatedPaths.substring(pathStart, i)); 501 pathStart = i + 1 ; 502 } 503 break; 504 } 505 default: 506 continue; // nothing special to do for this character 507 } 508 } 509 pathStrings.add(commaSeparatedPaths.substring(pathStart, length)); 510 511 return pathStrings.toArray(new String[0]); 512 } 513 514 /** 515 * Get the list of input {@link Path}s for the map-reduce job. 516 * 517 * @param conf The configuration of the job 518 * @return the list of input {@link Path}s for the map-reduce job. 519 */ 520 public static Path[] getInputPaths(JobConf conf) { 521 String dirs = conf.get(org.apache.hadoop.mapreduce.lib.input. 522 FileInputFormat.INPUT_DIR, ""); 523 String [] list = StringUtils.split(dirs); 524 Path[] result = new Path[list.length]; 525 for (int i = 0; i < list.length; i++) { 526 result[i] = new Path(StringUtils.unEscapeString(list[i])); 527 } 528 return result; 529 } 530 531 532 private void sortInDescendingOrder(List<NodeInfo> mylist) { 533 Collections.sort(mylist, new Comparator<NodeInfo> () { 534 public int compare(NodeInfo obj1, NodeInfo obj2) { 535 536 if (obj1 == null || obj2 == null) 537 return -1; 538 539 if (obj1.getValue() == obj2.getValue()) { 540 return 0; 541 } 542 else { 543 return ((obj1.getValue() < obj2.getValue()) ? 1 : -1); 544 } 545 } 546 } 547 ); 548 } 549 550 /** 551 * This function identifies and returns the hosts that contribute 552 * most for a given split. For calculating the contribution, rack 553 * locality is treated on par with host locality, so hosts from racks 554 * that contribute the most are preferred over hosts on racks that 555 * contribute less 556 * @param blkLocations The list of block locations 557 * @param offset 558 * @param splitSize 559 * @return an array of hosts that contribute most to this split 560 * @throws IOException 561 */ 562 protected String[] getSplitHosts(BlockLocation[] blkLocations, 563 long offset, long splitSize, NetworkTopology clusterMap) throws IOException { 564 return getSplitHostsAndCachedHosts(blkLocations, offset, splitSize, 565 clusterMap)[0]; 566 } 567 568 /** 569 * This function identifies and returns the hosts that contribute 570 * most for a given split. For calculating the contribution, rack 571 * locality is treated on par with host locality, so hosts from racks 572 * that contribute the most are preferred over hosts on racks that 573 * contribute less 574 * @param blkLocations The list of block locations 575 * @param offset 576 * @param splitSize 577 * @return two arrays - one of hosts that contribute most to this split, and 578 * one of hosts that contribute most to this split that have the data 579 * cached on them 580 * @throws IOException 581 */ 582 private String[][] getSplitHostsAndCachedHosts(BlockLocation[] blkLocations, 583 long offset, long splitSize, NetworkTopology clusterMap) 584 throws IOException { 585 586 int startIndex = getBlockIndex(blkLocations, offset); 587 588 long bytesInThisBlock = blkLocations[startIndex].getOffset() + 589 blkLocations[startIndex].getLength() - offset; 590 591 //If this is the only block, just return 592 if (bytesInThisBlock >= splitSize) { 593 return new String[][] { blkLocations[startIndex].getHosts(), 594 blkLocations[startIndex].getCachedHosts() }; 595 } 596 597 long bytesInFirstBlock = bytesInThisBlock; 598 int index = startIndex + 1; 599 splitSize -= bytesInThisBlock; 600 601 while (splitSize > 0) { 602 bytesInThisBlock = 603 Math.min(splitSize, blkLocations[index++].getLength()); 604 splitSize -= bytesInThisBlock; 605 } 606 607 long bytesInLastBlock = bytesInThisBlock; 608 int endIndex = index - 1; 609 610 Map <Node,NodeInfo> hostsMap = new IdentityHashMap<Node,NodeInfo>(); 611 Map <Node,NodeInfo> racksMap = new IdentityHashMap<Node,NodeInfo>(); 612 String [] allTopos = new String[0]; 613 614 // Build the hierarchy and aggregate the contribution of 615 // bytes at each level. See TestGetSplitHosts.java 616 617 for (index = startIndex; index <= endIndex; index++) { 618 619 // Establish the bytes in this block 620 if (index == startIndex) { 621 bytesInThisBlock = bytesInFirstBlock; 622 } 623 else if (index == endIndex) { 624 bytesInThisBlock = bytesInLastBlock; 625 } 626 else { 627 bytesInThisBlock = blkLocations[index].getLength(); 628 } 629 630 allTopos = blkLocations[index].getTopologyPaths(); 631 632 // If no topology information is available, just 633 // prefix a fakeRack 634 if (allTopos.length == 0) { 635 allTopos = fakeRacks(blkLocations, index); 636 } 637 638 // NOTE: This code currently works only for one level of 639 // hierarchy (rack/host). However, it is relatively easy 640 // to extend this to support aggregation at different 641 // levels 642 643 for (String topo: allTopos) { 644 645 Node node, parentNode; 646 NodeInfo nodeInfo, parentNodeInfo; 647 648 node = clusterMap.getNode(topo); 649 650 if (node == null) { 651 node = new NodeBase(topo); 652 clusterMap.add(node); 653 } 654 655 nodeInfo = hostsMap.get(node); 656 657 if (nodeInfo == null) { 658 nodeInfo = new NodeInfo(node); 659 hostsMap.put(node,nodeInfo); 660 parentNode = node.getParent(); 661 parentNodeInfo = racksMap.get(parentNode); 662 if (parentNodeInfo == null) { 663 parentNodeInfo = new NodeInfo(parentNode); 664 racksMap.put(parentNode,parentNodeInfo); 665 } 666 parentNodeInfo.addLeaf(nodeInfo); 667 } 668 else { 669 nodeInfo = hostsMap.get(node); 670 parentNode = node.getParent(); 671 parentNodeInfo = racksMap.get(parentNode); 672 } 673 674 nodeInfo.addValue(index, bytesInThisBlock); 675 parentNodeInfo.addValue(index, bytesInThisBlock); 676 677 } // for all topos 678 679 } // for all indices 680 681 // We don't yet support cached hosts when bytesInThisBlock > splitSize 682 return new String[][] { identifyHosts(allTopos.length, racksMap), 683 new String[0]}; 684 } 685 686 private String[] identifyHosts(int replicationFactor, 687 Map<Node,NodeInfo> racksMap) { 688 689 String [] retVal = new String[replicationFactor]; 690 691 List <NodeInfo> rackList = new LinkedList<NodeInfo>(); 692 693 rackList.addAll(racksMap.values()); 694 695 // Sort the racks based on their contribution to this split 696 sortInDescendingOrder(rackList); 697 698 boolean done = false; 699 int index = 0; 700 701 // Get the host list for all our aggregated items, sort 702 // them and return the top entries 703 for (NodeInfo ni: rackList) { 704 705 Set<NodeInfo> hostSet = ni.getLeaves(); 706 707 List<NodeInfo>hostList = new LinkedList<NodeInfo>(); 708 hostList.addAll(hostSet); 709 710 // Sort the hosts in this rack based on their contribution 711 sortInDescendingOrder(hostList); 712 713 for (NodeInfo host: hostList) { 714 // Strip out the port number from the host name 715 retVal[index++] = host.node.getName().split(":")[0]; 716 if (index == replicationFactor) { 717 done = true; 718 break; 719 } 720 } 721 722 if (done == true) { 723 break; 724 } 725 } 726 return retVal; 727 } 728 729 private String[] fakeRacks(BlockLocation[] blkLocations, int index) 730 throws IOException { 731 String[] allHosts = blkLocations[index].getHosts(); 732 String[] allTopos = new String[allHosts.length]; 733 for (int i = 0; i < allHosts.length; i++) { 734 allTopos[i] = NetworkTopology.DEFAULT_RACK + "/" + allHosts[i]; 735 } 736 return allTopos; 737 } 738 739 740 private static class NodeInfo { 741 final Node node; 742 final Set<Integer> blockIds; 743 final Set<NodeInfo> leaves; 744 745 private long value; 746 747 NodeInfo(Node node) { 748 this.node = node; 749 blockIds = new HashSet<Integer>(); 750 leaves = new HashSet<NodeInfo>(); 751 } 752 753 long getValue() {return value;} 754 755 void addValue(int blockIndex, long value) { 756 if (blockIds.add(blockIndex) == true) { 757 this.value += value; 758 } 759 } 760 761 Set<NodeInfo> getLeaves() { return leaves;} 762 763 void addLeaf(NodeInfo nodeInfo) { 764 leaves.add(nodeInfo); 765 } 766 } 767}