001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.io; 020 021import java.io.*; 022import java.util.*; 023import java.rmi.server.UID; 024import java.security.MessageDigest; 025import org.apache.commons.logging.*; 026import org.apache.hadoop.util.Options; 027import org.apache.hadoop.fs.*; 028import org.apache.hadoop.fs.Options.CreateOpts; 029import org.apache.hadoop.io.compress.CodecPool; 030import org.apache.hadoop.io.compress.CompressionCodec; 031import org.apache.hadoop.io.compress.CompressionInputStream; 032import org.apache.hadoop.io.compress.CompressionOutputStream; 033import org.apache.hadoop.io.compress.Compressor; 034import org.apache.hadoop.io.compress.Decompressor; 035import org.apache.hadoop.io.compress.DefaultCodec; 036import org.apache.hadoop.io.compress.GzipCodec; 037import org.apache.hadoop.io.compress.zlib.ZlibFactory; 038import org.apache.hadoop.io.serializer.Deserializer; 039import org.apache.hadoop.io.serializer.Serializer; 040import org.apache.hadoop.io.serializer.SerializationFactory; 041import org.apache.hadoop.classification.InterfaceAudience; 042import org.apache.hadoop.classification.InterfaceStability; 043import org.apache.hadoop.conf.*; 044import org.apache.hadoop.util.Progressable; 045import org.apache.hadoop.util.Progress; 046import org.apache.hadoop.util.ReflectionUtils; 047import org.apache.hadoop.util.NativeCodeLoader; 048import org.apache.hadoop.util.MergeSort; 049import org.apache.hadoop.util.PriorityQueue; 050import org.apache.hadoop.util.Time; 051 052/** 053 * <code>SequenceFile</code>s are flat files consisting of binary key/value 054 * pairs. 055 * 056 * <p><code>SequenceFile</code> provides {@link SequenceFile.Writer}, 057 * {@link SequenceFile.Reader} and {@link Sorter} classes for writing, 058 * reading and sorting respectively.</p> 059 * 060 * There are three <code>SequenceFile</code> <code>Writer</code>s based on the 061 * {@link CompressionType} used to compress key/value pairs: 062 * <ol> 063 * <li> 064 * <code>Writer</code> : Uncompressed records. 065 * </li> 066 * <li> 067 * <code>RecordCompressWriter</code> : Record-compressed files, only compress 068 * values. 069 * </li> 070 * <li> 071 * <code>BlockCompressWriter</code> : Block-compressed files, both keys & 072 * values are collected in 'blocks' 073 * separately and compressed. The size of 074 * the 'block' is configurable. 075 * </ol> 076 * 077 * <p>The actual compression algorithm used to compress key and/or values can be 078 * specified by using the appropriate {@link CompressionCodec}.</p> 079 * 080 * <p>The recommended way is to use the static <tt>createWriter</tt> methods 081 * provided by the <code>SequenceFile</code> to chose the preferred format.</p> 082 * 083 * <p>The {@link SequenceFile.Reader} acts as the bridge and can read any of the 084 * above <code>SequenceFile</code> formats.</p> 085 * 086 * <h4 id="Formats">SequenceFile Formats</h4> 087 * 088 * <p>Essentially there are 3 different formats for <code>SequenceFile</code>s 089 * depending on the <code>CompressionType</code> specified. All of them share a 090 * <a href="#Header">common header</a> described below. 091 * 092 * <h5 id="Header">SequenceFile Header</h5> 093 * <ul> 094 * <li> 095 * version - 3 bytes of magic header <b>SEQ</b>, followed by 1 byte of actual 096 * version number (e.g. SEQ4 or SEQ6) 097 * </li> 098 * <li> 099 * keyClassName -key class 100 * </li> 101 * <li> 102 * valueClassName - value class 103 * </li> 104 * <li> 105 * compression - A boolean which specifies if compression is turned on for 106 * keys/values in this file. 107 * </li> 108 * <li> 109 * blockCompression - A boolean which specifies if block-compression is 110 * turned on for keys/values in this file. 111 * </li> 112 * <li> 113 * compression codec - <code>CompressionCodec</code> class which is used for 114 * compression of keys and/or values (if compression is 115 * enabled). 116 * </li> 117 * <li> 118 * metadata - {@link Metadata} for this file. 119 * </li> 120 * <li> 121 * sync - A sync marker to denote end of the header. 122 * </li> 123 * </ul> 124 * 125 * <h5 id="#UncompressedFormat">Uncompressed SequenceFile Format</h5> 126 * <ul> 127 * <li> 128 * <a href="#Header">Header</a> 129 * </li> 130 * <li> 131 * Record 132 * <ul> 133 * <li>Record length</li> 134 * <li>Key length</li> 135 * <li>Key</li> 136 * <li>Value</li> 137 * </ul> 138 * </li> 139 * <li> 140 * A sync-marker every few <code>100</code> bytes or so. 141 * </li> 142 * </ul> 143 * 144 * <h5 id="#RecordCompressedFormat">Record-Compressed SequenceFile Format</h5> 145 * <ul> 146 * <li> 147 * <a href="#Header">Header</a> 148 * </li> 149 * <li> 150 * Record 151 * <ul> 152 * <li>Record length</li> 153 * <li>Key length</li> 154 * <li>Key</li> 155 * <li><i>Compressed</i> Value</li> 156 * </ul> 157 * </li> 158 * <li> 159 * A sync-marker every few <code>100</code> bytes or so. 160 * </li> 161 * </ul> 162 * 163 * <h5 id="#BlockCompressedFormat">Block-Compressed SequenceFile Format</h5> 164 * <ul> 165 * <li> 166 * <a href="#Header">Header</a> 167 * </li> 168 * <li> 169 * Record <i>Block</i> 170 * <ul> 171 * <li>Uncompressed number of records in the block</li> 172 * <li>Compressed key-lengths block-size</li> 173 * <li>Compressed key-lengths block</li> 174 * <li>Compressed keys block-size</li> 175 * <li>Compressed keys block</li> 176 * <li>Compressed value-lengths block-size</li> 177 * <li>Compressed value-lengths block</li> 178 * <li>Compressed values block-size</li> 179 * <li>Compressed values block</li> 180 * </ul> 181 * </li> 182 * <li> 183 * A sync-marker every block. 184 * </li> 185 * </ul> 186 * 187 * <p>The compressed blocks of key lengths and value lengths consist of the 188 * actual lengths of individual keys/values encoded in ZeroCompressedInteger 189 * format.</p> 190 * 191 * @see CompressionCodec 192 */ 193@InterfaceAudience.Public 194@InterfaceStability.Stable 195public class SequenceFile { 196 private static final Log LOG = LogFactory.getLog(SequenceFile.class); 197 198 private SequenceFile() {} // no public ctor 199 200 private static final byte BLOCK_COMPRESS_VERSION = (byte)4; 201 private static final byte CUSTOM_COMPRESS_VERSION = (byte)5; 202 private static final byte VERSION_WITH_METADATA = (byte)6; 203 private static byte[] VERSION = new byte[] { 204 (byte)'S', (byte)'E', (byte)'Q', VERSION_WITH_METADATA 205 }; 206 207 private static final int SYNC_ESCAPE = -1; // "length" of sync entries 208 private static final int SYNC_HASH_SIZE = 16; // number of bytes in hash 209 private static final int SYNC_SIZE = 4+SYNC_HASH_SIZE; // escape + hash 210 211 /** The number of bytes between sync points.*/ 212 public static final int SYNC_INTERVAL = 100*SYNC_SIZE; 213 214 /** 215 * The compression type used to compress key/value pairs in the 216 * {@link SequenceFile}. 217 * 218 * @see SequenceFile.Writer 219 */ 220 public static enum CompressionType { 221 /** Do not compress records. */ 222 NONE, 223 /** Compress values only, each separately. */ 224 RECORD, 225 /** Compress sequences of records together in blocks. */ 226 BLOCK 227 } 228 229 /** 230 * Get the compression type for the reduce outputs 231 * @param job the job config to look in 232 * @return the kind of compression to use 233 */ 234 static public CompressionType getDefaultCompressionType(Configuration job) { 235 String name = job.get("io.seqfile.compression.type"); 236 return name == null ? CompressionType.RECORD : 237 CompressionType.valueOf(name); 238 } 239 240 /** 241 * Set the default compression type for sequence files. 242 * @param job the configuration to modify 243 * @param val the new compression type (none, block, record) 244 */ 245 static public void setDefaultCompressionType(Configuration job, 246 CompressionType val) { 247 job.set("io.seqfile.compression.type", val.toString()); 248 } 249 250 /** 251 * Create a new Writer with the given options. 252 * @param conf the configuration to use 253 * @param opts the options to create the file with 254 * @return a new Writer 255 * @throws IOException 256 */ 257 public static Writer createWriter(Configuration conf, Writer.Option... opts 258 ) throws IOException { 259 Writer.CompressionOption compressionOption = 260 Options.getOption(Writer.CompressionOption.class, opts); 261 CompressionType kind; 262 if (compressionOption != null) { 263 kind = compressionOption.getValue(); 264 } else { 265 kind = getDefaultCompressionType(conf); 266 opts = Options.prependOptions(opts, Writer.compression(kind)); 267 } 268 switch (kind) { 269 default: 270 case NONE: 271 return new Writer(conf, opts); 272 case RECORD: 273 return new RecordCompressWriter(conf, opts); 274 case BLOCK: 275 return new BlockCompressWriter(conf, opts); 276 } 277 } 278 279 /** 280 * Construct the preferred type of SequenceFile Writer. 281 * @param fs The configured filesystem. 282 * @param conf The configuration. 283 * @param name The name of the file. 284 * @param keyClass The 'key' type. 285 * @param valClass The 'value' type. 286 * @return Returns the handle to the constructed SequenceFile Writer. 287 * @throws IOException 288 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 289 * instead. 290 */ 291 @Deprecated 292 public static Writer 293 createWriter(FileSystem fs, Configuration conf, Path name, 294 Class keyClass, Class valClass) throws IOException { 295 return createWriter(conf, Writer.filesystem(fs), 296 Writer.file(name), Writer.keyClass(keyClass), 297 Writer.valueClass(valClass)); 298 } 299 300 /** 301 * Construct the preferred type of SequenceFile Writer. 302 * @param fs The configured filesystem. 303 * @param conf The configuration. 304 * @param name The name of the file. 305 * @param keyClass The 'key' type. 306 * @param valClass The 'value' type. 307 * @param compressionType The compression type. 308 * @return Returns the handle to the constructed SequenceFile Writer. 309 * @throws IOException 310 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 311 * instead. 312 */ 313 @Deprecated 314 public static Writer 315 createWriter(FileSystem fs, Configuration conf, Path name, 316 Class keyClass, Class valClass, 317 CompressionType compressionType) throws IOException { 318 return createWriter(conf, Writer.filesystem(fs), 319 Writer.file(name), Writer.keyClass(keyClass), 320 Writer.valueClass(valClass), 321 Writer.compression(compressionType)); 322 } 323 324 /** 325 * Construct the preferred type of SequenceFile Writer. 326 * @param fs The configured filesystem. 327 * @param conf The configuration. 328 * @param name The name of the file. 329 * @param keyClass The 'key' type. 330 * @param valClass The 'value' type. 331 * @param compressionType The compression type. 332 * @param progress The Progressable object to track progress. 333 * @return Returns the handle to the constructed SequenceFile Writer. 334 * @throws IOException 335 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 336 * instead. 337 */ 338 @Deprecated 339 public static Writer 340 createWriter(FileSystem fs, Configuration conf, Path name, 341 Class keyClass, Class valClass, CompressionType compressionType, 342 Progressable progress) throws IOException { 343 return createWriter(conf, Writer.file(name), 344 Writer.filesystem(fs), 345 Writer.keyClass(keyClass), 346 Writer.valueClass(valClass), 347 Writer.compression(compressionType), 348 Writer.progressable(progress)); 349 } 350 351 /** 352 * Construct the preferred type of SequenceFile Writer. 353 * @param fs The configured filesystem. 354 * @param conf The configuration. 355 * @param name The name of the file. 356 * @param keyClass The 'key' type. 357 * @param valClass The 'value' type. 358 * @param compressionType The compression type. 359 * @param codec The compression codec. 360 * @return Returns the handle to the constructed SequenceFile Writer. 361 * @throws IOException 362 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 363 * instead. 364 */ 365 @Deprecated 366 public static Writer 367 createWriter(FileSystem fs, Configuration conf, Path name, 368 Class keyClass, Class valClass, CompressionType compressionType, 369 CompressionCodec codec) throws IOException { 370 return createWriter(conf, Writer.file(name), 371 Writer.filesystem(fs), 372 Writer.keyClass(keyClass), 373 Writer.valueClass(valClass), 374 Writer.compression(compressionType, codec)); 375 } 376 377 /** 378 * Construct the preferred type of SequenceFile Writer. 379 * @param fs The configured filesystem. 380 * @param conf The configuration. 381 * @param name The name of the file. 382 * @param keyClass The 'key' type. 383 * @param valClass The 'value' type. 384 * @param compressionType The compression type. 385 * @param codec The compression codec. 386 * @param progress The Progressable object to track progress. 387 * @param metadata The metadata of the file. 388 * @return Returns the handle to the constructed SequenceFile Writer. 389 * @throws IOException 390 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 391 * instead. 392 */ 393 @Deprecated 394 public static Writer 395 createWriter(FileSystem fs, Configuration conf, Path name, 396 Class keyClass, Class valClass, 397 CompressionType compressionType, CompressionCodec codec, 398 Progressable progress, Metadata metadata) throws IOException { 399 return createWriter(conf, Writer.file(name), 400 Writer.filesystem(fs), 401 Writer.keyClass(keyClass), 402 Writer.valueClass(valClass), 403 Writer.compression(compressionType, codec), 404 Writer.progressable(progress), 405 Writer.metadata(metadata)); 406 } 407 408 /** 409 * Construct the preferred type of SequenceFile Writer. 410 * @param fs The configured filesystem. 411 * @param conf The configuration. 412 * @param name The name of the file. 413 * @param keyClass The 'key' type. 414 * @param valClass The 'value' type. 415 * @param bufferSize buffer size for the underlaying outputstream. 416 * @param replication replication factor for the file. 417 * @param blockSize block size for the file. 418 * @param compressionType The compression type. 419 * @param codec The compression codec. 420 * @param progress The Progressable object to track progress. 421 * @param metadata The metadata of the file. 422 * @return Returns the handle to the constructed SequenceFile Writer. 423 * @throws IOException 424 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 425 * instead. 426 */ 427 @Deprecated 428 public static Writer 429 createWriter(FileSystem fs, Configuration conf, Path name, 430 Class keyClass, Class valClass, int bufferSize, 431 short replication, long blockSize, 432 CompressionType compressionType, CompressionCodec codec, 433 Progressable progress, Metadata metadata) throws IOException { 434 return createWriter(conf, Writer.file(name), 435 Writer.filesystem(fs), 436 Writer.keyClass(keyClass), 437 Writer.valueClass(valClass), 438 Writer.bufferSize(bufferSize), 439 Writer.replication(replication), 440 Writer.blockSize(blockSize), 441 Writer.compression(compressionType, codec), 442 Writer.progressable(progress), 443 Writer.metadata(metadata)); 444 } 445 446 /** 447 * Construct the preferred type of SequenceFile Writer. 448 * @param fs The configured filesystem. 449 * @param conf The configuration. 450 * @param name The name of the file. 451 * @param keyClass The 'key' type. 452 * @param valClass The 'value' type. 453 * @param bufferSize buffer size for the underlaying outputstream. 454 * @param replication replication factor for the file. 455 * @param blockSize block size for the file. 456 * @param createParent create parent directory if non-existent 457 * @param compressionType The compression type. 458 * @param codec The compression codec. 459 * @param metadata The metadata of the file. 460 * @return Returns the handle to the constructed SequenceFile Writer. 461 * @throws IOException 462 */ 463 @Deprecated 464 public static Writer 465 createWriter(FileSystem fs, Configuration conf, Path name, 466 Class keyClass, Class valClass, int bufferSize, 467 short replication, long blockSize, boolean createParent, 468 CompressionType compressionType, CompressionCodec codec, 469 Metadata metadata) throws IOException { 470 return createWriter(FileContext.getFileContext(fs.getUri(), conf), 471 conf, name, keyClass, valClass, compressionType, codec, 472 metadata, EnumSet.of(CreateFlag.CREATE,CreateFlag.OVERWRITE), 473 CreateOpts.bufferSize(bufferSize), 474 createParent ? CreateOpts.createParent() 475 : CreateOpts.donotCreateParent(), 476 CreateOpts.repFac(replication), 477 CreateOpts.blockSize(blockSize) 478 ); 479 } 480 481 /** 482 * Construct the preferred type of SequenceFile Writer. 483 * @param fc The context for the specified file. 484 * @param conf The configuration. 485 * @param name The name of the file. 486 * @param keyClass The 'key' type. 487 * @param valClass The 'value' type. 488 * @param compressionType The compression type. 489 * @param codec The compression codec. 490 * @param metadata The metadata of the file. 491 * @param createFlag gives the semantics of create: overwrite, append etc. 492 * @param opts file creation options; see {@link CreateOpts}. 493 * @return Returns the handle to the constructed SequenceFile Writer. 494 * @throws IOException 495 */ 496 public static Writer 497 createWriter(FileContext fc, Configuration conf, Path name, 498 Class keyClass, Class valClass, 499 CompressionType compressionType, CompressionCodec codec, 500 Metadata metadata, 501 final EnumSet<CreateFlag> createFlag, CreateOpts... opts) 502 throws IOException { 503 return createWriter(conf, fc.create(name, createFlag, opts), 504 keyClass, valClass, compressionType, codec, metadata).ownStream(); 505 } 506 507 /** 508 * Construct the preferred type of SequenceFile Writer. 509 * @param fs The configured filesystem. 510 * @param conf The configuration. 511 * @param name The name of the file. 512 * @param keyClass The 'key' type. 513 * @param valClass The 'value' type. 514 * @param compressionType The compression type. 515 * @param codec The compression codec. 516 * @param progress The Progressable object to track progress. 517 * @return Returns the handle to the constructed SequenceFile Writer. 518 * @throws IOException 519 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 520 * instead. 521 */ 522 @Deprecated 523 public static Writer 524 createWriter(FileSystem fs, Configuration conf, Path name, 525 Class keyClass, Class valClass, 526 CompressionType compressionType, CompressionCodec codec, 527 Progressable progress) throws IOException { 528 return createWriter(conf, Writer.file(name), 529 Writer.filesystem(fs), 530 Writer.keyClass(keyClass), 531 Writer.valueClass(valClass), 532 Writer.compression(compressionType, codec), 533 Writer.progressable(progress)); 534 } 535 536 /** 537 * Construct the preferred type of 'raw' SequenceFile Writer. 538 * @param conf The configuration. 539 * @param out The stream on top which the writer is to be constructed. 540 * @param keyClass The 'key' type. 541 * @param valClass The 'value' type. 542 * @param compressionType The compression type. 543 * @param codec The compression codec. 544 * @param metadata The metadata of the file. 545 * @return Returns the handle to the constructed SequenceFile Writer. 546 * @throws IOException 547 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 548 * instead. 549 */ 550 @Deprecated 551 public static Writer 552 createWriter(Configuration conf, FSDataOutputStream out, 553 Class keyClass, Class valClass, 554 CompressionType compressionType, 555 CompressionCodec codec, Metadata metadata) throws IOException { 556 return createWriter(conf, Writer.stream(out), Writer.keyClass(keyClass), 557 Writer.valueClass(valClass), 558 Writer.compression(compressionType, codec), 559 Writer.metadata(metadata)); 560 } 561 562 /** 563 * Construct the preferred type of 'raw' SequenceFile Writer. 564 * @param conf The configuration. 565 * @param out The stream on top which the writer is to be constructed. 566 * @param keyClass The 'key' type. 567 * @param valClass The 'value' type. 568 * @param compressionType The compression type. 569 * @param codec The compression codec. 570 * @return Returns the handle to the constructed SequenceFile Writer. 571 * @throws IOException 572 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 573 * instead. 574 */ 575 @Deprecated 576 public static Writer 577 createWriter(Configuration conf, FSDataOutputStream out, 578 Class keyClass, Class valClass, CompressionType compressionType, 579 CompressionCodec codec) throws IOException { 580 return createWriter(conf, Writer.stream(out), Writer.keyClass(keyClass), 581 Writer.valueClass(valClass), 582 Writer.compression(compressionType, codec)); 583 } 584 585 586 /** The interface to 'raw' values of SequenceFiles. */ 587 public static interface ValueBytes { 588 589 /** Writes the uncompressed bytes to the outStream. 590 * @param outStream : Stream to write uncompressed bytes into. 591 * @throws IOException 592 */ 593 public void writeUncompressedBytes(DataOutputStream outStream) 594 throws IOException; 595 596 /** Write compressed bytes to outStream. 597 * Note: that it will NOT compress the bytes if they are not compressed. 598 * @param outStream : Stream to write compressed bytes into. 599 */ 600 public void writeCompressedBytes(DataOutputStream outStream) 601 throws IllegalArgumentException, IOException; 602 603 /** 604 * Size of stored data. 605 */ 606 public int getSize(); 607 } 608 609 private static class UncompressedBytes implements ValueBytes { 610 private int dataSize; 611 private byte[] data; 612 613 private UncompressedBytes() { 614 data = null; 615 dataSize = 0; 616 } 617 618 private void reset(DataInputStream in, int length) throws IOException { 619 if (data == null) { 620 data = new byte[length]; 621 } else if (length > data.length) { 622 data = new byte[Math.max(length, data.length * 2)]; 623 } 624 dataSize = -1; 625 in.readFully(data, 0, length); 626 dataSize = length; 627 } 628 629 @Override 630 public int getSize() { 631 return dataSize; 632 } 633 634 @Override 635 public void writeUncompressedBytes(DataOutputStream outStream) 636 throws IOException { 637 outStream.write(data, 0, dataSize); 638 } 639 640 @Override 641 public void writeCompressedBytes(DataOutputStream outStream) 642 throws IllegalArgumentException, IOException { 643 throw 644 new IllegalArgumentException("UncompressedBytes cannot be compressed!"); 645 } 646 647 } // UncompressedBytes 648 649 private static class CompressedBytes implements ValueBytes { 650 private int dataSize; 651 private byte[] data; 652 DataInputBuffer rawData = null; 653 CompressionCodec codec = null; 654 CompressionInputStream decompressedStream = null; 655 656 private CompressedBytes(CompressionCodec codec) { 657 data = null; 658 dataSize = 0; 659 this.codec = codec; 660 } 661 662 private void reset(DataInputStream in, int length) throws IOException { 663 if (data == null) { 664 data = new byte[length]; 665 } else if (length > data.length) { 666 data = new byte[Math.max(length, data.length * 2)]; 667 } 668 dataSize = -1; 669 in.readFully(data, 0, length); 670 dataSize = length; 671 } 672 673 @Override 674 public int getSize() { 675 return dataSize; 676 } 677 678 @Override 679 public void writeUncompressedBytes(DataOutputStream outStream) 680 throws IOException { 681 if (decompressedStream == null) { 682 rawData = new DataInputBuffer(); 683 decompressedStream = codec.createInputStream(rawData); 684 } else { 685 decompressedStream.resetState(); 686 } 687 rawData.reset(data, 0, dataSize); 688 689 byte[] buffer = new byte[8192]; 690 int bytesRead = 0; 691 while ((bytesRead = decompressedStream.read(buffer, 0, 8192)) != -1) { 692 outStream.write(buffer, 0, bytesRead); 693 } 694 } 695 696 @Override 697 public void writeCompressedBytes(DataOutputStream outStream) 698 throws IllegalArgumentException, IOException { 699 outStream.write(data, 0, dataSize); 700 } 701 702 } // CompressedBytes 703 704 /** 705 * The class encapsulating with the metadata of a file. 706 * The metadata of a file is a list of attribute name/value 707 * pairs of Text type. 708 * 709 */ 710 public static class Metadata implements Writable { 711 712 private TreeMap<Text, Text> theMetadata; 713 714 public Metadata() { 715 this(new TreeMap<Text, Text>()); 716 } 717 718 public Metadata(TreeMap<Text, Text> arg) { 719 if (arg == null) { 720 this.theMetadata = new TreeMap<Text, Text>(); 721 } else { 722 this.theMetadata = arg; 723 } 724 } 725 726 public Text get(Text name) { 727 return this.theMetadata.get(name); 728 } 729 730 public void set(Text name, Text value) { 731 this.theMetadata.put(name, value); 732 } 733 734 public TreeMap<Text, Text> getMetadata() { 735 return new TreeMap<Text, Text>(this.theMetadata); 736 } 737 738 @Override 739 public void write(DataOutput out) throws IOException { 740 out.writeInt(this.theMetadata.size()); 741 Iterator<Map.Entry<Text, Text>> iter = 742 this.theMetadata.entrySet().iterator(); 743 while (iter.hasNext()) { 744 Map.Entry<Text, Text> en = iter.next(); 745 en.getKey().write(out); 746 en.getValue().write(out); 747 } 748 } 749 750 @Override 751 public void readFields(DataInput in) throws IOException { 752 int sz = in.readInt(); 753 if (sz < 0) throw new IOException("Invalid size: " + sz + " for file metadata object"); 754 this.theMetadata = new TreeMap<Text, Text>(); 755 for (int i = 0; i < sz; i++) { 756 Text key = new Text(); 757 Text val = new Text(); 758 key.readFields(in); 759 val.readFields(in); 760 this.theMetadata.put(key, val); 761 } 762 } 763 764 @Override 765 public boolean equals(Object other) { 766 if (other == null) { 767 return false; 768 } 769 if (other.getClass() != this.getClass()) { 770 return false; 771 } else { 772 return equals((Metadata)other); 773 } 774 } 775 776 public boolean equals(Metadata other) { 777 if (other == null) return false; 778 if (this.theMetadata.size() != other.theMetadata.size()) { 779 return false; 780 } 781 Iterator<Map.Entry<Text, Text>> iter1 = 782 this.theMetadata.entrySet().iterator(); 783 Iterator<Map.Entry<Text, Text>> iter2 = 784 other.theMetadata.entrySet().iterator(); 785 while (iter1.hasNext() && iter2.hasNext()) { 786 Map.Entry<Text, Text> en1 = iter1.next(); 787 Map.Entry<Text, Text> en2 = iter2.next(); 788 if (!en1.getKey().equals(en2.getKey())) { 789 return false; 790 } 791 if (!en1.getValue().equals(en2.getValue())) { 792 return false; 793 } 794 } 795 if (iter1.hasNext() || iter2.hasNext()) { 796 return false; 797 } 798 return true; 799 } 800 801 @Override 802 public int hashCode() { 803 assert false : "hashCode not designed"; 804 return 42; // any arbitrary constant will do 805 } 806 807 @Override 808 public String toString() { 809 StringBuilder sb = new StringBuilder(); 810 sb.append("size: ").append(this.theMetadata.size()).append("\n"); 811 Iterator<Map.Entry<Text, Text>> iter = 812 this.theMetadata.entrySet().iterator(); 813 while (iter.hasNext()) { 814 Map.Entry<Text, Text> en = iter.next(); 815 sb.append("\t").append(en.getKey().toString()).append("\t").append(en.getValue().toString()); 816 sb.append("\n"); 817 } 818 return sb.toString(); 819 } 820 } 821 822 /** Write key/value pairs to a sequence-format file. */ 823 public static class Writer implements java.io.Closeable, Syncable { 824 private Configuration conf; 825 FSDataOutputStream out; 826 boolean ownOutputStream = true; 827 DataOutputBuffer buffer = new DataOutputBuffer(); 828 829 Class keyClass; 830 Class valClass; 831 832 private final CompressionType compress; 833 CompressionCodec codec = null; 834 CompressionOutputStream deflateFilter = null; 835 DataOutputStream deflateOut = null; 836 Metadata metadata = null; 837 Compressor compressor = null; 838 839 private boolean appendMode = false; 840 841 protected Serializer keySerializer; 842 protected Serializer uncompressedValSerializer; 843 protected Serializer compressedValSerializer; 844 845 // Insert a globally unique 16-byte value every few entries, so that one 846 // can seek into the middle of a file and then synchronize with record 847 // starts and ends by scanning for this value. 848 long lastSyncPos; // position of last sync 849 byte[] sync; // 16 random bytes 850 { 851 try { 852 MessageDigest digester = MessageDigest.getInstance("MD5"); 853 long time = Time.now(); 854 digester.update((new UID()+"@"+time).getBytes()); 855 sync = digester.digest(); 856 } catch (Exception e) { 857 throw new RuntimeException(e); 858 } 859 } 860 861 public static interface Option {} 862 863 static class FileOption extends Options.PathOption 864 implements Option { 865 FileOption(Path path) { 866 super(path); 867 } 868 } 869 870 /** 871 * @deprecated only used for backwards-compatibility in the createWriter methods 872 * that take FileSystem. 873 */ 874 @Deprecated 875 private static class FileSystemOption implements Option { 876 private final FileSystem value; 877 protected FileSystemOption(FileSystem value) { 878 this.value = value; 879 } 880 public FileSystem getValue() { 881 return value; 882 } 883 } 884 885 static class StreamOption extends Options.FSDataOutputStreamOption 886 implements Option { 887 StreamOption(FSDataOutputStream stream) { 888 super(stream); 889 } 890 } 891 892 static class BufferSizeOption extends Options.IntegerOption 893 implements Option { 894 BufferSizeOption(int value) { 895 super(value); 896 } 897 } 898 899 static class BlockSizeOption extends Options.LongOption implements Option { 900 BlockSizeOption(long value) { 901 super(value); 902 } 903 } 904 905 static class ReplicationOption extends Options.IntegerOption 906 implements Option { 907 ReplicationOption(int value) { 908 super(value); 909 } 910 } 911 912 static class AppendIfExistsOption extends Options.BooleanOption implements 913 Option { 914 AppendIfExistsOption(boolean value) { 915 super(value); 916 } 917 } 918 919 static class KeyClassOption extends Options.ClassOption implements Option { 920 KeyClassOption(Class<?> value) { 921 super(value); 922 } 923 } 924 925 static class ValueClassOption extends Options.ClassOption 926 implements Option { 927 ValueClassOption(Class<?> value) { 928 super(value); 929 } 930 } 931 932 static class MetadataOption implements Option { 933 private final Metadata value; 934 MetadataOption(Metadata value) { 935 this.value = value; 936 } 937 Metadata getValue() { 938 return value; 939 } 940 } 941 942 static class ProgressableOption extends Options.ProgressableOption 943 implements Option { 944 ProgressableOption(Progressable value) { 945 super(value); 946 } 947 } 948 949 private static class CompressionOption implements Option { 950 private final CompressionType value; 951 private final CompressionCodec codec; 952 CompressionOption(CompressionType value) { 953 this(value, null); 954 } 955 CompressionOption(CompressionType value, CompressionCodec codec) { 956 this.value = value; 957 this.codec = (CompressionType.NONE != value && null == codec) 958 ? new DefaultCodec() 959 : codec; 960 } 961 CompressionType getValue() { 962 return value; 963 } 964 CompressionCodec getCodec() { 965 return codec; 966 } 967 } 968 969 public static Option file(Path value) { 970 return new FileOption(value); 971 } 972 973 /** 974 * @deprecated only used for backwards-compatibility in the createWriter methods 975 * that take FileSystem. 976 */ 977 @Deprecated 978 private static Option filesystem(FileSystem fs) { 979 return new SequenceFile.Writer.FileSystemOption(fs); 980 } 981 982 public static Option bufferSize(int value) { 983 return new BufferSizeOption(value); 984 } 985 986 public static Option stream(FSDataOutputStream value) { 987 return new StreamOption(value); 988 } 989 990 public static Option replication(short value) { 991 return new ReplicationOption(value); 992 } 993 994 public static Option appendIfExists(boolean value) { 995 return new AppendIfExistsOption(value); 996 } 997 998 public static Option blockSize(long value) { 999 return new BlockSizeOption(value); 1000 } 1001 1002 public static Option progressable(Progressable value) { 1003 return new ProgressableOption(value); 1004 } 1005 1006 public static Option keyClass(Class<?> value) { 1007 return new KeyClassOption(value); 1008 } 1009 1010 public static Option valueClass(Class<?> value) { 1011 return new ValueClassOption(value); 1012 } 1013 1014 public static Option metadata(Metadata value) { 1015 return new MetadataOption(value); 1016 } 1017 1018 public static Option compression(CompressionType value) { 1019 return new CompressionOption(value); 1020 } 1021 1022 public static Option compression(CompressionType value, 1023 CompressionCodec codec) { 1024 return new CompressionOption(value, codec); 1025 } 1026 1027 /** 1028 * Construct a uncompressed writer from a set of options. 1029 * @param conf the configuration to use 1030 * @param options the options used when creating the writer 1031 * @throws IOException if it fails 1032 */ 1033 Writer(Configuration conf, 1034 Option... opts) throws IOException { 1035 BlockSizeOption blockSizeOption = 1036 Options.getOption(BlockSizeOption.class, opts); 1037 BufferSizeOption bufferSizeOption = 1038 Options.getOption(BufferSizeOption.class, opts); 1039 ReplicationOption replicationOption = 1040 Options.getOption(ReplicationOption.class, opts); 1041 ProgressableOption progressOption = 1042 Options.getOption(ProgressableOption.class, opts); 1043 FileOption fileOption = Options.getOption(FileOption.class, opts); 1044 AppendIfExistsOption appendIfExistsOption = Options.getOption( 1045 AppendIfExistsOption.class, opts); 1046 FileSystemOption fsOption = Options.getOption(FileSystemOption.class, opts); 1047 StreamOption streamOption = Options.getOption(StreamOption.class, opts); 1048 KeyClassOption keyClassOption = 1049 Options.getOption(KeyClassOption.class, opts); 1050 ValueClassOption valueClassOption = 1051 Options.getOption(ValueClassOption.class, opts); 1052 MetadataOption metadataOption = 1053 Options.getOption(MetadataOption.class, opts); 1054 CompressionOption compressionTypeOption = 1055 Options.getOption(CompressionOption.class, opts); 1056 // check consistency of options 1057 if ((fileOption == null) == (streamOption == null)) { 1058 throw new IllegalArgumentException("file or stream must be specified"); 1059 } 1060 if (fileOption == null && (blockSizeOption != null || 1061 bufferSizeOption != null || 1062 replicationOption != null || 1063 progressOption != null)) { 1064 throw new IllegalArgumentException("file modifier options not " + 1065 "compatible with stream"); 1066 } 1067 1068 FSDataOutputStream out; 1069 boolean ownStream = fileOption != null; 1070 if (ownStream) { 1071 Path p = fileOption.getValue(); 1072 FileSystem fs; 1073 if (fsOption != null) { 1074 fs = fsOption.getValue(); 1075 } else { 1076 fs = p.getFileSystem(conf); 1077 } 1078 int bufferSize = bufferSizeOption == null ? getBufferSize(conf) : 1079 bufferSizeOption.getValue(); 1080 short replication = replicationOption == null ? 1081 fs.getDefaultReplication(p) : 1082 (short) replicationOption.getValue(); 1083 long blockSize = blockSizeOption == null ? fs.getDefaultBlockSize(p) : 1084 blockSizeOption.getValue(); 1085 Progressable progress = progressOption == null ? null : 1086 progressOption.getValue(); 1087 1088 if (appendIfExistsOption != null && appendIfExistsOption.getValue() 1089 && fs.exists(p)) { 1090 1091 // Read the file and verify header details 1092 SequenceFile.Reader reader = new SequenceFile.Reader(conf, 1093 SequenceFile.Reader.file(p), new Reader.OnlyHeaderOption()); 1094 try { 1095 1096 if (keyClassOption.getValue() != reader.getKeyClass() 1097 || valueClassOption.getValue() != reader.getValueClass()) { 1098 throw new IllegalArgumentException( 1099 "Key/value class provided does not match the file"); 1100 } 1101 1102 if (reader.getVersion() != VERSION[3]) { 1103 throw new VersionMismatchException(VERSION[3], 1104 reader.getVersion()); 1105 } 1106 1107 if (metadataOption != null) { 1108 LOG.info("MetaData Option is ignored during append"); 1109 } 1110 metadataOption = (MetadataOption) SequenceFile.Writer 1111 .metadata(reader.getMetadata()); 1112 1113 CompressionOption readerCompressionOption = new CompressionOption( 1114 reader.getCompressionType(), reader.getCompressionCodec()); 1115 1116 // Codec comparison will be ignored if the compression is NONE 1117 if (readerCompressionOption.value != compressionTypeOption.value 1118 || (readerCompressionOption.value != CompressionType.NONE 1119 && readerCompressionOption.codec 1120 .getClass() != compressionTypeOption.codec 1121 .getClass())) { 1122 throw new IllegalArgumentException( 1123 "Compression option provided does not match the file"); 1124 } 1125 1126 sync = reader.getSync(); 1127 1128 } finally { 1129 reader.close(); 1130 } 1131 1132 out = fs.append(p, bufferSize, progress); 1133 this.appendMode = true; 1134 } else { 1135 out = fs 1136 .create(p, true, bufferSize, replication, blockSize, progress); 1137 } 1138 } else { 1139 out = streamOption.getValue(); 1140 } 1141 Class<?> keyClass = keyClassOption == null ? 1142 Object.class : keyClassOption.getValue(); 1143 Class<?> valueClass = valueClassOption == null ? 1144 Object.class : valueClassOption.getValue(); 1145 Metadata metadata = metadataOption == null ? 1146 new Metadata() : metadataOption.getValue(); 1147 this.compress = compressionTypeOption.getValue(); 1148 final CompressionCodec codec = compressionTypeOption.getCodec(); 1149 if (codec != null && 1150 (codec instanceof GzipCodec) && 1151 !NativeCodeLoader.isNativeCodeLoaded() && 1152 !ZlibFactory.isNativeZlibLoaded(conf)) { 1153 throw new IllegalArgumentException("SequenceFile doesn't work with " + 1154 "GzipCodec without native-hadoop " + 1155 "code!"); 1156 } 1157 init(conf, out, ownStream, keyClass, valueClass, codec, metadata); 1158 } 1159 1160 /** Create the named file. 1161 * @deprecated Use 1162 * {@link SequenceFile#createWriter(Configuration, Writer.Option...)} 1163 * instead. 1164 */ 1165 @Deprecated 1166 public Writer(FileSystem fs, Configuration conf, Path name, 1167 Class keyClass, Class valClass) throws IOException { 1168 this.compress = CompressionType.NONE; 1169 init(conf, fs.create(name), true, keyClass, valClass, null, 1170 new Metadata()); 1171 } 1172 1173 /** Create the named file with write-progress reporter. 1174 * @deprecated Use 1175 * {@link SequenceFile#createWriter(Configuration, Writer.Option...)} 1176 * instead. 1177 */ 1178 @Deprecated 1179 public Writer(FileSystem fs, Configuration conf, Path name, 1180 Class keyClass, Class valClass, 1181 Progressable progress, Metadata metadata) throws IOException { 1182 this.compress = CompressionType.NONE; 1183 init(conf, fs.create(name, progress), true, keyClass, valClass, 1184 null, metadata); 1185 } 1186 1187 /** Create the named file with write-progress reporter. 1188 * @deprecated Use 1189 * {@link SequenceFile#createWriter(Configuration, Writer.Option...)} 1190 * instead. 1191 */ 1192 @Deprecated 1193 public Writer(FileSystem fs, Configuration conf, Path name, 1194 Class keyClass, Class valClass, 1195 int bufferSize, short replication, long blockSize, 1196 Progressable progress, Metadata metadata) throws IOException { 1197 this.compress = CompressionType.NONE; 1198 init(conf, 1199 fs.create(name, true, bufferSize, replication, blockSize, progress), 1200 true, keyClass, valClass, null, metadata); 1201 } 1202 1203 boolean isCompressed() { return compress != CompressionType.NONE; } 1204 boolean isBlockCompressed() { return compress == CompressionType.BLOCK; } 1205 1206 Writer ownStream() { this.ownOutputStream = true; return this; } 1207 1208 /** Write and flush the file header. */ 1209 private void writeFileHeader() 1210 throws IOException { 1211 out.write(VERSION); 1212 Text.writeString(out, keyClass.getName()); 1213 Text.writeString(out, valClass.getName()); 1214 1215 out.writeBoolean(this.isCompressed()); 1216 out.writeBoolean(this.isBlockCompressed()); 1217 1218 if (this.isCompressed()) { 1219 Text.writeString(out, (codec.getClass()).getName()); 1220 } 1221 this.metadata.write(out); 1222 out.write(sync); // write the sync bytes 1223 out.flush(); // flush header 1224 } 1225 1226 /** Initialize. */ 1227 @SuppressWarnings("unchecked") 1228 void init(Configuration conf, FSDataOutputStream out, boolean ownStream, 1229 Class keyClass, Class valClass, 1230 CompressionCodec codec, Metadata metadata) 1231 throws IOException { 1232 this.conf = conf; 1233 this.out = out; 1234 this.ownOutputStream = ownStream; 1235 this.keyClass = keyClass; 1236 this.valClass = valClass; 1237 this.codec = codec; 1238 this.metadata = metadata; 1239 SerializationFactory serializationFactory = new SerializationFactory(conf); 1240 this.keySerializer = serializationFactory.getSerializer(keyClass); 1241 if (this.keySerializer == null) { 1242 throw new IOException( 1243 "Could not find a serializer for the Key class: '" 1244 + keyClass.getCanonicalName() + "'. " 1245 + "Please ensure that the configuration '" + 1246 CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is " 1247 + "properly configured, if you're using" 1248 + "custom serialization."); 1249 } 1250 this.keySerializer.open(buffer); 1251 this.uncompressedValSerializer = serializationFactory.getSerializer(valClass); 1252 if (this.uncompressedValSerializer == null) { 1253 throw new IOException( 1254 "Could not find a serializer for the Value class: '" 1255 + valClass.getCanonicalName() + "'. " 1256 + "Please ensure that the configuration '" + 1257 CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is " 1258 + "properly configured, if you're using" 1259 + "custom serialization."); 1260 } 1261 this.uncompressedValSerializer.open(buffer); 1262 if (this.codec != null) { 1263 ReflectionUtils.setConf(this.codec, this.conf); 1264 this.compressor = CodecPool.getCompressor(this.codec); 1265 this.deflateFilter = this.codec.createOutputStream(buffer, compressor); 1266 this.deflateOut = 1267 new DataOutputStream(new BufferedOutputStream(deflateFilter)); 1268 this.compressedValSerializer = serializationFactory.getSerializer(valClass); 1269 if (this.compressedValSerializer == null) { 1270 throw new IOException( 1271 "Could not find a serializer for the Value class: '" 1272 + valClass.getCanonicalName() + "'. " 1273 + "Please ensure that the configuration '" + 1274 CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is " 1275 + "properly configured, if you're using" 1276 + "custom serialization."); 1277 } 1278 this.compressedValSerializer.open(deflateOut); 1279 } 1280 1281 if (appendMode) { 1282 sync(); 1283 } else { 1284 writeFileHeader(); 1285 } 1286 } 1287 1288 /** Returns the class of keys in this file. */ 1289 public Class getKeyClass() { return keyClass; } 1290 1291 /** Returns the class of values in this file. */ 1292 public Class getValueClass() { return valClass; } 1293 1294 /** Returns the compression codec of data in this file. */ 1295 public CompressionCodec getCompressionCodec() { return codec; } 1296 1297 /** create a sync point */ 1298 public void sync() throws IOException { 1299 if (sync != null && lastSyncPos != out.getPos()) { 1300 out.writeInt(SYNC_ESCAPE); // mark the start of the sync 1301 out.write(sync); // write sync 1302 lastSyncPos = out.getPos(); // update lastSyncPos 1303 } 1304 } 1305 1306 /** 1307 * flush all currently written data to the file system 1308 * @deprecated Use {@link #hsync()} or {@link #hflush()} instead 1309 */ 1310 @Deprecated 1311 public void syncFs() throws IOException { 1312 if (out != null) { 1313 out.sync(); // flush contents to file system 1314 } 1315 } 1316 1317 @Override 1318 public void hsync() throws IOException { 1319 if (out != null) { 1320 out.hsync(); 1321 } 1322 } 1323 1324 @Override 1325 public void hflush() throws IOException { 1326 if (out != null) { 1327 out.hflush(); 1328 } 1329 } 1330 1331 /** Returns the configuration of this file. */ 1332 Configuration getConf() { return conf; } 1333 1334 /** Close the file. */ 1335 @Override 1336 public synchronized void close() throws IOException { 1337 keySerializer.close(); 1338 uncompressedValSerializer.close(); 1339 if (compressedValSerializer != null) { 1340 compressedValSerializer.close(); 1341 } 1342 1343 CodecPool.returnCompressor(compressor); 1344 compressor = null; 1345 1346 if (out != null) { 1347 1348 // Close the underlying stream iff we own it... 1349 if (ownOutputStream) { 1350 out.close(); 1351 } else { 1352 out.flush(); 1353 } 1354 out = null; 1355 } 1356 } 1357 1358 synchronized void checkAndWriteSync() throws IOException { 1359 if (sync != null && 1360 out.getPos() >= lastSyncPos+SYNC_INTERVAL) { // time to emit sync 1361 sync(); 1362 } 1363 } 1364 1365 /** Append a key/value pair. */ 1366 public void append(Writable key, Writable val) 1367 throws IOException { 1368 append((Object) key, (Object) val); 1369 } 1370 1371 /** Append a key/value pair. */ 1372 @SuppressWarnings("unchecked") 1373 public synchronized void append(Object key, Object val) 1374 throws IOException { 1375 if (key.getClass() != keyClass) 1376 throw new IOException("wrong key class: "+key.getClass().getName() 1377 +" is not "+keyClass); 1378 if (val.getClass() != valClass) 1379 throw new IOException("wrong value class: "+val.getClass().getName() 1380 +" is not "+valClass); 1381 1382 buffer.reset(); 1383 1384 // Append the 'key' 1385 keySerializer.serialize(key); 1386 int keyLength = buffer.getLength(); 1387 if (keyLength < 0) 1388 throw new IOException("negative length keys not allowed: " + key); 1389 1390 // Append the 'value' 1391 if (compress == CompressionType.RECORD) { 1392 deflateFilter.resetState(); 1393 compressedValSerializer.serialize(val); 1394 deflateOut.flush(); 1395 deflateFilter.finish(); 1396 } else { 1397 uncompressedValSerializer.serialize(val); 1398 } 1399 1400 // Write the record out 1401 checkAndWriteSync(); // sync 1402 out.writeInt(buffer.getLength()); // total record length 1403 out.writeInt(keyLength); // key portion length 1404 out.write(buffer.getData(), 0, buffer.getLength()); // data 1405 } 1406 1407 public synchronized void appendRaw(byte[] keyData, int keyOffset, 1408 int keyLength, ValueBytes val) throws IOException { 1409 if (keyLength < 0) 1410 throw new IOException("negative length keys not allowed: " + keyLength); 1411 1412 int valLength = val.getSize(); 1413 1414 checkAndWriteSync(); 1415 1416 out.writeInt(keyLength+valLength); // total record length 1417 out.writeInt(keyLength); // key portion length 1418 out.write(keyData, keyOffset, keyLength); // key 1419 val.writeUncompressedBytes(out); // value 1420 } 1421 1422 /** Returns the current length of the output file. 1423 * 1424 * <p>This always returns a synchronized position. In other words, 1425 * immediately after calling {@link SequenceFile.Reader#seek(long)} with a position 1426 * returned by this method, {@link SequenceFile.Reader#next(Writable)} may be called. However 1427 * the key may be earlier in the file than key last written when this 1428 * method was called (e.g., with block-compression, it may be the first key 1429 * in the block that was being written when this method was called). 1430 */ 1431 public synchronized long getLength() throws IOException { 1432 return out.getPos(); 1433 } 1434 1435 } // class Writer 1436 1437 /** Write key/compressed-value pairs to a sequence-format file. */ 1438 static class RecordCompressWriter extends Writer { 1439 1440 RecordCompressWriter(Configuration conf, 1441 Option... options) throws IOException { 1442 super(conf, options); 1443 } 1444 1445 /** Append a key/value pair. */ 1446 @Override 1447 @SuppressWarnings("unchecked") 1448 public synchronized void append(Object key, Object val) 1449 throws IOException { 1450 if (key.getClass() != keyClass) 1451 throw new IOException("wrong key class: "+key.getClass().getName() 1452 +" is not "+keyClass); 1453 if (val.getClass() != valClass) 1454 throw new IOException("wrong value class: "+val.getClass().getName() 1455 +" is not "+valClass); 1456 1457 buffer.reset(); 1458 1459 // Append the 'key' 1460 keySerializer.serialize(key); 1461 int keyLength = buffer.getLength(); 1462 if (keyLength < 0) 1463 throw new IOException("negative length keys not allowed: " + key); 1464 1465 // Compress 'value' and append it 1466 deflateFilter.resetState(); 1467 compressedValSerializer.serialize(val); 1468 deflateOut.flush(); 1469 deflateFilter.finish(); 1470 1471 // Write the record out 1472 checkAndWriteSync(); // sync 1473 out.writeInt(buffer.getLength()); // total record length 1474 out.writeInt(keyLength); // key portion length 1475 out.write(buffer.getData(), 0, buffer.getLength()); // data 1476 } 1477 1478 /** Append a key/value pair. */ 1479 @Override 1480 public synchronized void appendRaw(byte[] keyData, int keyOffset, 1481 int keyLength, ValueBytes val) throws IOException { 1482 1483 if (keyLength < 0) 1484 throw new IOException("negative length keys not allowed: " + keyLength); 1485 1486 int valLength = val.getSize(); 1487 1488 checkAndWriteSync(); // sync 1489 out.writeInt(keyLength+valLength); // total record length 1490 out.writeInt(keyLength); // key portion length 1491 out.write(keyData, keyOffset, keyLength); // 'key' data 1492 val.writeCompressedBytes(out); // 'value' data 1493 } 1494 1495 } // RecordCompressionWriter 1496 1497 /** Write compressed key/value blocks to a sequence-format file. */ 1498 static class BlockCompressWriter extends Writer { 1499 1500 private int noBufferedRecords = 0; 1501 1502 private DataOutputBuffer keyLenBuffer = new DataOutputBuffer(); 1503 private DataOutputBuffer keyBuffer = new DataOutputBuffer(); 1504 1505 private DataOutputBuffer valLenBuffer = new DataOutputBuffer(); 1506 private DataOutputBuffer valBuffer = new DataOutputBuffer(); 1507 1508 private final int compressionBlockSize; 1509 1510 BlockCompressWriter(Configuration conf, 1511 Option... options) throws IOException { 1512 super(conf, options); 1513 compressionBlockSize = 1514 conf.getInt("io.seqfile.compress.blocksize", 1000000); 1515 keySerializer.close(); 1516 keySerializer.open(keyBuffer); 1517 uncompressedValSerializer.close(); 1518 uncompressedValSerializer.open(valBuffer); 1519 } 1520 1521 /** Workhorse to check and write out compressed data/lengths */ 1522 private synchronized 1523 void writeBuffer(DataOutputBuffer uncompressedDataBuffer) 1524 throws IOException { 1525 deflateFilter.resetState(); 1526 buffer.reset(); 1527 deflateOut.write(uncompressedDataBuffer.getData(), 0, 1528 uncompressedDataBuffer.getLength()); 1529 deflateOut.flush(); 1530 deflateFilter.finish(); 1531 1532 WritableUtils.writeVInt(out, buffer.getLength()); 1533 out.write(buffer.getData(), 0, buffer.getLength()); 1534 } 1535 1536 /** Compress and flush contents to dfs */ 1537 @Override 1538 public synchronized void sync() throws IOException { 1539 if (noBufferedRecords > 0) { 1540 super.sync(); 1541 1542 // No. of records 1543 WritableUtils.writeVInt(out, noBufferedRecords); 1544 1545 // Write 'keys' and lengths 1546 writeBuffer(keyLenBuffer); 1547 writeBuffer(keyBuffer); 1548 1549 // Write 'values' and lengths 1550 writeBuffer(valLenBuffer); 1551 writeBuffer(valBuffer); 1552 1553 // Flush the file-stream 1554 out.flush(); 1555 1556 // Reset internal states 1557 keyLenBuffer.reset(); 1558 keyBuffer.reset(); 1559 valLenBuffer.reset(); 1560 valBuffer.reset(); 1561 noBufferedRecords = 0; 1562 } 1563 1564 } 1565 1566 /** Close the file. */ 1567 @Override 1568 public synchronized void close() throws IOException { 1569 if (out != null) { 1570 sync(); 1571 } 1572 super.close(); 1573 } 1574 1575 /** Append a key/value pair. */ 1576 @Override 1577 @SuppressWarnings("unchecked") 1578 public synchronized void append(Object key, Object val) 1579 throws IOException { 1580 if (key.getClass() != keyClass) 1581 throw new IOException("wrong key class: "+key+" is not "+keyClass); 1582 if (val.getClass() != valClass) 1583 throw new IOException("wrong value class: "+val+" is not "+valClass); 1584 1585 // Save key/value into respective buffers 1586 int oldKeyLength = keyBuffer.getLength(); 1587 keySerializer.serialize(key); 1588 int keyLength = keyBuffer.getLength() - oldKeyLength; 1589 if (keyLength < 0) 1590 throw new IOException("negative length keys not allowed: " + key); 1591 WritableUtils.writeVInt(keyLenBuffer, keyLength); 1592 1593 int oldValLength = valBuffer.getLength(); 1594 uncompressedValSerializer.serialize(val); 1595 int valLength = valBuffer.getLength() - oldValLength; 1596 WritableUtils.writeVInt(valLenBuffer, valLength); 1597 1598 // Added another key/value pair 1599 ++noBufferedRecords; 1600 1601 // Compress and flush? 1602 int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength(); 1603 if (currentBlockSize >= compressionBlockSize) { 1604 sync(); 1605 } 1606 } 1607 1608 /** Append a key/value pair. */ 1609 @Override 1610 public synchronized void appendRaw(byte[] keyData, int keyOffset, 1611 int keyLength, ValueBytes val) throws IOException { 1612 1613 if (keyLength < 0) 1614 throw new IOException("negative length keys not allowed"); 1615 1616 int valLength = val.getSize(); 1617 1618 // Save key/value data in relevant buffers 1619 WritableUtils.writeVInt(keyLenBuffer, keyLength); 1620 keyBuffer.write(keyData, keyOffset, keyLength); 1621 WritableUtils.writeVInt(valLenBuffer, valLength); 1622 val.writeUncompressedBytes(valBuffer); 1623 1624 // Added another key/value pair 1625 ++noBufferedRecords; 1626 1627 // Compress and flush? 1628 int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength(); 1629 if (currentBlockSize >= compressionBlockSize) { 1630 sync(); 1631 } 1632 } 1633 1634 } // BlockCompressionWriter 1635 1636 /** Get the configured buffer size */ 1637 private static int getBufferSize(Configuration conf) { 1638 return conf.getInt("io.file.buffer.size", 4096); 1639 } 1640 1641 /** Reads key/value pairs from a sequence-format file. */ 1642 public static class Reader implements java.io.Closeable { 1643 private String filename; 1644 private FSDataInputStream in; 1645 private DataOutputBuffer outBuf = new DataOutputBuffer(); 1646 1647 private byte version; 1648 1649 private String keyClassName; 1650 private String valClassName; 1651 private Class keyClass; 1652 private Class valClass; 1653 1654 private CompressionCodec codec = null; 1655 private Metadata metadata = null; 1656 1657 private byte[] sync = new byte[SYNC_HASH_SIZE]; 1658 private byte[] syncCheck = new byte[SYNC_HASH_SIZE]; 1659 private boolean syncSeen; 1660 1661 private long headerEnd; 1662 private long end; 1663 private int keyLength; 1664 private int recordLength; 1665 1666 private boolean decompress; 1667 private boolean blockCompressed; 1668 1669 private Configuration conf; 1670 1671 private int noBufferedRecords = 0; 1672 private boolean lazyDecompress = true; 1673 private boolean valuesDecompressed = true; 1674 1675 private int noBufferedKeys = 0; 1676 private int noBufferedValues = 0; 1677 1678 private DataInputBuffer keyLenBuffer = null; 1679 private CompressionInputStream keyLenInFilter = null; 1680 private DataInputStream keyLenIn = null; 1681 private Decompressor keyLenDecompressor = null; 1682 private DataInputBuffer keyBuffer = null; 1683 private CompressionInputStream keyInFilter = null; 1684 private DataInputStream keyIn = null; 1685 private Decompressor keyDecompressor = null; 1686 1687 private DataInputBuffer valLenBuffer = null; 1688 private CompressionInputStream valLenInFilter = null; 1689 private DataInputStream valLenIn = null; 1690 private Decompressor valLenDecompressor = null; 1691 private DataInputBuffer valBuffer = null; 1692 private CompressionInputStream valInFilter = null; 1693 private DataInputStream valIn = null; 1694 private Decompressor valDecompressor = null; 1695 1696 private Deserializer keyDeserializer; 1697 private Deserializer valDeserializer; 1698 1699 /** 1700 * A tag interface for all of the Reader options 1701 */ 1702 public static interface Option {} 1703 1704 /** 1705 * Create an option to specify the path name of the sequence file. 1706 * @param value the path to read 1707 * @return a new option 1708 */ 1709 public static Option file(Path value) { 1710 return new FileOption(value); 1711 } 1712 1713 /** 1714 * Create an option to specify the stream with the sequence file. 1715 * @param value the stream to read. 1716 * @return a new option 1717 */ 1718 public static Option stream(FSDataInputStream value) { 1719 return new InputStreamOption(value); 1720 } 1721 1722 /** 1723 * Create an option to specify the starting byte to read. 1724 * @param value the number of bytes to skip over 1725 * @return a new option 1726 */ 1727 public static Option start(long value) { 1728 return new StartOption(value); 1729 } 1730 1731 /** 1732 * Create an option to specify the number of bytes to read. 1733 * @param value the number of bytes to read 1734 * @return a new option 1735 */ 1736 public static Option length(long value) { 1737 return new LengthOption(value); 1738 } 1739 1740 /** 1741 * Create an option with the buffer size for reading the given pathname. 1742 * @param value the number of bytes to buffer 1743 * @return a new option 1744 */ 1745 public static Option bufferSize(int value) { 1746 return new BufferSizeOption(value); 1747 } 1748 1749 private static class FileOption extends Options.PathOption 1750 implements Option { 1751 private FileOption(Path value) { 1752 super(value); 1753 } 1754 } 1755 1756 private static class InputStreamOption 1757 extends Options.FSDataInputStreamOption 1758 implements Option { 1759 private InputStreamOption(FSDataInputStream value) { 1760 super(value); 1761 } 1762 } 1763 1764 private static class StartOption extends Options.LongOption 1765 implements Option { 1766 private StartOption(long value) { 1767 super(value); 1768 } 1769 } 1770 1771 private static class LengthOption extends Options.LongOption 1772 implements Option { 1773 private LengthOption(long value) { 1774 super(value); 1775 } 1776 } 1777 1778 private static class BufferSizeOption extends Options.IntegerOption 1779 implements Option { 1780 private BufferSizeOption(int value) { 1781 super(value); 1782 } 1783 } 1784 1785 // only used directly 1786 private static class OnlyHeaderOption extends Options.BooleanOption 1787 implements Option { 1788 private OnlyHeaderOption() { 1789 super(true); 1790 } 1791 } 1792 1793 public Reader(Configuration conf, Option... opts) throws IOException { 1794 // Look up the options, these are null if not set 1795 FileOption fileOpt = Options.getOption(FileOption.class, opts); 1796 InputStreamOption streamOpt = 1797 Options.getOption(InputStreamOption.class, opts); 1798 StartOption startOpt = Options.getOption(StartOption.class, opts); 1799 LengthOption lenOpt = Options.getOption(LengthOption.class, opts); 1800 BufferSizeOption bufOpt = Options.getOption(BufferSizeOption.class,opts); 1801 OnlyHeaderOption headerOnly = 1802 Options.getOption(OnlyHeaderOption.class, opts); 1803 // check for consistency 1804 if ((fileOpt == null) == (streamOpt == null)) { 1805 throw new 1806 IllegalArgumentException("File or stream option must be specified"); 1807 } 1808 if (fileOpt == null && bufOpt != null) { 1809 throw new IllegalArgumentException("buffer size can only be set when" + 1810 " a file is specified."); 1811 } 1812 // figure out the real values 1813 Path filename = null; 1814 FSDataInputStream file; 1815 final long len; 1816 if (fileOpt != null) { 1817 filename = fileOpt.getValue(); 1818 FileSystem fs = filename.getFileSystem(conf); 1819 int bufSize = bufOpt == null ? getBufferSize(conf): bufOpt.getValue(); 1820 len = null == lenOpt 1821 ? fs.getFileStatus(filename).getLen() 1822 : lenOpt.getValue(); 1823 file = openFile(fs, filename, bufSize, len); 1824 } else { 1825 len = null == lenOpt ? Long.MAX_VALUE : lenOpt.getValue(); 1826 file = streamOpt.getValue(); 1827 } 1828 long start = startOpt == null ? 0 : startOpt.getValue(); 1829 // really set up 1830 initialize(filename, file, start, len, conf, headerOnly != null); 1831 } 1832 1833 /** 1834 * Construct a reader by opening a file from the given file system. 1835 * @param fs The file system used to open the file. 1836 * @param file The file being read. 1837 * @param conf Configuration 1838 * @throws IOException 1839 * @deprecated Use Reader(Configuration, Option...) instead. 1840 */ 1841 @Deprecated 1842 public Reader(FileSystem fs, Path file, 1843 Configuration conf) throws IOException { 1844 this(conf, file(file.makeQualified(fs))); 1845 } 1846 1847 /** 1848 * Construct a reader by the given input stream. 1849 * @param in An input stream. 1850 * @param buffersize unused 1851 * @param start The starting position. 1852 * @param length The length being read. 1853 * @param conf Configuration 1854 * @throws IOException 1855 * @deprecated Use Reader(Configuration, Reader.Option...) instead. 1856 */ 1857 @Deprecated 1858 public Reader(FSDataInputStream in, int buffersize, 1859 long start, long length, Configuration conf) throws IOException { 1860 this(conf, stream(in), start(start), length(length)); 1861 } 1862 1863 /** Common work of the constructors. */ 1864 private void initialize(Path filename, FSDataInputStream in, 1865 long start, long length, Configuration conf, 1866 boolean tempReader) throws IOException { 1867 if (in == null) { 1868 throw new IllegalArgumentException("in == null"); 1869 } 1870 this.filename = filename == null ? "<unknown>" : filename.toString(); 1871 this.in = in; 1872 this.conf = conf; 1873 boolean succeeded = false; 1874 try { 1875 seek(start); 1876 this.end = this.in.getPos() + length; 1877 // if it wrapped around, use the max 1878 if (end < length) { 1879 end = Long.MAX_VALUE; 1880 } 1881 init(tempReader); 1882 succeeded = true; 1883 } finally { 1884 if (!succeeded) { 1885 IOUtils.cleanup(LOG, this.in); 1886 } 1887 } 1888 } 1889 1890 /** 1891 * Override this method to specialize the type of 1892 * {@link FSDataInputStream} returned. 1893 * @param fs The file system used to open the file. 1894 * @param file The file being read. 1895 * @param bufferSize The buffer size used to read the file. 1896 * @param length The length being read if it is >= 0. Otherwise, 1897 * the length is not available. 1898 * @return The opened stream. 1899 * @throws IOException 1900 */ 1901 protected FSDataInputStream openFile(FileSystem fs, Path file, 1902 int bufferSize, long length) throws IOException { 1903 return fs.open(file, bufferSize); 1904 } 1905 1906 /** 1907 * Initialize the {@link Reader} 1908 * @param tmpReader <code>true</code> if we are constructing a temporary 1909 * reader {@link SequenceFile.Sorter.cloneFileAttributes}, 1910 * and hence do not initialize every component; 1911 * <code>false</code> otherwise. 1912 * @throws IOException 1913 */ 1914 private void init(boolean tempReader) throws IOException { 1915 byte[] versionBlock = new byte[VERSION.length]; 1916 String exceptionMsg = this + " not a SequenceFile"; 1917 1918 // Try to read sequence file header. 1919 try { 1920 in.readFully(versionBlock); 1921 } catch (EOFException e) { 1922 throw new EOFException(exceptionMsg); 1923 } 1924 1925 if ((versionBlock[0] != VERSION[0]) || 1926 (versionBlock[1] != VERSION[1]) || 1927 (versionBlock[2] != VERSION[2])) { 1928 throw new IOException(this + " not a SequenceFile"); 1929 } 1930 1931 // Set 'version' 1932 version = versionBlock[3]; 1933 if (version > VERSION[3]) { 1934 throw new VersionMismatchException(VERSION[3], version); 1935 } 1936 1937 if (version < BLOCK_COMPRESS_VERSION) { 1938 UTF8 className = new UTF8(); 1939 1940 className.readFields(in); 1941 keyClassName = className.toStringChecked(); // key class name 1942 1943 className.readFields(in); 1944 valClassName = className.toStringChecked(); // val class name 1945 } else { 1946 keyClassName = Text.readString(in); 1947 valClassName = Text.readString(in); 1948 } 1949 1950 if (version > 2) { // if version > 2 1951 this.decompress = in.readBoolean(); // is compressed? 1952 } else { 1953 decompress = false; 1954 } 1955 1956 if (version >= BLOCK_COMPRESS_VERSION) { // if version >= 4 1957 this.blockCompressed = in.readBoolean(); // is block-compressed? 1958 } else { 1959 blockCompressed = false; 1960 } 1961 1962 // if version >= 5 1963 // setup the compression codec 1964 if (decompress) { 1965 if (version >= CUSTOM_COMPRESS_VERSION) { 1966 String codecClassname = Text.readString(in); 1967 try { 1968 Class<? extends CompressionCodec> codecClass 1969 = conf.getClassByName(codecClassname).asSubclass(CompressionCodec.class); 1970 this.codec = ReflectionUtils.newInstance(codecClass, conf); 1971 } catch (ClassNotFoundException cnfe) { 1972 throw new IllegalArgumentException("Unknown codec: " + 1973 codecClassname, cnfe); 1974 } 1975 } else { 1976 codec = new DefaultCodec(); 1977 ((Configurable)codec).setConf(conf); 1978 } 1979 } 1980 1981 this.metadata = new Metadata(); 1982 if (version >= VERSION_WITH_METADATA) { // if version >= 6 1983 this.metadata.readFields(in); 1984 } 1985 1986 if (version > 1) { // if version > 1 1987 in.readFully(sync); // read sync bytes 1988 headerEnd = in.getPos(); // record end of header 1989 } 1990 1991 // Initialize... *not* if this we are constructing a temporary Reader 1992 if (!tempReader) { 1993 valBuffer = new DataInputBuffer(); 1994 if (decompress) { 1995 valDecompressor = CodecPool.getDecompressor(codec); 1996 valInFilter = codec.createInputStream(valBuffer, valDecompressor); 1997 valIn = new DataInputStream(valInFilter); 1998 } else { 1999 valIn = valBuffer; 2000 } 2001 2002 if (blockCompressed) { 2003 keyLenBuffer = new DataInputBuffer(); 2004 keyBuffer = new DataInputBuffer(); 2005 valLenBuffer = new DataInputBuffer(); 2006 2007 keyLenDecompressor = CodecPool.getDecompressor(codec); 2008 keyLenInFilter = codec.createInputStream(keyLenBuffer, 2009 keyLenDecompressor); 2010 keyLenIn = new DataInputStream(keyLenInFilter); 2011 2012 keyDecompressor = CodecPool.getDecompressor(codec); 2013 keyInFilter = codec.createInputStream(keyBuffer, keyDecompressor); 2014 keyIn = new DataInputStream(keyInFilter); 2015 2016 valLenDecompressor = CodecPool.getDecompressor(codec); 2017 valLenInFilter = codec.createInputStream(valLenBuffer, 2018 valLenDecompressor); 2019 valLenIn = new DataInputStream(valLenInFilter); 2020 } 2021 2022 SerializationFactory serializationFactory = 2023 new SerializationFactory(conf); 2024 this.keyDeserializer = 2025 getDeserializer(serializationFactory, getKeyClass()); 2026 if (this.keyDeserializer == null) { 2027 throw new IOException( 2028 "Could not find a deserializer for the Key class: '" 2029 + getKeyClass().getCanonicalName() + "'. " 2030 + "Please ensure that the configuration '" + 2031 CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is " 2032 + "properly configured, if you're using " 2033 + "custom serialization."); 2034 } 2035 if (!blockCompressed) { 2036 this.keyDeserializer.open(valBuffer); 2037 } else { 2038 this.keyDeserializer.open(keyIn); 2039 } 2040 this.valDeserializer = 2041 getDeserializer(serializationFactory, getValueClass()); 2042 if (this.valDeserializer == null) { 2043 throw new IOException( 2044 "Could not find a deserializer for the Value class: '" 2045 + getValueClass().getCanonicalName() + "'. " 2046 + "Please ensure that the configuration '" + 2047 CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is " 2048 + "properly configured, if you're using " 2049 + "custom serialization."); 2050 } 2051 this.valDeserializer.open(valIn); 2052 } 2053 } 2054 2055 @SuppressWarnings("unchecked") 2056 private Deserializer getDeserializer(SerializationFactory sf, Class c) { 2057 return sf.getDeserializer(c); 2058 } 2059 2060 /** Close the file. */ 2061 @Override 2062 public synchronized void close() throws IOException { 2063 // Return the decompressors to the pool 2064 CodecPool.returnDecompressor(keyLenDecompressor); 2065 CodecPool.returnDecompressor(keyDecompressor); 2066 CodecPool.returnDecompressor(valLenDecompressor); 2067 CodecPool.returnDecompressor(valDecompressor); 2068 keyLenDecompressor = keyDecompressor = null; 2069 valLenDecompressor = valDecompressor = null; 2070 2071 if (keyDeserializer != null) { 2072 keyDeserializer.close(); 2073 } 2074 if (valDeserializer != null) { 2075 valDeserializer.close(); 2076 } 2077 2078 // Close the input-stream 2079 in.close(); 2080 } 2081 2082 /** Returns the name of the key class. */ 2083 public String getKeyClassName() { 2084 return keyClassName; 2085 } 2086 2087 /** Returns the class of keys in this file. */ 2088 public synchronized Class<?> getKeyClass() { 2089 if (null == keyClass) { 2090 try { 2091 keyClass = WritableName.getClass(getKeyClassName(), conf); 2092 } catch (IOException e) { 2093 throw new RuntimeException(e); 2094 } 2095 } 2096 return keyClass; 2097 } 2098 2099 /** Returns the name of the value class. */ 2100 public String getValueClassName() { 2101 return valClassName; 2102 } 2103 2104 /** Returns the class of values in this file. */ 2105 public synchronized Class<?> getValueClass() { 2106 if (null == valClass) { 2107 try { 2108 valClass = WritableName.getClass(getValueClassName(), conf); 2109 } catch (IOException e) { 2110 throw new RuntimeException(e); 2111 } 2112 } 2113 return valClass; 2114 } 2115 2116 /** Returns true if values are compressed. */ 2117 public boolean isCompressed() { return decompress; } 2118 2119 /** Returns true if records are block-compressed. */ 2120 public boolean isBlockCompressed() { return blockCompressed; } 2121 2122 /** Returns the compression codec of data in this file. */ 2123 public CompressionCodec getCompressionCodec() { return codec; } 2124 2125 private byte[] getSync() { 2126 return sync; 2127 } 2128 2129 private byte getVersion() { 2130 return version; 2131 } 2132 2133 /** 2134 * Get the compression type for this file. 2135 * @return the compression type 2136 */ 2137 public CompressionType getCompressionType() { 2138 if (decompress) { 2139 return blockCompressed ? CompressionType.BLOCK : CompressionType.RECORD; 2140 } else { 2141 return CompressionType.NONE; 2142 } 2143 } 2144 2145 /** Returns the metadata object of the file */ 2146 public Metadata getMetadata() { 2147 return this.metadata; 2148 } 2149 2150 /** Returns the configuration used for this file. */ 2151 Configuration getConf() { return conf; } 2152 2153 /** Read a compressed buffer */ 2154 private synchronized void readBuffer(DataInputBuffer buffer, 2155 CompressionInputStream filter) throws IOException { 2156 // Read data into a temporary buffer 2157 DataOutputBuffer dataBuffer = new DataOutputBuffer(); 2158 2159 try { 2160 int dataBufferLength = WritableUtils.readVInt(in); 2161 dataBuffer.write(in, dataBufferLength); 2162 2163 // Set up 'buffer' connected to the input-stream 2164 buffer.reset(dataBuffer.getData(), 0, dataBuffer.getLength()); 2165 } finally { 2166 dataBuffer.close(); 2167 } 2168 2169 // Reset the codec 2170 filter.resetState(); 2171 } 2172 2173 /** Read the next 'compressed' block */ 2174 private synchronized void readBlock() throws IOException { 2175 // Check if we need to throw away a whole block of 2176 // 'values' due to 'lazy decompression' 2177 if (lazyDecompress && !valuesDecompressed) { 2178 in.seek(WritableUtils.readVInt(in)+in.getPos()); 2179 in.seek(WritableUtils.readVInt(in)+in.getPos()); 2180 } 2181 2182 // Reset internal states 2183 noBufferedKeys = 0; noBufferedValues = 0; noBufferedRecords = 0; 2184 valuesDecompressed = false; 2185 2186 //Process sync 2187 if (sync != null) { 2188 in.readInt(); 2189 in.readFully(syncCheck); // read syncCheck 2190 if (!Arrays.equals(sync, syncCheck)) // check it 2191 throw new IOException("File is corrupt!"); 2192 } 2193 syncSeen = true; 2194 2195 // Read number of records in this block 2196 noBufferedRecords = WritableUtils.readVInt(in); 2197 2198 // Read key lengths and keys 2199 readBuffer(keyLenBuffer, keyLenInFilter); 2200 readBuffer(keyBuffer, keyInFilter); 2201 noBufferedKeys = noBufferedRecords; 2202 2203 // Read value lengths and values 2204 if (!lazyDecompress) { 2205 readBuffer(valLenBuffer, valLenInFilter); 2206 readBuffer(valBuffer, valInFilter); 2207 noBufferedValues = noBufferedRecords; 2208 valuesDecompressed = true; 2209 } 2210 } 2211 2212 /** 2213 * Position valLenIn/valIn to the 'value' 2214 * corresponding to the 'current' key 2215 */ 2216 private synchronized void seekToCurrentValue() throws IOException { 2217 if (!blockCompressed) { 2218 if (decompress) { 2219 valInFilter.resetState(); 2220 } 2221 valBuffer.reset(); 2222 } else { 2223 // Check if this is the first value in the 'block' to be read 2224 if (lazyDecompress && !valuesDecompressed) { 2225 // Read the value lengths and values 2226 readBuffer(valLenBuffer, valLenInFilter); 2227 readBuffer(valBuffer, valInFilter); 2228 noBufferedValues = noBufferedRecords; 2229 valuesDecompressed = true; 2230 } 2231 2232 // Calculate the no. of bytes to skip 2233 // Note: 'current' key has already been read! 2234 int skipValBytes = 0; 2235 int currentKey = noBufferedKeys + 1; 2236 for (int i=noBufferedValues; i > currentKey; --i) { 2237 skipValBytes += WritableUtils.readVInt(valLenIn); 2238 --noBufferedValues; 2239 } 2240 2241 // Skip to the 'val' corresponding to 'current' key 2242 if (skipValBytes > 0) { 2243 if (valIn.skipBytes(skipValBytes) != skipValBytes) { 2244 throw new IOException("Failed to seek to " + currentKey + 2245 "(th) value!"); 2246 } 2247 } 2248 } 2249 } 2250 2251 /** 2252 * Get the 'value' corresponding to the last read 'key'. 2253 * @param val : The 'value' to be read. 2254 * @throws IOException 2255 */ 2256 public synchronized void getCurrentValue(Writable val) 2257 throws IOException { 2258 if (val instanceof Configurable) { 2259 ((Configurable) val).setConf(this.conf); 2260 } 2261 2262 // Position stream to 'current' value 2263 seekToCurrentValue(); 2264 2265 if (!blockCompressed) { 2266 val.readFields(valIn); 2267 2268 if (valIn.read() > 0) { 2269 LOG.info("available bytes: " + valIn.available()); 2270 throw new IOException(val+" read "+(valBuffer.getPosition()-keyLength) 2271 + " bytes, should read " + 2272 (valBuffer.getLength()-keyLength)); 2273 } 2274 } else { 2275 // Get the value 2276 int valLength = WritableUtils.readVInt(valLenIn); 2277 val.readFields(valIn); 2278 2279 // Read another compressed 'value' 2280 --noBufferedValues; 2281 2282 // Sanity check 2283 if ((valLength < 0) && LOG.isDebugEnabled()) { 2284 LOG.debug(val + " is a zero-length value"); 2285 } 2286 } 2287 2288 } 2289 2290 /** 2291 * Get the 'value' corresponding to the last read 'key'. 2292 * @param val : The 'value' to be read. 2293 * @throws IOException 2294 */ 2295 public synchronized Object getCurrentValue(Object val) 2296 throws IOException { 2297 if (val instanceof Configurable) { 2298 ((Configurable) val).setConf(this.conf); 2299 } 2300 2301 // Position stream to 'current' value 2302 seekToCurrentValue(); 2303 2304 if (!blockCompressed) { 2305 val = deserializeValue(val); 2306 2307 if (valIn.read() > 0) { 2308 LOG.info("available bytes: " + valIn.available()); 2309 throw new IOException(val+" read "+(valBuffer.getPosition()-keyLength) 2310 + " bytes, should read " + 2311 (valBuffer.getLength()-keyLength)); 2312 } 2313 } else { 2314 // Get the value 2315 int valLength = WritableUtils.readVInt(valLenIn); 2316 val = deserializeValue(val); 2317 2318 // Read another compressed 'value' 2319 --noBufferedValues; 2320 2321 // Sanity check 2322 if ((valLength < 0) && LOG.isDebugEnabled()) { 2323 LOG.debug(val + " is a zero-length value"); 2324 } 2325 } 2326 return val; 2327 2328 } 2329 2330 @SuppressWarnings("unchecked") 2331 private Object deserializeValue(Object val) throws IOException { 2332 return valDeserializer.deserialize(val); 2333 } 2334 2335 /** Read the next key in the file into <code>key</code>, skipping its 2336 * value. True if another entry exists, and false at end of file. */ 2337 public synchronized boolean next(Writable key) throws IOException { 2338 if (key.getClass() != getKeyClass()) 2339 throw new IOException("wrong key class: "+key.getClass().getName() 2340 +" is not "+keyClass); 2341 2342 if (!blockCompressed) { 2343 outBuf.reset(); 2344 2345 keyLength = next(outBuf); 2346 if (keyLength < 0) 2347 return false; 2348 2349 valBuffer.reset(outBuf.getData(), outBuf.getLength()); 2350 2351 key.readFields(valBuffer); 2352 valBuffer.mark(0); 2353 if (valBuffer.getPosition() != keyLength) 2354 throw new IOException(key + " read " + valBuffer.getPosition() 2355 + " bytes, should read " + keyLength); 2356 } else { 2357 //Reset syncSeen 2358 syncSeen = false; 2359 2360 if (noBufferedKeys == 0) { 2361 try { 2362 readBlock(); 2363 } catch (EOFException eof) { 2364 return false; 2365 } 2366 } 2367 2368 int keyLength = WritableUtils.readVInt(keyLenIn); 2369 2370 // Sanity check 2371 if (keyLength < 0) { 2372 return false; 2373 } 2374 2375 //Read another compressed 'key' 2376 key.readFields(keyIn); 2377 --noBufferedKeys; 2378 } 2379 2380 return true; 2381 } 2382 2383 /** Read the next key/value pair in the file into <code>key</code> and 2384 * <code>val</code>. Returns true if such a pair exists and false when at 2385 * end of file */ 2386 public synchronized boolean next(Writable key, Writable val) 2387 throws IOException { 2388 if (val.getClass() != getValueClass()) 2389 throw new IOException("wrong value class: "+val+" is not "+valClass); 2390 2391 boolean more = next(key); 2392 2393 if (more) { 2394 getCurrentValue(val); 2395 } 2396 2397 return more; 2398 } 2399 2400 /** 2401 * Read and return the next record length, potentially skipping over 2402 * a sync block. 2403 * @return the length of the next record or -1 if there is no next record 2404 * @throws IOException 2405 */ 2406 private synchronized int readRecordLength() throws IOException { 2407 if (in.getPos() >= end) { 2408 return -1; 2409 } 2410 int length = in.readInt(); 2411 if (version > 1 && sync != null && 2412 length == SYNC_ESCAPE) { // process a sync entry 2413 in.readFully(syncCheck); // read syncCheck 2414 if (!Arrays.equals(sync, syncCheck)) // check it 2415 throw new IOException("File is corrupt!"); 2416 syncSeen = true; 2417 if (in.getPos() >= end) { 2418 return -1; 2419 } 2420 length = in.readInt(); // re-read length 2421 } else { 2422 syncSeen = false; 2423 } 2424 2425 return length; 2426 } 2427 2428 /** Read the next key/value pair in the file into <code>buffer</code>. 2429 * Returns the length of the key read, or -1 if at end of file. The length 2430 * of the value may be computed by calling buffer.getLength() before and 2431 * after calls to this method. */ 2432 /** @deprecated Call {@link #nextRaw(DataOutputBuffer,SequenceFile.ValueBytes)}. */ 2433 @Deprecated 2434 synchronized int next(DataOutputBuffer buffer) throws IOException { 2435 // Unsupported for block-compressed sequence files 2436 if (blockCompressed) { 2437 throw new IOException("Unsupported call for block-compressed" + 2438 " SequenceFiles - use SequenceFile.Reader.next(DataOutputStream, ValueBytes)"); 2439 } 2440 try { 2441 int length = readRecordLength(); 2442 if (length == -1) { 2443 return -1; 2444 } 2445 int keyLength = in.readInt(); 2446 buffer.write(in, length); 2447 return keyLength; 2448 } catch (ChecksumException e) { // checksum failure 2449 handleChecksumException(e); 2450 return next(buffer); 2451 } 2452 } 2453 2454 public ValueBytes createValueBytes() { 2455 ValueBytes val = null; 2456 if (!decompress || blockCompressed) { 2457 val = new UncompressedBytes(); 2458 } else { 2459 val = new CompressedBytes(codec); 2460 } 2461 return val; 2462 } 2463 2464 /** 2465 * Read 'raw' records. 2466 * @param key - The buffer into which the key is read 2467 * @param val - The 'raw' value 2468 * @return Returns the total record length or -1 for end of file 2469 * @throws IOException 2470 */ 2471 public synchronized int nextRaw(DataOutputBuffer key, ValueBytes val) 2472 throws IOException { 2473 if (!blockCompressed) { 2474 int length = readRecordLength(); 2475 if (length == -1) { 2476 return -1; 2477 } 2478 int keyLength = in.readInt(); 2479 int valLength = length - keyLength; 2480 key.write(in, keyLength); 2481 if (decompress) { 2482 CompressedBytes value = (CompressedBytes)val; 2483 value.reset(in, valLength); 2484 } else { 2485 UncompressedBytes value = (UncompressedBytes)val; 2486 value.reset(in, valLength); 2487 } 2488 2489 return length; 2490 } else { 2491 //Reset syncSeen 2492 syncSeen = false; 2493 2494 // Read 'key' 2495 if (noBufferedKeys == 0) { 2496 if (in.getPos() >= end) 2497 return -1; 2498 2499 try { 2500 readBlock(); 2501 } catch (EOFException eof) { 2502 return -1; 2503 } 2504 } 2505 int keyLength = WritableUtils.readVInt(keyLenIn); 2506 if (keyLength < 0) { 2507 throw new IOException("zero length key found!"); 2508 } 2509 key.write(keyIn, keyLength); 2510 --noBufferedKeys; 2511 2512 // Read raw 'value' 2513 seekToCurrentValue(); 2514 int valLength = WritableUtils.readVInt(valLenIn); 2515 UncompressedBytes rawValue = (UncompressedBytes)val; 2516 rawValue.reset(valIn, valLength); 2517 --noBufferedValues; 2518 2519 return (keyLength+valLength); 2520 } 2521 2522 } 2523 2524 /** 2525 * Read 'raw' keys. 2526 * @param key - The buffer into which the key is read 2527 * @return Returns the key length or -1 for end of file 2528 * @throws IOException 2529 */ 2530 public synchronized int nextRawKey(DataOutputBuffer key) 2531 throws IOException { 2532 if (!blockCompressed) { 2533 recordLength = readRecordLength(); 2534 if (recordLength == -1) { 2535 return -1; 2536 } 2537 keyLength = in.readInt(); 2538 key.write(in, keyLength); 2539 return keyLength; 2540 } else { 2541 //Reset syncSeen 2542 syncSeen = false; 2543 2544 // Read 'key' 2545 if (noBufferedKeys == 0) { 2546 if (in.getPos() >= end) 2547 return -1; 2548 2549 try { 2550 readBlock(); 2551 } catch (EOFException eof) { 2552 return -1; 2553 } 2554 } 2555 int keyLength = WritableUtils.readVInt(keyLenIn); 2556 if (keyLength < 0) { 2557 throw new IOException("zero length key found!"); 2558 } 2559 key.write(keyIn, keyLength); 2560 --noBufferedKeys; 2561 2562 return keyLength; 2563 } 2564 2565 } 2566 2567 /** Read the next key in the file, skipping its 2568 * value. Return null at end of file. */ 2569 public synchronized Object next(Object key) throws IOException { 2570 if (key != null && key.getClass() != getKeyClass()) { 2571 throw new IOException("wrong key class: "+key.getClass().getName() 2572 +" is not "+keyClass); 2573 } 2574 2575 if (!blockCompressed) { 2576 outBuf.reset(); 2577 2578 keyLength = next(outBuf); 2579 if (keyLength < 0) 2580 return null; 2581 2582 valBuffer.reset(outBuf.getData(), outBuf.getLength()); 2583 2584 key = deserializeKey(key); 2585 valBuffer.mark(0); 2586 if (valBuffer.getPosition() != keyLength) 2587 throw new IOException(key + " read " + valBuffer.getPosition() 2588 + " bytes, should read " + keyLength); 2589 } else { 2590 //Reset syncSeen 2591 syncSeen = false; 2592 2593 if (noBufferedKeys == 0) { 2594 try { 2595 readBlock(); 2596 } catch (EOFException eof) { 2597 return null; 2598 } 2599 } 2600 2601 int keyLength = WritableUtils.readVInt(keyLenIn); 2602 2603 // Sanity check 2604 if (keyLength < 0) { 2605 return null; 2606 } 2607 2608 //Read another compressed 'key' 2609 key = deserializeKey(key); 2610 --noBufferedKeys; 2611 } 2612 2613 return key; 2614 } 2615 2616 @SuppressWarnings("unchecked") 2617 private Object deserializeKey(Object key) throws IOException { 2618 return keyDeserializer.deserialize(key); 2619 } 2620 2621 /** 2622 * Read 'raw' values. 2623 * @param val - The 'raw' value 2624 * @return Returns the value length 2625 * @throws IOException 2626 */ 2627 public synchronized int nextRawValue(ValueBytes val) 2628 throws IOException { 2629 2630 // Position stream to current value 2631 seekToCurrentValue(); 2632 2633 if (!blockCompressed) { 2634 int valLength = recordLength - keyLength; 2635 if (decompress) { 2636 CompressedBytes value = (CompressedBytes)val; 2637 value.reset(in, valLength); 2638 } else { 2639 UncompressedBytes value = (UncompressedBytes)val; 2640 value.reset(in, valLength); 2641 } 2642 2643 return valLength; 2644 } else { 2645 int valLength = WritableUtils.readVInt(valLenIn); 2646 UncompressedBytes rawValue = (UncompressedBytes)val; 2647 rawValue.reset(valIn, valLength); 2648 --noBufferedValues; 2649 return valLength; 2650 } 2651 2652 } 2653 2654 private void handleChecksumException(ChecksumException e) 2655 throws IOException { 2656 if (this.conf.getBoolean("io.skip.checksum.errors", false)) { 2657 LOG.warn("Bad checksum at "+getPosition()+". Skipping entries."); 2658 sync(getPosition()+this.conf.getInt("io.bytes.per.checksum", 512)); 2659 } else { 2660 throw e; 2661 } 2662 } 2663 2664 /** disables sync. often invoked for tmp files */ 2665 synchronized void ignoreSync() { 2666 sync = null; 2667 } 2668 2669 /** Set the current byte position in the input file. 2670 * 2671 * <p>The position passed must be a position returned by {@link 2672 * SequenceFile.Writer#getLength()} when writing this file. To seek to an arbitrary 2673 * position, use {@link SequenceFile.Reader#sync(long)}. 2674 */ 2675 public synchronized void seek(long position) throws IOException { 2676 in.seek(position); 2677 if (blockCompressed) { // trigger block read 2678 noBufferedKeys = 0; 2679 valuesDecompressed = true; 2680 } 2681 } 2682 2683 /** Seek to the next sync mark past a given position.*/ 2684 public synchronized void sync(long position) throws IOException { 2685 if (position+SYNC_SIZE >= end) { 2686 seek(end); 2687 return; 2688 } 2689 2690 if (position < headerEnd) { 2691 // seek directly to first record 2692 in.seek(headerEnd); 2693 // note the sync marker "seen" in the header 2694 syncSeen = true; 2695 return; 2696 } 2697 2698 try { 2699 seek(position+4); // skip escape 2700 in.readFully(syncCheck); 2701 int syncLen = sync.length; 2702 for (int i = 0; in.getPos() < end; i++) { 2703 int j = 0; 2704 for (; j < syncLen; j++) { 2705 if (sync[j] != syncCheck[(i+j)%syncLen]) 2706 break; 2707 } 2708 if (j == syncLen) { 2709 in.seek(in.getPos() - SYNC_SIZE); // position before sync 2710 return; 2711 } 2712 syncCheck[i%syncLen] = in.readByte(); 2713 } 2714 } catch (ChecksumException e) { // checksum failure 2715 handleChecksumException(e); 2716 } 2717 } 2718 2719 /** Returns true iff the previous call to next passed a sync mark.*/ 2720 public synchronized boolean syncSeen() { return syncSeen; } 2721 2722 /** Return the current byte position in the input file. */ 2723 public synchronized long getPosition() throws IOException { 2724 return in.getPos(); 2725 } 2726 2727 /** Returns the name of the file. */ 2728 @Override 2729 public String toString() { 2730 return filename; 2731 } 2732 2733 } 2734 2735 /** Sorts key/value pairs in a sequence-format file. 2736 * 2737 * <p>For best performance, applications should make sure that the {@link 2738 * Writable#readFields(DataInput)} implementation of their keys is 2739 * very efficient. In particular, it should avoid allocating memory. 2740 */ 2741 public static class Sorter { 2742 2743 private RawComparator comparator; 2744 2745 private MergeSort mergeSort; //the implementation of merge sort 2746 2747 private Path[] inFiles; // when merging or sorting 2748 2749 private Path outFile; 2750 2751 private int memory; // bytes 2752 private int factor; // merged per pass 2753 2754 private FileSystem fs = null; 2755 2756 private Class keyClass; 2757 private Class valClass; 2758 2759 private Configuration conf; 2760 private Metadata metadata; 2761 2762 private Progressable progressable = null; 2763 2764 /** Sort and merge files containing the named classes. */ 2765 public Sorter(FileSystem fs, Class<? extends WritableComparable> keyClass, 2766 Class valClass, Configuration conf) { 2767 this(fs, WritableComparator.get(keyClass, conf), keyClass, valClass, conf); 2768 } 2769 2770 /** Sort and merge using an arbitrary {@link RawComparator}. */ 2771 public Sorter(FileSystem fs, RawComparator comparator, Class keyClass, 2772 Class valClass, Configuration conf) { 2773 this(fs, comparator, keyClass, valClass, conf, new Metadata()); 2774 } 2775 2776 /** Sort and merge using an arbitrary {@link RawComparator}. */ 2777 public Sorter(FileSystem fs, RawComparator comparator, Class keyClass, 2778 Class valClass, Configuration conf, Metadata metadata) { 2779 this.fs = fs; 2780 this.comparator = comparator; 2781 this.keyClass = keyClass; 2782 this.valClass = valClass; 2783 this.memory = conf.getInt("io.sort.mb", 100) * 1024 * 1024; 2784 this.factor = conf.getInt("io.sort.factor", 100); 2785 this.conf = conf; 2786 this.metadata = metadata; 2787 } 2788 2789 /** Set the number of streams to merge at once.*/ 2790 public void setFactor(int factor) { this.factor = factor; } 2791 2792 /** Get the number of streams to merge at once.*/ 2793 public int getFactor() { return factor; } 2794 2795 /** Set the total amount of buffer memory, in bytes.*/ 2796 public void setMemory(int memory) { this.memory = memory; } 2797 2798 /** Get the total amount of buffer memory, in bytes.*/ 2799 public int getMemory() { return memory; } 2800 2801 /** Set the progressable object in order to report progress. */ 2802 public void setProgressable(Progressable progressable) { 2803 this.progressable = progressable; 2804 } 2805 2806 /** 2807 * Perform a file sort from a set of input files into an output file. 2808 * @param inFiles the files to be sorted 2809 * @param outFile the sorted output file 2810 * @param deleteInput should the input files be deleted as they are read? 2811 */ 2812 public void sort(Path[] inFiles, Path outFile, 2813 boolean deleteInput) throws IOException { 2814 if (fs.exists(outFile)) { 2815 throw new IOException("already exists: " + outFile); 2816 } 2817 2818 this.inFiles = inFiles; 2819 this.outFile = outFile; 2820 2821 int segments = sortPass(deleteInput); 2822 if (segments > 1) { 2823 mergePass(outFile.getParent()); 2824 } 2825 } 2826 2827 /** 2828 * Perform a file sort from a set of input files and return an iterator. 2829 * @param inFiles the files to be sorted 2830 * @param tempDir the directory where temp files are created during sort 2831 * @param deleteInput should the input files be deleted as they are read? 2832 * @return iterator the RawKeyValueIterator 2833 */ 2834 public RawKeyValueIterator sortAndIterate(Path[] inFiles, Path tempDir, 2835 boolean deleteInput) throws IOException { 2836 Path outFile = new Path(tempDir + Path.SEPARATOR + "all.2"); 2837 if (fs.exists(outFile)) { 2838 throw new IOException("already exists: " + outFile); 2839 } 2840 this.inFiles = inFiles; 2841 //outFile will basically be used as prefix for temp files in the cases 2842 //where sort outputs multiple sorted segments. For the single segment 2843 //case, the outputFile itself will contain the sorted data for that 2844 //segment 2845 this.outFile = outFile; 2846 2847 int segments = sortPass(deleteInput); 2848 if (segments > 1) 2849 return merge(outFile.suffix(".0"), outFile.suffix(".0.index"), 2850 tempDir); 2851 else if (segments == 1) 2852 return merge(new Path[]{outFile}, true, tempDir); 2853 else return null; 2854 } 2855 2856 /** 2857 * The backwards compatible interface to sort. 2858 * @param inFile the input file to sort 2859 * @param outFile the sorted output file 2860 */ 2861 public void sort(Path inFile, Path outFile) throws IOException { 2862 sort(new Path[]{inFile}, outFile, false); 2863 } 2864 2865 private int sortPass(boolean deleteInput) throws IOException { 2866 if(LOG.isDebugEnabled()) { 2867 LOG.debug("running sort pass"); 2868 } 2869 SortPass sortPass = new SortPass(); // make the SortPass 2870 sortPass.setProgressable(progressable); 2871 mergeSort = new MergeSort(sortPass.new SeqFileComparator()); 2872 try { 2873 return sortPass.run(deleteInput); // run it 2874 } finally { 2875 sortPass.close(); // close it 2876 } 2877 } 2878 2879 private class SortPass { 2880 private int memoryLimit = memory/4; 2881 private int recordLimit = 1000000; 2882 2883 private DataOutputBuffer rawKeys = new DataOutputBuffer(); 2884 private byte[] rawBuffer; 2885 2886 private int[] keyOffsets = new int[1024]; 2887 private int[] pointers = new int[keyOffsets.length]; 2888 private int[] pointersCopy = new int[keyOffsets.length]; 2889 private int[] keyLengths = new int[keyOffsets.length]; 2890 private ValueBytes[] rawValues = new ValueBytes[keyOffsets.length]; 2891 2892 private ArrayList segmentLengths = new ArrayList(); 2893 2894 private Reader in = null; 2895 private FSDataOutputStream out = null; 2896 private FSDataOutputStream indexOut = null; 2897 private Path outName; 2898 2899 private Progressable progressable = null; 2900 2901 public int run(boolean deleteInput) throws IOException { 2902 int segments = 0; 2903 int currentFile = 0; 2904 boolean atEof = (currentFile >= inFiles.length); 2905 CompressionType compressionType; 2906 CompressionCodec codec = null; 2907 segmentLengths.clear(); 2908 if (atEof) { 2909 return 0; 2910 } 2911 2912 // Initialize 2913 in = new Reader(fs, inFiles[currentFile], conf); 2914 compressionType = in.getCompressionType(); 2915 codec = in.getCompressionCodec(); 2916 2917 for (int i=0; i < rawValues.length; ++i) { 2918 rawValues[i] = null; 2919 } 2920 2921 while (!atEof) { 2922 int count = 0; 2923 int bytesProcessed = 0; 2924 rawKeys.reset(); 2925 while (!atEof && 2926 bytesProcessed < memoryLimit && count < recordLimit) { 2927 2928 // Read a record into buffer 2929 // Note: Attempt to re-use 'rawValue' as far as possible 2930 int keyOffset = rawKeys.getLength(); 2931 ValueBytes rawValue = 2932 (count == keyOffsets.length || rawValues[count] == null) ? 2933 in.createValueBytes() : 2934 rawValues[count]; 2935 int recordLength = in.nextRaw(rawKeys, rawValue); 2936 if (recordLength == -1) { 2937 in.close(); 2938 if (deleteInput) { 2939 fs.delete(inFiles[currentFile], true); 2940 } 2941 currentFile += 1; 2942 atEof = currentFile >= inFiles.length; 2943 if (!atEof) { 2944 in = new Reader(fs, inFiles[currentFile], conf); 2945 } else { 2946 in = null; 2947 } 2948 continue; 2949 } 2950 2951 int keyLength = rawKeys.getLength() - keyOffset; 2952 2953 if (count == keyOffsets.length) 2954 grow(); 2955 2956 keyOffsets[count] = keyOffset; // update pointers 2957 pointers[count] = count; 2958 keyLengths[count] = keyLength; 2959 rawValues[count] = rawValue; 2960 2961 bytesProcessed += recordLength; 2962 count++; 2963 } 2964 2965 // buffer is full -- sort & flush it 2966 if(LOG.isDebugEnabled()) { 2967 LOG.debug("flushing segment " + segments); 2968 } 2969 rawBuffer = rawKeys.getData(); 2970 sort(count); 2971 // indicate we're making progress 2972 if (progressable != null) { 2973 progressable.progress(); 2974 } 2975 flush(count, bytesProcessed, compressionType, codec, 2976 segments==0 && atEof); 2977 segments++; 2978 } 2979 return segments; 2980 } 2981 2982 public void close() throws IOException { 2983 if (in != null) { 2984 in.close(); 2985 } 2986 if (out != null) { 2987 out.close(); 2988 } 2989 if (indexOut != null) { 2990 indexOut.close(); 2991 } 2992 } 2993 2994 private void grow() { 2995 int newLength = keyOffsets.length * 3 / 2; 2996 keyOffsets = grow(keyOffsets, newLength); 2997 pointers = grow(pointers, newLength); 2998 pointersCopy = new int[newLength]; 2999 keyLengths = grow(keyLengths, newLength); 3000 rawValues = grow(rawValues, newLength); 3001 } 3002 3003 private int[] grow(int[] old, int newLength) { 3004 int[] result = new int[newLength]; 3005 System.arraycopy(old, 0, result, 0, old.length); 3006 return result; 3007 } 3008 3009 private ValueBytes[] grow(ValueBytes[] old, int newLength) { 3010 ValueBytes[] result = new ValueBytes[newLength]; 3011 System.arraycopy(old, 0, result, 0, old.length); 3012 for (int i=old.length; i < newLength; ++i) { 3013 result[i] = null; 3014 } 3015 return result; 3016 } 3017 3018 private void flush(int count, int bytesProcessed, 3019 CompressionType compressionType, 3020 CompressionCodec codec, 3021 boolean done) throws IOException { 3022 if (out == null) { 3023 outName = done ? outFile : outFile.suffix(".0"); 3024 out = fs.create(outName); 3025 if (!done) { 3026 indexOut = fs.create(outName.suffix(".index")); 3027 } 3028 } 3029 3030 long segmentStart = out.getPos(); 3031 Writer writer = createWriter(conf, Writer.stream(out), 3032 Writer.keyClass(keyClass), Writer.valueClass(valClass), 3033 Writer.compression(compressionType, codec), 3034 Writer.metadata(done ? metadata : new Metadata())); 3035 3036 if (!done) { 3037 writer.sync = null; // disable sync on temp files 3038 } 3039 3040 for (int i = 0; i < count; i++) { // write in sorted order 3041 int p = pointers[i]; 3042 writer.appendRaw(rawBuffer, keyOffsets[p], keyLengths[p], rawValues[p]); 3043 } 3044 writer.close(); 3045 3046 if (!done) { 3047 // Save the segment length 3048 WritableUtils.writeVLong(indexOut, segmentStart); 3049 WritableUtils.writeVLong(indexOut, (out.getPos()-segmentStart)); 3050 indexOut.flush(); 3051 } 3052 } 3053 3054 private void sort(int count) { 3055 System.arraycopy(pointers, 0, pointersCopy, 0, count); 3056 mergeSort.mergeSort(pointersCopy, pointers, 0, count); 3057 } 3058 class SeqFileComparator implements Comparator<IntWritable> { 3059 @Override 3060 public int compare(IntWritable I, IntWritable J) { 3061 return comparator.compare(rawBuffer, keyOffsets[I.get()], 3062 keyLengths[I.get()], rawBuffer, 3063 keyOffsets[J.get()], keyLengths[J.get()]); 3064 } 3065 } 3066 3067 /** set the progressable object in order to report progress */ 3068 public void setProgressable(Progressable progressable) 3069 { 3070 this.progressable = progressable; 3071 } 3072 3073 } // SequenceFile.Sorter.SortPass 3074 3075 /** The interface to iterate over raw keys/values of SequenceFiles. */ 3076 public static interface RawKeyValueIterator { 3077 /** Gets the current raw key 3078 * @return DataOutputBuffer 3079 * @throws IOException 3080 */ 3081 DataOutputBuffer getKey() throws IOException; 3082 /** Gets the current raw value 3083 * @return ValueBytes 3084 * @throws IOException 3085 */ 3086 ValueBytes getValue() throws IOException; 3087 /** Sets up the current key and value (for getKey and getValue) 3088 * @return true if there exists a key/value, false otherwise 3089 * @throws IOException 3090 */ 3091 boolean next() throws IOException; 3092 /** closes the iterator so that the underlying streams can be closed 3093 * @throws IOException 3094 */ 3095 void close() throws IOException; 3096 /** Gets the Progress object; this has a float (0.0 - 1.0) 3097 * indicating the bytes processed by the iterator so far 3098 */ 3099 Progress getProgress(); 3100 } 3101 3102 /** 3103 * Merges the list of segments of type <code>SegmentDescriptor</code> 3104 * @param segments the list of SegmentDescriptors 3105 * @param tmpDir the directory to write temporary files into 3106 * @return RawKeyValueIterator 3107 * @throws IOException 3108 */ 3109 public RawKeyValueIterator merge(List <SegmentDescriptor> segments, 3110 Path tmpDir) 3111 throws IOException { 3112 // pass in object to report progress, if present 3113 MergeQueue mQueue = new MergeQueue(segments, tmpDir, progressable); 3114 return mQueue.merge(); 3115 } 3116 3117 /** 3118 * Merges the contents of files passed in Path[] using a max factor value 3119 * that is already set 3120 * @param inNames the array of path names 3121 * @param deleteInputs true if the input files should be deleted when 3122 * unnecessary 3123 * @param tmpDir the directory to write temporary files into 3124 * @return RawKeyValueIteratorMergeQueue 3125 * @throws IOException 3126 */ 3127 public RawKeyValueIterator merge(Path [] inNames, boolean deleteInputs, 3128 Path tmpDir) 3129 throws IOException { 3130 return merge(inNames, deleteInputs, 3131 (inNames.length < factor) ? inNames.length : factor, 3132 tmpDir); 3133 } 3134 3135 /** 3136 * Merges the contents of files passed in Path[] 3137 * @param inNames the array of path names 3138 * @param deleteInputs true if the input files should be deleted when 3139 * unnecessary 3140 * @param factor the factor that will be used as the maximum merge fan-in 3141 * @param tmpDir the directory to write temporary files into 3142 * @return RawKeyValueIteratorMergeQueue 3143 * @throws IOException 3144 */ 3145 public RawKeyValueIterator merge(Path [] inNames, boolean deleteInputs, 3146 int factor, Path tmpDir) 3147 throws IOException { 3148 //get the segments from inNames 3149 ArrayList <SegmentDescriptor> a = new ArrayList <SegmentDescriptor>(); 3150 for (int i = 0; i < inNames.length; i++) { 3151 SegmentDescriptor s = new SegmentDescriptor(0, 3152 fs.getFileStatus(inNames[i]).getLen(), inNames[i]); 3153 s.preserveInput(!deleteInputs); 3154 s.doSync(); 3155 a.add(s); 3156 } 3157 this.factor = factor; 3158 MergeQueue mQueue = new MergeQueue(a, tmpDir, progressable); 3159 return mQueue.merge(); 3160 } 3161 3162 /** 3163 * Merges the contents of files passed in Path[] 3164 * @param inNames the array of path names 3165 * @param tempDir the directory for creating temp files during merge 3166 * @param deleteInputs true if the input files should be deleted when 3167 * unnecessary 3168 * @return RawKeyValueIteratorMergeQueue 3169 * @throws IOException 3170 */ 3171 public RawKeyValueIterator merge(Path [] inNames, Path tempDir, 3172 boolean deleteInputs) 3173 throws IOException { 3174 //outFile will basically be used as prefix for temp files for the 3175 //intermediate merge outputs 3176 this.outFile = new Path(tempDir + Path.SEPARATOR + "merged"); 3177 //get the segments from inNames 3178 ArrayList <SegmentDescriptor> a = new ArrayList <SegmentDescriptor>(); 3179 for (int i = 0; i < inNames.length; i++) { 3180 SegmentDescriptor s = new SegmentDescriptor(0, 3181 fs.getFileStatus(inNames[i]).getLen(), inNames[i]); 3182 s.preserveInput(!deleteInputs); 3183 s.doSync(); 3184 a.add(s); 3185 } 3186 factor = (inNames.length < factor) ? inNames.length : factor; 3187 // pass in object to report progress, if present 3188 MergeQueue mQueue = new MergeQueue(a, tempDir, progressable); 3189 return mQueue.merge(); 3190 } 3191 3192 /** 3193 * Clones the attributes (like compression of the input file and creates a 3194 * corresponding Writer 3195 * @param inputFile the path of the input file whose attributes should be 3196 * cloned 3197 * @param outputFile the path of the output file 3198 * @param prog the Progressable to report status during the file write 3199 * @return Writer 3200 * @throws IOException 3201 */ 3202 public Writer cloneFileAttributes(Path inputFile, Path outputFile, 3203 Progressable prog) throws IOException { 3204 Reader reader = new Reader(conf, 3205 Reader.file(inputFile), 3206 new Reader.OnlyHeaderOption()); 3207 CompressionType compress = reader.getCompressionType(); 3208 CompressionCodec codec = reader.getCompressionCodec(); 3209 reader.close(); 3210 3211 Writer writer = createWriter(conf, 3212 Writer.file(outputFile), 3213 Writer.keyClass(keyClass), 3214 Writer.valueClass(valClass), 3215 Writer.compression(compress, codec), 3216 Writer.progressable(prog)); 3217 return writer; 3218 } 3219 3220 /** 3221 * Writes records from RawKeyValueIterator into a file represented by the 3222 * passed writer 3223 * @param records the RawKeyValueIterator 3224 * @param writer the Writer created earlier 3225 * @throws IOException 3226 */ 3227 public void writeFile(RawKeyValueIterator records, Writer writer) 3228 throws IOException { 3229 while(records.next()) { 3230 writer.appendRaw(records.getKey().getData(), 0, 3231 records.getKey().getLength(), records.getValue()); 3232 } 3233 writer.sync(); 3234 } 3235 3236 /** Merge the provided files. 3237 * @param inFiles the array of input path names 3238 * @param outFile the final output file 3239 * @throws IOException 3240 */ 3241 public void merge(Path[] inFiles, Path outFile) throws IOException { 3242 if (fs.exists(outFile)) { 3243 throw new IOException("already exists: " + outFile); 3244 } 3245 RawKeyValueIterator r = merge(inFiles, false, outFile.getParent()); 3246 Writer writer = cloneFileAttributes(inFiles[0], outFile, null); 3247 3248 writeFile(r, writer); 3249 3250 writer.close(); 3251 } 3252 3253 /** sort calls this to generate the final merged output */ 3254 private int mergePass(Path tmpDir) throws IOException { 3255 if(LOG.isDebugEnabled()) { 3256 LOG.debug("running merge pass"); 3257 } 3258 Writer writer = cloneFileAttributes( 3259 outFile.suffix(".0"), outFile, null); 3260 RawKeyValueIterator r = merge(outFile.suffix(".0"), 3261 outFile.suffix(".0.index"), tmpDir); 3262 writeFile(r, writer); 3263 3264 writer.close(); 3265 return 0; 3266 } 3267 3268 /** Used by mergePass to merge the output of the sort 3269 * @param inName the name of the input file containing sorted segments 3270 * @param indexIn the offsets of the sorted segments 3271 * @param tmpDir the relative directory to store intermediate results in 3272 * @return RawKeyValueIterator 3273 * @throws IOException 3274 */ 3275 private RawKeyValueIterator merge(Path inName, Path indexIn, Path tmpDir) 3276 throws IOException { 3277 //get the segments from indexIn 3278 //we create a SegmentContainer so that we can track segments belonging to 3279 //inName and delete inName as soon as we see that we have looked at all 3280 //the contained segments during the merge process & hence don't need 3281 //them anymore 3282 SegmentContainer container = new SegmentContainer(inName, indexIn); 3283 MergeQueue mQueue = new MergeQueue(container.getSegmentList(), tmpDir, progressable); 3284 return mQueue.merge(); 3285 } 3286 3287 /** This class implements the core of the merge logic */ 3288 private class MergeQueue extends PriorityQueue 3289 implements RawKeyValueIterator { 3290 private boolean compress; 3291 private boolean blockCompress; 3292 private DataOutputBuffer rawKey = new DataOutputBuffer(); 3293 private ValueBytes rawValue; 3294 private long totalBytesProcessed; 3295 private float progPerByte; 3296 private Progress mergeProgress = new Progress(); 3297 private Path tmpDir; 3298 private Progressable progress = null; //handle to the progress reporting object 3299 private SegmentDescriptor minSegment; 3300 3301 //a TreeMap used to store the segments sorted by size (segment offset and 3302 //segment path name is used to break ties between segments of same sizes) 3303 private Map<SegmentDescriptor, Void> sortedSegmentSizes = 3304 new TreeMap<SegmentDescriptor, Void>(); 3305 3306 @SuppressWarnings("unchecked") 3307 public void put(SegmentDescriptor stream) throws IOException { 3308 if (size() == 0) { 3309 compress = stream.in.isCompressed(); 3310 blockCompress = stream.in.isBlockCompressed(); 3311 } else if (compress != stream.in.isCompressed() || 3312 blockCompress != stream.in.isBlockCompressed()) { 3313 throw new IOException("All merged files must be compressed or not."); 3314 } 3315 super.put(stream); 3316 } 3317 3318 /** 3319 * A queue of file segments to merge 3320 * @param segments the file segments to merge 3321 * @param tmpDir a relative local directory to save intermediate files in 3322 * @param progress the reference to the Progressable object 3323 */ 3324 public MergeQueue(List <SegmentDescriptor> segments, 3325 Path tmpDir, Progressable progress) { 3326 int size = segments.size(); 3327 for (int i = 0; i < size; i++) { 3328 sortedSegmentSizes.put(segments.get(i), null); 3329 } 3330 this.tmpDir = tmpDir; 3331 this.progress = progress; 3332 } 3333 @Override 3334 protected boolean lessThan(Object a, Object b) { 3335 // indicate we're making progress 3336 if (progress != null) { 3337 progress.progress(); 3338 } 3339 SegmentDescriptor msa = (SegmentDescriptor)a; 3340 SegmentDescriptor msb = (SegmentDescriptor)b; 3341 return comparator.compare(msa.getKey().getData(), 0, 3342 msa.getKey().getLength(), msb.getKey().getData(), 0, 3343 msb.getKey().getLength()) < 0; 3344 } 3345 @Override 3346 public void close() throws IOException { 3347 SegmentDescriptor ms; // close inputs 3348 while ((ms = (SegmentDescriptor)pop()) != null) { 3349 ms.cleanup(); 3350 } 3351 minSegment = null; 3352 } 3353 @Override 3354 public DataOutputBuffer getKey() throws IOException { 3355 return rawKey; 3356 } 3357 @Override 3358 public ValueBytes getValue() throws IOException { 3359 return rawValue; 3360 } 3361 @Override 3362 public boolean next() throws IOException { 3363 if (size() == 0) 3364 return false; 3365 if (minSegment != null) { 3366 //minSegment is non-null for all invocations of next except the first 3367 //one. For the first invocation, the priority queue is ready for use 3368 //but for the subsequent invocations, first adjust the queue 3369 adjustPriorityQueue(minSegment); 3370 if (size() == 0) { 3371 minSegment = null; 3372 return false; 3373 } 3374 } 3375 minSegment = (SegmentDescriptor)top(); 3376 long startPos = minSegment.in.getPosition(); // Current position in stream 3377 //save the raw key reference 3378 rawKey = minSegment.getKey(); 3379 //load the raw value. Re-use the existing rawValue buffer 3380 if (rawValue == null) { 3381 rawValue = minSegment.in.createValueBytes(); 3382 } 3383 minSegment.nextRawValue(rawValue); 3384 long endPos = minSegment.in.getPosition(); // End position after reading value 3385 updateProgress(endPos - startPos); 3386 return true; 3387 } 3388 3389 @Override 3390 public Progress getProgress() { 3391 return mergeProgress; 3392 } 3393 3394 private void adjustPriorityQueue(SegmentDescriptor ms) throws IOException{ 3395 long startPos = ms.in.getPosition(); // Current position in stream 3396 boolean hasNext = ms.nextRawKey(); 3397 long endPos = ms.in.getPosition(); // End position after reading key 3398 updateProgress(endPos - startPos); 3399 if (hasNext) { 3400 adjustTop(); 3401 } else { 3402 pop(); 3403 ms.cleanup(); 3404 } 3405 } 3406 3407 private void updateProgress(long bytesProcessed) { 3408 totalBytesProcessed += bytesProcessed; 3409 if (progPerByte > 0) { 3410 mergeProgress.set(totalBytesProcessed * progPerByte); 3411 } 3412 } 3413 3414 /** This is the single level merge that is called multiple times 3415 * depending on the factor size and the number of segments 3416 * @return RawKeyValueIterator 3417 * @throws IOException 3418 */ 3419 public RawKeyValueIterator merge() throws IOException { 3420 //create the MergeStreams from the sorted map created in the constructor 3421 //and dump the final output to a file 3422 int numSegments = sortedSegmentSizes.size(); 3423 int origFactor = factor; 3424 int passNo = 1; 3425 LocalDirAllocator lDirAlloc = new LocalDirAllocator("io.seqfile.local.dir"); 3426 do { 3427 //get the factor for this pass of merge 3428 factor = getPassFactor(passNo, numSegments); 3429 List<SegmentDescriptor> segmentsToMerge = 3430 new ArrayList<SegmentDescriptor>(); 3431 int segmentsConsidered = 0; 3432 int numSegmentsToConsider = factor; 3433 while (true) { 3434 //extract the smallest 'factor' number of segment pointers from the 3435 //TreeMap. Call cleanup on the empty segments (no key/value data) 3436 SegmentDescriptor[] mStream = 3437 getSegmentDescriptors(numSegmentsToConsider); 3438 for (int i = 0; i < mStream.length; i++) { 3439 if (mStream[i].nextRawKey()) { 3440 segmentsToMerge.add(mStream[i]); 3441 segmentsConsidered++; 3442 // Count the fact that we read some bytes in calling nextRawKey() 3443 updateProgress(mStream[i].in.getPosition()); 3444 } 3445 else { 3446 mStream[i].cleanup(); 3447 numSegments--; //we ignore this segment for the merge 3448 } 3449 } 3450 //if we have the desired number of segments 3451 //or looked at all available segments, we break 3452 if (segmentsConsidered == factor || 3453 sortedSegmentSizes.size() == 0) { 3454 break; 3455 } 3456 3457 numSegmentsToConsider = factor - segmentsConsidered; 3458 } 3459 //feed the streams to the priority queue 3460 initialize(segmentsToMerge.size()); clear(); 3461 for (int i = 0; i < segmentsToMerge.size(); i++) { 3462 put(segmentsToMerge.get(i)); 3463 } 3464 //if we have lesser number of segments remaining, then just return the 3465 //iterator, else do another single level merge 3466 if (numSegments <= factor) { 3467 //calculate the length of the remaining segments. Required for 3468 //calculating the merge progress 3469 long totalBytes = 0; 3470 for (int i = 0; i < segmentsToMerge.size(); i++) { 3471 totalBytes += segmentsToMerge.get(i).segmentLength; 3472 } 3473 if (totalBytes != 0) //being paranoid 3474 progPerByte = 1.0f / (float)totalBytes; 3475 //reset factor to what it originally was 3476 factor = origFactor; 3477 return this; 3478 } else { 3479 //we want to spread the creation of temp files on multiple disks if 3480 //available under the space constraints 3481 long approxOutputSize = 0; 3482 for (SegmentDescriptor s : segmentsToMerge) { 3483 approxOutputSize += s.segmentLength + 3484 ChecksumFileSystem.getApproxChkSumLength( 3485 s.segmentLength); 3486 } 3487 Path tmpFilename = 3488 new Path(tmpDir, "intermediate").suffix("." + passNo); 3489 3490 Path outputFile = lDirAlloc.getLocalPathForWrite( 3491 tmpFilename.toString(), 3492 approxOutputSize, conf); 3493 if(LOG.isDebugEnabled()) { 3494 LOG.debug("writing intermediate results to " + outputFile); 3495 } 3496 Writer writer = cloneFileAttributes( 3497 fs.makeQualified(segmentsToMerge.get(0).segmentPathName), 3498 fs.makeQualified(outputFile), null); 3499 writer.sync = null; //disable sync for temp files 3500 writeFile(this, writer); 3501 writer.close(); 3502 3503 //we finished one single level merge; now clean up the priority 3504 //queue 3505 this.close(); 3506 3507 SegmentDescriptor tempSegment = 3508 new SegmentDescriptor(0, 3509 fs.getFileStatus(outputFile).getLen(), outputFile); 3510 //put the segment back in the TreeMap 3511 sortedSegmentSizes.put(tempSegment, null); 3512 numSegments = sortedSegmentSizes.size(); 3513 passNo++; 3514 } 3515 //we are worried about only the first pass merge factor. So reset the 3516 //factor to what it originally was 3517 factor = origFactor; 3518 } while(true); 3519 } 3520 3521 //Hadoop-591 3522 public int getPassFactor(int passNo, int numSegments) { 3523 if (passNo > 1 || numSegments <= factor || factor == 1) 3524 return factor; 3525 int mod = (numSegments - 1) % (factor - 1); 3526 if (mod == 0) 3527 return factor; 3528 return mod + 1; 3529 } 3530 3531 /** Return (& remove) the requested number of segment descriptors from the 3532 * sorted map. 3533 */ 3534 public SegmentDescriptor[] getSegmentDescriptors(int numDescriptors) { 3535 if (numDescriptors > sortedSegmentSizes.size()) 3536 numDescriptors = sortedSegmentSizes.size(); 3537 SegmentDescriptor[] SegmentDescriptors = 3538 new SegmentDescriptor[numDescriptors]; 3539 Iterator iter = sortedSegmentSizes.keySet().iterator(); 3540 int i = 0; 3541 while (i < numDescriptors) { 3542 SegmentDescriptors[i++] = (SegmentDescriptor)iter.next(); 3543 iter.remove(); 3544 } 3545 return SegmentDescriptors; 3546 } 3547 } // SequenceFile.Sorter.MergeQueue 3548 3549 /** This class defines a merge segment. This class can be subclassed to 3550 * provide a customized cleanup method implementation. In this 3551 * implementation, cleanup closes the file handle and deletes the file 3552 */ 3553 public class SegmentDescriptor implements Comparable { 3554 3555 long segmentOffset; //the start of the segment in the file 3556 long segmentLength; //the length of the segment 3557 Path segmentPathName; //the path name of the file containing the segment 3558 boolean ignoreSync = true; //set to true for temp files 3559 private Reader in = null; 3560 private DataOutputBuffer rawKey = null; //this will hold the current key 3561 private boolean preserveInput = false; //delete input segment files? 3562 3563 /** Constructs a segment 3564 * @param segmentOffset the offset of the segment in the file 3565 * @param segmentLength the length of the segment 3566 * @param segmentPathName the path name of the file containing the segment 3567 */ 3568 public SegmentDescriptor (long segmentOffset, long segmentLength, 3569 Path segmentPathName) { 3570 this.segmentOffset = segmentOffset; 3571 this.segmentLength = segmentLength; 3572 this.segmentPathName = segmentPathName; 3573 } 3574 3575 /** Do the sync checks */ 3576 public void doSync() {ignoreSync = false;} 3577 3578 /** Whether to delete the files when no longer needed */ 3579 public void preserveInput(boolean preserve) { 3580 preserveInput = preserve; 3581 } 3582 3583 public boolean shouldPreserveInput() { 3584 return preserveInput; 3585 } 3586 3587 @Override 3588 public int compareTo(Object o) { 3589 SegmentDescriptor that = (SegmentDescriptor)o; 3590 if (this.segmentLength != that.segmentLength) { 3591 return (this.segmentLength < that.segmentLength ? -1 : 1); 3592 } 3593 if (this.segmentOffset != that.segmentOffset) { 3594 return (this.segmentOffset < that.segmentOffset ? -1 : 1); 3595 } 3596 return (this.segmentPathName.toString()). 3597 compareTo(that.segmentPathName.toString()); 3598 } 3599 3600 @Override 3601 public boolean equals(Object o) { 3602 if (!(o instanceof SegmentDescriptor)) { 3603 return false; 3604 } 3605 SegmentDescriptor that = (SegmentDescriptor)o; 3606 if (this.segmentLength == that.segmentLength && 3607 this.segmentOffset == that.segmentOffset && 3608 this.segmentPathName.toString().equals( 3609 that.segmentPathName.toString())) { 3610 return true; 3611 } 3612 return false; 3613 } 3614 3615 @Override 3616 public int hashCode() { 3617 return 37 * 17 + (int) (segmentOffset^(segmentOffset>>>32)); 3618 } 3619 3620 /** Fills up the rawKey object with the key returned by the Reader 3621 * @return true if there is a key returned; false, otherwise 3622 * @throws IOException 3623 */ 3624 public boolean nextRawKey() throws IOException { 3625 if (in == null) { 3626 int bufferSize = getBufferSize(conf); 3627 Reader reader = new Reader(conf, 3628 Reader.file(segmentPathName), 3629 Reader.bufferSize(bufferSize), 3630 Reader.start(segmentOffset), 3631 Reader.length(segmentLength)); 3632 3633 //sometimes we ignore syncs especially for temp merge files 3634 if (ignoreSync) reader.ignoreSync(); 3635 3636 if (reader.getKeyClass() != keyClass) 3637 throw new IOException("wrong key class: " + reader.getKeyClass() + 3638 " is not " + keyClass); 3639 if (reader.getValueClass() != valClass) 3640 throw new IOException("wrong value class: "+reader.getValueClass()+ 3641 " is not " + valClass); 3642 this.in = reader; 3643 rawKey = new DataOutputBuffer(); 3644 } 3645 rawKey.reset(); 3646 int keyLength = 3647 in.nextRawKey(rawKey); 3648 return (keyLength >= 0); 3649 } 3650 3651 /** Fills up the passed rawValue with the value corresponding to the key 3652 * read earlier 3653 * @param rawValue 3654 * @return the length of the value 3655 * @throws IOException 3656 */ 3657 public int nextRawValue(ValueBytes rawValue) throws IOException { 3658 int valLength = in.nextRawValue(rawValue); 3659 return valLength; 3660 } 3661 3662 /** Returns the stored rawKey */ 3663 public DataOutputBuffer getKey() { 3664 return rawKey; 3665 } 3666 3667 /** closes the underlying reader */ 3668 private void close() throws IOException { 3669 this.in.close(); 3670 this.in = null; 3671 } 3672 3673 /** The default cleanup. Subclasses can override this with a custom 3674 * cleanup 3675 */ 3676 public void cleanup() throws IOException { 3677 close(); 3678 if (!preserveInput) { 3679 fs.delete(segmentPathName, true); 3680 } 3681 } 3682 } // SequenceFile.Sorter.SegmentDescriptor 3683 3684 /** This class provisions multiple segments contained within a single 3685 * file 3686 */ 3687 private class LinkedSegmentsDescriptor extends SegmentDescriptor { 3688 3689 SegmentContainer parentContainer = null; 3690 3691 /** Constructs a segment 3692 * @param segmentOffset the offset of the segment in the file 3693 * @param segmentLength the length of the segment 3694 * @param segmentPathName the path name of the file containing the segment 3695 * @param parent the parent SegmentContainer that holds the segment 3696 */ 3697 public LinkedSegmentsDescriptor (long segmentOffset, long segmentLength, 3698 Path segmentPathName, SegmentContainer parent) { 3699 super(segmentOffset, segmentLength, segmentPathName); 3700 this.parentContainer = parent; 3701 } 3702 /** The default cleanup. Subclasses can override this with a custom 3703 * cleanup 3704 */ 3705 @Override 3706 public void cleanup() throws IOException { 3707 super.close(); 3708 if (super.shouldPreserveInput()) return; 3709 parentContainer.cleanup(); 3710 } 3711 3712 @Override 3713 public boolean equals(Object o) { 3714 if (!(o instanceof LinkedSegmentsDescriptor)) { 3715 return false; 3716 } 3717 return super.equals(o); 3718 } 3719 } //SequenceFile.Sorter.LinkedSegmentsDescriptor 3720 3721 /** The class that defines a container for segments to be merged. Primarily 3722 * required to delete temp files as soon as all the contained segments 3723 * have been looked at */ 3724 private class SegmentContainer { 3725 private int numSegmentsCleanedUp = 0; //track the no. of segment cleanups 3726 private int numSegmentsContained; //# of segments contained 3727 private Path inName; //input file from where segments are created 3728 3729 //the list of segments read from the file 3730 private ArrayList <SegmentDescriptor> segments = 3731 new ArrayList <SegmentDescriptor>(); 3732 /** This constructor is there primarily to serve the sort routine that 3733 * generates a single output file with an associated index file */ 3734 public SegmentContainer(Path inName, Path indexIn) throws IOException { 3735 //get the segments from indexIn 3736 FSDataInputStream fsIndexIn = fs.open(indexIn); 3737 long end = fs.getFileStatus(indexIn).getLen(); 3738 while (fsIndexIn.getPos() < end) { 3739 long segmentOffset = WritableUtils.readVLong(fsIndexIn); 3740 long segmentLength = WritableUtils.readVLong(fsIndexIn); 3741 Path segmentName = inName; 3742 segments.add(new LinkedSegmentsDescriptor(segmentOffset, 3743 segmentLength, segmentName, this)); 3744 } 3745 fsIndexIn.close(); 3746 fs.delete(indexIn, true); 3747 numSegmentsContained = segments.size(); 3748 this.inName = inName; 3749 } 3750 3751 public List <SegmentDescriptor> getSegmentList() { 3752 return segments; 3753 } 3754 public void cleanup() throws IOException { 3755 numSegmentsCleanedUp++; 3756 if (numSegmentsCleanedUp == numSegmentsContained) { 3757 fs.delete(inName, true); 3758 } 3759 } 3760 } //SequenceFile.Sorter.SegmentContainer 3761 3762 } // SequenceFile.Sorter 3763 3764} // SequenceFile