001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.mapreduce.lib.output;
019
020import org.apache.hadoop.classification.InterfaceAudience;
021import org.apache.hadoop.classification.InterfaceStability;
022import org.apache.hadoop.conf.Configuration;
023import org.apache.hadoop.io.Text;
024import org.apache.hadoop.mapreduce.*;
025import org.apache.hadoop.mapreduce.Reducer.Context;
026import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
027import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
028import org.apache.hadoop.util.ReflectionUtils;
029
030import java.io.IOException;
031import java.util.*;
032
033/**
034 * The MultipleOutputs class simplifies writing output data 
035 * to multiple outputs
036 * 
037 * <p> 
038 * Case one: writing to additional outputs other than the job default output.
039 *
040 * Each additional output, or named output, may be configured with its own
041 * <code>OutputFormat</code>, with its own key class and with its own value
042 * class.
043 * </p>
044 * 
045 * <p>
046 * Case two: to write data to different files provided by user
047 * </p>
048 * 
049 * <p>
050 * MultipleOutputs supports counters, by default they are disabled. The 
051 * counters group is the {@link MultipleOutputs} class name. The names of the 
052 * counters are the same as the output name. These count the number records 
053 * written to each output name.
054 * </p>
055 * 
056 * Usage pattern for job submission:
057 * <pre>
058 *
059 * Job job = new Job();
060 *
061 * FileInputFormat.setInputPath(job, inDir);
062 * FileOutputFormat.setOutputPath(job, outDir);
063 *
064 * job.setMapperClass(MOMap.class);
065 * job.setReducerClass(MOReduce.class);
066 * ...
067 *
068 * // Defines additional single text based output 'text' for the job
069 * MultipleOutputs.addNamedOutput(job, "text", TextOutputFormat.class,
070 * LongWritable.class, Text.class);
071 *
072 * // Defines additional sequence-file based output 'sequence' for the job
073 * MultipleOutputs.addNamedOutput(job, "seq",
074 *   SequenceFileOutputFormat.class,
075 *   LongWritable.class, Text.class);
076 * ...
077 *
078 * job.waitForCompletion(true);
079 * ...
080 * </pre>
081 * <p>
082 * Usage in Reducer:
083 * <pre>
084 * <K, V> String generateFileName(K k, V v) {
085 *   return k.toString() + "_" + v.toString();
086 * }
087 * 
088 * public class MOReduce extends
089 *   Reducer&lt;WritableComparable, Writable,WritableComparable, Writable&gt; {
090 * private MultipleOutputs mos;
091 * public void setup(Context context) {
092 * ...
093 * mos = new MultipleOutputs(context);
094 * }
095 *
096 * public void reduce(WritableComparable key, Iterator&lt;Writable&gt; values,
097 * Context context)
098 * throws IOException {
099 * ...
100 * mos.write("text", , key, new Text("Hello"));
101 * mos.write("seq", LongWritable(1), new Text("Bye"), "seq_a");
102 * mos.write("seq", LongWritable(2), key, new Text("Chau"), "seq_b");
103 * mos.write(key, new Text("value"), generateFileName(key, new Text("value")));
104 * ...
105 * }
106 *
107 * public void cleanup(Context) throws IOException {
108 * mos.close();
109 * ...
110 * }
111 *
112 * }
113 * </pre>
114 * 
115 * <p>
116 * When used in conjuction with org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat,
117 * MultipleOutputs can mimic the behaviour of MultipleTextOutputFormat and MultipleSequenceFileOutputFormat
118 * from the old Hadoop API - ie, output can be written from the Reducer to more than one location.
119 * </p>
120 * 
121 * <p>
122 * Use <code>MultipleOutputs.write(KEYOUT key, VALUEOUT value, String baseOutputPath)</code> to write key and 
123 * value to a path specified by <code>baseOutputPath</code>, with no need to specify a named output.
124 * <b>Warning</b>: when the baseOutputPath passed to MultipleOutputs.write
125 * is a path that resolves outside of the final job output directory, the
126 * directory is created immediately and then persists through subsequent
127 * task retries, breaking the concept of output committing:
128 * </p>
129 * 
130 * <pre>
131 * private MultipleOutputs<Text, Text> out;
132 * 
133 * public void setup(Context context) {
134 *   out = new MultipleOutputs<Text, Text>(context);
135 *   ...
136 * }
137 * 
138 * public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
139 * for (Text t : values) {
140 *   out.write(key, t, generateFileName(<<i>parameter list...</i>>));
141 *   }
142 * }
143 * 
144 * protected void cleanup(Context context) throws IOException, InterruptedException {
145 *   out.close();
146 * }
147 * </pre>
148 * 
149 * <p>
150 * Use your own code in <code>generateFileName()</code> to create a custom path to your results. 
151 * '/' characters in <code>baseOutputPath</code> will be translated into directory levels in your file system. 
152 * Also, append your custom-generated path with "part" or similar, otherwise your output will be -00000, -00001 etc. 
153 * No call to <code>context.write()</code> is necessary. See example <code>generateFileName()</code> code below. 
154 * </p>
155 * 
156 * <pre>
157 * private String generateFileName(Text k) {
158 *   // expect Text k in format "Surname|Forename"
159 *   String[] kStr = k.toString().split("\\|");
160 *   
161 *   String sName = kStr[0];
162 *   String fName = kStr[1];
163 *
164 *   // example for k = Smith|John
165 *   // output written to /user/hadoop/path/to/output/Smith/John-r-00000 (etc)
166 *   return sName + "/" + fName;
167 * }
168 * </pre>
169 * 
170 * <p>
171 * Using MultipleOutputs in this way will still create zero-sized default output, eg part-00000.
172 * To prevent this use <code>LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);</code>
173 * instead of <code>job.setOutputFormatClass(TextOutputFormat.class);</code> in your Hadoop job configuration.
174 * </p> 
175 * 
176 */
177@InterfaceAudience.Public
178@InterfaceStability.Stable
179public class MultipleOutputs<KEYOUT, VALUEOUT> {
180
181  private static final String MULTIPLE_OUTPUTS = "mapreduce.multipleoutputs";
182
183  private static final String MO_PREFIX = 
184    "mapreduce.multipleoutputs.namedOutput.";
185
186  private static final String FORMAT = ".format";
187  private static final String KEY = ".key";
188  private static final String VALUE = ".value";
189  private static final String COUNTERS_ENABLED = 
190    "mapreduce.multipleoutputs.counters";
191
192  /**
193   * Counters group used by the counters of MultipleOutputs.
194   */
195  private static final String COUNTERS_GROUP = MultipleOutputs.class.getName();
196
197  /**
198   * Cache for the taskContexts
199   */
200  private Map<String, TaskAttemptContext> taskContexts = new HashMap<String, TaskAttemptContext>();
201  /**
202   * Cached TaskAttemptContext which uses the job's configured settings
203   */
204  private TaskAttemptContext jobOutputFormatContext;
205
206  /**
207   * Checks if a named output name is valid token.
208   *
209   * @param namedOutput named output Name
210   * @throws IllegalArgumentException if the output name is not valid.
211   */
212  private static void checkTokenName(String namedOutput) {
213    if (namedOutput == null || namedOutput.length() == 0) {
214      throw new IllegalArgumentException(
215        "Name cannot be NULL or emtpy");
216    }
217    for (char ch : namedOutput.toCharArray()) {
218      if ((ch >= 'A') && (ch <= 'Z')) {
219        continue;
220      }
221      if ((ch >= 'a') && (ch <= 'z')) {
222        continue;
223      }
224      if ((ch >= '0') && (ch <= '9')) {
225        continue;
226      }
227      throw new IllegalArgumentException(
228        "Name cannot be have a '" + ch + "' char");
229    }
230  }
231
232  /**
233   * Checks if output name is valid.
234   *
235   * name cannot be the name used for the default output
236   * @param outputPath base output Name
237   * @throws IllegalArgumentException if the output name is not valid.
238   */
239  private static void checkBaseOutputPath(String outputPath) {
240    if (outputPath.equals(FileOutputFormat.PART)) {
241      throw new IllegalArgumentException("output name cannot be 'part'");
242    }
243  }
244  
245  /**
246   * Checks if a named output name is valid.
247   *
248   * @param namedOutput named output Name
249   * @throws IllegalArgumentException if the output name is not valid.
250   */
251  private static void checkNamedOutputName(JobContext job,
252      String namedOutput, boolean alreadyDefined) {
253    checkTokenName(namedOutput);
254    checkBaseOutputPath(namedOutput);
255    List<String> definedChannels = getNamedOutputsList(job);
256    if (alreadyDefined && definedChannels.contains(namedOutput)) {
257      throw new IllegalArgumentException("Named output '" + namedOutput +
258        "' already alreadyDefined");
259    } else if (!alreadyDefined && !definedChannels.contains(namedOutput)) {
260      throw new IllegalArgumentException("Named output '" + namedOutput +
261        "' not defined");
262    }
263  }
264
265  // Returns list of channel names.
266  private static List<String> getNamedOutputsList(JobContext job) {
267    List<String> names = new ArrayList<String>();
268    StringTokenizer st = new StringTokenizer(
269      job.getConfiguration().get(MULTIPLE_OUTPUTS, ""), " ");
270    while (st.hasMoreTokens()) {
271      names.add(st.nextToken());
272    }
273    return names;
274  }
275
276  // Returns the named output OutputFormat.
277  @SuppressWarnings("unchecked")
278  private static Class<? extends OutputFormat<?, ?>> getNamedOutputFormatClass(
279    JobContext job, String namedOutput) {
280    return (Class<? extends OutputFormat<?, ?>>)
281      job.getConfiguration().getClass(MO_PREFIX + namedOutput + FORMAT, null,
282      OutputFormat.class);
283  }
284
285  // Returns the key class for a named output.
286  private static Class<?> getNamedOutputKeyClass(JobContext job,
287                                                String namedOutput) {
288    return job.getConfiguration().getClass(MO_PREFIX + namedOutput + KEY, null,
289      Object.class);
290  }
291
292  // Returns the value class for a named output.
293  private static Class<?> getNamedOutputValueClass(
294      JobContext job, String namedOutput) {
295    return job.getConfiguration().getClass(MO_PREFIX + namedOutput + VALUE,
296      null, Object.class);
297  }
298
299  /**
300   * Adds a named output for the job.
301   * <p/>
302   *
303   * @param job               job to add the named output
304   * @param namedOutput       named output name, it has to be a word, letters
305   *                          and numbers only, cannot be the word 'part' as
306   *                          that is reserved for the default output.
307   * @param outputFormatClass OutputFormat class.
308   * @param keyClass          key class
309   * @param valueClass        value class
310   */
311  @SuppressWarnings("unchecked")
312  public static void addNamedOutput(Job job, String namedOutput,
313      Class<? extends OutputFormat> outputFormatClass,
314      Class<?> keyClass, Class<?> valueClass) {
315    checkNamedOutputName(job, namedOutput, true);
316    Configuration conf = job.getConfiguration();
317    conf.set(MULTIPLE_OUTPUTS,
318      conf.get(MULTIPLE_OUTPUTS, "") + " " + namedOutput);
319    conf.setClass(MO_PREFIX + namedOutput + FORMAT, outputFormatClass,
320      OutputFormat.class);
321    conf.setClass(MO_PREFIX + namedOutput + KEY, keyClass, Object.class);
322    conf.setClass(MO_PREFIX + namedOutput + VALUE, valueClass, Object.class);
323  }
324
325  /**
326   * Enables or disables counters for the named outputs.
327   * 
328   * The counters group is the {@link MultipleOutputs} class name.
329   * The names of the counters are the same as the named outputs. These
330   * counters count the number records written to each output name.
331   * By default these counters are disabled.
332   *
333   * @param job    job  to enable counters
334   * @param enabled indicates if the counters will be enabled or not.
335   */
336  public static void setCountersEnabled(Job job, boolean enabled) {
337    job.getConfiguration().setBoolean(COUNTERS_ENABLED, enabled);
338  }
339
340  /**
341   * Returns if the counters for the named outputs are enabled or not.
342   * By default these counters are disabled.
343   *
344   * @param job    the job 
345   * @return TRUE if the counters are enabled, FALSE if they are disabled.
346   */
347  public static boolean getCountersEnabled(JobContext job) {
348    return job.getConfiguration().getBoolean(COUNTERS_ENABLED, false);
349  }
350
351  /**
352   * Wraps RecordWriter to increment counters. 
353   */
354  @SuppressWarnings("unchecked")
355  private static class RecordWriterWithCounter extends RecordWriter {
356    private RecordWriter writer;
357    private String counterName;
358    private TaskInputOutputContext context;
359
360    public RecordWriterWithCounter(RecordWriter writer, String counterName,
361                                   TaskInputOutputContext context) {
362      this.writer = writer;
363      this.counterName = counterName;
364      this.context = context;
365    }
366
367    @SuppressWarnings({"unchecked"})
368    public void write(Object key, Object value) 
369        throws IOException, InterruptedException {
370      context.getCounter(COUNTERS_GROUP, counterName).increment(1);
371      writer.write(key, value);
372    }
373
374    public void close(TaskAttemptContext context) 
375        throws IOException, InterruptedException {
376      writer.close(context);
377    }
378  }
379
380  // instance code, to be used from Mapper/Reducer code
381
382  private TaskInputOutputContext<?, ?, KEYOUT, VALUEOUT> context;
383  private Set<String> namedOutputs;
384  private Map<String, RecordWriter<?, ?>> recordWriters;
385  private boolean countersEnabled;
386  
387  /**
388   * Creates and initializes multiple outputs support,
389   * it should be instantiated in the Mapper/Reducer setup method.
390   *
391   * @param context the TaskInputOutputContext object
392   */
393  public MultipleOutputs(
394      TaskInputOutputContext<?, ?, KEYOUT, VALUEOUT> context) {
395    this.context = context;
396    namedOutputs = Collections.unmodifiableSet(
397      new HashSet<String>(MultipleOutputs.getNamedOutputsList(context)));
398    recordWriters = new HashMap<String, RecordWriter<?, ?>>();
399    countersEnabled = getCountersEnabled(context);
400  }
401
402  /**
403   * Write key and value to the namedOutput.
404   *
405   * Output path is a unique file generated for the namedOutput.
406   * For example, {namedOutput}-(m|r)-{part-number}
407   * 
408   * @param namedOutput the named output name
409   * @param key         the key
410   * @param value       the value
411   */
412  @SuppressWarnings("unchecked")
413  public <K, V> void write(String namedOutput, K key, V value)
414      throws IOException, InterruptedException {
415    write(namedOutput, key, value, namedOutput);
416  }
417
418  /**
419   * Write key and value to baseOutputPath using the namedOutput.
420   * 
421   * @param namedOutput    the named output name
422   * @param key            the key
423   * @param value          the value
424   * @param baseOutputPath base-output path to write the record to.
425   * Note: Framework will generate unique filename for the baseOutputPath
426   * <b>Warning</b>: when the baseOutputPath is a path that resolves
427   * outside of the final job output directory, the directory is created
428   * immediately and then persists through subsequent task retries, breaking
429   * the concept of output committing.
430   */
431  @SuppressWarnings("unchecked")
432  public <K, V> void write(String namedOutput, K key, V value,
433      String baseOutputPath) throws IOException, InterruptedException {
434    checkNamedOutputName(context, namedOutput, false);
435    checkBaseOutputPath(baseOutputPath);
436    if (!namedOutputs.contains(namedOutput)) {
437      throw new IllegalArgumentException("Undefined named output '" +
438        namedOutput + "'");
439    }
440    TaskAttemptContext taskContext = getContext(namedOutput);
441    getRecordWriter(taskContext, baseOutputPath).write(key, value);
442  }
443
444  /**
445   * Write key value to an output file name.
446   * 
447   * Gets the record writer from job's output format.  
448   * Job's output format should be a FileOutputFormat.
449   * 
450   * @param key       the key
451   * @param value     the value
452   * @param baseOutputPath base-output path to write the record to.
453   * Note: Framework will generate unique filename for the baseOutputPath
454   * <b>Warning</b>: when the baseOutputPath is a path that resolves
455   * outside of the final job output directory, the directory is created
456   * immediately and then persists through subsequent task retries, breaking
457   * the concept of output committing.
458   */
459  @SuppressWarnings("unchecked")
460  public void write(KEYOUT key, VALUEOUT value, String baseOutputPath) 
461      throws IOException, InterruptedException {
462    checkBaseOutputPath(baseOutputPath);
463    if (jobOutputFormatContext == null) {
464      jobOutputFormatContext = 
465        new TaskAttemptContextImpl(context.getConfiguration(), 
466                                   context.getTaskAttemptID(),
467                                   new WrappedStatusReporter(context));
468    }
469    getRecordWriter(jobOutputFormatContext, baseOutputPath).write(key, value);
470  }
471
472  // by being synchronized MultipleOutputTask can be use with a
473  // MultithreadedMapper.
474  @SuppressWarnings("unchecked")
475  private synchronized RecordWriter getRecordWriter(
476      TaskAttemptContext taskContext, String baseFileName) 
477      throws IOException, InterruptedException {
478    
479    // look for record-writer in the cache
480    RecordWriter writer = recordWriters.get(baseFileName);
481    
482    // If not in cache, create a new one
483    if (writer == null) {
484      // get the record writer from context output format
485      FileOutputFormat.setOutputName(taskContext, baseFileName);
486      try {
487        writer = ((OutputFormat) ReflectionUtils.newInstance(
488          taskContext.getOutputFormatClass(), taskContext.getConfiguration()))
489          .getRecordWriter(taskContext);
490      } catch (ClassNotFoundException e) {
491        throw new IOException(e);
492      }
493 
494      // if counters are enabled, wrap the writer with context 
495      // to increment counters 
496      if (countersEnabled) {
497        writer = new RecordWriterWithCounter(writer, baseFileName, context);
498      }
499      
500      // add the record-writer to the cache
501      recordWriters.put(baseFileName, writer);
502    }
503    return writer;
504  }
505
506   // Create a taskAttemptContext for the named output with 
507   // output format and output key/value types put in the context
508  private TaskAttemptContext getContext(String nameOutput) throws IOException {
509      
510    TaskAttemptContext taskContext = taskContexts.get(nameOutput);
511    
512    if (taskContext != null) {
513        return taskContext;
514    }
515    
516    // The following trick leverages the instantiation of a record writer via
517    // the job thus supporting arbitrary output formats.
518    Job job = new Job(context.getConfiguration());
519    job.setOutputFormatClass(getNamedOutputFormatClass(context, nameOutput));
520    job.setOutputKeyClass(getNamedOutputKeyClass(context, nameOutput));
521    job.setOutputValueClass(getNamedOutputValueClass(context, nameOutput));
522    taskContext = new TaskAttemptContextImpl(job.getConfiguration(), context
523        .getTaskAttemptID(), new WrappedStatusReporter(context));
524
525    taskContexts.put(nameOutput, taskContext);
526
527    return taskContext;
528  }
529
530  private static class WrappedStatusReporter extends StatusReporter {
531
532    TaskAttemptContext context;
533
534    public WrappedStatusReporter(TaskAttemptContext context) {
535      this.context = context;
536    }
537
538    @Override
539    public Counter getCounter(Enum<?> name) {
540      return context.getCounter(name);
541    }
542
543    @Override
544    public Counter getCounter(String group, String name) {
545      return context.getCounter(group, name);
546    }
547
548    @Override
549    public void progress() {
550      context.progress();
551    }
552
553    @Override
554    public float getProgress() {
555      return context.getProgress();
556    }
557    
558    @Override
559    public void setStatus(String status) {
560      context.setStatus(status);
561    }
562  }
563
564  /**
565   * Closes all the opened outputs.
566   * 
567   * This should be called from cleanup method of map/reduce task.
568   * If overridden subclasses must invoke <code>super.close()</code> at the
569   * end of their <code>close()</code>
570   * 
571   */
572  @SuppressWarnings("unchecked")
573  public void close() throws IOException, InterruptedException {
574    for (RecordWriter writer : recordWriters.values()) {
575      writer.close(context);
576    }
577  }
578}