001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.io.compress; 020 021import java.io.BufferedInputStream; 022import java.io.IOException; 023import java.io.InputStream; 024import java.io.OutputStream; 025 026import org.apache.hadoop.conf.Configurable; 027import org.apache.hadoop.conf.Configuration; 028 029import org.apache.hadoop.classification.InterfaceAudience; 030import org.apache.hadoop.classification.InterfaceStability; 031import org.apache.hadoop.fs.Seekable; 032import org.apache.hadoop.io.compress.bzip2.BZip2Constants; 033import org.apache.hadoop.io.compress.bzip2.CBZip2InputStream; 034import org.apache.hadoop.io.compress.bzip2.CBZip2OutputStream; 035import org.apache.hadoop.io.compress.bzip2.Bzip2Factory; 036 037/** 038 * This class provides output and input streams for bzip2 compression 039 * and decompression. It uses the native bzip2 library on the system 040 * if possible, else it uses a pure-Java implementation of the bzip2 041 * algorithm. The configuration parameter 042 * io.compression.codec.bzip2.library can be used to control this 043 * behavior. 044 * 045 * In the pure-Java mode, the Compressor and Decompressor interfaces 046 * are not implemented. Therefore, in that mode, those methods of 047 * CompressionCodec which have a Compressor or Decompressor type 048 * argument, throw UnsupportedOperationException. 049 * 050 * Currently, support for splittability is available only in the 051 * pure-Java mode; therefore, if a SplitCompressionInputStream is 052 * requested, the pure-Java implementation is used, regardless of the 053 * setting of the configuration parameter mentioned above. 054 */ 055@InterfaceAudience.Public 056@InterfaceStability.Evolving 057public class BZip2Codec implements Configurable, SplittableCompressionCodec { 058 059 private static final String HEADER = "BZ"; 060 private static final int HEADER_LEN = HEADER.length(); 061 private static final String SUB_HEADER = "h9"; 062 private static final int SUB_HEADER_LEN = SUB_HEADER.length(); 063 064 private Configuration conf; 065 066 /** 067 * Set the configuration to be used by this object. 068 * 069 * @param conf the configuration object. 070 */ 071 @Override 072 public void setConf(Configuration conf) { 073 this.conf = conf; 074 } 075 076 /** 077 * Return the configuration used by this object. 078 * 079 * @return the configuration object used by this objec. 080 */ 081 @Override 082 public Configuration getConf() { 083 return conf; 084 } 085 086 /** 087 * Creates a new instance of BZip2Codec. 088 */ 089 public BZip2Codec() { } 090 091 /** 092 * Create a {@link CompressionOutputStream} that will write to the given 093 * {@link OutputStream}. 094 * 095 * @param out the location for the final output stream 096 * @return a stream the user can write uncompressed data to, to have it 097 * compressed 098 * @throws IOException 099 */ 100 @Override 101 public CompressionOutputStream createOutputStream(OutputStream out) 102 throws IOException { 103 return CompressionCodec.Util. 104 createOutputStreamWithCodecPool(this, conf, out); 105 } 106 107 /** 108 * Create a {@link CompressionOutputStream} that will write to the given 109 * {@link OutputStream} with the given {@link Compressor}. 110 * 111 * @param out the location for the final output stream 112 * @param compressor compressor to use 113 * @return a stream the user can write uncompressed data to, to have it 114 * compressed 115 * @throws IOException 116 */ 117 @Override 118 public CompressionOutputStream createOutputStream(OutputStream out, 119 Compressor compressor) throws IOException { 120 return Bzip2Factory.isNativeBzip2Loaded(conf) ? 121 new CompressorStream(out, compressor, 122 conf.getInt("io.file.buffer.size", 4*1024)) : 123 new BZip2CompressionOutputStream(out); 124 } 125 126 /** 127 * Get the type of {@link Compressor} needed by this {@link CompressionCodec}. 128 * 129 * @return the type of compressor needed by this codec. 130 */ 131 @Override 132 public Class<? extends Compressor> getCompressorType() { 133 return Bzip2Factory.getBzip2CompressorType(conf); 134 } 135 136 /** 137 * Create a new {@link Compressor} for use by this {@link CompressionCodec}. 138 * 139 * @return a new compressor for use by this codec 140 */ 141 @Override 142 public Compressor createCompressor() { 143 return Bzip2Factory.getBzip2Compressor(conf); 144 } 145 146 /** 147 * Create a {@link CompressionInputStream} that will read from the given 148 * input stream and return a stream for uncompressed data. 149 * 150 * @param in the stream to read compressed bytes from 151 * @return a stream to read uncompressed bytes from 152 * @throws IOException 153 */ 154 @Override 155 public CompressionInputStream createInputStream(InputStream in) 156 throws IOException { 157 return CompressionCodec.Util. 158 createInputStreamWithCodecPool(this, conf, in); 159 } 160 161 /** 162 * Create a {@link CompressionInputStream} that will read from the given 163 * {@link InputStream} with the given {@link Decompressor}, and return a 164 * stream for uncompressed data. 165 * 166 * @param in the stream to read compressed bytes from 167 * @param decompressor decompressor to use 168 * @return a stream to read uncompressed bytes from 169 * @throws IOException 170 */ 171 @Override 172 public CompressionInputStream createInputStream(InputStream in, 173 Decompressor decompressor) throws IOException { 174 return Bzip2Factory.isNativeBzip2Loaded(conf) ? 175 new DecompressorStream(in, decompressor, 176 conf.getInt("io.file.buffer.size", 4*1024)) : 177 new BZip2CompressionInputStream(in); 178 } 179 180 /** 181 * Creates CompressionInputStream to be used to read off uncompressed data 182 * in one of the two reading modes. i.e. Continuous or Blocked reading modes 183 * 184 * @param seekableIn The InputStream 185 * @param start The start offset into the compressed stream 186 * @param end The end offset into the compressed stream 187 * @param readMode Controls whether progress is reported continuously or 188 * only at block boundaries. 189 * 190 * @return CompressionInputStream for BZip2 aligned at block boundaries 191 */ 192 public SplitCompressionInputStream createInputStream(InputStream seekableIn, 193 Decompressor decompressor, long start, long end, READ_MODE readMode) 194 throws IOException { 195 196 if (!(seekableIn instanceof Seekable)) { 197 throw new IOException("seekableIn must be an instance of " + 198 Seekable.class.getName()); 199 } 200 201 //find the position of first BZip2 start up marker 202 ((Seekable)seekableIn).seek(0); 203 204 // BZip2 start of block markers are of 6 bytes. But the very first block 205 // also has "BZh9", making it 10 bytes. This is the common case. But at 206 // time stream might start without a leading BZ. 207 final long FIRST_BZIP2_BLOCK_MARKER_POSITION = 208 CBZip2InputStream.numberOfBytesTillNextMarker(seekableIn); 209 long adjStart = 0L; 210 if (start != 0) { 211 // Other than the first of file, the marker size is 6 bytes. 212 adjStart = Math.max(0L, start - (FIRST_BZIP2_BLOCK_MARKER_POSITION 213 - (HEADER_LEN + SUB_HEADER_LEN))); 214 } 215 216 ((Seekable)seekableIn).seek(adjStart); 217 SplitCompressionInputStream in = 218 new BZip2CompressionInputStream(seekableIn, adjStart, end, readMode); 219 220 221 // The following if clause handles the following case: 222 // Assume the following scenario in BZip2 compressed stream where 223 // . represent compressed data. 224 // .....[48 bit Block].....[48 bit Block].....[48 bit Block]... 225 // ........................[47 bits][1 bit].....[48 bit Block]... 226 // ................................^[Assume a Byte alignment here] 227 // ........................................^^[current position of stream] 228 // .....................^^[We go back 10 Bytes in stream and find a Block marker] 229 // ........................................^^[We align at wrong position!] 230 // ...........................................................^^[While this pos is correct] 231 232 if (in.getPos() < start) { 233 ((Seekable)seekableIn).seek(start); 234 in = new BZip2CompressionInputStream(seekableIn, start, end, readMode); 235 } 236 237 return in; 238 } 239 240 /** 241 * Get the type of {@link Decompressor} needed by this {@link CompressionCodec}. 242 * 243 * @return the type of decompressor needed by this codec. 244 */ 245 @Override 246 public Class<? extends Decompressor> getDecompressorType() { 247 return Bzip2Factory.getBzip2DecompressorType(conf); 248 } 249 250 /** 251 * Create a new {@link Decompressor} for use by this {@link CompressionCodec}. 252 * 253 * @return a new decompressor for use by this codec 254 */ 255 @Override 256 public Decompressor createDecompressor() { 257 return Bzip2Factory.getBzip2Decompressor(conf); 258 } 259 260 /** 261 * .bz2 is recognized as the default extension for compressed BZip2 files 262 * 263 * @return A String telling the default bzip2 file extension 264 */ 265 @Override 266 public String getDefaultExtension() { 267 return ".bz2"; 268 } 269 270 private static class BZip2CompressionOutputStream extends 271 CompressionOutputStream { 272 273 // class data starts here// 274 private CBZip2OutputStream output; 275 private boolean needsReset; 276 // class data ends here// 277 278 public BZip2CompressionOutputStream(OutputStream out) 279 throws IOException { 280 super(out); 281 needsReset = true; 282 } 283 284 private void writeStreamHeader() throws IOException { 285 if (super.out != null) { 286 // The compressed bzip2 stream should start with the 287 // identifying characters BZ. Caller of CBZip2OutputStream 288 // i.e. this class must write these characters. 289 out.write(HEADER.getBytes()); 290 } 291 } 292 293 public void finish() throws IOException { 294 if (needsReset) { 295 // In the case that nothing is written to this stream, we still need to 296 // write out the header before closing, otherwise the stream won't be 297 // recognized by BZip2CompressionInputStream. 298 internalReset(); 299 } 300 this.output.finish(); 301 needsReset = true; 302 } 303 304 private void internalReset() throws IOException { 305 if (needsReset) { 306 needsReset = false; 307 writeStreamHeader(); 308 this.output = new CBZip2OutputStream(out); 309 } 310 } 311 312 public void resetState() throws IOException { 313 // Cannot write to out at this point because out might not be ready 314 // yet, as in SequenceFile.Writer implementation. 315 needsReset = true; 316 } 317 318 public void write(int b) throws IOException { 319 if (needsReset) { 320 internalReset(); 321 } 322 this.output.write(b); 323 } 324 325 public void write(byte[] b, int off, int len) throws IOException { 326 if (needsReset) { 327 internalReset(); 328 } 329 this.output.write(b, off, len); 330 } 331 332 public void close() throws IOException { 333 if (needsReset) { 334 // In the case that nothing is written to this stream, we still need to 335 // write out the header before closing, otherwise the stream won't be 336 // recognized by BZip2CompressionInputStream. 337 internalReset(); 338 } 339 this.output.flush(); 340 this.output.close(); 341 needsReset = true; 342 } 343 344 }// end of class BZip2CompressionOutputStream 345 346 /** 347 * This class is capable to de-compress BZip2 data in two modes; 348 * CONTINOUS and BYBLOCK. BYBLOCK mode makes it possible to 349 * do decompression starting any arbitrary position in the stream. 350 * 351 * So this facility can easily be used to parallelize decompression 352 * of a large BZip2 file for performance reasons. (It is exactly 353 * done so for Hadoop framework. See LineRecordReader for an 354 * example). So one can break the file (of course logically) into 355 * chunks for parallel processing. These "splits" should be like 356 * default Hadoop splits (e.g as in FileInputFormat getSplit metod). 357 * So this code is designed and tested for FileInputFormat's way 358 * of splitting only. 359 */ 360 361 private static class BZip2CompressionInputStream extends 362 SplitCompressionInputStream { 363 364 // class data starts here// 365 private CBZip2InputStream input; 366 boolean needsReset; 367 private BufferedInputStream bufferedIn; 368 private boolean isHeaderStripped = false; 369 private boolean isSubHeaderStripped = false; 370 private READ_MODE readMode = READ_MODE.CONTINUOUS; 371 private long startingPos = 0L; 372 373 // Following state machine handles different states of compressed stream 374 // position 375 // HOLD : Don't advertise compressed stream position 376 // ADVERTISE : Read 1 more character and advertise stream position 377 // See more comments about it before updatePos method. 378 private enum POS_ADVERTISEMENT_STATE_MACHINE { 379 HOLD, ADVERTISE 380 }; 381 382 POS_ADVERTISEMENT_STATE_MACHINE posSM = POS_ADVERTISEMENT_STATE_MACHINE.HOLD; 383 long compressedStreamPosition = 0; 384 385 // class data ends here// 386 387 public BZip2CompressionInputStream(InputStream in) throws IOException { 388 this(in, 0L, Long.MAX_VALUE, READ_MODE.CONTINUOUS); 389 } 390 391 public BZip2CompressionInputStream(InputStream in, long start, long end, 392 READ_MODE readMode) throws IOException { 393 super(in, start, end); 394 needsReset = false; 395 bufferedIn = new BufferedInputStream(super.in); 396 this.startingPos = super.getPos(); 397 this.readMode = readMode; 398 if (this.startingPos == 0) { 399 // We only strip header if it is start of file 400 bufferedIn = readStreamHeader(); 401 } 402 input = new CBZip2InputStream(bufferedIn, readMode); 403 if (this.isHeaderStripped) { 404 input.updateReportedByteCount(HEADER_LEN); 405 } 406 407 if (this.isSubHeaderStripped) { 408 input.updateReportedByteCount(SUB_HEADER_LEN); 409 } 410 411 this.updatePos(false); 412 } 413 414 private BufferedInputStream readStreamHeader() throws IOException { 415 // We are flexible enough to allow the compressed stream not to 416 // start with the header of BZ. So it works fine either we have 417 // the header or not. 418 if (super.in != null) { 419 bufferedIn.mark(HEADER_LEN); 420 byte[] headerBytes = new byte[HEADER_LEN]; 421 int actualRead = bufferedIn.read(headerBytes, 0, HEADER_LEN); 422 if (actualRead != -1) { 423 String header = new String(headerBytes); 424 if (header.compareTo(HEADER) != 0) { 425 bufferedIn.reset(); 426 } else { 427 this.isHeaderStripped = true; 428 // In case of BYBLOCK mode, we also want to strip off 429 // remaining two character of the header. 430 if (this.readMode == READ_MODE.BYBLOCK) { 431 actualRead = bufferedIn.read(headerBytes, 0, 432 SUB_HEADER_LEN); 433 if (actualRead != -1) { 434 this.isSubHeaderStripped = true; 435 } 436 } 437 } 438 } 439 } 440 441 if (bufferedIn == null) { 442 throw new IOException("Failed to read bzip2 stream."); 443 } 444 445 return bufferedIn; 446 447 }// end of method 448 449 public void close() throws IOException { 450 if (!needsReset) { 451 input.close(); 452 needsReset = true; 453 } 454 } 455 456 /** 457 * This method updates compressed stream position exactly when the 458 * client of this code has read off at least one byte passed any BZip2 459 * end of block marker. 460 * 461 * This mechanism is very helpful to deal with data level record 462 * boundaries. Please see constructor and next methods of 463 * org.apache.hadoop.mapred.LineRecordReader as an example usage of this 464 * feature. We elaborate it with an example in the following: 465 * 466 * Assume two different scenarios of the BZip2 compressed stream, where 467 * [m] represent end of block, \n is line delimiter and . represent compressed 468 * data. 469 * 470 * ............[m]......\n....... 471 * 472 * ..........\n[m]......\n....... 473 * 474 * Assume that end is right after [m]. In the first case the reading 475 * will stop at \n and there is no need to read one more line. (To see the 476 * reason of reading one more line in the next() method is explained in LineRecordReader.) 477 * While in the second example LineRecordReader needs to read one more line 478 * (till the second \n). Now since BZip2Codecs only update position 479 * at least one byte passed a maker, so it is straight forward to differentiate 480 * between the two cases mentioned. 481 * 482 */ 483 484 public int read(byte[] b, int off, int len) throws IOException { 485 if (needsReset) { 486 internalReset(); 487 } 488 489 int result = 0; 490 result = this.input.read(b, off, len); 491 if (result == BZip2Constants.END_OF_BLOCK) { 492 this.posSM = POS_ADVERTISEMENT_STATE_MACHINE.ADVERTISE; 493 } 494 495 if (this.posSM == POS_ADVERTISEMENT_STATE_MACHINE.ADVERTISE) { 496 result = this.input.read(b, off, off + 1); 497 // This is the precise time to update compressed stream position 498 // to the client of this code. 499 this.updatePos(true); 500 this.posSM = POS_ADVERTISEMENT_STATE_MACHINE.HOLD; 501 } 502 503 return result; 504 505 } 506 507 public int read() throws IOException { 508 byte b[] = new byte[1]; 509 int result = this.read(b, 0, 1); 510 return (result < 0) ? result : (b[0] & 0xff); 511 } 512 513 private void internalReset() throws IOException { 514 if (needsReset) { 515 needsReset = false; 516 BufferedInputStream bufferedIn = readStreamHeader(); 517 input = new CBZip2InputStream(bufferedIn, this.readMode); 518 } 519 } 520 521 public void resetState() throws IOException { 522 // Cannot read from bufferedIn at this point because bufferedIn 523 // might not be ready 524 // yet, as in SequenceFile.Reader implementation. 525 needsReset = true; 526 } 527 528 public long getPos() { 529 return this.compressedStreamPosition; 530 } 531 532 /* 533 * As the comments before read method tell that 534 * compressed stream is advertised when at least 535 * one byte passed EOB have been read off. But 536 * there is an exception to this rule. When we 537 * construct the stream we advertise the position 538 * exactly at EOB. In the following method 539 * shouldAddOn boolean captures this exception. 540 * 541 */ 542 private void updatePos(boolean shouldAddOn) { 543 int addOn = shouldAddOn ? 1 : 0; 544 this.compressedStreamPosition = this.startingPos 545 + this.input.getProcessedByteCount() + addOn; 546 } 547 548 }// end of BZip2CompressionInputStream 549 550}