001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.io;
020
021import java.io.*;
022import java.util.*;
023import java.rmi.server.UID;
024import java.security.MessageDigest;
025import org.apache.commons.logging.*;
026import org.apache.hadoop.util.Options;
027import org.apache.hadoop.fs.*;
028import org.apache.hadoop.fs.Options.CreateOpts;
029import org.apache.hadoop.io.compress.CodecPool;
030import org.apache.hadoop.io.compress.CompressionCodec;
031import org.apache.hadoop.io.compress.CompressionInputStream;
032import org.apache.hadoop.io.compress.CompressionOutputStream;
033import org.apache.hadoop.io.compress.Compressor;
034import org.apache.hadoop.io.compress.Decompressor;
035import org.apache.hadoop.io.compress.DefaultCodec;
036import org.apache.hadoop.io.compress.GzipCodec;
037import org.apache.hadoop.io.compress.zlib.ZlibFactory;
038import org.apache.hadoop.io.serializer.Deserializer;
039import org.apache.hadoop.io.serializer.Serializer;
040import org.apache.hadoop.io.serializer.SerializationFactory;
041import org.apache.hadoop.classification.InterfaceAudience;
042import org.apache.hadoop.classification.InterfaceStability;
043import org.apache.hadoop.conf.*;
044import org.apache.hadoop.util.Progressable;
045import org.apache.hadoop.util.Progress;
046import org.apache.hadoop.util.ReflectionUtils;
047import org.apache.hadoop.util.NativeCodeLoader;
048import org.apache.hadoop.util.MergeSort;
049import org.apache.hadoop.util.PriorityQueue;
050import org.apache.hadoop.util.Time;
051
052/** 
053 * <code>SequenceFile</code>s are flat files consisting of binary key/value 
054 * pairs.
055 * 
056 * <p><code>SequenceFile</code> provides {@link SequenceFile.Writer},
057 * {@link SequenceFile.Reader} and {@link Sorter} classes for writing,
058 * reading and sorting respectively.</p>
059 * 
060 * There are three <code>SequenceFile</code> <code>Writer</code>s based on the 
061 * {@link CompressionType} used to compress key/value pairs:
062 * <ol>
063 *   <li>
064 *   <code>Writer</code> : Uncompressed records.
065 *   </li>
066 *   <li>
067 *   <code>RecordCompressWriter</code> : Record-compressed files, only compress 
068 *                                       values.
069 *   </li>
070 *   <li>
071 *   <code>BlockCompressWriter</code> : Block-compressed files, both keys & 
072 *                                      values are collected in 'blocks' 
073 *                                      separately and compressed. The size of 
074 *                                      the 'block' is configurable.
075 * </ol>
076 * 
077 * <p>The actual compression algorithm used to compress key and/or values can be
078 * specified by using the appropriate {@link CompressionCodec}.</p>
079 * 
080 * <p>The recommended way is to use the static <tt>createWriter</tt> methods
081 * provided by the <code>SequenceFile</code> to chose the preferred format.</p>
082 *
083 * <p>The {@link SequenceFile.Reader} acts as the bridge and can read any of the
084 * above <code>SequenceFile</code> formats.</p>
085 *
086 * <h4 id="Formats">SequenceFile Formats</h4>
087 * 
088 * <p>Essentially there are 3 different formats for <code>SequenceFile</code>s
089 * depending on the <code>CompressionType</code> specified. All of them share a
090 * <a href="#Header">common header</a> described below.
091 * 
092 * <h5 id="Header">SequenceFile Header</h5>
093 * <ul>
094 *   <li>
095 *   version - 3 bytes of magic header <b>SEQ</b>, followed by 1 byte of actual 
096 *             version number (e.g. SEQ4 or SEQ6)
097 *   </li>
098 *   <li>
099 *   keyClassName -key class
100 *   </li>
101 *   <li>
102 *   valueClassName - value class
103 *   </li>
104 *   <li>
105 *   compression - A boolean which specifies if compression is turned on for 
106 *                 keys/values in this file.
107 *   </li>
108 *   <li>
109 *   blockCompression - A boolean which specifies if block-compression is 
110 *                      turned on for keys/values in this file.
111 *   </li>
112 *   <li>
113 *   compression codec - <code>CompressionCodec</code> class which is used for  
114 *                       compression of keys and/or values (if compression is 
115 *                       enabled).
116 *   </li>
117 *   <li>
118 *   metadata - {@link Metadata} for this file.
119 *   </li>
120 *   <li>
121 *   sync - A sync marker to denote end of the header.
122 *   </li>
123 * </ul>
124 * 
125 * <h5 id="#UncompressedFormat">Uncompressed SequenceFile Format</h5>
126 * <ul>
127 * <li>
128 * <a href="#Header">Header</a>
129 * </li>
130 * <li>
131 * Record
132 *   <ul>
133 *     <li>Record length</li>
134 *     <li>Key length</li>
135 *     <li>Key</li>
136 *     <li>Value</li>
137 *   </ul>
138 * </li>
139 * <li>
140 * A sync-marker every few <code>100</code> bytes or so.
141 * </li>
142 * </ul>
143 *
144 * <h5 id="#RecordCompressedFormat">Record-Compressed SequenceFile Format</h5>
145 * <ul>
146 * <li>
147 * <a href="#Header">Header</a>
148 * </li>
149 * <li>
150 * Record
151 *   <ul>
152 *     <li>Record length</li>
153 *     <li>Key length</li>
154 *     <li>Key</li>
155 *     <li><i>Compressed</i> Value</li>
156 *   </ul>
157 * </li>
158 * <li>
159 * A sync-marker every few <code>100</code> bytes or so.
160 * </li>
161 * </ul>
162 * 
163 * <h5 id="#BlockCompressedFormat">Block-Compressed SequenceFile Format</h5>
164 * <ul>
165 * <li>
166 * <a href="#Header">Header</a>
167 * </li>
168 * <li>
169 * Record <i>Block</i>
170 *   <ul>
171 *     <li>Uncompressed number of records in the block</li>
172 *     <li>Compressed key-lengths block-size</li>
173 *     <li>Compressed key-lengths block</li>
174 *     <li>Compressed keys block-size</li>
175 *     <li>Compressed keys block</li>
176 *     <li>Compressed value-lengths block-size</li>
177 *     <li>Compressed value-lengths block</li>
178 *     <li>Compressed values block-size</li>
179 *     <li>Compressed values block</li>
180 *   </ul>
181 * </li>
182 * <li>
183 * A sync-marker every block.
184 * </li>
185 * </ul>
186 * 
187 * <p>The compressed blocks of key lengths and value lengths consist of the 
188 * actual lengths of individual keys/values encoded in ZeroCompressedInteger 
189 * format.</p>
190 * 
191 * @see CompressionCodec
192 */
193@InterfaceAudience.Public
194@InterfaceStability.Stable
195public class SequenceFile {
196  private static final Log LOG = LogFactory.getLog(SequenceFile.class);
197
198  private SequenceFile() {}                         // no public ctor
199
200  private static final byte BLOCK_COMPRESS_VERSION = (byte)4;
201  private static final byte CUSTOM_COMPRESS_VERSION = (byte)5;
202  private static final byte VERSION_WITH_METADATA = (byte)6;
203  private static byte[] VERSION = new byte[] {
204    (byte)'S', (byte)'E', (byte)'Q', VERSION_WITH_METADATA
205  };
206
207  private static final int SYNC_ESCAPE = -1;      // "length" of sync entries
208  private static final int SYNC_HASH_SIZE = 16;   // number of bytes in hash 
209  private static final int SYNC_SIZE = 4+SYNC_HASH_SIZE; // escape + hash
210
211  /** The number of bytes between sync points.*/
212  public static final int SYNC_INTERVAL = 100*SYNC_SIZE; 
213
214  /** 
215   * The compression type used to compress key/value pairs in the 
216   * {@link SequenceFile}.
217   * 
218   * @see SequenceFile.Writer
219   */
220  public static enum CompressionType {
221    /** Do not compress records. */
222    NONE, 
223    /** Compress values only, each separately. */
224    RECORD,
225    /** Compress sequences of records together in blocks. */
226    BLOCK
227  }
228
229  /**
230   * Get the compression type for the reduce outputs
231   * @param job the job config to look in
232   * @return the kind of compression to use
233   */
234  static public CompressionType getDefaultCompressionType(Configuration job) {
235    String name = job.get("io.seqfile.compression.type");
236    return name == null ? CompressionType.RECORD : 
237      CompressionType.valueOf(name);
238  }
239  
240  /**
241   * Set the default compression type for sequence files.
242   * @param job the configuration to modify
243   * @param val the new compression type (none, block, record)
244   */
245  static public void setDefaultCompressionType(Configuration job, 
246                                               CompressionType val) {
247    job.set("io.seqfile.compression.type", val.toString());
248  }
249
250  /**
251   * Create a new Writer with the given options.
252   * @param conf the configuration to use
253   * @param opts the options to create the file with
254   * @return a new Writer
255   * @throws IOException
256   */
257  public static Writer createWriter(Configuration conf, Writer.Option... opts
258                                    ) throws IOException {
259    Writer.CompressionOption compressionOption = 
260      Options.getOption(Writer.CompressionOption.class, opts);
261    CompressionType kind;
262    if (compressionOption != null) {
263      kind = compressionOption.getValue();
264    } else {
265      kind = getDefaultCompressionType(conf);
266      opts = Options.prependOptions(opts, Writer.compression(kind));
267    }
268    switch (kind) {
269      default:
270      case NONE:
271        return new Writer(conf, opts);
272      case RECORD:
273        return new RecordCompressWriter(conf, opts);
274      case BLOCK:
275        return new BlockCompressWriter(conf, opts);
276    }
277  }
278
279  /**
280   * Construct the preferred type of SequenceFile Writer.
281   * @param fs The configured filesystem. 
282   * @param conf The configuration.
283   * @param name The name of the file. 
284   * @param keyClass The 'key' type.
285   * @param valClass The 'value' type.
286   * @return Returns the handle to the constructed SequenceFile Writer.
287   * @throws IOException
288   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
289   *     instead.
290   */
291  @Deprecated
292  public static Writer 
293    createWriter(FileSystem fs, Configuration conf, Path name, 
294                 Class keyClass, Class valClass) throws IOException {
295    return createWriter(conf, Writer.filesystem(fs),
296                        Writer.file(name), Writer.keyClass(keyClass),
297                        Writer.valueClass(valClass));
298  }
299  
300  /**
301   * Construct the preferred type of SequenceFile Writer.
302   * @param fs The configured filesystem. 
303   * @param conf The configuration.
304   * @param name The name of the file. 
305   * @param keyClass The 'key' type.
306   * @param valClass The 'value' type.
307   * @param compressionType The compression type.
308   * @return Returns the handle to the constructed SequenceFile Writer.
309   * @throws IOException
310   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
311   *     instead.
312   */
313  @Deprecated
314  public static Writer 
315    createWriter(FileSystem fs, Configuration conf, Path name, 
316                 Class keyClass, Class valClass, 
317                 CompressionType compressionType) throws IOException {
318    return createWriter(conf, Writer.filesystem(fs),
319                        Writer.file(name), Writer.keyClass(keyClass),
320                        Writer.valueClass(valClass), 
321                        Writer.compression(compressionType));
322  }
323  
324  /**
325   * Construct the preferred type of SequenceFile Writer.
326   * @param fs The configured filesystem. 
327   * @param conf The configuration.
328   * @param name The name of the file. 
329   * @param keyClass The 'key' type.
330   * @param valClass The 'value' type.
331   * @param compressionType The compression type.
332   * @param progress The Progressable object to track progress.
333   * @return Returns the handle to the constructed SequenceFile Writer.
334   * @throws IOException
335   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
336   *     instead.
337   */
338  @Deprecated
339  public static Writer
340    createWriter(FileSystem fs, Configuration conf, Path name, 
341                 Class keyClass, Class valClass, CompressionType compressionType,
342                 Progressable progress) throws IOException {
343    return createWriter(conf, Writer.file(name),
344                        Writer.filesystem(fs),
345                        Writer.keyClass(keyClass),
346                        Writer.valueClass(valClass), 
347                        Writer.compression(compressionType),
348                        Writer.progressable(progress));
349  }
350
351  /**
352   * Construct the preferred type of SequenceFile Writer.
353   * @param fs The configured filesystem. 
354   * @param conf The configuration.
355   * @param name The name of the file. 
356   * @param keyClass The 'key' type.
357   * @param valClass The 'value' type.
358   * @param compressionType The compression type.
359   * @param codec The compression codec.
360   * @return Returns the handle to the constructed SequenceFile Writer.
361   * @throws IOException
362   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
363   *     instead.
364   */
365  @Deprecated
366  public static Writer 
367    createWriter(FileSystem fs, Configuration conf, Path name, 
368                 Class keyClass, Class valClass, CompressionType compressionType, 
369                 CompressionCodec codec) throws IOException {
370    return createWriter(conf, Writer.file(name),
371                        Writer.filesystem(fs),
372                        Writer.keyClass(keyClass),
373                        Writer.valueClass(valClass), 
374                        Writer.compression(compressionType, codec));
375  }
376  
377  /**
378   * Construct the preferred type of SequenceFile Writer.
379   * @param fs The configured filesystem. 
380   * @param conf The configuration.
381   * @param name The name of the file. 
382   * @param keyClass The 'key' type.
383   * @param valClass The 'value' type.
384   * @param compressionType The compression type.
385   * @param codec The compression codec.
386   * @param progress The Progressable object to track progress.
387   * @param metadata The metadata of the file.
388   * @return Returns the handle to the constructed SequenceFile Writer.
389   * @throws IOException
390   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
391   *     instead.
392   */
393  @Deprecated
394  public static Writer
395    createWriter(FileSystem fs, Configuration conf, Path name, 
396                 Class keyClass, Class valClass, 
397                 CompressionType compressionType, CompressionCodec codec,
398                 Progressable progress, Metadata metadata) throws IOException {
399    return createWriter(conf, Writer.file(name),
400                        Writer.filesystem(fs),
401                        Writer.keyClass(keyClass),
402                        Writer.valueClass(valClass),
403                        Writer.compression(compressionType, codec),
404                        Writer.progressable(progress),
405                        Writer.metadata(metadata));
406  }
407
408  /**
409   * Construct the preferred type of SequenceFile Writer.
410   * @param fs The configured filesystem.
411   * @param conf The configuration.
412   * @param name The name of the file.
413   * @param keyClass The 'key' type.
414   * @param valClass The 'value' type.
415   * @param bufferSize buffer size for the underlaying outputstream.
416   * @param replication replication factor for the file.
417   * @param blockSize block size for the file.
418   * @param compressionType The compression type.
419   * @param codec The compression codec.
420   * @param progress The Progressable object to track progress.
421   * @param metadata The metadata of the file.
422   * @return Returns the handle to the constructed SequenceFile Writer.
423   * @throws IOException
424   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
425   *     instead.
426   */
427  @Deprecated
428  public static Writer
429    createWriter(FileSystem fs, Configuration conf, Path name,
430                 Class keyClass, Class valClass, int bufferSize,
431                 short replication, long blockSize,
432                 CompressionType compressionType, CompressionCodec codec,
433                 Progressable progress, Metadata metadata) throws IOException {
434    return createWriter(conf, Writer.file(name),
435                        Writer.filesystem(fs),
436                        Writer.keyClass(keyClass),
437                        Writer.valueClass(valClass), 
438                        Writer.bufferSize(bufferSize), 
439                        Writer.replication(replication),
440                        Writer.blockSize(blockSize),
441                        Writer.compression(compressionType, codec),
442                        Writer.progressable(progress),
443                        Writer.metadata(metadata));
444  }
445
446  /**
447   * Construct the preferred type of SequenceFile Writer.
448   * @param fs The configured filesystem.
449   * @param conf The configuration.
450   * @param name The name of the file.
451   * @param keyClass The 'key' type.
452   * @param valClass The 'value' type.
453   * @param bufferSize buffer size for the underlaying outputstream.
454   * @param replication replication factor for the file.
455   * @param blockSize block size for the file.
456   * @param createParent create parent directory if non-existent
457   * @param compressionType The compression type.
458   * @param codec The compression codec.
459   * @param metadata The metadata of the file.
460   * @return Returns the handle to the constructed SequenceFile Writer.
461   * @throws IOException
462   */
463  @Deprecated
464  public static Writer
465  createWriter(FileSystem fs, Configuration conf, Path name,
466               Class keyClass, Class valClass, int bufferSize,
467               short replication, long blockSize, boolean createParent,
468               CompressionType compressionType, CompressionCodec codec,
469               Metadata metadata) throws IOException {
470    return createWriter(FileContext.getFileContext(fs.getUri(), conf),
471        conf, name, keyClass, valClass, compressionType, codec,
472        metadata, EnumSet.of(CreateFlag.CREATE,CreateFlag.OVERWRITE),
473        CreateOpts.bufferSize(bufferSize),
474        createParent ? CreateOpts.createParent()
475                     : CreateOpts.donotCreateParent(),
476        CreateOpts.repFac(replication),
477        CreateOpts.blockSize(blockSize)
478      );
479  }
480
481  /**
482   * Construct the preferred type of SequenceFile Writer.
483   * @param fc The context for the specified file.
484   * @param conf The configuration.
485   * @param name The name of the file.
486   * @param keyClass The 'key' type.
487   * @param valClass The 'value' type.
488   * @param compressionType The compression type.
489   * @param codec The compression codec.
490   * @param metadata The metadata of the file.
491   * @param createFlag gives the semantics of create: overwrite, append etc.
492   * @param opts file creation options; see {@link CreateOpts}.
493   * @return Returns the handle to the constructed SequenceFile Writer.
494   * @throws IOException
495   */
496  public static Writer
497  createWriter(FileContext fc, Configuration conf, Path name,
498               Class keyClass, Class valClass,
499               CompressionType compressionType, CompressionCodec codec,
500               Metadata metadata,
501               final EnumSet<CreateFlag> createFlag, CreateOpts... opts)
502               throws IOException {
503    return createWriter(conf, fc.create(name, createFlag, opts),
504          keyClass, valClass, compressionType, codec, metadata).ownStream();
505  }
506
507  /**
508   * Construct the preferred type of SequenceFile Writer.
509   * @param fs The configured filesystem. 
510   * @param conf The configuration.
511   * @param name The name of the file. 
512   * @param keyClass The 'key' type.
513   * @param valClass The 'value' type.
514   * @param compressionType The compression type.
515   * @param codec The compression codec.
516   * @param progress The Progressable object to track progress.
517   * @return Returns the handle to the constructed SequenceFile Writer.
518   * @throws IOException
519   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
520   *     instead.
521   */
522  @Deprecated
523  public static Writer
524    createWriter(FileSystem fs, Configuration conf, Path name, 
525                 Class keyClass, Class valClass, 
526                 CompressionType compressionType, CompressionCodec codec,
527                 Progressable progress) throws IOException {
528    return createWriter(conf, Writer.file(name),
529                        Writer.filesystem(fs),
530                        Writer.keyClass(keyClass),
531                        Writer.valueClass(valClass),
532                        Writer.compression(compressionType, codec),
533                        Writer.progressable(progress));
534  }
535
536  /**
537   * Construct the preferred type of 'raw' SequenceFile Writer.
538   * @param conf The configuration.
539   * @param out The stream on top which the writer is to be constructed.
540   * @param keyClass The 'key' type.
541   * @param valClass The 'value' type.
542   * @param compressionType The compression type.
543   * @param codec The compression codec.
544   * @param metadata The metadata of the file.
545   * @return Returns the handle to the constructed SequenceFile Writer.
546   * @throws IOException
547   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
548   *     instead.
549   */
550  @Deprecated
551  public static Writer
552    createWriter(Configuration conf, FSDataOutputStream out, 
553                 Class keyClass, Class valClass,
554                 CompressionType compressionType,
555                 CompressionCodec codec, Metadata metadata) throws IOException {
556    return createWriter(conf, Writer.stream(out), Writer.keyClass(keyClass),
557                        Writer.valueClass(valClass), 
558                        Writer.compression(compressionType, codec),
559                        Writer.metadata(metadata));
560  }
561  
562  /**
563   * Construct the preferred type of 'raw' SequenceFile Writer.
564   * @param conf The configuration.
565   * @param out The stream on top which the writer is to be constructed.
566   * @param keyClass The 'key' type.
567   * @param valClass The 'value' type.
568   * @param compressionType The compression type.
569   * @param codec The compression codec.
570   * @return Returns the handle to the constructed SequenceFile Writer.
571   * @throws IOException
572   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
573   *     instead.
574   */
575  @Deprecated
576  public static Writer
577    createWriter(Configuration conf, FSDataOutputStream out, 
578                 Class keyClass, Class valClass, CompressionType compressionType,
579                 CompressionCodec codec) throws IOException {
580    return createWriter(conf, Writer.stream(out), Writer.keyClass(keyClass),
581                        Writer.valueClass(valClass),
582                        Writer.compression(compressionType, codec));
583  }
584  
585
586  /** The interface to 'raw' values of SequenceFiles. */
587  public static interface ValueBytes {
588
589    /** Writes the uncompressed bytes to the outStream.
590     * @param outStream : Stream to write uncompressed bytes into.
591     * @throws IOException
592     */
593    public void writeUncompressedBytes(DataOutputStream outStream)
594      throws IOException;
595
596    /** Write compressed bytes to outStream. 
597     * Note: that it will NOT compress the bytes if they are not compressed.
598     * @param outStream : Stream to write compressed bytes into.
599     */
600    public void writeCompressedBytes(DataOutputStream outStream) 
601      throws IllegalArgumentException, IOException;
602
603    /**
604     * Size of stored data.
605     */
606    public int getSize();
607  }
608  
609  private static class UncompressedBytes implements ValueBytes {
610    private int dataSize;
611    private byte[] data;
612    
613    private UncompressedBytes() {
614      data = null;
615      dataSize = 0;
616    }
617    
618    private void reset(DataInputStream in, int length) throws IOException {
619      if (data == null) {
620        data = new byte[length];
621      } else if (length > data.length) {
622        data = new byte[Math.max(length, data.length * 2)];
623      }
624      dataSize = -1;
625      in.readFully(data, 0, length);
626      dataSize = length;
627    }
628    
629    @Override
630    public int getSize() {
631      return dataSize;
632    }
633    
634    @Override
635    public void writeUncompressedBytes(DataOutputStream outStream)
636      throws IOException {
637      outStream.write(data, 0, dataSize);
638    }
639
640    @Override
641    public void writeCompressedBytes(DataOutputStream outStream) 
642      throws IllegalArgumentException, IOException {
643      throw 
644        new IllegalArgumentException("UncompressedBytes cannot be compressed!");
645    }
646
647  } // UncompressedBytes
648  
649  private static class CompressedBytes implements ValueBytes {
650    private int dataSize;
651    private byte[] data;
652    DataInputBuffer rawData = null;
653    CompressionCodec codec = null;
654    CompressionInputStream decompressedStream = null;
655
656    private CompressedBytes(CompressionCodec codec) {
657      data = null;
658      dataSize = 0;
659      this.codec = codec;
660    }
661
662    private void reset(DataInputStream in, int length) throws IOException {
663      if (data == null) {
664        data = new byte[length];
665      } else if (length > data.length) {
666        data = new byte[Math.max(length, data.length * 2)];
667      } 
668      dataSize = -1;
669      in.readFully(data, 0, length);
670      dataSize = length;
671    }
672    
673    @Override
674    public int getSize() {
675      return dataSize;
676    }
677    
678    @Override
679    public void writeUncompressedBytes(DataOutputStream outStream)
680      throws IOException {
681      if (decompressedStream == null) {
682        rawData = new DataInputBuffer();
683        decompressedStream = codec.createInputStream(rawData);
684      } else {
685        decompressedStream.resetState();
686      }
687      rawData.reset(data, 0, dataSize);
688
689      byte[] buffer = new byte[8192];
690      int bytesRead = 0;
691      while ((bytesRead = decompressedStream.read(buffer, 0, 8192)) != -1) {
692        outStream.write(buffer, 0, bytesRead);
693      }
694    }
695
696    @Override
697    public void writeCompressedBytes(DataOutputStream outStream) 
698      throws IllegalArgumentException, IOException {
699      outStream.write(data, 0, dataSize);
700    }
701
702  } // CompressedBytes
703  
704  /**
705   * The class encapsulating with the metadata of a file.
706   * The metadata of a file is a list of attribute name/value
707   * pairs of Text type.
708   *
709   */
710  public static class Metadata implements Writable {
711
712    private TreeMap<Text, Text> theMetadata;
713    
714    public Metadata() {
715      this(new TreeMap<Text, Text>());
716    }
717    
718    public Metadata(TreeMap<Text, Text> arg) {
719      if (arg == null) {
720        this.theMetadata = new TreeMap<Text, Text>();
721      } else {
722        this.theMetadata = arg;
723      }
724    }
725    
726    public Text get(Text name) {
727      return this.theMetadata.get(name);
728    }
729    
730    public void set(Text name, Text value) {
731      this.theMetadata.put(name, value);
732    }
733    
734    public TreeMap<Text, Text> getMetadata() {
735      return new TreeMap<Text, Text>(this.theMetadata);
736    }
737    
738    @Override
739    public void write(DataOutput out) throws IOException {
740      out.writeInt(this.theMetadata.size());
741      Iterator<Map.Entry<Text, Text>> iter =
742        this.theMetadata.entrySet().iterator();
743      while (iter.hasNext()) {
744        Map.Entry<Text, Text> en = iter.next();
745        en.getKey().write(out);
746        en.getValue().write(out);
747      }
748    }
749
750    @Override
751    public void readFields(DataInput in) throws IOException {
752      int sz = in.readInt();
753      if (sz < 0) throw new IOException("Invalid size: " + sz + " for file metadata object");
754      this.theMetadata = new TreeMap<Text, Text>();
755      for (int i = 0; i < sz; i++) {
756        Text key = new Text();
757        Text val = new Text();
758        key.readFields(in);
759        val.readFields(in);
760        this.theMetadata.put(key, val);
761      }    
762    }
763
764    @Override
765    public boolean equals(Object other) {
766      if (other == null) {
767        return false;
768      }
769      if (other.getClass() != this.getClass()) {
770        return false;
771      } else {
772        return equals((Metadata)other);
773      }
774    }
775    
776    public boolean equals(Metadata other) {
777      if (other == null) return false;
778      if (this.theMetadata.size() != other.theMetadata.size()) {
779        return false;
780      }
781      Iterator<Map.Entry<Text, Text>> iter1 =
782        this.theMetadata.entrySet().iterator();
783      Iterator<Map.Entry<Text, Text>> iter2 =
784        other.theMetadata.entrySet().iterator();
785      while (iter1.hasNext() && iter2.hasNext()) {
786        Map.Entry<Text, Text> en1 = iter1.next();
787        Map.Entry<Text, Text> en2 = iter2.next();
788        if (!en1.getKey().equals(en2.getKey())) {
789          return false;
790        }
791        if (!en1.getValue().equals(en2.getValue())) {
792          return false;
793        }
794      }
795      if (iter1.hasNext() || iter2.hasNext()) {
796        return false;
797      }
798      return true;
799    }
800
801    @Override
802    public int hashCode() {
803      assert false : "hashCode not designed";
804      return 42; // any arbitrary constant will do 
805    }
806    
807    @Override
808    public String toString() {
809      StringBuilder sb = new StringBuilder();
810      sb.append("size: ").append(this.theMetadata.size()).append("\n");
811      Iterator<Map.Entry<Text, Text>> iter =
812        this.theMetadata.entrySet().iterator();
813      while (iter.hasNext()) {
814        Map.Entry<Text, Text> en = iter.next();
815        sb.append("\t").append(en.getKey().toString()).append("\t").append(en.getValue().toString());
816        sb.append("\n");
817      }
818      return sb.toString();
819    }
820  }
821  
822  /** Write key/value pairs to a sequence-format file. */
823  public static class Writer implements java.io.Closeable, Syncable {
824    private Configuration conf;
825    FSDataOutputStream out;
826    boolean ownOutputStream = true;
827    DataOutputBuffer buffer = new DataOutputBuffer();
828
829    Class keyClass;
830    Class valClass;
831
832    private final CompressionType compress;
833    CompressionCodec codec = null;
834    CompressionOutputStream deflateFilter = null;
835    DataOutputStream deflateOut = null;
836    Metadata metadata = null;
837    Compressor compressor = null;
838
839    private boolean appendMode = false;
840
841    protected Serializer keySerializer;
842    protected Serializer uncompressedValSerializer;
843    protected Serializer compressedValSerializer;
844    
845    // Insert a globally unique 16-byte value every few entries, so that one
846    // can seek into the middle of a file and then synchronize with record
847    // starts and ends by scanning for this value.
848    long lastSyncPos;                     // position of last sync
849    byte[] sync;                          // 16 random bytes
850    {
851      try {                                       
852        MessageDigest digester = MessageDigest.getInstance("MD5");
853        long time = Time.now();
854        digester.update((new UID()+"@"+time).getBytes());
855        sync = digester.digest();
856      } catch (Exception e) {
857        throw new RuntimeException(e);
858      }
859    }
860
861    public static interface Option {}
862    
863    static class FileOption extends Options.PathOption 
864                                    implements Option {
865      FileOption(Path path) {
866        super(path);
867      }
868    }
869
870    /**
871     * @deprecated only used for backwards-compatibility in the createWriter methods
872     * that take FileSystem.
873     */
874    @Deprecated
875    private static class FileSystemOption implements Option {
876      private final FileSystem value;
877      protected FileSystemOption(FileSystem value) {
878        this.value = value;
879      }
880      public FileSystem getValue() {
881        return value;
882      }
883    }
884
885    static class StreamOption extends Options.FSDataOutputStreamOption 
886                              implements Option {
887      StreamOption(FSDataOutputStream stream) {
888        super(stream);
889      }
890    }
891
892    static class BufferSizeOption extends Options.IntegerOption
893                                  implements Option {
894      BufferSizeOption(int value) {
895        super(value);
896      }
897    }
898    
899    static class BlockSizeOption extends Options.LongOption implements Option {
900      BlockSizeOption(long value) {
901        super(value);
902      }
903    }
904
905    static class ReplicationOption extends Options.IntegerOption
906                                   implements Option {
907      ReplicationOption(int value) {
908        super(value);
909      }
910    }
911
912    static class AppendIfExistsOption extends Options.BooleanOption implements
913        Option {
914      AppendIfExistsOption(boolean value) {
915        super(value);
916      }
917    }
918
919    static class KeyClassOption extends Options.ClassOption implements Option {
920      KeyClassOption(Class<?> value) {
921        super(value);
922      }
923    }
924
925    static class ValueClassOption extends Options.ClassOption
926                                          implements Option {
927      ValueClassOption(Class<?> value) {
928        super(value);
929      }
930    }
931
932    static class MetadataOption implements Option {
933      private final Metadata value;
934      MetadataOption(Metadata value) {
935        this.value = value;
936      }
937      Metadata getValue() {
938        return value;
939      }
940    }
941
942    static class ProgressableOption extends Options.ProgressableOption
943                                    implements Option {
944      ProgressableOption(Progressable value) {
945        super(value);
946      }
947    }
948
949    private static class CompressionOption implements Option {
950      private final CompressionType value;
951      private final CompressionCodec codec;
952      CompressionOption(CompressionType value) {
953        this(value, null);
954      }
955      CompressionOption(CompressionType value, CompressionCodec codec) {
956        this.value = value;
957        this.codec = (CompressionType.NONE != value && null == codec)
958          ? new DefaultCodec()
959          : codec;
960      }
961      CompressionType getValue() {
962        return value;
963      }
964      CompressionCodec getCodec() {
965        return codec;
966      }
967    }
968
969    public static Option file(Path value) {
970      return new FileOption(value);
971    }
972
973    /**
974     * @deprecated only used for backwards-compatibility in the createWriter methods
975     * that take FileSystem.
976     */
977    @Deprecated
978    private static Option filesystem(FileSystem fs) {
979      return new SequenceFile.Writer.FileSystemOption(fs);
980    }
981    
982    public static Option bufferSize(int value) {
983      return new BufferSizeOption(value);
984    }
985    
986    public static Option stream(FSDataOutputStream value) {
987      return new StreamOption(value);
988    }
989    
990    public static Option replication(short value) {
991      return new ReplicationOption(value);
992    }
993    
994    public static Option appendIfExists(boolean value) {
995      return new AppendIfExistsOption(value);
996    }
997
998    public static Option blockSize(long value) {
999      return new BlockSizeOption(value);
1000    }
1001    
1002    public static Option progressable(Progressable value) {
1003      return new ProgressableOption(value);
1004    }
1005
1006    public static Option keyClass(Class<?> value) {
1007      return new KeyClassOption(value);
1008    }
1009    
1010    public static Option valueClass(Class<?> value) {
1011      return new ValueClassOption(value);
1012    }
1013    
1014    public static Option metadata(Metadata value) {
1015      return new MetadataOption(value);
1016    }
1017
1018    public static Option compression(CompressionType value) {
1019      return new CompressionOption(value);
1020    }
1021
1022    public static Option compression(CompressionType value,
1023        CompressionCodec codec) {
1024      return new CompressionOption(value, codec);
1025    }
1026    
1027    /**
1028     * Construct a uncompressed writer from a set of options.
1029     * @param conf the configuration to use
1030     * @param options the options used when creating the writer
1031     * @throws IOException if it fails
1032     */
1033    Writer(Configuration conf, 
1034           Option... opts) throws IOException {
1035      BlockSizeOption blockSizeOption = 
1036        Options.getOption(BlockSizeOption.class, opts);
1037      BufferSizeOption bufferSizeOption = 
1038        Options.getOption(BufferSizeOption.class, opts);
1039      ReplicationOption replicationOption = 
1040        Options.getOption(ReplicationOption.class, opts);
1041      ProgressableOption progressOption = 
1042        Options.getOption(ProgressableOption.class, opts);
1043      FileOption fileOption = Options.getOption(FileOption.class, opts);
1044      AppendIfExistsOption appendIfExistsOption = Options.getOption(
1045          AppendIfExistsOption.class, opts);
1046      FileSystemOption fsOption = Options.getOption(FileSystemOption.class, opts);
1047      StreamOption streamOption = Options.getOption(StreamOption.class, opts);
1048      KeyClassOption keyClassOption = 
1049        Options.getOption(KeyClassOption.class, opts);
1050      ValueClassOption valueClassOption = 
1051        Options.getOption(ValueClassOption.class, opts);
1052      MetadataOption metadataOption = 
1053        Options.getOption(MetadataOption.class, opts);
1054      CompressionOption compressionTypeOption =
1055        Options.getOption(CompressionOption.class, opts);
1056      // check consistency of options
1057      if ((fileOption == null) == (streamOption == null)) {
1058        throw new IllegalArgumentException("file or stream must be specified");
1059      }
1060      if (fileOption == null && (blockSizeOption != null ||
1061                                 bufferSizeOption != null ||
1062                                 replicationOption != null ||
1063                                 progressOption != null)) {
1064        throw new IllegalArgumentException("file modifier options not " +
1065                                           "compatible with stream");
1066      }
1067
1068      FSDataOutputStream out;
1069      boolean ownStream = fileOption != null;
1070      if (ownStream) {
1071        Path p = fileOption.getValue();
1072        FileSystem fs;
1073        if (fsOption != null) {
1074          fs = fsOption.getValue();
1075        } else {
1076          fs = p.getFileSystem(conf);
1077        }
1078        int bufferSize = bufferSizeOption == null ? getBufferSize(conf) :
1079          bufferSizeOption.getValue();
1080        short replication = replicationOption == null ? 
1081          fs.getDefaultReplication(p) :
1082          (short) replicationOption.getValue();
1083        long blockSize = blockSizeOption == null ? fs.getDefaultBlockSize(p) :
1084          blockSizeOption.getValue();
1085        Progressable progress = progressOption == null ? null :
1086          progressOption.getValue();
1087
1088        if (appendIfExistsOption != null && appendIfExistsOption.getValue()
1089            && fs.exists(p)) {
1090
1091          // Read the file and verify header details
1092          SequenceFile.Reader reader = new SequenceFile.Reader(conf,
1093              SequenceFile.Reader.file(p), new Reader.OnlyHeaderOption());
1094          try {
1095
1096            if (keyClassOption.getValue() != reader.getKeyClass()
1097                || valueClassOption.getValue() != reader.getValueClass()) {
1098              throw new IllegalArgumentException(
1099                  "Key/value class provided does not match the file");
1100            }
1101
1102            if (reader.getVersion() != VERSION[3]) {
1103              throw new VersionMismatchException(VERSION[3],
1104                  reader.getVersion());
1105            }
1106
1107            if (metadataOption != null) {
1108              LOG.info("MetaData Option is ignored during append");
1109            }
1110            metadataOption = (MetadataOption) SequenceFile.Writer
1111                .metadata(reader.getMetadata());
1112
1113            CompressionOption readerCompressionOption = new CompressionOption(
1114                reader.getCompressionType(), reader.getCompressionCodec());
1115
1116            // Codec comparison will be ignored if the compression is NONE
1117            if (readerCompressionOption.value != compressionTypeOption.value
1118                || (readerCompressionOption.value != CompressionType.NONE
1119                    && readerCompressionOption.codec
1120                        .getClass() != compressionTypeOption.codec
1121                            .getClass())) {
1122              throw new IllegalArgumentException(
1123                  "Compression option provided does not match the file");
1124            }
1125
1126            sync = reader.getSync();
1127
1128          } finally {
1129            reader.close();
1130          }
1131
1132          out = fs.append(p, bufferSize, progress);
1133          this.appendMode = true;
1134        } else {
1135          out = fs
1136              .create(p, true, bufferSize, replication, blockSize, progress);
1137        }
1138      } else {
1139        out = streamOption.getValue();
1140      }
1141      Class<?> keyClass = keyClassOption == null ?
1142          Object.class : keyClassOption.getValue();
1143      Class<?> valueClass = valueClassOption == null ?
1144          Object.class : valueClassOption.getValue();
1145      Metadata metadata = metadataOption == null ?
1146          new Metadata() : metadataOption.getValue();
1147      this.compress = compressionTypeOption.getValue();
1148      final CompressionCodec codec = compressionTypeOption.getCodec();
1149      if (codec != null &&
1150          (codec instanceof GzipCodec) &&
1151          !NativeCodeLoader.isNativeCodeLoaded() &&
1152          !ZlibFactory.isNativeZlibLoaded(conf)) {
1153        throw new IllegalArgumentException("SequenceFile doesn't work with " +
1154                                           "GzipCodec without native-hadoop " +
1155                                           "code!");
1156      }
1157      init(conf, out, ownStream, keyClass, valueClass, codec, metadata);
1158    }
1159
1160    /** Create the named file.
1161     * @deprecated Use 
1162     *   {@link SequenceFile#createWriter(Configuration, Writer.Option...)} 
1163     *   instead.
1164     */
1165    @Deprecated
1166    public Writer(FileSystem fs, Configuration conf, Path name, 
1167                  Class keyClass, Class valClass) throws IOException {
1168      this.compress = CompressionType.NONE;
1169      init(conf, fs.create(name), true, keyClass, valClass, null, 
1170           new Metadata());
1171    }
1172    
1173    /** Create the named file with write-progress reporter.
1174     * @deprecated Use 
1175     *   {@link SequenceFile#createWriter(Configuration, Writer.Option...)} 
1176     *   instead.
1177     */
1178    @Deprecated
1179    public Writer(FileSystem fs, Configuration conf, Path name, 
1180                  Class keyClass, Class valClass,
1181                  Progressable progress, Metadata metadata) throws IOException {
1182      this.compress = CompressionType.NONE;
1183      init(conf, fs.create(name, progress), true, keyClass, valClass,
1184           null, metadata);
1185    }
1186    
1187    /** Create the named file with write-progress reporter. 
1188     * @deprecated Use 
1189     *   {@link SequenceFile#createWriter(Configuration, Writer.Option...)} 
1190     *   instead.
1191     */
1192    @Deprecated
1193    public Writer(FileSystem fs, Configuration conf, Path name,
1194                  Class keyClass, Class valClass,
1195                  int bufferSize, short replication, long blockSize,
1196                  Progressable progress, Metadata metadata) throws IOException {
1197      this.compress = CompressionType.NONE;
1198      init(conf,
1199           fs.create(name, true, bufferSize, replication, blockSize, progress),
1200           true, keyClass, valClass, null, metadata);
1201    }
1202
1203    boolean isCompressed() { return compress != CompressionType.NONE; }
1204    boolean isBlockCompressed() { return compress == CompressionType.BLOCK; }
1205    
1206    Writer ownStream() { this.ownOutputStream = true; return this;  }
1207
1208    /** Write and flush the file header. */
1209    private void writeFileHeader() 
1210      throws IOException {
1211      out.write(VERSION);
1212      Text.writeString(out, keyClass.getName());
1213      Text.writeString(out, valClass.getName());
1214      
1215      out.writeBoolean(this.isCompressed());
1216      out.writeBoolean(this.isBlockCompressed());
1217      
1218      if (this.isCompressed()) {
1219        Text.writeString(out, (codec.getClass()).getName());
1220      }
1221      this.metadata.write(out);
1222      out.write(sync);                       // write the sync bytes
1223      out.flush();                           // flush header
1224    }
1225
1226    /** Initialize. */
1227    @SuppressWarnings("unchecked")
1228    void init(Configuration conf, FSDataOutputStream out, boolean ownStream,
1229              Class keyClass, Class valClass,
1230              CompressionCodec codec, Metadata metadata) 
1231      throws IOException {
1232      this.conf = conf;
1233      this.out = out;
1234      this.ownOutputStream = ownStream;
1235      this.keyClass = keyClass;
1236      this.valClass = valClass;
1237      this.codec = codec;
1238      this.metadata = metadata;
1239      SerializationFactory serializationFactory = new SerializationFactory(conf);
1240      this.keySerializer = serializationFactory.getSerializer(keyClass);
1241      if (this.keySerializer == null) {
1242        throw new IOException(
1243            "Could not find a serializer for the Key class: '"
1244                + keyClass.getCanonicalName() + "'. "
1245                + "Please ensure that the configuration '" +
1246                CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is "
1247                + "properly configured, if you're using"
1248                + "custom serialization.");
1249      }
1250      this.keySerializer.open(buffer);
1251      this.uncompressedValSerializer = serializationFactory.getSerializer(valClass);
1252      if (this.uncompressedValSerializer == null) {
1253        throw new IOException(
1254            "Could not find a serializer for the Value class: '"
1255                + valClass.getCanonicalName() + "'. "
1256                + "Please ensure that the configuration '" +
1257                CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is "
1258                + "properly configured, if you're using"
1259                + "custom serialization.");
1260      }
1261      this.uncompressedValSerializer.open(buffer);
1262      if (this.codec != null) {
1263        ReflectionUtils.setConf(this.codec, this.conf);
1264        this.compressor = CodecPool.getCompressor(this.codec);
1265        this.deflateFilter = this.codec.createOutputStream(buffer, compressor);
1266        this.deflateOut = 
1267          new DataOutputStream(new BufferedOutputStream(deflateFilter));
1268        this.compressedValSerializer = serializationFactory.getSerializer(valClass);
1269        if (this.compressedValSerializer == null) {
1270          throw new IOException(
1271              "Could not find a serializer for the Value class: '"
1272                  + valClass.getCanonicalName() + "'. "
1273                  + "Please ensure that the configuration '" +
1274                  CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is "
1275                  + "properly configured, if you're using"
1276                  + "custom serialization.");
1277        }
1278        this.compressedValSerializer.open(deflateOut);
1279      }
1280
1281      if (appendMode) {
1282        sync();
1283      } else {
1284        writeFileHeader();
1285      }
1286    }
1287    
1288    /** Returns the class of keys in this file. */
1289    public Class getKeyClass() { return keyClass; }
1290
1291    /** Returns the class of values in this file. */
1292    public Class getValueClass() { return valClass; }
1293
1294    /** Returns the compression codec of data in this file. */
1295    public CompressionCodec getCompressionCodec() { return codec; }
1296    
1297    /** create a sync point */
1298    public void sync() throws IOException {
1299      if (sync != null && lastSyncPos != out.getPos()) {
1300        out.writeInt(SYNC_ESCAPE);                // mark the start of the sync
1301        out.write(sync);                          // write sync
1302        lastSyncPos = out.getPos();               // update lastSyncPos
1303      }
1304    }
1305
1306    /**
1307     * flush all currently written data to the file system
1308     * @deprecated Use {@link #hsync()} or {@link #hflush()} instead
1309     */
1310    @Deprecated
1311    public void syncFs() throws IOException {
1312      if (out != null) {
1313        out.sync();                               // flush contents to file system
1314      }
1315    }
1316
1317    @Override
1318    public void hsync() throws IOException {
1319      if (out != null) {
1320        out.hsync();
1321      }
1322    }
1323
1324    @Override
1325    public void hflush() throws IOException {
1326      if (out != null) {
1327        out.hflush();
1328      }
1329    }
1330    
1331    /** Returns the configuration of this file. */
1332    Configuration getConf() { return conf; }
1333    
1334    /** Close the file. */
1335    @Override
1336    public synchronized void close() throws IOException {
1337      keySerializer.close();
1338      uncompressedValSerializer.close();
1339      if (compressedValSerializer != null) {
1340        compressedValSerializer.close();
1341      }
1342
1343      CodecPool.returnCompressor(compressor);
1344      compressor = null;
1345      
1346      if (out != null) {
1347        
1348        // Close the underlying stream iff we own it...
1349        if (ownOutputStream) {
1350          out.close();
1351        } else {
1352          out.flush();
1353        }
1354        out = null;
1355      }
1356    }
1357
1358    synchronized void checkAndWriteSync() throws IOException {
1359      if (sync != null &&
1360          out.getPos() >= lastSyncPos+SYNC_INTERVAL) { // time to emit sync
1361        sync();
1362      }
1363    }
1364
1365    /** Append a key/value pair. */
1366    public void append(Writable key, Writable val)
1367      throws IOException {
1368      append((Object) key, (Object) val);
1369    }
1370
1371    /** Append a key/value pair. */
1372    @SuppressWarnings("unchecked")
1373    public synchronized void append(Object key, Object val)
1374      throws IOException {
1375      if (key.getClass() != keyClass)
1376        throw new IOException("wrong key class: "+key.getClass().getName()
1377                              +" is not "+keyClass);
1378      if (val.getClass() != valClass)
1379        throw new IOException("wrong value class: "+val.getClass().getName()
1380                              +" is not "+valClass);
1381
1382      buffer.reset();
1383
1384      // Append the 'key'
1385      keySerializer.serialize(key);
1386      int keyLength = buffer.getLength();
1387      if (keyLength < 0)
1388        throw new IOException("negative length keys not allowed: " + key);
1389
1390      // Append the 'value'
1391      if (compress == CompressionType.RECORD) {
1392        deflateFilter.resetState();
1393        compressedValSerializer.serialize(val);
1394        deflateOut.flush();
1395        deflateFilter.finish();
1396      } else {
1397        uncompressedValSerializer.serialize(val);
1398      }
1399
1400      // Write the record out
1401      checkAndWriteSync();                                // sync
1402      out.writeInt(buffer.getLength());                   // total record length
1403      out.writeInt(keyLength);                            // key portion length
1404      out.write(buffer.getData(), 0, buffer.getLength()); // data
1405    }
1406
1407    public synchronized void appendRaw(byte[] keyData, int keyOffset,
1408        int keyLength, ValueBytes val) throws IOException {
1409      if (keyLength < 0)
1410        throw new IOException("negative length keys not allowed: " + keyLength);
1411
1412      int valLength = val.getSize();
1413
1414      checkAndWriteSync();
1415      
1416      out.writeInt(keyLength+valLength);          // total record length
1417      out.writeInt(keyLength);                    // key portion length
1418      out.write(keyData, keyOffset, keyLength);   // key
1419      val.writeUncompressedBytes(out);            // value
1420    }
1421
1422    /** Returns the current length of the output file.
1423     *
1424     * <p>This always returns a synchronized position.  In other words,
1425     * immediately after calling {@link SequenceFile.Reader#seek(long)} with a position
1426     * returned by this method, {@link SequenceFile.Reader#next(Writable)} may be called.  However
1427     * the key may be earlier in the file than key last written when this
1428     * method was called (e.g., with block-compression, it may be the first key
1429     * in the block that was being written when this method was called).
1430     */
1431    public synchronized long getLength() throws IOException {
1432      return out.getPos();
1433    }
1434
1435  } // class Writer
1436
1437  /** Write key/compressed-value pairs to a sequence-format file. */
1438  static class RecordCompressWriter extends Writer {
1439    
1440    RecordCompressWriter(Configuration conf, 
1441                         Option... options) throws IOException {
1442      super(conf, options);
1443    }
1444
1445    /** Append a key/value pair. */
1446    @Override
1447    @SuppressWarnings("unchecked")
1448    public synchronized void append(Object key, Object val)
1449      throws IOException {
1450      if (key.getClass() != keyClass)
1451        throw new IOException("wrong key class: "+key.getClass().getName()
1452                              +" is not "+keyClass);
1453      if (val.getClass() != valClass)
1454        throw new IOException("wrong value class: "+val.getClass().getName()
1455                              +" is not "+valClass);
1456
1457      buffer.reset();
1458
1459      // Append the 'key'
1460      keySerializer.serialize(key);
1461      int keyLength = buffer.getLength();
1462      if (keyLength < 0)
1463        throw new IOException("negative length keys not allowed: " + key);
1464
1465      // Compress 'value' and append it
1466      deflateFilter.resetState();
1467      compressedValSerializer.serialize(val);
1468      deflateOut.flush();
1469      deflateFilter.finish();
1470
1471      // Write the record out
1472      checkAndWriteSync();                                // sync
1473      out.writeInt(buffer.getLength());                   // total record length
1474      out.writeInt(keyLength);                            // key portion length
1475      out.write(buffer.getData(), 0, buffer.getLength()); // data
1476    }
1477
1478    /** Append a key/value pair. */
1479    @Override
1480    public synchronized void appendRaw(byte[] keyData, int keyOffset,
1481        int keyLength, ValueBytes val) throws IOException {
1482
1483      if (keyLength < 0)
1484        throw new IOException("negative length keys not allowed: " + keyLength);
1485
1486      int valLength = val.getSize();
1487      
1488      checkAndWriteSync();                        // sync
1489      out.writeInt(keyLength+valLength);          // total record length
1490      out.writeInt(keyLength);                    // key portion length
1491      out.write(keyData, keyOffset, keyLength);   // 'key' data
1492      val.writeCompressedBytes(out);              // 'value' data
1493    }
1494    
1495  } // RecordCompressionWriter
1496
1497  /** Write compressed key/value blocks to a sequence-format file. */
1498  static class BlockCompressWriter extends Writer {
1499    
1500    private int noBufferedRecords = 0;
1501    
1502    private DataOutputBuffer keyLenBuffer = new DataOutputBuffer();
1503    private DataOutputBuffer keyBuffer = new DataOutputBuffer();
1504
1505    private DataOutputBuffer valLenBuffer = new DataOutputBuffer();
1506    private DataOutputBuffer valBuffer = new DataOutputBuffer();
1507
1508    private final int compressionBlockSize;
1509    
1510    BlockCompressWriter(Configuration conf,
1511                        Option... options) throws IOException {
1512      super(conf, options);
1513      compressionBlockSize = 
1514        conf.getInt("io.seqfile.compress.blocksize", 1000000);
1515      keySerializer.close();
1516      keySerializer.open(keyBuffer);
1517      uncompressedValSerializer.close();
1518      uncompressedValSerializer.open(valBuffer);
1519    }
1520
1521    /** Workhorse to check and write out compressed data/lengths */
1522    private synchronized 
1523      void writeBuffer(DataOutputBuffer uncompressedDataBuffer) 
1524      throws IOException {
1525      deflateFilter.resetState();
1526      buffer.reset();
1527      deflateOut.write(uncompressedDataBuffer.getData(), 0, 
1528                       uncompressedDataBuffer.getLength());
1529      deflateOut.flush();
1530      deflateFilter.finish();
1531      
1532      WritableUtils.writeVInt(out, buffer.getLength());
1533      out.write(buffer.getData(), 0, buffer.getLength());
1534    }
1535    
1536    /** Compress and flush contents to dfs */
1537    @Override
1538    public synchronized void sync() throws IOException {
1539      if (noBufferedRecords > 0) {
1540        super.sync();
1541        
1542        // No. of records
1543        WritableUtils.writeVInt(out, noBufferedRecords);
1544        
1545        // Write 'keys' and lengths
1546        writeBuffer(keyLenBuffer);
1547        writeBuffer(keyBuffer);
1548        
1549        // Write 'values' and lengths
1550        writeBuffer(valLenBuffer);
1551        writeBuffer(valBuffer);
1552        
1553        // Flush the file-stream
1554        out.flush();
1555        
1556        // Reset internal states
1557        keyLenBuffer.reset();
1558        keyBuffer.reset();
1559        valLenBuffer.reset();
1560        valBuffer.reset();
1561        noBufferedRecords = 0;
1562      }
1563      
1564    }
1565    
1566    /** Close the file. */
1567    @Override
1568    public synchronized void close() throws IOException {
1569      if (out != null) {
1570        sync();
1571      }
1572      super.close();
1573    }
1574
1575    /** Append a key/value pair. */
1576    @Override
1577    @SuppressWarnings("unchecked")
1578    public synchronized void append(Object key, Object val)
1579      throws IOException {
1580      if (key.getClass() != keyClass)
1581        throw new IOException("wrong key class: "+key+" is not "+keyClass);
1582      if (val.getClass() != valClass)
1583        throw new IOException("wrong value class: "+val+" is not "+valClass);
1584
1585      // Save key/value into respective buffers 
1586      int oldKeyLength = keyBuffer.getLength();
1587      keySerializer.serialize(key);
1588      int keyLength = keyBuffer.getLength() - oldKeyLength;
1589      if (keyLength < 0)
1590        throw new IOException("negative length keys not allowed: " + key);
1591      WritableUtils.writeVInt(keyLenBuffer, keyLength);
1592
1593      int oldValLength = valBuffer.getLength();
1594      uncompressedValSerializer.serialize(val);
1595      int valLength = valBuffer.getLength() - oldValLength;
1596      WritableUtils.writeVInt(valLenBuffer, valLength);
1597      
1598      // Added another key/value pair
1599      ++noBufferedRecords;
1600      
1601      // Compress and flush?
1602      int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength();
1603      if (currentBlockSize >= compressionBlockSize) {
1604        sync();
1605      }
1606    }
1607    
1608    /** Append a key/value pair. */
1609    @Override
1610    public synchronized void appendRaw(byte[] keyData, int keyOffset,
1611        int keyLength, ValueBytes val) throws IOException {
1612      
1613      if (keyLength < 0)
1614        throw new IOException("negative length keys not allowed");
1615
1616      int valLength = val.getSize();
1617      
1618      // Save key/value data in relevant buffers
1619      WritableUtils.writeVInt(keyLenBuffer, keyLength);
1620      keyBuffer.write(keyData, keyOffset, keyLength);
1621      WritableUtils.writeVInt(valLenBuffer, valLength);
1622      val.writeUncompressedBytes(valBuffer);
1623
1624      // Added another key/value pair
1625      ++noBufferedRecords;
1626
1627      // Compress and flush?
1628      int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength(); 
1629      if (currentBlockSize >= compressionBlockSize) {
1630        sync();
1631      }
1632    }
1633  
1634  } // BlockCompressionWriter
1635
1636  /** Get the configured buffer size */
1637  private static int getBufferSize(Configuration conf) {
1638    return conf.getInt("io.file.buffer.size", 4096);
1639  }
1640
1641  /** Reads key/value pairs from a sequence-format file. */
1642  public static class Reader implements java.io.Closeable {
1643    private String filename;
1644    private FSDataInputStream in;
1645    private DataOutputBuffer outBuf = new DataOutputBuffer();
1646
1647    private byte version;
1648
1649    private String keyClassName;
1650    private String valClassName;
1651    private Class keyClass;
1652    private Class valClass;
1653
1654    private CompressionCodec codec = null;
1655    private Metadata metadata = null;
1656    
1657    private byte[] sync = new byte[SYNC_HASH_SIZE];
1658    private byte[] syncCheck = new byte[SYNC_HASH_SIZE];
1659    private boolean syncSeen;
1660
1661    private long headerEnd;
1662    private long end;
1663    private int keyLength;
1664    private int recordLength;
1665
1666    private boolean decompress;
1667    private boolean blockCompressed;
1668    
1669    private Configuration conf;
1670
1671    private int noBufferedRecords = 0;
1672    private boolean lazyDecompress = true;
1673    private boolean valuesDecompressed = true;
1674    
1675    private int noBufferedKeys = 0;
1676    private int noBufferedValues = 0;
1677    
1678    private DataInputBuffer keyLenBuffer = null;
1679    private CompressionInputStream keyLenInFilter = null;
1680    private DataInputStream keyLenIn = null;
1681    private Decompressor keyLenDecompressor = null;
1682    private DataInputBuffer keyBuffer = null;
1683    private CompressionInputStream keyInFilter = null;
1684    private DataInputStream keyIn = null;
1685    private Decompressor keyDecompressor = null;
1686
1687    private DataInputBuffer valLenBuffer = null;
1688    private CompressionInputStream valLenInFilter = null;
1689    private DataInputStream valLenIn = null;
1690    private Decompressor valLenDecompressor = null;
1691    private DataInputBuffer valBuffer = null;
1692    private CompressionInputStream valInFilter = null;
1693    private DataInputStream valIn = null;
1694    private Decompressor valDecompressor = null;
1695    
1696    private Deserializer keyDeserializer;
1697    private Deserializer valDeserializer;
1698
1699    /**
1700     * A tag interface for all of the Reader options
1701     */
1702    public static interface Option {}
1703    
1704    /**
1705     * Create an option to specify the path name of the sequence file.
1706     * @param value the path to read
1707     * @return a new option
1708     */
1709    public static Option file(Path value) {
1710      return new FileOption(value);
1711    }
1712    
1713    /**
1714     * Create an option to specify the stream with the sequence file.
1715     * @param value the stream to read.
1716     * @return a new option
1717     */
1718    public static Option stream(FSDataInputStream value) {
1719      return new InputStreamOption(value);
1720    }
1721    
1722    /**
1723     * Create an option to specify the starting byte to read.
1724     * @param value the number of bytes to skip over
1725     * @return a new option
1726     */
1727    public static Option start(long value) {
1728      return new StartOption(value);
1729    }
1730    
1731    /**
1732     * Create an option to specify the number of bytes to read.
1733     * @param value the number of bytes to read
1734     * @return a new option
1735     */
1736    public static Option length(long value) {
1737      return new LengthOption(value);
1738    }
1739    
1740    /**
1741     * Create an option with the buffer size for reading the given pathname.
1742     * @param value the number of bytes to buffer
1743     * @return a new option
1744     */
1745    public static Option bufferSize(int value) {
1746      return new BufferSizeOption(value);
1747    }
1748
1749    private static class FileOption extends Options.PathOption 
1750                                    implements Option {
1751      private FileOption(Path value) {
1752        super(value);
1753      }
1754    }
1755    
1756    private static class InputStreamOption
1757        extends Options.FSDataInputStreamOption 
1758        implements Option {
1759      private InputStreamOption(FSDataInputStream value) {
1760        super(value);
1761      }
1762    }
1763
1764    private static class StartOption extends Options.LongOption
1765                                     implements Option {
1766      private StartOption(long value) {
1767        super(value);
1768      }
1769    }
1770
1771    private static class LengthOption extends Options.LongOption
1772                                      implements Option {
1773      private LengthOption(long value) {
1774        super(value);
1775      }
1776    }
1777
1778    private static class BufferSizeOption extends Options.IntegerOption
1779                                      implements Option {
1780      private BufferSizeOption(int value) {
1781        super(value);
1782      }
1783    }
1784
1785    // only used directly
1786    private static class OnlyHeaderOption extends Options.BooleanOption 
1787                                          implements Option {
1788      private OnlyHeaderOption() {
1789        super(true);
1790      }
1791    }
1792
1793    public Reader(Configuration conf, Option... opts) throws IOException {
1794      // Look up the options, these are null if not set
1795      FileOption fileOpt = Options.getOption(FileOption.class, opts);
1796      InputStreamOption streamOpt = 
1797        Options.getOption(InputStreamOption.class, opts);
1798      StartOption startOpt = Options.getOption(StartOption.class, opts);
1799      LengthOption lenOpt = Options.getOption(LengthOption.class, opts);
1800      BufferSizeOption bufOpt = Options.getOption(BufferSizeOption.class,opts);
1801      OnlyHeaderOption headerOnly = 
1802        Options.getOption(OnlyHeaderOption.class, opts);
1803      // check for consistency
1804      if ((fileOpt == null) == (streamOpt == null)) {
1805        throw new 
1806          IllegalArgumentException("File or stream option must be specified");
1807      }
1808      if (fileOpt == null && bufOpt != null) {
1809        throw new IllegalArgumentException("buffer size can only be set when" +
1810                                           " a file is specified.");
1811      }
1812      // figure out the real values
1813      Path filename = null;
1814      FSDataInputStream file;
1815      final long len;
1816      if (fileOpt != null) {
1817        filename = fileOpt.getValue();
1818        FileSystem fs = filename.getFileSystem(conf);
1819        int bufSize = bufOpt == null ? getBufferSize(conf): bufOpt.getValue();
1820        len = null == lenOpt
1821          ? fs.getFileStatus(filename).getLen()
1822          : lenOpt.getValue();
1823        file = openFile(fs, filename, bufSize, len);
1824      } else {
1825        len = null == lenOpt ? Long.MAX_VALUE : lenOpt.getValue();
1826        file = streamOpt.getValue();
1827      }
1828      long start = startOpt == null ? 0 : startOpt.getValue();
1829      // really set up
1830      initialize(filename, file, start, len, conf, headerOnly != null);
1831    }
1832
1833    /**
1834     * Construct a reader by opening a file from the given file system.
1835     * @param fs The file system used to open the file.
1836     * @param file The file being read.
1837     * @param conf Configuration
1838     * @throws IOException
1839     * @deprecated Use Reader(Configuration, Option...) instead.
1840     */
1841    @Deprecated
1842    public Reader(FileSystem fs, Path file, 
1843                  Configuration conf) throws IOException {
1844      this(conf, file(file.makeQualified(fs)));
1845    }
1846
1847    /**
1848     * Construct a reader by the given input stream.
1849     * @param in An input stream.
1850     * @param buffersize unused
1851     * @param start The starting position.
1852     * @param length The length being read.
1853     * @param conf Configuration
1854     * @throws IOException
1855     * @deprecated Use Reader(Configuration, Reader.Option...) instead.
1856     */
1857    @Deprecated
1858    public Reader(FSDataInputStream in, int buffersize,
1859        long start, long length, Configuration conf) throws IOException {
1860      this(conf, stream(in), start(start), length(length));
1861    }
1862
1863    /** Common work of the constructors. */
1864    private void initialize(Path filename, FSDataInputStream in,
1865                            long start, long length, Configuration conf,
1866                            boolean tempReader) throws IOException {
1867      if (in == null) {
1868        throw new IllegalArgumentException("in == null");
1869      }
1870      this.filename = filename == null ? "<unknown>" : filename.toString();
1871      this.in = in;
1872      this.conf = conf;
1873      boolean succeeded = false;
1874      try {
1875        seek(start);
1876        this.end = this.in.getPos() + length;
1877        // if it wrapped around, use the max
1878        if (end < length) {
1879          end = Long.MAX_VALUE;
1880        }
1881        init(tempReader);
1882        succeeded = true;
1883      } finally {
1884        if (!succeeded) {
1885          IOUtils.cleanup(LOG, this.in);
1886        }
1887      }
1888    }
1889
1890    /**
1891     * Override this method to specialize the type of
1892     * {@link FSDataInputStream} returned.
1893     * @param fs The file system used to open the file.
1894     * @param file The file being read.
1895     * @param bufferSize The buffer size used to read the file.
1896     * @param length The length being read if it is >= 0.  Otherwise,
1897     *               the length is not available.
1898     * @return The opened stream.
1899     * @throws IOException
1900     */
1901    protected FSDataInputStream openFile(FileSystem fs, Path file,
1902        int bufferSize, long length) throws IOException {
1903      return fs.open(file, bufferSize);
1904    }
1905    
1906    /**
1907     * Initialize the {@link Reader}
1908     * @param tmpReader <code>true</code> if we are constructing a temporary
1909     *                  reader {@link SequenceFile.Sorter.cloneFileAttributes}, 
1910     *                  and hence do not initialize every component; 
1911     *                  <code>false</code> otherwise.
1912     * @throws IOException
1913     */
1914    private void init(boolean tempReader) throws IOException {
1915      byte[] versionBlock = new byte[VERSION.length];
1916      String exceptionMsg = this + " not a SequenceFile";
1917
1918      // Try to read sequence file header.
1919      try {
1920        in.readFully(versionBlock);
1921      } catch (EOFException e) {
1922        throw new EOFException(exceptionMsg);
1923      }
1924
1925      if ((versionBlock[0] != VERSION[0]) ||
1926          (versionBlock[1] != VERSION[1]) ||
1927          (versionBlock[2] != VERSION[2])) {
1928        throw new IOException(this + " not a SequenceFile");
1929      }
1930
1931      // Set 'version'
1932      version = versionBlock[3];
1933      if (version > VERSION[3]) {
1934        throw new VersionMismatchException(VERSION[3], version);
1935      }
1936
1937      if (version < BLOCK_COMPRESS_VERSION) {
1938        UTF8 className = new UTF8();
1939
1940        className.readFields(in);
1941        keyClassName = className.toStringChecked(); // key class name
1942
1943        className.readFields(in);
1944        valClassName = className.toStringChecked(); // val class name
1945      } else {
1946        keyClassName = Text.readString(in);
1947        valClassName = Text.readString(in);
1948      }
1949
1950      if (version > 2) {                          // if version > 2
1951        this.decompress = in.readBoolean();       // is compressed?
1952      } else {
1953        decompress = false;
1954      }
1955
1956      if (version >= BLOCK_COMPRESS_VERSION) {    // if version >= 4
1957        this.blockCompressed = in.readBoolean();  // is block-compressed?
1958      } else {
1959        blockCompressed = false;
1960      }
1961      
1962      // if version >= 5
1963      // setup the compression codec
1964      if (decompress) {
1965        if (version >= CUSTOM_COMPRESS_VERSION) {
1966          String codecClassname = Text.readString(in);
1967          try {
1968            Class<? extends CompressionCodec> codecClass
1969              = conf.getClassByName(codecClassname).asSubclass(CompressionCodec.class);
1970            this.codec = ReflectionUtils.newInstance(codecClass, conf);
1971          } catch (ClassNotFoundException cnfe) {
1972            throw new IllegalArgumentException("Unknown codec: " + 
1973                                               codecClassname, cnfe);
1974          }
1975        } else {
1976          codec = new DefaultCodec();
1977          ((Configurable)codec).setConf(conf);
1978        }
1979      }
1980      
1981      this.metadata = new Metadata();
1982      if (version >= VERSION_WITH_METADATA) {    // if version >= 6
1983        this.metadata.readFields(in);
1984      }
1985      
1986      if (version > 1) {                          // if version > 1
1987        in.readFully(sync);                       // read sync bytes
1988        headerEnd = in.getPos();                  // record end of header
1989      }
1990      
1991      // Initialize... *not* if this we are constructing a temporary Reader
1992      if (!tempReader) {
1993        valBuffer = new DataInputBuffer();
1994        if (decompress) {
1995          valDecompressor = CodecPool.getDecompressor(codec);
1996          valInFilter = codec.createInputStream(valBuffer, valDecompressor);
1997          valIn = new DataInputStream(valInFilter);
1998        } else {
1999          valIn = valBuffer;
2000        }
2001
2002        if (blockCompressed) {
2003          keyLenBuffer = new DataInputBuffer();
2004          keyBuffer = new DataInputBuffer();
2005          valLenBuffer = new DataInputBuffer();
2006
2007          keyLenDecompressor = CodecPool.getDecompressor(codec);
2008          keyLenInFilter = codec.createInputStream(keyLenBuffer, 
2009                                                   keyLenDecompressor);
2010          keyLenIn = new DataInputStream(keyLenInFilter);
2011
2012          keyDecompressor = CodecPool.getDecompressor(codec);
2013          keyInFilter = codec.createInputStream(keyBuffer, keyDecompressor);
2014          keyIn = new DataInputStream(keyInFilter);
2015
2016          valLenDecompressor = CodecPool.getDecompressor(codec);
2017          valLenInFilter = codec.createInputStream(valLenBuffer, 
2018                                                   valLenDecompressor);
2019          valLenIn = new DataInputStream(valLenInFilter);
2020        }
2021        
2022        SerializationFactory serializationFactory =
2023          new SerializationFactory(conf);
2024        this.keyDeserializer =
2025          getDeserializer(serializationFactory, getKeyClass());
2026        if (this.keyDeserializer == null) {
2027          throw new IOException(
2028              "Could not find a deserializer for the Key class: '"
2029                  + getKeyClass().getCanonicalName() + "'. "
2030                  + "Please ensure that the configuration '" +
2031                  CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is "
2032                  + "properly configured, if you're using "
2033                  + "custom serialization.");
2034        }
2035        if (!blockCompressed) {
2036          this.keyDeserializer.open(valBuffer);
2037        } else {
2038          this.keyDeserializer.open(keyIn);
2039        }
2040        this.valDeserializer =
2041          getDeserializer(serializationFactory, getValueClass());
2042        if (this.valDeserializer == null) {
2043          throw new IOException(
2044              "Could not find a deserializer for the Value class: '"
2045                  + getValueClass().getCanonicalName() + "'. "
2046                  + "Please ensure that the configuration '" +
2047                  CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is "
2048                  + "properly configured, if you're using "
2049                  + "custom serialization.");
2050        }
2051        this.valDeserializer.open(valIn);
2052      }
2053    }
2054    
2055    @SuppressWarnings("unchecked")
2056    private Deserializer getDeserializer(SerializationFactory sf, Class c) {
2057      return sf.getDeserializer(c);
2058    }
2059    
2060    /** Close the file. */
2061    @Override
2062    public synchronized void close() throws IOException {
2063      // Return the decompressors to the pool
2064      CodecPool.returnDecompressor(keyLenDecompressor);
2065      CodecPool.returnDecompressor(keyDecompressor);
2066      CodecPool.returnDecompressor(valLenDecompressor);
2067      CodecPool.returnDecompressor(valDecompressor);
2068      keyLenDecompressor = keyDecompressor = null;
2069      valLenDecompressor = valDecompressor = null;
2070      
2071      if (keyDeserializer != null) {
2072        keyDeserializer.close();
2073      }
2074      if (valDeserializer != null) {
2075        valDeserializer.close();
2076      }
2077      
2078      // Close the input-stream
2079      in.close();
2080    }
2081
2082    /** Returns the name of the key class. */
2083    public String getKeyClassName() {
2084      return keyClassName;
2085    }
2086
2087    /** Returns the class of keys in this file. */
2088    public synchronized Class<?> getKeyClass() {
2089      if (null == keyClass) {
2090        try {
2091          keyClass = WritableName.getClass(getKeyClassName(), conf);
2092        } catch (IOException e) {
2093          throw new RuntimeException(e);
2094        }
2095      }
2096      return keyClass;
2097    }
2098
2099    /** Returns the name of the value class. */
2100    public String getValueClassName() {
2101      return valClassName;
2102    }
2103
2104    /** Returns the class of values in this file. */
2105    public synchronized Class<?> getValueClass() {
2106      if (null == valClass) {
2107        try {
2108          valClass = WritableName.getClass(getValueClassName(), conf);
2109        } catch (IOException e) {
2110          throw new RuntimeException(e);
2111        }
2112      }
2113      return valClass;
2114    }
2115
2116    /** Returns true if values are compressed. */
2117    public boolean isCompressed() { return decompress; }
2118    
2119    /** Returns true if records are block-compressed. */
2120    public boolean isBlockCompressed() { return blockCompressed; }
2121    
2122    /** Returns the compression codec of data in this file. */
2123    public CompressionCodec getCompressionCodec() { return codec; }
2124    
2125    private byte[] getSync() {
2126      return sync;
2127    }
2128
2129    private byte getVersion() {
2130      return version;
2131    }
2132
2133    /**
2134     * Get the compression type for this file.
2135     * @return the compression type
2136     */
2137    public CompressionType getCompressionType() {
2138      if (decompress) {
2139        return blockCompressed ? CompressionType.BLOCK : CompressionType.RECORD;
2140      } else {
2141        return CompressionType.NONE;
2142      }
2143    }
2144
2145    /** Returns the metadata object of the file */
2146    public Metadata getMetadata() {
2147      return this.metadata;
2148    }
2149    
2150    /** Returns the configuration used for this file. */
2151    Configuration getConf() { return conf; }
2152    
2153    /** Read a compressed buffer */
2154    private synchronized void readBuffer(DataInputBuffer buffer, 
2155                                         CompressionInputStream filter) throws IOException {
2156      // Read data into a temporary buffer
2157      DataOutputBuffer dataBuffer = new DataOutputBuffer();
2158
2159      try {
2160        int dataBufferLength = WritableUtils.readVInt(in);
2161        dataBuffer.write(in, dataBufferLength);
2162      
2163        // Set up 'buffer' connected to the input-stream
2164        buffer.reset(dataBuffer.getData(), 0, dataBuffer.getLength());
2165      } finally {
2166        dataBuffer.close();
2167      }
2168
2169      // Reset the codec
2170      filter.resetState();
2171    }
2172    
2173    /** Read the next 'compressed' block */
2174    private synchronized void readBlock() throws IOException {
2175      // Check if we need to throw away a whole block of 
2176      // 'values' due to 'lazy decompression' 
2177      if (lazyDecompress && !valuesDecompressed) {
2178        in.seek(WritableUtils.readVInt(in)+in.getPos());
2179        in.seek(WritableUtils.readVInt(in)+in.getPos());
2180      }
2181      
2182      // Reset internal states
2183      noBufferedKeys = 0; noBufferedValues = 0; noBufferedRecords = 0;
2184      valuesDecompressed = false;
2185
2186      //Process sync
2187      if (sync != null) {
2188        in.readInt();
2189        in.readFully(syncCheck);                // read syncCheck
2190        if (!Arrays.equals(sync, syncCheck))    // check it
2191          throw new IOException("File is corrupt!");
2192      }
2193      syncSeen = true;
2194
2195      // Read number of records in this block
2196      noBufferedRecords = WritableUtils.readVInt(in);
2197      
2198      // Read key lengths and keys
2199      readBuffer(keyLenBuffer, keyLenInFilter);
2200      readBuffer(keyBuffer, keyInFilter);
2201      noBufferedKeys = noBufferedRecords;
2202      
2203      // Read value lengths and values
2204      if (!lazyDecompress) {
2205        readBuffer(valLenBuffer, valLenInFilter);
2206        readBuffer(valBuffer, valInFilter);
2207        noBufferedValues = noBufferedRecords;
2208        valuesDecompressed = true;
2209      }
2210    }
2211
2212    /** 
2213     * Position valLenIn/valIn to the 'value' 
2214     * corresponding to the 'current' key 
2215     */
2216    private synchronized void seekToCurrentValue() throws IOException {
2217      if (!blockCompressed) {
2218        if (decompress) {
2219          valInFilter.resetState();
2220        }
2221        valBuffer.reset();
2222      } else {
2223        // Check if this is the first value in the 'block' to be read
2224        if (lazyDecompress && !valuesDecompressed) {
2225          // Read the value lengths and values
2226          readBuffer(valLenBuffer, valLenInFilter);
2227          readBuffer(valBuffer, valInFilter);
2228          noBufferedValues = noBufferedRecords;
2229          valuesDecompressed = true;
2230        }
2231        
2232        // Calculate the no. of bytes to skip
2233        // Note: 'current' key has already been read!
2234        int skipValBytes = 0;
2235        int currentKey = noBufferedKeys + 1;          
2236        for (int i=noBufferedValues; i > currentKey; --i) {
2237          skipValBytes += WritableUtils.readVInt(valLenIn);
2238          --noBufferedValues;
2239        }
2240        
2241        // Skip to the 'val' corresponding to 'current' key
2242        if (skipValBytes > 0) {
2243          if (valIn.skipBytes(skipValBytes) != skipValBytes) {
2244            throw new IOException("Failed to seek to " + currentKey + 
2245                                  "(th) value!");
2246          }
2247        }
2248      }
2249    }
2250
2251    /**
2252     * Get the 'value' corresponding to the last read 'key'.
2253     * @param val : The 'value' to be read.
2254     * @throws IOException
2255     */
2256    public synchronized void getCurrentValue(Writable val) 
2257      throws IOException {
2258      if (val instanceof Configurable) {
2259        ((Configurable) val).setConf(this.conf);
2260      }
2261
2262      // Position stream to 'current' value
2263      seekToCurrentValue();
2264
2265      if (!blockCompressed) {
2266        val.readFields(valIn);
2267        
2268        if (valIn.read() > 0) {
2269          LOG.info("available bytes: " + valIn.available());
2270          throw new IOException(val+" read "+(valBuffer.getPosition()-keyLength)
2271                                + " bytes, should read " +
2272                                (valBuffer.getLength()-keyLength));
2273        }
2274      } else {
2275        // Get the value
2276        int valLength = WritableUtils.readVInt(valLenIn);
2277        val.readFields(valIn);
2278        
2279        // Read another compressed 'value'
2280        --noBufferedValues;
2281        
2282        // Sanity check
2283        if ((valLength < 0) && LOG.isDebugEnabled()) {
2284          LOG.debug(val + " is a zero-length value");
2285        }
2286      }
2287
2288    }
2289    
2290    /**
2291     * Get the 'value' corresponding to the last read 'key'.
2292     * @param val : The 'value' to be read.
2293     * @throws IOException
2294     */
2295    public synchronized Object getCurrentValue(Object val) 
2296      throws IOException {
2297      if (val instanceof Configurable) {
2298        ((Configurable) val).setConf(this.conf);
2299      }
2300
2301      // Position stream to 'current' value
2302      seekToCurrentValue();
2303
2304      if (!blockCompressed) {
2305        val = deserializeValue(val);
2306        
2307        if (valIn.read() > 0) {
2308          LOG.info("available bytes: " + valIn.available());
2309          throw new IOException(val+" read "+(valBuffer.getPosition()-keyLength)
2310                                + " bytes, should read " +
2311                                (valBuffer.getLength()-keyLength));
2312        }
2313      } else {
2314        // Get the value
2315        int valLength = WritableUtils.readVInt(valLenIn);
2316        val = deserializeValue(val);
2317        
2318        // Read another compressed 'value'
2319        --noBufferedValues;
2320        
2321        // Sanity check
2322        if ((valLength < 0) && LOG.isDebugEnabled()) {
2323          LOG.debug(val + " is a zero-length value");
2324        }
2325      }
2326      return val;
2327
2328    }
2329
2330    @SuppressWarnings("unchecked")
2331    private Object deserializeValue(Object val) throws IOException {
2332      return valDeserializer.deserialize(val);
2333    }
2334    
2335    /** Read the next key in the file into <code>key</code>, skipping its
2336     * value.  True if another entry exists, and false at end of file. */
2337    public synchronized boolean next(Writable key) throws IOException {
2338      if (key.getClass() != getKeyClass())
2339        throw new IOException("wrong key class: "+key.getClass().getName()
2340                              +" is not "+keyClass);
2341
2342      if (!blockCompressed) {
2343        outBuf.reset();
2344        
2345        keyLength = next(outBuf);
2346        if (keyLength < 0)
2347          return false;
2348        
2349        valBuffer.reset(outBuf.getData(), outBuf.getLength());
2350        
2351        key.readFields(valBuffer);
2352        valBuffer.mark(0);
2353        if (valBuffer.getPosition() != keyLength)
2354          throw new IOException(key + " read " + valBuffer.getPosition()
2355                                + " bytes, should read " + keyLength);
2356      } else {
2357        //Reset syncSeen
2358        syncSeen = false;
2359        
2360        if (noBufferedKeys == 0) {
2361          try {
2362            readBlock();
2363          } catch (EOFException eof) {
2364            return false;
2365          }
2366        }
2367        
2368        int keyLength = WritableUtils.readVInt(keyLenIn);
2369        
2370        // Sanity check
2371        if (keyLength < 0) {
2372          return false;
2373        }
2374        
2375        //Read another compressed 'key'
2376        key.readFields(keyIn);
2377        --noBufferedKeys;
2378      }
2379
2380      return true;
2381    }
2382
2383    /** Read the next key/value pair in the file into <code>key</code> and
2384     * <code>val</code>.  Returns true if such a pair exists and false when at
2385     * end of file */
2386    public synchronized boolean next(Writable key, Writable val)
2387      throws IOException {
2388      if (val.getClass() != getValueClass())
2389        throw new IOException("wrong value class: "+val+" is not "+valClass);
2390
2391      boolean more = next(key);
2392      
2393      if (more) {
2394        getCurrentValue(val);
2395      }
2396
2397      return more;
2398    }
2399    
2400    /**
2401     * Read and return the next record length, potentially skipping over 
2402     * a sync block.
2403     * @return the length of the next record or -1 if there is no next record
2404     * @throws IOException
2405     */
2406    private synchronized int readRecordLength() throws IOException {
2407      if (in.getPos() >= end) {
2408        return -1;
2409      }      
2410      int length = in.readInt();
2411      if (version > 1 && sync != null &&
2412          length == SYNC_ESCAPE) {              // process a sync entry
2413        in.readFully(syncCheck);                // read syncCheck
2414        if (!Arrays.equals(sync, syncCheck))    // check it
2415          throw new IOException("File is corrupt!");
2416        syncSeen = true;
2417        if (in.getPos() >= end) {
2418          return -1;
2419        }
2420        length = in.readInt();                  // re-read length
2421      } else {
2422        syncSeen = false;
2423      }
2424      
2425      return length;
2426    }
2427    
2428    /** Read the next key/value pair in the file into <code>buffer</code>.
2429     * Returns the length of the key read, or -1 if at end of file.  The length
2430     * of the value may be computed by calling buffer.getLength() before and
2431     * after calls to this method. */
2432    /** @deprecated Call {@link #nextRaw(DataOutputBuffer,SequenceFile.ValueBytes)}. */
2433    @Deprecated
2434    synchronized int next(DataOutputBuffer buffer) throws IOException {
2435      // Unsupported for block-compressed sequence files
2436      if (blockCompressed) {
2437        throw new IOException("Unsupported call for block-compressed" +
2438                              " SequenceFiles - use SequenceFile.Reader.next(DataOutputStream, ValueBytes)");
2439      }
2440      try {
2441        int length = readRecordLength();
2442        if (length == -1) {
2443          return -1;
2444        }
2445        int keyLength = in.readInt();
2446        buffer.write(in, length);
2447        return keyLength;
2448      } catch (ChecksumException e) {             // checksum failure
2449        handleChecksumException(e);
2450        return next(buffer);
2451      }
2452    }
2453
2454    public ValueBytes createValueBytes() {
2455      ValueBytes val = null;
2456      if (!decompress || blockCompressed) {
2457        val = new UncompressedBytes();
2458      } else {
2459        val = new CompressedBytes(codec);
2460      }
2461      return val;
2462    }
2463
2464    /**
2465     * Read 'raw' records.
2466     * @param key - The buffer into which the key is read
2467     * @param val - The 'raw' value
2468     * @return Returns the total record length or -1 for end of file
2469     * @throws IOException
2470     */
2471    public synchronized int nextRaw(DataOutputBuffer key, ValueBytes val) 
2472      throws IOException {
2473      if (!blockCompressed) {
2474        int length = readRecordLength();
2475        if (length == -1) {
2476          return -1;
2477        }
2478        int keyLength = in.readInt();
2479        int valLength = length - keyLength;
2480        key.write(in, keyLength);
2481        if (decompress) {
2482          CompressedBytes value = (CompressedBytes)val;
2483          value.reset(in, valLength);
2484        } else {
2485          UncompressedBytes value = (UncompressedBytes)val;
2486          value.reset(in, valLength);
2487        }
2488        
2489        return length;
2490      } else {
2491        //Reset syncSeen
2492        syncSeen = false;
2493        
2494        // Read 'key'
2495        if (noBufferedKeys == 0) {
2496          if (in.getPos() >= end) 
2497            return -1;
2498
2499          try { 
2500            readBlock();
2501          } catch (EOFException eof) {
2502            return -1;
2503          }
2504        }
2505        int keyLength = WritableUtils.readVInt(keyLenIn);
2506        if (keyLength < 0) {
2507          throw new IOException("zero length key found!");
2508        }
2509        key.write(keyIn, keyLength);
2510        --noBufferedKeys;
2511        
2512        // Read raw 'value'
2513        seekToCurrentValue();
2514        int valLength = WritableUtils.readVInt(valLenIn);
2515        UncompressedBytes rawValue = (UncompressedBytes)val;
2516        rawValue.reset(valIn, valLength);
2517        --noBufferedValues;
2518        
2519        return (keyLength+valLength);
2520      }
2521      
2522    }
2523
2524    /**
2525     * Read 'raw' keys.
2526     * @param key - The buffer into which the key is read
2527     * @return Returns the key length or -1 for end of file
2528     * @throws IOException
2529     */
2530    public synchronized int nextRawKey(DataOutputBuffer key) 
2531      throws IOException {
2532      if (!blockCompressed) {
2533        recordLength = readRecordLength();
2534        if (recordLength == -1) {
2535          return -1;
2536        }
2537        keyLength = in.readInt();
2538        key.write(in, keyLength);
2539        return keyLength;
2540      } else {
2541        //Reset syncSeen
2542        syncSeen = false;
2543        
2544        // Read 'key'
2545        if (noBufferedKeys == 0) {
2546          if (in.getPos() >= end) 
2547            return -1;
2548
2549          try { 
2550            readBlock();
2551          } catch (EOFException eof) {
2552            return -1;
2553          }
2554        }
2555        int keyLength = WritableUtils.readVInt(keyLenIn);
2556        if (keyLength < 0) {
2557          throw new IOException("zero length key found!");
2558        }
2559        key.write(keyIn, keyLength);
2560        --noBufferedKeys;
2561        
2562        return keyLength;
2563      }
2564      
2565    }
2566
2567    /** Read the next key in the file, skipping its
2568     * value.  Return null at end of file. */
2569    public synchronized Object next(Object key) throws IOException {
2570      if (key != null && key.getClass() != getKeyClass()) {
2571        throw new IOException("wrong key class: "+key.getClass().getName()
2572                              +" is not "+keyClass);
2573      }
2574
2575      if (!blockCompressed) {
2576        outBuf.reset();
2577        
2578        keyLength = next(outBuf);
2579        if (keyLength < 0)
2580          return null;
2581        
2582        valBuffer.reset(outBuf.getData(), outBuf.getLength());
2583        
2584        key = deserializeKey(key);
2585        valBuffer.mark(0);
2586        if (valBuffer.getPosition() != keyLength)
2587          throw new IOException(key + " read " + valBuffer.getPosition()
2588                                + " bytes, should read " + keyLength);
2589      } else {
2590        //Reset syncSeen
2591        syncSeen = false;
2592        
2593        if (noBufferedKeys == 0) {
2594          try {
2595            readBlock();
2596          } catch (EOFException eof) {
2597            return null;
2598          }
2599        }
2600        
2601        int keyLength = WritableUtils.readVInt(keyLenIn);
2602        
2603        // Sanity check
2604        if (keyLength < 0) {
2605          return null;
2606        }
2607        
2608        //Read another compressed 'key'
2609        key = deserializeKey(key);
2610        --noBufferedKeys;
2611      }
2612
2613      return key;
2614    }
2615
2616    @SuppressWarnings("unchecked")
2617    private Object deserializeKey(Object key) throws IOException {
2618      return keyDeserializer.deserialize(key);
2619    }
2620
2621    /**
2622     * Read 'raw' values.
2623     * @param val - The 'raw' value
2624     * @return Returns the value length
2625     * @throws IOException
2626     */
2627    public synchronized int nextRawValue(ValueBytes val) 
2628      throws IOException {
2629      
2630      // Position stream to current value
2631      seekToCurrentValue();
2632 
2633      if (!blockCompressed) {
2634        int valLength = recordLength - keyLength;
2635        if (decompress) {
2636          CompressedBytes value = (CompressedBytes)val;
2637          value.reset(in, valLength);
2638        } else {
2639          UncompressedBytes value = (UncompressedBytes)val;
2640          value.reset(in, valLength);
2641        }
2642         
2643        return valLength;
2644      } else {
2645        int valLength = WritableUtils.readVInt(valLenIn);
2646        UncompressedBytes rawValue = (UncompressedBytes)val;
2647        rawValue.reset(valIn, valLength);
2648        --noBufferedValues;
2649        return valLength;
2650      }
2651      
2652    }
2653
2654    private void handleChecksumException(ChecksumException e)
2655      throws IOException {
2656      if (this.conf.getBoolean("io.skip.checksum.errors", false)) {
2657        LOG.warn("Bad checksum at "+getPosition()+". Skipping entries.");
2658        sync(getPosition()+this.conf.getInt("io.bytes.per.checksum", 512));
2659      } else {
2660        throw e;
2661      }
2662    }
2663
2664    /** disables sync. often invoked for tmp files */
2665    synchronized void ignoreSync() {
2666      sync = null;
2667    }
2668    
2669    /** Set the current byte position in the input file.
2670     *
2671     * <p>The position passed must be a position returned by {@link
2672     * SequenceFile.Writer#getLength()} when writing this file.  To seek to an arbitrary
2673     * position, use {@link SequenceFile.Reader#sync(long)}.
2674     */
2675    public synchronized void seek(long position) throws IOException {
2676      in.seek(position);
2677      if (blockCompressed) {                      // trigger block read
2678        noBufferedKeys = 0;
2679        valuesDecompressed = true;
2680      }
2681    }
2682
2683    /** Seek to the next sync mark past a given position.*/
2684    public synchronized void sync(long position) throws IOException {
2685      if (position+SYNC_SIZE >= end) {
2686        seek(end);
2687        return;
2688      }
2689
2690      if (position < headerEnd) {
2691        // seek directly to first record
2692        in.seek(headerEnd);
2693        // note the sync marker "seen" in the header
2694        syncSeen = true;
2695        return;
2696      }
2697
2698      try {
2699        seek(position+4);                         // skip escape
2700        in.readFully(syncCheck);
2701        int syncLen = sync.length;
2702        for (int i = 0; in.getPos() < end; i++) {
2703          int j = 0;
2704          for (; j < syncLen; j++) {
2705            if (sync[j] != syncCheck[(i+j)%syncLen])
2706              break;
2707          }
2708          if (j == syncLen) {
2709            in.seek(in.getPos() - SYNC_SIZE);     // position before sync
2710            return;
2711          }
2712          syncCheck[i%syncLen] = in.readByte();
2713        }
2714      } catch (ChecksumException e) {             // checksum failure
2715        handleChecksumException(e);
2716      }
2717    }
2718
2719    /** Returns true iff the previous call to next passed a sync mark.*/
2720    public synchronized boolean syncSeen() { return syncSeen; }
2721
2722    /** Return the current byte position in the input file. */
2723    public synchronized long getPosition() throws IOException {
2724      return in.getPos();
2725    }
2726
2727    /** Returns the name of the file. */
2728    @Override
2729    public String toString() {
2730      return filename;
2731    }
2732
2733  }
2734
2735  /** Sorts key/value pairs in a sequence-format file.
2736   *
2737   * <p>For best performance, applications should make sure that the {@link
2738   * Writable#readFields(DataInput)} implementation of their keys is
2739   * very efficient.  In particular, it should avoid allocating memory.
2740   */
2741  public static class Sorter {
2742
2743    private RawComparator comparator;
2744
2745    private MergeSort mergeSort; //the implementation of merge sort
2746    
2747    private Path[] inFiles;                     // when merging or sorting
2748
2749    private Path outFile;
2750
2751    private int memory; // bytes
2752    private int factor; // merged per pass
2753
2754    private FileSystem fs = null;
2755
2756    private Class keyClass;
2757    private Class valClass;
2758
2759    private Configuration conf;
2760    private Metadata metadata;
2761    
2762    private Progressable progressable = null;
2763
2764    /** Sort and merge files containing the named classes. */
2765    public Sorter(FileSystem fs, Class<? extends WritableComparable> keyClass,
2766                  Class valClass, Configuration conf)  {
2767      this(fs, WritableComparator.get(keyClass, conf), keyClass, valClass, conf);
2768    }
2769
2770    /** Sort and merge using an arbitrary {@link RawComparator}. */
2771    public Sorter(FileSystem fs, RawComparator comparator, Class keyClass, 
2772                  Class valClass, Configuration conf) {
2773      this(fs, comparator, keyClass, valClass, conf, new Metadata());
2774    }
2775
2776    /** Sort and merge using an arbitrary {@link RawComparator}. */
2777    public Sorter(FileSystem fs, RawComparator comparator, Class keyClass,
2778                  Class valClass, Configuration conf, Metadata metadata) {
2779      this.fs = fs;
2780      this.comparator = comparator;
2781      this.keyClass = keyClass;
2782      this.valClass = valClass;
2783      this.memory = conf.getInt("io.sort.mb", 100) * 1024 * 1024;
2784      this.factor = conf.getInt("io.sort.factor", 100);
2785      this.conf = conf;
2786      this.metadata = metadata;
2787    }
2788
2789    /** Set the number of streams to merge at once.*/
2790    public void setFactor(int factor) { this.factor = factor; }
2791
2792    /** Get the number of streams to merge at once.*/
2793    public int getFactor() { return factor; }
2794
2795    /** Set the total amount of buffer memory, in bytes.*/
2796    public void setMemory(int memory) { this.memory = memory; }
2797
2798    /** Get the total amount of buffer memory, in bytes.*/
2799    public int getMemory() { return memory; }
2800
2801    /** Set the progressable object in order to report progress. */
2802    public void setProgressable(Progressable progressable) {
2803      this.progressable = progressable;
2804    }
2805    
2806    /** 
2807     * Perform a file sort from a set of input files into an output file.
2808     * @param inFiles the files to be sorted
2809     * @param outFile the sorted output file
2810     * @param deleteInput should the input files be deleted as they are read?
2811     */
2812    public void sort(Path[] inFiles, Path outFile,
2813                     boolean deleteInput) throws IOException {
2814      if (fs.exists(outFile)) {
2815        throw new IOException("already exists: " + outFile);
2816      }
2817
2818      this.inFiles = inFiles;
2819      this.outFile = outFile;
2820
2821      int segments = sortPass(deleteInput);
2822      if (segments > 1) {
2823        mergePass(outFile.getParent());
2824      }
2825    }
2826
2827    /** 
2828     * Perform a file sort from a set of input files and return an iterator.
2829     * @param inFiles the files to be sorted
2830     * @param tempDir the directory where temp files are created during sort
2831     * @param deleteInput should the input files be deleted as they are read?
2832     * @return iterator the RawKeyValueIterator
2833     */
2834    public RawKeyValueIterator sortAndIterate(Path[] inFiles, Path tempDir, 
2835                                              boolean deleteInput) throws IOException {
2836      Path outFile = new Path(tempDir + Path.SEPARATOR + "all.2");
2837      if (fs.exists(outFile)) {
2838        throw new IOException("already exists: " + outFile);
2839      }
2840      this.inFiles = inFiles;
2841      //outFile will basically be used as prefix for temp files in the cases
2842      //where sort outputs multiple sorted segments. For the single segment
2843      //case, the outputFile itself will contain the sorted data for that
2844      //segment
2845      this.outFile = outFile;
2846
2847      int segments = sortPass(deleteInput);
2848      if (segments > 1)
2849        return merge(outFile.suffix(".0"), outFile.suffix(".0.index"), 
2850                     tempDir);
2851      else if (segments == 1)
2852        return merge(new Path[]{outFile}, true, tempDir);
2853      else return null;
2854    }
2855
2856    /**
2857     * The backwards compatible interface to sort.
2858     * @param inFile the input file to sort
2859     * @param outFile the sorted output file
2860     */
2861    public void sort(Path inFile, Path outFile) throws IOException {
2862      sort(new Path[]{inFile}, outFile, false);
2863    }
2864    
2865    private int sortPass(boolean deleteInput) throws IOException {
2866      if(LOG.isDebugEnabled()) {
2867        LOG.debug("running sort pass");
2868      }
2869      SortPass sortPass = new SortPass();         // make the SortPass
2870      sortPass.setProgressable(progressable);
2871      mergeSort = new MergeSort(sortPass.new SeqFileComparator());
2872      try {
2873        return sortPass.run(deleteInput);         // run it
2874      } finally {
2875        sortPass.close();                         // close it
2876      }
2877    }
2878
2879    private class SortPass {
2880      private int memoryLimit = memory/4;
2881      private int recordLimit = 1000000;
2882      
2883      private DataOutputBuffer rawKeys = new DataOutputBuffer();
2884      private byte[] rawBuffer;
2885
2886      private int[] keyOffsets = new int[1024];
2887      private int[] pointers = new int[keyOffsets.length];
2888      private int[] pointersCopy = new int[keyOffsets.length];
2889      private int[] keyLengths = new int[keyOffsets.length];
2890      private ValueBytes[] rawValues = new ValueBytes[keyOffsets.length];
2891      
2892      private ArrayList segmentLengths = new ArrayList();
2893      
2894      private Reader in = null;
2895      private FSDataOutputStream out = null;
2896      private FSDataOutputStream indexOut = null;
2897      private Path outName;
2898
2899      private Progressable progressable = null;
2900
2901      public int run(boolean deleteInput) throws IOException {
2902        int segments = 0;
2903        int currentFile = 0;
2904        boolean atEof = (currentFile >= inFiles.length);
2905        CompressionType compressionType;
2906        CompressionCodec codec = null;
2907        segmentLengths.clear();
2908        if (atEof) {
2909          return 0;
2910        }
2911        
2912        // Initialize
2913        in = new Reader(fs, inFiles[currentFile], conf);
2914        compressionType = in.getCompressionType();
2915        codec = in.getCompressionCodec();
2916        
2917        for (int i=0; i < rawValues.length; ++i) {
2918          rawValues[i] = null;
2919        }
2920        
2921        while (!atEof) {
2922          int count = 0;
2923          int bytesProcessed = 0;
2924          rawKeys.reset();
2925          while (!atEof && 
2926                 bytesProcessed < memoryLimit && count < recordLimit) {
2927
2928            // Read a record into buffer
2929            // Note: Attempt to re-use 'rawValue' as far as possible
2930            int keyOffset = rawKeys.getLength();       
2931            ValueBytes rawValue = 
2932              (count == keyOffsets.length || rawValues[count] == null) ? 
2933              in.createValueBytes() : 
2934              rawValues[count];
2935            int recordLength = in.nextRaw(rawKeys, rawValue);
2936            if (recordLength == -1) {
2937              in.close();
2938              if (deleteInput) {
2939                fs.delete(inFiles[currentFile], true);
2940              }
2941              currentFile += 1;
2942              atEof = currentFile >= inFiles.length;
2943              if (!atEof) {
2944                in = new Reader(fs, inFiles[currentFile], conf);
2945              } else {
2946                in = null;
2947              }
2948              continue;
2949            }
2950
2951            int keyLength = rawKeys.getLength() - keyOffset;
2952
2953            if (count == keyOffsets.length)
2954              grow();
2955
2956            keyOffsets[count] = keyOffset;                // update pointers
2957            pointers[count] = count;
2958            keyLengths[count] = keyLength;
2959            rawValues[count] = rawValue;
2960
2961            bytesProcessed += recordLength; 
2962            count++;
2963          }
2964
2965          // buffer is full -- sort & flush it
2966          if(LOG.isDebugEnabled()) {
2967            LOG.debug("flushing segment " + segments);
2968          }
2969          rawBuffer = rawKeys.getData();
2970          sort(count);
2971          // indicate we're making progress
2972          if (progressable != null) {
2973            progressable.progress();
2974          }
2975          flush(count, bytesProcessed, compressionType, codec, 
2976                segments==0 && atEof);
2977          segments++;
2978        }
2979        return segments;
2980      }
2981
2982      public void close() throws IOException {
2983        if (in != null) {
2984          in.close();
2985        }
2986        if (out != null) {
2987          out.close();
2988        }
2989        if (indexOut != null) {
2990          indexOut.close();
2991        }
2992      }
2993
2994      private void grow() {
2995        int newLength = keyOffsets.length * 3 / 2;
2996        keyOffsets = grow(keyOffsets, newLength);
2997        pointers = grow(pointers, newLength);
2998        pointersCopy = new int[newLength];
2999        keyLengths = grow(keyLengths, newLength);
3000        rawValues = grow(rawValues, newLength);
3001      }
3002
3003      private int[] grow(int[] old, int newLength) {
3004        int[] result = new int[newLength];
3005        System.arraycopy(old, 0, result, 0, old.length);
3006        return result;
3007      }
3008      
3009      private ValueBytes[] grow(ValueBytes[] old, int newLength) {
3010        ValueBytes[] result = new ValueBytes[newLength];
3011        System.arraycopy(old, 0, result, 0, old.length);
3012        for (int i=old.length; i < newLength; ++i) {
3013          result[i] = null;
3014        }
3015        return result;
3016      }
3017
3018      private void flush(int count, int bytesProcessed, 
3019                         CompressionType compressionType, 
3020                         CompressionCodec codec, 
3021                         boolean done) throws IOException {
3022        if (out == null) {
3023          outName = done ? outFile : outFile.suffix(".0");
3024          out = fs.create(outName);
3025          if (!done) {
3026            indexOut = fs.create(outName.suffix(".index"));
3027          }
3028        }
3029
3030        long segmentStart = out.getPos();
3031        Writer writer = createWriter(conf, Writer.stream(out), 
3032            Writer.keyClass(keyClass), Writer.valueClass(valClass),
3033            Writer.compression(compressionType, codec),
3034            Writer.metadata(done ? metadata : new Metadata()));
3035        
3036        if (!done) {
3037          writer.sync = null;                     // disable sync on temp files
3038        }
3039
3040        for (int i = 0; i < count; i++) {         // write in sorted order
3041          int p = pointers[i];
3042          writer.appendRaw(rawBuffer, keyOffsets[p], keyLengths[p], rawValues[p]);
3043        }
3044        writer.close();
3045        
3046        if (!done) {
3047          // Save the segment length
3048          WritableUtils.writeVLong(indexOut, segmentStart);
3049          WritableUtils.writeVLong(indexOut, (out.getPos()-segmentStart));
3050          indexOut.flush();
3051        }
3052      }
3053
3054      private void sort(int count) {
3055        System.arraycopy(pointers, 0, pointersCopy, 0, count);
3056        mergeSort.mergeSort(pointersCopy, pointers, 0, count);
3057      }
3058      class SeqFileComparator implements Comparator<IntWritable> {
3059        @Override
3060        public int compare(IntWritable I, IntWritable J) {
3061          return comparator.compare(rawBuffer, keyOffsets[I.get()], 
3062                                    keyLengths[I.get()], rawBuffer, 
3063                                    keyOffsets[J.get()], keyLengths[J.get()]);
3064        }
3065      }
3066      
3067      /** set the progressable object in order to report progress */
3068      public void setProgressable(Progressable progressable)
3069      {
3070        this.progressable = progressable;
3071      }
3072      
3073    } // SequenceFile.Sorter.SortPass
3074
3075    /** The interface to iterate over raw keys/values of SequenceFiles. */
3076    public static interface RawKeyValueIterator {
3077      /** Gets the current raw key
3078       * @return DataOutputBuffer
3079       * @throws IOException
3080       */
3081      DataOutputBuffer getKey() throws IOException; 
3082      /** Gets the current raw value
3083       * @return ValueBytes 
3084       * @throws IOException
3085       */
3086      ValueBytes getValue() throws IOException; 
3087      /** Sets up the current key and value (for getKey and getValue)
3088       * @return true if there exists a key/value, false otherwise 
3089       * @throws IOException
3090       */
3091      boolean next() throws IOException;
3092      /** closes the iterator so that the underlying streams can be closed
3093       * @throws IOException
3094       */
3095      void close() throws IOException;
3096      /** Gets the Progress object; this has a float (0.0 - 1.0) 
3097       * indicating the bytes processed by the iterator so far
3098       */
3099      Progress getProgress();
3100    }    
3101    
3102    /**
3103     * Merges the list of segments of type <code>SegmentDescriptor</code>
3104     * @param segments the list of SegmentDescriptors
3105     * @param tmpDir the directory to write temporary files into
3106     * @return RawKeyValueIterator
3107     * @throws IOException
3108     */
3109    public RawKeyValueIterator merge(List <SegmentDescriptor> segments, 
3110                                     Path tmpDir) 
3111      throws IOException {
3112      // pass in object to report progress, if present
3113      MergeQueue mQueue = new MergeQueue(segments, tmpDir, progressable);
3114      return mQueue.merge();
3115    }
3116
3117    /**
3118     * Merges the contents of files passed in Path[] using a max factor value
3119     * that is already set
3120     * @param inNames the array of path names
3121     * @param deleteInputs true if the input files should be deleted when 
3122     * unnecessary
3123     * @param tmpDir the directory to write temporary files into
3124     * @return RawKeyValueIteratorMergeQueue
3125     * @throws IOException
3126     */
3127    public RawKeyValueIterator merge(Path [] inNames, boolean deleteInputs,
3128                                     Path tmpDir) 
3129      throws IOException {
3130      return merge(inNames, deleteInputs, 
3131                   (inNames.length < factor) ? inNames.length : factor,
3132                   tmpDir);
3133    }
3134
3135    /**
3136     * Merges the contents of files passed in Path[]
3137     * @param inNames the array of path names
3138     * @param deleteInputs true if the input files should be deleted when 
3139     * unnecessary
3140     * @param factor the factor that will be used as the maximum merge fan-in
3141     * @param tmpDir the directory to write temporary files into
3142     * @return RawKeyValueIteratorMergeQueue
3143     * @throws IOException
3144     */
3145    public RawKeyValueIterator merge(Path [] inNames, boolean deleteInputs,
3146                                     int factor, Path tmpDir) 
3147      throws IOException {
3148      //get the segments from inNames
3149      ArrayList <SegmentDescriptor> a = new ArrayList <SegmentDescriptor>();
3150      for (int i = 0; i < inNames.length; i++) {
3151        SegmentDescriptor s = new SegmentDescriptor(0,
3152            fs.getFileStatus(inNames[i]).getLen(), inNames[i]);
3153        s.preserveInput(!deleteInputs);
3154        s.doSync();
3155        a.add(s);
3156      }
3157      this.factor = factor;
3158      MergeQueue mQueue = new MergeQueue(a, tmpDir, progressable);
3159      return mQueue.merge();
3160    }
3161
3162    /**
3163     * Merges the contents of files passed in Path[]
3164     * @param inNames the array of path names
3165     * @param tempDir the directory for creating temp files during merge
3166     * @param deleteInputs true if the input files should be deleted when 
3167     * unnecessary
3168     * @return RawKeyValueIteratorMergeQueue
3169     * @throws IOException
3170     */
3171    public RawKeyValueIterator merge(Path [] inNames, Path tempDir, 
3172                                     boolean deleteInputs) 
3173      throws IOException {
3174      //outFile will basically be used as prefix for temp files for the
3175      //intermediate merge outputs           
3176      this.outFile = new Path(tempDir + Path.SEPARATOR + "merged");
3177      //get the segments from inNames
3178      ArrayList <SegmentDescriptor> a = new ArrayList <SegmentDescriptor>();
3179      for (int i = 0; i < inNames.length; i++) {
3180        SegmentDescriptor s = new SegmentDescriptor(0,
3181            fs.getFileStatus(inNames[i]).getLen(), inNames[i]);
3182        s.preserveInput(!deleteInputs);
3183        s.doSync();
3184        a.add(s);
3185      }
3186      factor = (inNames.length < factor) ? inNames.length : factor;
3187      // pass in object to report progress, if present
3188      MergeQueue mQueue = new MergeQueue(a, tempDir, progressable);
3189      return mQueue.merge();
3190    }
3191
3192    /**
3193     * Clones the attributes (like compression of the input file and creates a 
3194     * corresponding Writer
3195     * @param inputFile the path of the input file whose attributes should be 
3196     * cloned
3197     * @param outputFile the path of the output file 
3198     * @param prog the Progressable to report status during the file write
3199     * @return Writer
3200     * @throws IOException
3201     */
3202    public Writer cloneFileAttributes(Path inputFile, Path outputFile, 
3203                                      Progressable prog) throws IOException {
3204      Reader reader = new Reader(conf,
3205                                 Reader.file(inputFile),
3206                                 new Reader.OnlyHeaderOption());
3207      CompressionType compress = reader.getCompressionType();
3208      CompressionCodec codec = reader.getCompressionCodec();
3209      reader.close();
3210
3211      Writer writer = createWriter(conf, 
3212                                   Writer.file(outputFile), 
3213                                   Writer.keyClass(keyClass), 
3214                                   Writer.valueClass(valClass), 
3215                                   Writer.compression(compress, codec), 
3216                                   Writer.progressable(prog));
3217      return writer;
3218    }
3219
3220    /**
3221     * Writes records from RawKeyValueIterator into a file represented by the 
3222     * passed writer
3223     * @param records the RawKeyValueIterator
3224     * @param writer the Writer created earlier 
3225     * @throws IOException
3226     */
3227    public void writeFile(RawKeyValueIterator records, Writer writer) 
3228      throws IOException {
3229      while(records.next()) {
3230        writer.appendRaw(records.getKey().getData(), 0, 
3231                         records.getKey().getLength(), records.getValue());
3232      }
3233      writer.sync();
3234    }
3235        
3236    /** Merge the provided files.
3237     * @param inFiles the array of input path names
3238     * @param outFile the final output file
3239     * @throws IOException
3240     */
3241    public void merge(Path[] inFiles, Path outFile) throws IOException {
3242      if (fs.exists(outFile)) {
3243        throw new IOException("already exists: " + outFile);
3244      }
3245      RawKeyValueIterator r = merge(inFiles, false, outFile.getParent());
3246      Writer writer = cloneFileAttributes(inFiles[0], outFile, null);
3247      
3248      writeFile(r, writer);
3249
3250      writer.close();
3251    }
3252
3253    /** sort calls this to generate the final merged output */
3254    private int mergePass(Path tmpDir) throws IOException {
3255      if(LOG.isDebugEnabled()) {
3256        LOG.debug("running merge pass");
3257      }
3258      Writer writer = cloneFileAttributes(
3259                                          outFile.suffix(".0"), outFile, null);
3260      RawKeyValueIterator r = merge(outFile.suffix(".0"), 
3261                                    outFile.suffix(".0.index"), tmpDir);
3262      writeFile(r, writer);
3263
3264      writer.close();
3265      return 0;
3266    }
3267
3268    /** Used by mergePass to merge the output of the sort
3269     * @param inName the name of the input file containing sorted segments
3270     * @param indexIn the offsets of the sorted segments
3271     * @param tmpDir the relative directory to store intermediate results in
3272     * @return RawKeyValueIterator
3273     * @throws IOException
3274     */
3275    private RawKeyValueIterator merge(Path inName, Path indexIn, Path tmpDir) 
3276      throws IOException {
3277      //get the segments from indexIn
3278      //we create a SegmentContainer so that we can track segments belonging to
3279      //inName and delete inName as soon as we see that we have looked at all
3280      //the contained segments during the merge process & hence don't need 
3281      //them anymore
3282      SegmentContainer container = new SegmentContainer(inName, indexIn);
3283      MergeQueue mQueue = new MergeQueue(container.getSegmentList(), tmpDir, progressable);
3284      return mQueue.merge();
3285    }
3286    
3287    /** This class implements the core of the merge logic */
3288    private class MergeQueue extends PriorityQueue 
3289      implements RawKeyValueIterator {
3290      private boolean compress;
3291      private boolean blockCompress;
3292      private DataOutputBuffer rawKey = new DataOutputBuffer();
3293      private ValueBytes rawValue;
3294      private long totalBytesProcessed;
3295      private float progPerByte;
3296      private Progress mergeProgress = new Progress();
3297      private Path tmpDir;
3298      private Progressable progress = null; //handle to the progress reporting object
3299      private SegmentDescriptor minSegment;
3300      
3301      //a TreeMap used to store the segments sorted by size (segment offset and
3302      //segment path name is used to break ties between segments of same sizes)
3303      private Map<SegmentDescriptor, Void> sortedSegmentSizes =
3304        new TreeMap<SegmentDescriptor, Void>();
3305            
3306      @SuppressWarnings("unchecked")
3307      public void put(SegmentDescriptor stream) throws IOException {
3308        if (size() == 0) {
3309          compress = stream.in.isCompressed();
3310          blockCompress = stream.in.isBlockCompressed();
3311        } else if (compress != stream.in.isCompressed() || 
3312                   blockCompress != stream.in.isBlockCompressed()) {
3313          throw new IOException("All merged files must be compressed or not.");
3314        } 
3315        super.put(stream);
3316      }
3317      
3318      /**
3319       * A queue of file segments to merge
3320       * @param segments the file segments to merge
3321       * @param tmpDir a relative local directory to save intermediate files in
3322       * @param progress the reference to the Progressable object
3323       */
3324      public MergeQueue(List <SegmentDescriptor> segments,
3325          Path tmpDir, Progressable progress) {
3326        int size = segments.size();
3327        for (int i = 0; i < size; i++) {
3328          sortedSegmentSizes.put(segments.get(i), null);
3329        }
3330        this.tmpDir = tmpDir;
3331        this.progress = progress;
3332      }
3333      @Override
3334      protected boolean lessThan(Object a, Object b) {
3335        // indicate we're making progress
3336        if (progress != null) {
3337          progress.progress();
3338        }
3339        SegmentDescriptor msa = (SegmentDescriptor)a;
3340        SegmentDescriptor msb = (SegmentDescriptor)b;
3341        return comparator.compare(msa.getKey().getData(), 0, 
3342                                  msa.getKey().getLength(), msb.getKey().getData(), 0, 
3343                                  msb.getKey().getLength()) < 0;
3344      }
3345      @Override
3346      public void close() throws IOException {
3347        SegmentDescriptor ms;                           // close inputs
3348        while ((ms = (SegmentDescriptor)pop()) != null) {
3349          ms.cleanup();
3350        }
3351        minSegment = null;
3352      }
3353      @Override
3354      public DataOutputBuffer getKey() throws IOException {
3355        return rawKey;
3356      }
3357      @Override
3358      public ValueBytes getValue() throws IOException {
3359        return rawValue;
3360      }
3361      @Override
3362      public boolean next() throws IOException {
3363        if (size() == 0)
3364          return false;
3365        if (minSegment != null) {
3366          //minSegment is non-null for all invocations of next except the first
3367          //one. For the first invocation, the priority queue is ready for use
3368          //but for the subsequent invocations, first adjust the queue 
3369          adjustPriorityQueue(minSegment);
3370          if (size() == 0) {
3371            minSegment = null;
3372            return false;
3373          }
3374        }
3375        minSegment = (SegmentDescriptor)top();
3376        long startPos = minSegment.in.getPosition(); // Current position in stream
3377        //save the raw key reference
3378        rawKey = minSegment.getKey();
3379        //load the raw value. Re-use the existing rawValue buffer
3380        if (rawValue == null) {
3381          rawValue = minSegment.in.createValueBytes();
3382        }
3383        minSegment.nextRawValue(rawValue);
3384        long endPos = minSegment.in.getPosition(); // End position after reading value
3385        updateProgress(endPos - startPos);
3386        return true;
3387      }
3388      
3389      @Override
3390      public Progress getProgress() {
3391        return mergeProgress; 
3392      }
3393
3394      private void adjustPriorityQueue(SegmentDescriptor ms) throws IOException{
3395        long startPos = ms.in.getPosition(); // Current position in stream
3396        boolean hasNext = ms.nextRawKey();
3397        long endPos = ms.in.getPosition(); // End position after reading key
3398        updateProgress(endPos - startPos);
3399        if (hasNext) {
3400          adjustTop();
3401        } else {
3402          pop();
3403          ms.cleanup();
3404        }
3405      }
3406
3407      private void updateProgress(long bytesProcessed) {
3408        totalBytesProcessed += bytesProcessed;
3409        if (progPerByte > 0) {
3410          mergeProgress.set(totalBytesProcessed * progPerByte);
3411        }
3412      }
3413      
3414      /** This is the single level merge that is called multiple times 
3415       * depending on the factor size and the number of segments
3416       * @return RawKeyValueIterator
3417       * @throws IOException
3418       */
3419      public RawKeyValueIterator merge() throws IOException {
3420        //create the MergeStreams from the sorted map created in the constructor
3421        //and dump the final output to a file
3422        int numSegments = sortedSegmentSizes.size();
3423        int origFactor = factor;
3424        int passNo = 1;
3425        LocalDirAllocator lDirAlloc = new LocalDirAllocator("io.seqfile.local.dir");
3426        do {
3427          //get the factor for this pass of merge
3428          factor = getPassFactor(passNo, numSegments);
3429          List<SegmentDescriptor> segmentsToMerge =
3430            new ArrayList<SegmentDescriptor>();
3431          int segmentsConsidered = 0;
3432          int numSegmentsToConsider = factor;
3433          while (true) {
3434            //extract the smallest 'factor' number of segment pointers from the 
3435            //TreeMap. Call cleanup on the empty segments (no key/value data)
3436            SegmentDescriptor[] mStream = 
3437              getSegmentDescriptors(numSegmentsToConsider);
3438            for (int i = 0; i < mStream.length; i++) {
3439              if (mStream[i].nextRawKey()) {
3440                segmentsToMerge.add(mStream[i]);
3441                segmentsConsidered++;
3442                // Count the fact that we read some bytes in calling nextRawKey()
3443                updateProgress(mStream[i].in.getPosition());
3444              }
3445              else {
3446                mStream[i].cleanup();
3447                numSegments--; //we ignore this segment for the merge
3448              }
3449            }
3450            //if we have the desired number of segments
3451            //or looked at all available segments, we break
3452            if (segmentsConsidered == factor || 
3453                sortedSegmentSizes.size() == 0) {
3454              break;
3455            }
3456              
3457            numSegmentsToConsider = factor - segmentsConsidered;
3458          }
3459          //feed the streams to the priority queue
3460          initialize(segmentsToMerge.size()); clear();
3461          for (int i = 0; i < segmentsToMerge.size(); i++) {
3462            put(segmentsToMerge.get(i));
3463          }
3464          //if we have lesser number of segments remaining, then just return the
3465          //iterator, else do another single level merge
3466          if (numSegments <= factor) {
3467            //calculate the length of the remaining segments. Required for 
3468            //calculating the merge progress
3469            long totalBytes = 0;
3470            for (int i = 0; i < segmentsToMerge.size(); i++) {
3471              totalBytes += segmentsToMerge.get(i).segmentLength;
3472            }
3473            if (totalBytes != 0) //being paranoid
3474              progPerByte = 1.0f / (float)totalBytes;
3475            //reset factor to what it originally was
3476            factor = origFactor;
3477            return this;
3478          } else {
3479            //we want to spread the creation of temp files on multiple disks if 
3480            //available under the space constraints
3481            long approxOutputSize = 0; 
3482            for (SegmentDescriptor s : segmentsToMerge) {
3483              approxOutputSize += s.segmentLength + 
3484                                  ChecksumFileSystem.getApproxChkSumLength(
3485                                  s.segmentLength);
3486            }
3487            Path tmpFilename = 
3488              new Path(tmpDir, "intermediate").suffix("." + passNo);
3489
3490            Path outputFile =  lDirAlloc.getLocalPathForWrite(
3491                                                tmpFilename.toString(),
3492                                                approxOutputSize, conf);
3493            if(LOG.isDebugEnabled()) { 
3494              LOG.debug("writing intermediate results to " + outputFile);
3495            }
3496            Writer writer = cloneFileAttributes(
3497                                                fs.makeQualified(segmentsToMerge.get(0).segmentPathName), 
3498                                                fs.makeQualified(outputFile), null);
3499            writer.sync = null; //disable sync for temp files
3500            writeFile(this, writer);
3501            writer.close();
3502            
3503            //we finished one single level merge; now clean up the priority 
3504            //queue
3505            this.close();
3506            
3507            SegmentDescriptor tempSegment = 
3508              new SegmentDescriptor(0,
3509                  fs.getFileStatus(outputFile).getLen(), outputFile);
3510            //put the segment back in the TreeMap
3511            sortedSegmentSizes.put(tempSegment, null);
3512            numSegments = sortedSegmentSizes.size();
3513            passNo++;
3514          }
3515          //we are worried about only the first pass merge factor. So reset the 
3516          //factor to what it originally was
3517          factor = origFactor;
3518        } while(true);
3519      }
3520  
3521      //Hadoop-591
3522      public int getPassFactor(int passNo, int numSegments) {
3523        if (passNo > 1 || numSegments <= factor || factor == 1) 
3524          return factor;
3525        int mod = (numSegments - 1) % (factor - 1);
3526        if (mod == 0)
3527          return factor;
3528        return mod + 1;
3529      }
3530      
3531      /** Return (& remove) the requested number of segment descriptors from the
3532       * sorted map.
3533       */
3534      public SegmentDescriptor[] getSegmentDescriptors(int numDescriptors) {
3535        if (numDescriptors > sortedSegmentSizes.size())
3536          numDescriptors = sortedSegmentSizes.size();
3537        SegmentDescriptor[] SegmentDescriptors = 
3538          new SegmentDescriptor[numDescriptors];
3539        Iterator iter = sortedSegmentSizes.keySet().iterator();
3540        int i = 0;
3541        while (i < numDescriptors) {
3542          SegmentDescriptors[i++] = (SegmentDescriptor)iter.next();
3543          iter.remove();
3544        }
3545        return SegmentDescriptors;
3546      }
3547    } // SequenceFile.Sorter.MergeQueue
3548
3549    /** This class defines a merge segment. This class can be subclassed to 
3550     * provide a customized cleanup method implementation. In this 
3551     * implementation, cleanup closes the file handle and deletes the file 
3552     */
3553    public class SegmentDescriptor implements Comparable {
3554      
3555      long segmentOffset; //the start of the segment in the file
3556      long segmentLength; //the length of the segment
3557      Path segmentPathName; //the path name of the file containing the segment
3558      boolean ignoreSync = true; //set to true for temp files
3559      private Reader in = null; 
3560      private DataOutputBuffer rawKey = null; //this will hold the current key
3561      private boolean preserveInput = false; //delete input segment files?
3562      
3563      /** Constructs a segment
3564       * @param segmentOffset the offset of the segment in the file
3565       * @param segmentLength the length of the segment
3566       * @param segmentPathName the path name of the file containing the segment
3567       */
3568      public SegmentDescriptor (long segmentOffset, long segmentLength, 
3569                                Path segmentPathName) {
3570        this.segmentOffset = segmentOffset;
3571        this.segmentLength = segmentLength;
3572        this.segmentPathName = segmentPathName;
3573      }
3574      
3575      /** Do the sync checks */
3576      public void doSync() {ignoreSync = false;}
3577      
3578      /** Whether to delete the files when no longer needed */
3579      public void preserveInput(boolean preserve) {
3580        preserveInput = preserve;
3581      }
3582
3583      public boolean shouldPreserveInput() {
3584        return preserveInput;
3585      }
3586      
3587      @Override
3588      public int compareTo(Object o) {
3589        SegmentDescriptor that = (SegmentDescriptor)o;
3590        if (this.segmentLength != that.segmentLength) {
3591          return (this.segmentLength < that.segmentLength ? -1 : 1);
3592        }
3593        if (this.segmentOffset != that.segmentOffset) {
3594          return (this.segmentOffset < that.segmentOffset ? -1 : 1);
3595        }
3596        return (this.segmentPathName.toString()).
3597          compareTo(that.segmentPathName.toString());
3598      }
3599
3600      @Override
3601      public boolean equals(Object o) {
3602        if (!(o instanceof SegmentDescriptor)) {
3603          return false;
3604        }
3605        SegmentDescriptor that = (SegmentDescriptor)o;
3606        if (this.segmentLength == that.segmentLength &&
3607            this.segmentOffset == that.segmentOffset &&
3608            this.segmentPathName.toString().equals(
3609              that.segmentPathName.toString())) {
3610          return true;
3611        }
3612        return false;
3613      }
3614
3615      @Override
3616      public int hashCode() {
3617        return 37 * 17 + (int) (segmentOffset^(segmentOffset>>>32));
3618      }
3619
3620      /** Fills up the rawKey object with the key returned by the Reader
3621       * @return true if there is a key returned; false, otherwise
3622       * @throws IOException
3623       */
3624      public boolean nextRawKey() throws IOException {
3625        if (in == null) {
3626          int bufferSize = getBufferSize(conf); 
3627          Reader reader = new Reader(conf,
3628                                     Reader.file(segmentPathName), 
3629                                     Reader.bufferSize(bufferSize),
3630                                     Reader.start(segmentOffset), 
3631                                     Reader.length(segmentLength));
3632        
3633          //sometimes we ignore syncs especially for temp merge files
3634          if (ignoreSync) reader.ignoreSync();
3635
3636          if (reader.getKeyClass() != keyClass)
3637            throw new IOException("wrong key class: " + reader.getKeyClass() +
3638                                  " is not " + keyClass);
3639          if (reader.getValueClass() != valClass)
3640            throw new IOException("wrong value class: "+reader.getValueClass()+
3641                                  " is not " + valClass);
3642          this.in = reader;
3643          rawKey = new DataOutputBuffer();
3644        }
3645        rawKey.reset();
3646        int keyLength = 
3647          in.nextRawKey(rawKey);
3648        return (keyLength >= 0);
3649      }
3650
3651      /** Fills up the passed rawValue with the value corresponding to the key
3652       * read earlier
3653       * @param rawValue
3654       * @return the length of the value
3655       * @throws IOException
3656       */
3657      public int nextRawValue(ValueBytes rawValue) throws IOException {
3658        int valLength = in.nextRawValue(rawValue);
3659        return valLength;
3660      }
3661      
3662      /** Returns the stored rawKey */
3663      public DataOutputBuffer getKey() {
3664        return rawKey;
3665      }
3666      
3667      /** closes the underlying reader */
3668      private void close() throws IOException {
3669        this.in.close();
3670        this.in = null;
3671      }
3672
3673      /** The default cleanup. Subclasses can override this with a custom 
3674       * cleanup 
3675       */
3676      public void cleanup() throws IOException {
3677        close();
3678        if (!preserveInput) {
3679          fs.delete(segmentPathName, true);
3680        }
3681      }
3682    } // SequenceFile.Sorter.SegmentDescriptor
3683    
3684    /** This class provisions multiple segments contained within a single
3685     *  file
3686     */
3687    private class LinkedSegmentsDescriptor extends SegmentDescriptor {
3688
3689      SegmentContainer parentContainer = null;
3690
3691      /** Constructs a segment
3692       * @param segmentOffset the offset of the segment in the file
3693       * @param segmentLength the length of the segment
3694       * @param segmentPathName the path name of the file containing the segment
3695       * @param parent the parent SegmentContainer that holds the segment
3696       */
3697      public LinkedSegmentsDescriptor (long segmentOffset, long segmentLength, 
3698                                       Path segmentPathName, SegmentContainer parent) {
3699        super(segmentOffset, segmentLength, segmentPathName);
3700        this.parentContainer = parent;
3701      }
3702      /** The default cleanup. Subclasses can override this with a custom 
3703       * cleanup 
3704       */
3705      @Override
3706      public void cleanup() throws IOException {
3707        super.close();
3708        if (super.shouldPreserveInput()) return;
3709        parentContainer.cleanup();
3710      }
3711      
3712      @Override
3713      public boolean equals(Object o) {
3714        if (!(o instanceof LinkedSegmentsDescriptor)) {
3715          return false;
3716        }
3717        return super.equals(o);
3718      }
3719    } //SequenceFile.Sorter.LinkedSegmentsDescriptor
3720
3721    /** The class that defines a container for segments to be merged. Primarily
3722     * required to delete temp files as soon as all the contained segments
3723     * have been looked at */
3724    private class SegmentContainer {
3725      private int numSegmentsCleanedUp = 0; //track the no. of segment cleanups
3726      private int numSegmentsContained; //# of segments contained
3727      private Path inName; //input file from where segments are created
3728      
3729      //the list of segments read from the file
3730      private ArrayList <SegmentDescriptor> segments = 
3731        new ArrayList <SegmentDescriptor>();
3732      /** This constructor is there primarily to serve the sort routine that 
3733       * generates a single output file with an associated index file */
3734      public SegmentContainer(Path inName, Path indexIn) throws IOException {
3735        //get the segments from indexIn
3736        FSDataInputStream fsIndexIn = fs.open(indexIn);
3737        long end = fs.getFileStatus(indexIn).getLen();
3738        while (fsIndexIn.getPos() < end) {
3739          long segmentOffset = WritableUtils.readVLong(fsIndexIn);
3740          long segmentLength = WritableUtils.readVLong(fsIndexIn);
3741          Path segmentName = inName;
3742          segments.add(new LinkedSegmentsDescriptor(segmentOffset, 
3743                                                    segmentLength, segmentName, this));
3744        }
3745        fsIndexIn.close();
3746        fs.delete(indexIn, true);
3747        numSegmentsContained = segments.size();
3748        this.inName = inName;
3749      }
3750
3751      public List <SegmentDescriptor> getSegmentList() {
3752        return segments;
3753      }
3754      public void cleanup() throws IOException {
3755        numSegmentsCleanedUp++;
3756        if (numSegmentsCleanedUp == numSegmentsContained) {
3757          fs.delete(inName, true);
3758        }
3759      }
3760    } //SequenceFile.Sorter.SegmentContainer
3761
3762  } // SequenceFile.Sorter
3763
3764} // SequenceFile