001 /*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.io.compress;
020
021 import java.io.EOFException;
022 import java.io.IOException;
023 import java.io.InputStream;
024
025 import org.apache.hadoop.classification.InterfaceAudience;
026 import org.apache.hadoop.classification.InterfaceStability;
027
028 /**
029 * A {@link org.apache.hadoop.io.compress.DecompressorStream} which works
030 * with 'block-based' based compression algorithms, as opposed to
031 * 'stream-based' compression algorithms.
032 *
033 */
034 @InterfaceAudience.Public
035 @InterfaceStability.Evolving
036 public class BlockDecompressorStream extends DecompressorStream {
037 private int originalBlockSize = 0;
038 private int noUncompressedBytes = 0;
039
040 /**
041 * Create a {@link BlockDecompressorStream}.
042 *
043 * @param in input stream
044 * @param decompressor decompressor to use
045 * @param bufferSize size of buffer
046 * @throws IOException
047 */
048 public BlockDecompressorStream(InputStream in, Decompressor decompressor,
049 int bufferSize) throws IOException {
050 super(in, decompressor, bufferSize);
051 }
052
053 /**
054 * Create a {@link BlockDecompressorStream}.
055 *
056 * @param in input stream
057 * @param decompressor decompressor to use
058 * @throws IOException
059 */
060 public BlockDecompressorStream(InputStream in, Decompressor decompressor) throws IOException {
061 super(in, decompressor);
062 }
063
064 protected BlockDecompressorStream(InputStream in) throws IOException {
065 super(in);
066 }
067
068 @Override
069 protected int decompress(byte[] b, int off, int len) throws IOException {
070 // Check if we are the beginning of a block
071 if (noUncompressedBytes == originalBlockSize) {
072 // Get original data size
073 try {
074 originalBlockSize = rawReadInt();
075 } catch (IOException ioe) {
076 return -1;
077 }
078 noUncompressedBytes = 0;
079 // EOF if originalBlockSize is 0
080 // This will occur only when decompressing previous compressed empty file
081 if (originalBlockSize == 0) {
082 eof = true;
083 return -1;
084 }
085 }
086
087 int n = 0;
088 while ((n = decompressor.decompress(b, off, len)) == 0) {
089 if (decompressor.finished() || decompressor.needsDictionary()) {
090 if (noUncompressedBytes >= originalBlockSize) {
091 eof = true;
092 return -1;
093 }
094 }
095 if (decompressor.needsInput()) {
096 int m;
097 try {
098 m = getCompressedData();
099 } catch (EOFException e) {
100 eof = true;
101 return -1;
102 }
103 // Send the read data to the decompressor
104 decompressor.setInput(buffer, 0, m);
105 }
106 }
107
108 // Note the no. of decompressed bytes read from 'current' block
109 noUncompressedBytes += n;
110
111 return n;
112 }
113
114 @Override
115 protected int getCompressedData() throws IOException {
116 checkStream();
117
118 // Get the size of the compressed chunk (always non-negative)
119 int len = rawReadInt();
120
121 // Read len bytes from underlying stream
122 if (len > buffer.length) {
123 buffer = new byte[len];
124 }
125 int n = 0, off = 0;
126 while (n < len) {
127 int count = in.read(buffer, off + n, len - n);
128 if (count < 0) {
129 throw new EOFException("Unexpected end of block in input stream");
130 }
131 n += count;
132 }
133
134 return len;
135 }
136
137 @Override
138 public void resetState() throws IOException {
139 originalBlockSize = 0;
140 noUncompressedBytes = 0;
141 super.resetState();
142 }
143
144 private int rawReadInt() throws IOException {
145 int b1 = in.read();
146 int b2 = in.read();
147 int b3 = in.read();
148 int b4 = in.read();
149 if ((b1 | b2 | b3 | b4) < 0)
150 throw new EOFException();
151 return ((b1 << 24) + (b2 << 16) + (b3 << 8) + (b4 << 0));
152 }
153 }