001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.mapreduce.lib.input; 020 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.List; 024 025import java.util.concurrent.TimeUnit; 026import org.apache.commons.logging.Log; 027import org.apache.commons.logging.LogFactory; 028import org.apache.hadoop.classification.InterfaceAudience; 029import org.apache.hadoop.classification.InterfaceStability; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.fs.FileStatus; 032import org.apache.hadoop.fs.FileSystem; 033import org.apache.hadoop.fs.LocatedFileStatus; 034import org.apache.hadoop.fs.Path; 035import org.apache.hadoop.fs.PathFilter; 036import org.apache.hadoop.fs.BlockLocation; 037import org.apache.hadoop.fs.RemoteIterator; 038import org.apache.hadoop.mapred.LocatedFileStatusFetcher; 039import org.apache.hadoop.mapred.SplitLocationInfo; 040import org.apache.hadoop.mapreduce.InputFormat; 041import org.apache.hadoop.mapreduce.InputSplit; 042import org.apache.hadoop.mapreduce.Job; 043import org.apache.hadoop.mapreduce.JobContext; 044import org.apache.hadoop.mapreduce.Mapper; 045import org.apache.hadoop.mapreduce.security.TokenCache; 046import org.apache.hadoop.util.ReflectionUtils; 047import org.apache.hadoop.util.StopWatch; 048import org.apache.hadoop.util.StringUtils; 049 050import com.google.common.collect.Lists; 051 052/** 053 * A base class for file-based {@link InputFormat}s. 054 * 055 * <p><code>FileInputFormat</code> is the base class for all file-based 056 * <code>InputFormat</code>s. This provides a generic implementation of 057 * {@link #getSplits(JobContext)}. 058 * Subclasses of <code>FileInputFormat</code> can also override the 059 * {@link #isSplitable(JobContext, Path)} method to ensure input-files are 060 * not split-up and are processed as a whole by {@link Mapper}s. 061 */ 062@InterfaceAudience.Public 063@InterfaceStability.Stable 064public abstract class FileInputFormat<K, V> extends InputFormat<K, V> { 065 public static final String INPUT_DIR = 066 "mapreduce.input.fileinputformat.inputdir"; 067 public static final String SPLIT_MAXSIZE = 068 "mapreduce.input.fileinputformat.split.maxsize"; 069 public static final String SPLIT_MINSIZE = 070 "mapreduce.input.fileinputformat.split.minsize"; 071 public static final String PATHFILTER_CLASS = 072 "mapreduce.input.pathFilter.class"; 073 public static final String NUM_INPUT_FILES = 074 "mapreduce.input.fileinputformat.numinputfiles"; 075 public static final String INPUT_DIR_RECURSIVE = 076 "mapreduce.input.fileinputformat.input.dir.recursive"; 077 public static final String LIST_STATUS_NUM_THREADS = 078 "mapreduce.input.fileinputformat.list-status.num-threads"; 079 public static final int DEFAULT_LIST_STATUS_NUM_THREADS = 1; 080 081 private static final Log LOG = LogFactory.getLog(FileInputFormat.class); 082 083 private static final double SPLIT_SLOP = 1.1; // 10% slop 084 085 @Deprecated 086 public static enum Counter { 087 BYTES_READ 088 } 089 090 private static final PathFilter hiddenFileFilter = new PathFilter(){ 091 public boolean accept(Path p){ 092 String name = p.getName(); 093 return !name.startsWith("_") && !name.startsWith("."); 094 } 095 }; 096 097 /** 098 * Proxy PathFilter that accepts a path only if all filters given in the 099 * constructor do. Used by the listPaths() to apply the built-in 100 * hiddenFileFilter together with a user provided one (if any). 101 */ 102 private static class MultiPathFilter implements PathFilter { 103 private List<PathFilter> filters; 104 105 public MultiPathFilter(List<PathFilter> filters) { 106 this.filters = filters; 107 } 108 109 public boolean accept(Path path) { 110 for (PathFilter filter : filters) { 111 if (!filter.accept(path)) { 112 return false; 113 } 114 } 115 return true; 116 } 117 } 118 119 /** 120 * @param job 121 * the job to modify 122 * @param inputDirRecursive 123 */ 124 public static void setInputDirRecursive(Job job, 125 boolean inputDirRecursive) { 126 job.getConfiguration().setBoolean(INPUT_DIR_RECURSIVE, 127 inputDirRecursive); 128 } 129 130 /** 131 * @param job 132 * the job to look at. 133 * @return should the files to be read recursively? 134 */ 135 public static boolean getInputDirRecursive(JobContext job) { 136 return job.getConfiguration().getBoolean(INPUT_DIR_RECURSIVE, 137 false); 138 } 139 140 /** 141 * Get the lower bound on split size imposed by the format. 142 * @return the number of bytes of the minimal split for this format 143 */ 144 protected long getFormatMinSplitSize() { 145 return 1; 146 } 147 148 /** 149 * Is the given filename splitable? Usually, true, but if the file is 150 * stream compressed, it will not be. 151 * 152 * <code>FileInputFormat</code> implementations can override this and return 153 * <code>false</code> to ensure that individual input files are never split-up 154 * so that {@link Mapper}s process entire files. 155 * 156 * @param context the job context 157 * @param filename the file name to check 158 * @return is this file splitable? 159 */ 160 protected boolean isSplitable(JobContext context, Path filename) { 161 return true; 162 } 163 164 /** 165 * Set a PathFilter to be applied to the input paths for the map-reduce job. 166 * @param job the job to modify 167 * @param filter the PathFilter class use for filtering the input paths. 168 */ 169 public static void setInputPathFilter(Job job, 170 Class<? extends PathFilter> filter) { 171 job.getConfiguration().setClass(PATHFILTER_CLASS, filter, 172 PathFilter.class); 173 } 174 175 /** 176 * Set the minimum input split size 177 * @param job the job to modify 178 * @param size the minimum size 179 */ 180 public static void setMinInputSplitSize(Job job, 181 long size) { 182 job.getConfiguration().setLong(SPLIT_MINSIZE, size); 183 } 184 185 /** 186 * Get the minimum split size 187 * @param job the job 188 * @return the minimum number of bytes that can be in a split 189 */ 190 public static long getMinSplitSize(JobContext job) { 191 return job.getConfiguration().getLong(SPLIT_MINSIZE, 1L); 192 } 193 194 /** 195 * Set the maximum split size 196 * @param job the job to modify 197 * @param size the maximum split size 198 */ 199 public static void setMaxInputSplitSize(Job job, 200 long size) { 201 job.getConfiguration().setLong(SPLIT_MAXSIZE, size); 202 } 203 204 /** 205 * Get the maximum split size. 206 * @param context the job to look at. 207 * @return the maximum number of bytes a split can include 208 */ 209 public static long getMaxSplitSize(JobContext context) { 210 return context.getConfiguration().getLong(SPLIT_MAXSIZE, 211 Long.MAX_VALUE); 212 } 213 214 /** 215 * Get a PathFilter instance of the filter set for the input paths. 216 * 217 * @return the PathFilter instance set for the job, NULL if none has been set. 218 */ 219 public static PathFilter getInputPathFilter(JobContext context) { 220 Configuration conf = context.getConfiguration(); 221 Class<?> filterClass = conf.getClass(PATHFILTER_CLASS, null, 222 PathFilter.class); 223 return (filterClass != null) ? 224 (PathFilter) ReflectionUtils.newInstance(filterClass, conf) : null; 225 } 226 227 /** List input directories. 228 * Subclasses may override to, e.g., select only files matching a regular 229 * expression. 230 * 231 * @param job the job to list input paths for 232 * @return array of FileStatus objects 233 * @throws IOException if zero items. 234 */ 235 protected List<FileStatus> listStatus(JobContext job 236 ) throws IOException { 237 Path[] dirs = getInputPaths(job); 238 if (dirs.length == 0) { 239 throw new IOException("No input paths specified in job"); 240 } 241 242 // get tokens for all the required FileSystems.. 243 TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs, 244 job.getConfiguration()); 245 246 // Whether we need to recursive look into the directory structure 247 boolean recursive = getInputDirRecursive(job); 248 249 // creates a MultiPathFilter with the hiddenFileFilter and the 250 // user provided one (if any). 251 List<PathFilter> filters = new ArrayList<PathFilter>(); 252 filters.add(hiddenFileFilter); 253 PathFilter jobFilter = getInputPathFilter(job); 254 if (jobFilter != null) { 255 filters.add(jobFilter); 256 } 257 PathFilter inputFilter = new MultiPathFilter(filters); 258 259 List<FileStatus> result = null; 260 261 int numThreads = job.getConfiguration().getInt(LIST_STATUS_NUM_THREADS, 262 DEFAULT_LIST_STATUS_NUM_THREADS); 263 StopWatch sw = new StopWatch().start(); 264 if (numThreads == 1) { 265 result = singleThreadedListStatus(job, dirs, inputFilter, recursive); 266 } else { 267 Iterable<FileStatus> locatedFiles = null; 268 try { 269 LocatedFileStatusFetcher locatedFileStatusFetcher = new LocatedFileStatusFetcher( 270 job.getConfiguration(), dirs, recursive, inputFilter, true); 271 locatedFiles = locatedFileStatusFetcher.getFileStatuses(); 272 } catch (InterruptedException e) { 273 throw new IOException("Interrupted while getting file statuses"); 274 } 275 result = Lists.newArrayList(locatedFiles); 276 } 277 278 sw.stop(); 279 if (LOG.isDebugEnabled()) { 280 LOG.debug("Time taken to get FileStatuses: " 281 + sw.now(TimeUnit.MILLISECONDS)); 282 } 283 LOG.info("Total input paths to process : " + result.size()); 284 return result; 285 } 286 287 private List<FileStatus> singleThreadedListStatus(JobContext job, Path[] dirs, 288 PathFilter inputFilter, boolean recursive) throws IOException { 289 List<FileStatus> result = new ArrayList<FileStatus>(); 290 List<IOException> errors = new ArrayList<IOException>(); 291 for (int i=0; i < dirs.length; ++i) { 292 Path p = dirs[i]; 293 FileSystem fs = p.getFileSystem(job.getConfiguration()); 294 FileStatus[] matches = fs.globStatus(p, inputFilter); 295 if (matches == null) { 296 errors.add(new IOException("Input path does not exist: " + p)); 297 } else if (matches.length == 0) { 298 errors.add(new IOException("Input Pattern " + p + " matches 0 files")); 299 } else { 300 for (FileStatus globStat: matches) { 301 if (globStat.isDirectory()) { 302 RemoteIterator<LocatedFileStatus> iter = 303 fs.listLocatedStatus(globStat.getPath()); 304 while (iter.hasNext()) { 305 LocatedFileStatus stat = iter.next(); 306 if (inputFilter.accept(stat.getPath())) { 307 if (recursive && stat.isDirectory()) { 308 addInputPathRecursively(result, fs, stat.getPath(), 309 inputFilter); 310 } else { 311 result.add(stat); 312 } 313 } 314 } 315 } else { 316 result.add(globStat); 317 } 318 } 319 } 320 } 321 322 if (!errors.isEmpty()) { 323 throw new InvalidInputException(errors); 324 } 325 return result; 326 } 327 328 /** 329 * Add files in the input path recursively into the results. 330 * @param result 331 * The List to store all files. 332 * @param fs 333 * The FileSystem. 334 * @param path 335 * The input path. 336 * @param inputFilter 337 * The input filter that can be used to filter files/dirs. 338 * @throws IOException 339 */ 340 protected void addInputPathRecursively(List<FileStatus> result, 341 FileSystem fs, Path path, PathFilter inputFilter) 342 throws IOException { 343 RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(path); 344 while (iter.hasNext()) { 345 LocatedFileStatus stat = iter.next(); 346 if (inputFilter.accept(stat.getPath())) { 347 if (stat.isDirectory()) { 348 addInputPathRecursively(result, fs, stat.getPath(), inputFilter); 349 } else { 350 result.add(stat); 351 } 352 } 353 } 354 } 355 356 357 /** 358 * A factory that makes the split for this class. It can be overridden 359 * by sub-classes to make sub-types 360 */ 361 protected FileSplit makeSplit(Path file, long start, long length, 362 String[] hosts) { 363 return new FileSplit(file, start, length, hosts); 364 } 365 366 /** 367 * A factory that makes the split for this class. It can be overridden 368 * by sub-classes to make sub-types 369 */ 370 protected FileSplit makeSplit(Path file, long start, long length, 371 String[] hosts, String[] inMemoryHosts) { 372 return new FileSplit(file, start, length, hosts, inMemoryHosts); 373 } 374 375 /** 376 * Generate the list of files and make them into FileSplits. 377 * @param job the job context 378 * @throws IOException 379 */ 380 public List<InputSplit> getSplits(JobContext job) throws IOException { 381 StopWatch sw = new StopWatch().start(); 382 long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); 383 long maxSize = getMaxSplitSize(job); 384 385 // generate splits 386 List<InputSplit> splits = new ArrayList<InputSplit>(); 387 List<FileStatus> files = listStatus(job); 388 for (FileStatus file: files) { 389 Path path = file.getPath(); 390 long length = file.getLen(); 391 if (length != 0) { 392 BlockLocation[] blkLocations; 393 if (file instanceof LocatedFileStatus) { 394 blkLocations = ((LocatedFileStatus) file).getBlockLocations(); 395 } else { 396 FileSystem fs = path.getFileSystem(job.getConfiguration()); 397 blkLocations = fs.getFileBlockLocations(file, 0, length); 398 } 399 if (isSplitable(job, path)) { 400 long blockSize = file.getBlockSize(); 401 long splitSize = computeSplitSize(blockSize, minSize, maxSize); 402 403 long bytesRemaining = length; 404 while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { 405 int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); 406 splits.add(makeSplit(path, length-bytesRemaining, splitSize, 407 blkLocations[blkIndex].getHosts(), 408 blkLocations[blkIndex].getCachedHosts())); 409 bytesRemaining -= splitSize; 410 } 411 412 if (bytesRemaining != 0) { 413 int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); 414 splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining, 415 blkLocations[blkIndex].getHosts(), 416 blkLocations[blkIndex].getCachedHosts())); 417 } 418 } else { // not splitable 419 if (LOG.isDebugEnabled()) { 420 // Log only if the file is big enough to be splitted 421 if (length > Math.min(file.getBlockSize(), minSize)) { 422 LOG.debug("File is not splittable so no parallelization " 423 + "is possible: " + file.getPath()); 424 } 425 } 426 splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(), 427 blkLocations[0].getCachedHosts())); 428 } 429 } else { 430 //Create empty hosts array for zero length files 431 splits.add(makeSplit(path, 0, length, new String[0])); 432 } 433 } 434 // Save the number of input files for metrics/loadgen 435 job.getConfiguration().setLong(NUM_INPUT_FILES, files.size()); 436 sw.stop(); 437 if (LOG.isDebugEnabled()) { 438 LOG.debug("Total # of splits generated by getSplits: " + splits.size() 439 + ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS)); 440 } 441 return splits; 442 } 443 444 protected long computeSplitSize(long blockSize, long minSize, 445 long maxSize) { 446 return Math.max(minSize, Math.min(maxSize, blockSize)); 447 } 448 449 protected int getBlockIndex(BlockLocation[] blkLocations, 450 long offset) { 451 for (int i = 0 ; i < blkLocations.length; i++) { 452 // is the offset inside this block? 453 if ((blkLocations[i].getOffset() <= offset) && 454 (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())){ 455 return i; 456 } 457 } 458 BlockLocation last = blkLocations[blkLocations.length -1]; 459 long fileLength = last.getOffset() + last.getLength() -1; 460 throw new IllegalArgumentException("Offset " + offset + 461 " is outside of file (0.." + 462 fileLength + ")"); 463 } 464 465 /** 466 * Sets the given comma separated paths as the list of inputs 467 * for the map-reduce job. 468 * 469 * @param job the job 470 * @param commaSeparatedPaths Comma separated paths to be set as 471 * the list of inputs for the map-reduce job. 472 */ 473 public static void setInputPaths(Job job, 474 String commaSeparatedPaths 475 ) throws IOException { 476 setInputPaths(job, StringUtils.stringToPath( 477 getPathStrings(commaSeparatedPaths))); 478 } 479 480 /** 481 * Add the given comma separated paths to the list of inputs for 482 * the map-reduce job. 483 * 484 * @param job The job to modify 485 * @param commaSeparatedPaths Comma separated paths to be added to 486 * the list of inputs for the map-reduce job. 487 */ 488 public static void addInputPaths(Job job, 489 String commaSeparatedPaths 490 ) throws IOException { 491 for (String str : getPathStrings(commaSeparatedPaths)) { 492 addInputPath(job, new Path(str)); 493 } 494 } 495 496 /** 497 * Set the array of {@link Path}s as the list of inputs 498 * for the map-reduce job. 499 * 500 * @param job The job to modify 501 * @param inputPaths the {@link Path}s of the input directories/files 502 * for the map-reduce job. 503 */ 504 public static void setInputPaths(Job job, 505 Path... inputPaths) throws IOException { 506 Configuration conf = job.getConfiguration(); 507 Path path = inputPaths[0].getFileSystem(conf).makeQualified(inputPaths[0]); 508 StringBuffer str = new StringBuffer(StringUtils.escapeString(path.toString())); 509 for(int i = 1; i < inputPaths.length;i++) { 510 str.append(StringUtils.COMMA_STR); 511 path = inputPaths[i].getFileSystem(conf).makeQualified(inputPaths[i]); 512 str.append(StringUtils.escapeString(path.toString())); 513 } 514 conf.set(INPUT_DIR, str.toString()); 515 } 516 517 /** 518 * Add a {@link Path} to the list of inputs for the map-reduce job. 519 * 520 * @param job The {@link Job} to modify 521 * @param path {@link Path} to be added to the list of inputs for 522 * the map-reduce job. 523 */ 524 public static void addInputPath(Job job, 525 Path path) throws IOException { 526 Configuration conf = job.getConfiguration(); 527 path = path.getFileSystem(conf).makeQualified(path); 528 String dirStr = StringUtils.escapeString(path.toString()); 529 String dirs = conf.get(INPUT_DIR); 530 conf.set(INPUT_DIR, dirs == null ? dirStr : dirs + "," + dirStr); 531 } 532 533 // This method escapes commas in the glob pattern of the given paths. 534 private static String[] getPathStrings(String commaSeparatedPaths) { 535 int length = commaSeparatedPaths.length(); 536 int curlyOpen = 0; 537 int pathStart = 0; 538 boolean globPattern = false; 539 List<String> pathStrings = new ArrayList<String>(); 540 541 for (int i=0; i<length; i++) { 542 char ch = commaSeparatedPaths.charAt(i); 543 switch(ch) { 544 case '{' : { 545 curlyOpen++; 546 if (!globPattern) { 547 globPattern = true; 548 } 549 break; 550 } 551 case '}' : { 552 curlyOpen--; 553 if (curlyOpen == 0 && globPattern) { 554 globPattern = false; 555 } 556 break; 557 } 558 case ',' : { 559 if (!globPattern) { 560 pathStrings.add(commaSeparatedPaths.substring(pathStart, i)); 561 pathStart = i + 1 ; 562 } 563 break; 564 } 565 default: 566 continue; // nothing special to do for this character 567 } 568 } 569 pathStrings.add(commaSeparatedPaths.substring(pathStart, length)); 570 571 return pathStrings.toArray(new String[0]); 572 } 573 574 /** 575 * Get the list of input {@link Path}s for the map-reduce job. 576 * 577 * @param context The job 578 * @return the list of input {@link Path}s for the map-reduce job. 579 */ 580 public static Path[] getInputPaths(JobContext context) { 581 String dirs = context.getConfiguration().get(INPUT_DIR, ""); 582 String [] list = StringUtils.split(dirs); 583 Path[] result = new Path[list.length]; 584 for (int i = 0; i < list.length; i++) { 585 result[i] = new Path(StringUtils.unEscapeString(list[i])); 586 } 587 return result; 588 } 589 590}