001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.mapreduce.lib.output; 019 020import org.apache.hadoop.classification.InterfaceAudience; 021import org.apache.hadoop.classification.InterfaceStability; 022import org.apache.hadoop.conf.Configuration; 023import org.apache.hadoop.io.Text; 024import org.apache.hadoop.mapreduce.*; 025import org.apache.hadoop.mapreduce.Reducer.Context; 026import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; 027import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; 028import org.apache.hadoop.util.ReflectionUtils; 029 030import java.io.IOException; 031import java.util.*; 032 033/** 034 * The MultipleOutputs class simplifies writing output data 035 * to multiple outputs 036 * 037 * <p> 038 * Case one: writing to additional outputs other than the job default output. 039 * 040 * Each additional output, or named output, may be configured with its own 041 * <code>OutputFormat</code>, with its own key class and with its own value 042 * class. 043 * </p> 044 * 045 * <p> 046 * Case two: to write data to different files provided by user 047 * </p> 048 * 049 * <p> 050 * MultipleOutputs supports counters, by default they are disabled. The 051 * counters group is the {@link MultipleOutputs} class name. The names of the 052 * counters are the same as the output name. These count the number records 053 * written to each output name. 054 * </p> 055 * 056 * Usage pattern for job submission: 057 * <pre> 058 * 059 * Job job = new Job(); 060 * 061 * FileInputFormat.setInputPath(job, inDir); 062 * FileOutputFormat.setOutputPath(job, outDir); 063 * 064 * job.setMapperClass(MOMap.class); 065 * job.setReducerClass(MOReduce.class); 066 * ... 067 * 068 * // Defines additional single text based output 'text' for the job 069 * MultipleOutputs.addNamedOutput(job, "text", TextOutputFormat.class, 070 * LongWritable.class, Text.class); 071 * 072 * // Defines additional sequence-file based output 'sequence' for the job 073 * MultipleOutputs.addNamedOutput(job, "seq", 074 * SequenceFileOutputFormat.class, 075 * LongWritable.class, Text.class); 076 * ... 077 * 078 * job.waitForCompletion(true); 079 * ... 080 * </pre> 081 * <p> 082 * Usage in Reducer: 083 * <pre> 084 * <K, V> String generateFileName(K k, V v) { 085 * return k.toString() + "_" + v.toString(); 086 * } 087 * 088 * public class MOReduce extends 089 * Reducer<WritableComparable, Writable,WritableComparable, Writable> { 090 * private MultipleOutputs mos; 091 * public void setup(Context context) { 092 * ... 093 * mos = new MultipleOutputs(context); 094 * } 095 * 096 * public void reduce(WritableComparable key, Iterator<Writable> values, 097 * Context context) 098 * throws IOException { 099 * ... 100 * mos.write("text", , key, new Text("Hello")); 101 * mos.write("seq", LongWritable(1), new Text("Bye"), "seq_a"); 102 * mos.write("seq", LongWritable(2), key, new Text("Chau"), "seq_b"); 103 * mos.write(key, new Text("value"), generateFileName(key, new Text("value"))); 104 * ... 105 * } 106 * 107 * public void cleanup(Context) throws IOException { 108 * mos.close(); 109 * ... 110 * } 111 * 112 * } 113 * </pre> 114 * 115 * <p> 116 * When used in conjuction with org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat, 117 * MultipleOutputs can mimic the behaviour of MultipleTextOutputFormat and MultipleSequenceFileOutputFormat 118 * from the old Hadoop API - ie, output can be written from the Reducer to more than one location. 119 * </p> 120 * 121 * <p> 122 * Use <code>MultipleOutputs.write(KEYOUT key, VALUEOUT value, String baseOutputPath)</code> to write key and 123 * value to a path specified by <code>baseOutputPath</code>, with no need to specify a named output. 124 * <b>Warning</b>: when the baseOutputPath passed to MultipleOutputs.write 125 * is a path that resolves outside of the final job output directory, the 126 * directory is created immediately and then persists through subsequent 127 * task retries, breaking the concept of output committing: 128 * </p> 129 * 130 * <pre> 131 * private MultipleOutputs<Text, Text> out; 132 * 133 * public void setup(Context context) { 134 * out = new MultipleOutputs<Text, Text>(context); 135 * ... 136 * } 137 * 138 * public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { 139 * for (Text t : values) { 140 * out.write(key, t, generateFileName(<<i>parameter list...</i>>)); 141 * } 142 * } 143 * 144 * protected void cleanup(Context context) throws IOException, InterruptedException { 145 * out.close(); 146 * } 147 * </pre> 148 * 149 * <p> 150 * Use your own code in <code>generateFileName()</code> to create a custom path to your results. 151 * '/' characters in <code>baseOutputPath</code> will be translated into directory levels in your file system. 152 * Also, append your custom-generated path with "part" or similar, otherwise your output will be -00000, -00001 etc. 153 * No call to <code>context.write()</code> is necessary. See example <code>generateFileName()</code> code below. 154 * </p> 155 * 156 * <pre> 157 * private String generateFileName(Text k) { 158 * // expect Text k in format "Surname|Forename" 159 * String[] kStr = k.toString().split("\\|"); 160 * 161 * String sName = kStr[0]; 162 * String fName = kStr[1]; 163 * 164 * // example for k = Smith|John 165 * // output written to /user/hadoop/path/to/output/Smith/John-r-00000 (etc) 166 * return sName + "/" + fName; 167 * } 168 * </pre> 169 * 170 * <p> 171 * Using MultipleOutputs in this way will still create zero-sized default output, eg part-00000. 172 * To prevent this use <code>LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);</code> 173 * instead of <code>job.setOutputFormatClass(TextOutputFormat.class);</code> in your Hadoop job configuration. 174 * </p> 175 * 176 */ 177@InterfaceAudience.Public 178@InterfaceStability.Stable 179public class MultipleOutputs<KEYOUT, VALUEOUT> { 180 181 private static final String MULTIPLE_OUTPUTS = "mapreduce.multipleoutputs"; 182 183 private static final String MO_PREFIX = 184 "mapreduce.multipleoutputs.namedOutput."; 185 186 private static final String FORMAT = ".format"; 187 private static final String KEY = ".key"; 188 private static final String VALUE = ".value"; 189 private static final String COUNTERS_ENABLED = 190 "mapreduce.multipleoutputs.counters"; 191 192 /** 193 * Counters group used by the counters of MultipleOutputs. 194 */ 195 private static final String COUNTERS_GROUP = MultipleOutputs.class.getName(); 196 197 /** 198 * Cache for the taskContexts 199 */ 200 private Map<String, TaskAttemptContext> taskContexts = new HashMap<String, TaskAttemptContext>(); 201 /** 202 * Cached TaskAttemptContext which uses the job's configured settings 203 */ 204 private TaskAttemptContext jobOutputFormatContext; 205 206 /** 207 * Checks if a named output name is valid token. 208 * 209 * @param namedOutput named output Name 210 * @throws IllegalArgumentException if the output name is not valid. 211 */ 212 private static void checkTokenName(String namedOutput) { 213 if (namedOutput == null || namedOutput.length() == 0) { 214 throw new IllegalArgumentException( 215 "Name cannot be NULL or emtpy"); 216 } 217 for (char ch : namedOutput.toCharArray()) { 218 if ((ch >= 'A') && (ch <= 'Z')) { 219 continue; 220 } 221 if ((ch >= 'a') && (ch <= 'z')) { 222 continue; 223 } 224 if ((ch >= '0') && (ch <= '9')) { 225 continue; 226 } 227 throw new IllegalArgumentException( 228 "Name cannot be have a '" + ch + "' char"); 229 } 230 } 231 232 /** 233 * Checks if output name is valid. 234 * 235 * name cannot be the name used for the default output 236 * @param outputPath base output Name 237 * @throws IllegalArgumentException if the output name is not valid. 238 */ 239 private static void checkBaseOutputPath(String outputPath) { 240 if (outputPath.equals(FileOutputFormat.PART)) { 241 throw new IllegalArgumentException("output name cannot be 'part'"); 242 } 243 } 244 245 /** 246 * Checks if a named output name is valid. 247 * 248 * @param namedOutput named output Name 249 * @throws IllegalArgumentException if the output name is not valid. 250 */ 251 private static void checkNamedOutputName(JobContext job, 252 String namedOutput, boolean alreadyDefined) { 253 checkTokenName(namedOutput); 254 checkBaseOutputPath(namedOutput); 255 List<String> definedChannels = getNamedOutputsList(job); 256 if (alreadyDefined && definedChannels.contains(namedOutput)) { 257 throw new IllegalArgumentException("Named output '" + namedOutput + 258 "' already alreadyDefined"); 259 } else if (!alreadyDefined && !definedChannels.contains(namedOutput)) { 260 throw new IllegalArgumentException("Named output '" + namedOutput + 261 "' not defined"); 262 } 263 } 264 265 // Returns list of channel names. 266 private static List<String> getNamedOutputsList(JobContext job) { 267 List<String> names = new ArrayList<String>(); 268 StringTokenizer st = new StringTokenizer( 269 job.getConfiguration().get(MULTIPLE_OUTPUTS, ""), " "); 270 while (st.hasMoreTokens()) { 271 names.add(st.nextToken()); 272 } 273 return names; 274 } 275 276 // Returns the named output OutputFormat. 277 @SuppressWarnings("unchecked") 278 private static Class<? extends OutputFormat<?, ?>> getNamedOutputFormatClass( 279 JobContext job, String namedOutput) { 280 return (Class<? extends OutputFormat<?, ?>>) 281 job.getConfiguration().getClass(MO_PREFIX + namedOutput + FORMAT, null, 282 OutputFormat.class); 283 } 284 285 // Returns the key class for a named output. 286 private static Class<?> getNamedOutputKeyClass(JobContext job, 287 String namedOutput) { 288 return job.getConfiguration().getClass(MO_PREFIX + namedOutput + KEY, null, 289 Object.class); 290 } 291 292 // Returns the value class for a named output. 293 private static Class<?> getNamedOutputValueClass( 294 JobContext job, String namedOutput) { 295 return job.getConfiguration().getClass(MO_PREFIX + namedOutput + VALUE, 296 null, Object.class); 297 } 298 299 /** 300 * Adds a named output for the job. 301 * <p/> 302 * 303 * @param job job to add the named output 304 * @param namedOutput named output name, it has to be a word, letters 305 * and numbers only, cannot be the word 'part' as 306 * that is reserved for the default output. 307 * @param outputFormatClass OutputFormat class. 308 * @param keyClass key class 309 * @param valueClass value class 310 */ 311 @SuppressWarnings("unchecked") 312 public static void addNamedOutput(Job job, String namedOutput, 313 Class<? extends OutputFormat> outputFormatClass, 314 Class<?> keyClass, Class<?> valueClass) { 315 checkNamedOutputName(job, namedOutput, true); 316 Configuration conf = job.getConfiguration(); 317 conf.set(MULTIPLE_OUTPUTS, 318 conf.get(MULTIPLE_OUTPUTS, "") + " " + namedOutput); 319 conf.setClass(MO_PREFIX + namedOutput + FORMAT, outputFormatClass, 320 OutputFormat.class); 321 conf.setClass(MO_PREFIX + namedOutput + KEY, keyClass, Object.class); 322 conf.setClass(MO_PREFIX + namedOutput + VALUE, valueClass, Object.class); 323 } 324 325 /** 326 * Enables or disables counters for the named outputs. 327 * 328 * The counters group is the {@link MultipleOutputs} class name. 329 * The names of the counters are the same as the named outputs. These 330 * counters count the number records written to each output name. 331 * By default these counters are disabled. 332 * 333 * @param job job to enable counters 334 * @param enabled indicates if the counters will be enabled or not. 335 */ 336 public static void setCountersEnabled(Job job, boolean enabled) { 337 job.getConfiguration().setBoolean(COUNTERS_ENABLED, enabled); 338 } 339 340 /** 341 * Returns if the counters for the named outputs are enabled or not. 342 * By default these counters are disabled. 343 * 344 * @param job the job 345 * @return TRUE if the counters are enabled, FALSE if they are disabled. 346 */ 347 public static boolean getCountersEnabled(JobContext job) { 348 return job.getConfiguration().getBoolean(COUNTERS_ENABLED, false); 349 } 350 351 /** 352 * Wraps RecordWriter to increment counters. 353 */ 354 @SuppressWarnings("unchecked") 355 private static class RecordWriterWithCounter extends RecordWriter { 356 private RecordWriter writer; 357 private String counterName; 358 private TaskInputOutputContext context; 359 360 public RecordWriterWithCounter(RecordWriter writer, String counterName, 361 TaskInputOutputContext context) { 362 this.writer = writer; 363 this.counterName = counterName; 364 this.context = context; 365 } 366 367 @SuppressWarnings({"unchecked"}) 368 public void write(Object key, Object value) 369 throws IOException, InterruptedException { 370 context.getCounter(COUNTERS_GROUP, counterName).increment(1); 371 writer.write(key, value); 372 } 373 374 public void close(TaskAttemptContext context) 375 throws IOException, InterruptedException { 376 writer.close(context); 377 } 378 } 379 380 // instance code, to be used from Mapper/Reducer code 381 382 private TaskInputOutputContext<?, ?, KEYOUT, VALUEOUT> context; 383 private Set<String> namedOutputs; 384 private Map<String, RecordWriter<?, ?>> recordWriters; 385 private boolean countersEnabled; 386 387 /** 388 * Creates and initializes multiple outputs support, 389 * it should be instantiated in the Mapper/Reducer setup method. 390 * 391 * @param context the TaskInputOutputContext object 392 */ 393 public MultipleOutputs( 394 TaskInputOutputContext<?, ?, KEYOUT, VALUEOUT> context) { 395 this.context = context; 396 namedOutputs = Collections.unmodifiableSet( 397 new HashSet<String>(MultipleOutputs.getNamedOutputsList(context))); 398 recordWriters = new HashMap<String, RecordWriter<?, ?>>(); 399 countersEnabled = getCountersEnabled(context); 400 } 401 402 /** 403 * Write key and value to the namedOutput. 404 * 405 * Output path is a unique file generated for the namedOutput. 406 * For example, {namedOutput}-(m|r)-{part-number} 407 * 408 * @param namedOutput the named output name 409 * @param key the key 410 * @param value the value 411 */ 412 @SuppressWarnings("unchecked") 413 public <K, V> void write(String namedOutput, K key, V value) 414 throws IOException, InterruptedException { 415 write(namedOutput, key, value, namedOutput); 416 } 417 418 /** 419 * Write key and value to baseOutputPath using the namedOutput. 420 * 421 * @param namedOutput the named output name 422 * @param key the key 423 * @param value the value 424 * @param baseOutputPath base-output path to write the record to. 425 * Note: Framework will generate unique filename for the baseOutputPath 426 * <b>Warning</b>: when the baseOutputPath is a path that resolves 427 * outside of the final job output directory, the directory is created 428 * immediately and then persists through subsequent task retries, breaking 429 * the concept of output committing. 430 */ 431 @SuppressWarnings("unchecked") 432 public <K, V> void write(String namedOutput, K key, V value, 433 String baseOutputPath) throws IOException, InterruptedException { 434 checkNamedOutputName(context, namedOutput, false); 435 checkBaseOutputPath(baseOutputPath); 436 if (!namedOutputs.contains(namedOutput)) { 437 throw new IllegalArgumentException("Undefined named output '" + 438 namedOutput + "'"); 439 } 440 TaskAttemptContext taskContext = getContext(namedOutput); 441 getRecordWriter(taskContext, baseOutputPath).write(key, value); 442 } 443 444 /** 445 * Write key value to an output file name. 446 * 447 * Gets the record writer from job's output format. 448 * Job's output format should be a FileOutputFormat. 449 * 450 * @param key the key 451 * @param value the value 452 * @param baseOutputPath base-output path to write the record to. 453 * Note: Framework will generate unique filename for the baseOutputPath 454 * <b>Warning</b>: when the baseOutputPath is a path that resolves 455 * outside of the final job output directory, the directory is created 456 * immediately and then persists through subsequent task retries, breaking 457 * the concept of output committing. 458 */ 459 @SuppressWarnings("unchecked") 460 public void write(KEYOUT key, VALUEOUT value, String baseOutputPath) 461 throws IOException, InterruptedException { 462 checkBaseOutputPath(baseOutputPath); 463 if (jobOutputFormatContext == null) { 464 jobOutputFormatContext = 465 new TaskAttemptContextImpl(context.getConfiguration(), 466 context.getTaskAttemptID(), 467 new WrappedStatusReporter(context)); 468 } 469 getRecordWriter(jobOutputFormatContext, baseOutputPath).write(key, value); 470 } 471 472 // by being synchronized MultipleOutputTask can be use with a 473 // MultithreadedMapper. 474 @SuppressWarnings("unchecked") 475 private synchronized RecordWriter getRecordWriter( 476 TaskAttemptContext taskContext, String baseFileName) 477 throws IOException, InterruptedException { 478 479 // look for record-writer in the cache 480 RecordWriter writer = recordWriters.get(baseFileName); 481 482 // If not in cache, create a new one 483 if (writer == null) { 484 // get the record writer from context output format 485 FileOutputFormat.setOutputName(taskContext, baseFileName); 486 try { 487 writer = ((OutputFormat) ReflectionUtils.newInstance( 488 taskContext.getOutputFormatClass(), taskContext.getConfiguration())) 489 .getRecordWriter(taskContext); 490 } catch (ClassNotFoundException e) { 491 throw new IOException(e); 492 } 493 494 // if counters are enabled, wrap the writer with context 495 // to increment counters 496 if (countersEnabled) { 497 writer = new RecordWriterWithCounter(writer, baseFileName, context); 498 } 499 500 // add the record-writer to the cache 501 recordWriters.put(baseFileName, writer); 502 } 503 return writer; 504 } 505 506 // Create a taskAttemptContext for the named output with 507 // output format and output key/value types put in the context 508 private TaskAttemptContext getContext(String nameOutput) throws IOException { 509 510 TaskAttemptContext taskContext = taskContexts.get(nameOutput); 511 512 if (taskContext != null) { 513 return taskContext; 514 } 515 516 // The following trick leverages the instantiation of a record writer via 517 // the job thus supporting arbitrary output formats. 518 Job job = new Job(context.getConfiguration()); 519 job.setOutputFormatClass(getNamedOutputFormatClass(context, nameOutput)); 520 job.setOutputKeyClass(getNamedOutputKeyClass(context, nameOutput)); 521 job.setOutputValueClass(getNamedOutputValueClass(context, nameOutput)); 522 taskContext = new TaskAttemptContextImpl(job.getConfiguration(), context 523 .getTaskAttemptID(), new WrappedStatusReporter(context)); 524 525 taskContexts.put(nameOutput, taskContext); 526 527 return taskContext; 528 } 529 530 private static class WrappedStatusReporter extends StatusReporter { 531 532 TaskAttemptContext context; 533 534 public WrappedStatusReporter(TaskAttemptContext context) { 535 this.context = context; 536 } 537 538 @Override 539 public Counter getCounter(Enum<?> name) { 540 return context.getCounter(name); 541 } 542 543 @Override 544 public Counter getCounter(String group, String name) { 545 return context.getCounter(group, name); 546 } 547 548 @Override 549 public void progress() { 550 context.progress(); 551 } 552 553 @Override 554 public float getProgress() { 555 return context.getProgress(); 556 } 557 558 @Override 559 public void setStatus(String status) { 560 context.setStatus(status); 561 } 562 } 563 564 /** 565 * Closes all the opened outputs. 566 * 567 * This should be called from cleanup method of map/reduce task. 568 * If overridden subclasses must invoke <code>super.close()</code> at the 569 * end of their <code>close()</code> 570 * 571 */ 572 @SuppressWarnings("unchecked") 573 public void close() throws IOException, InterruptedException { 574 for (RecordWriter writer : recordWriters.values()) { 575 writer.close(context); 576 } 577 } 578}