001 /*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.io.compress;
020
021 import java.io.EOFException;
022 import java.io.IOException;
023 import java.io.InputStream;
024
025 import org.apache.hadoop.classification.InterfaceAudience;
026 import org.apache.hadoop.classification.InterfaceStability;
027 import org.apache.hadoop.io.compress.Decompressor;
028
029 @InterfaceAudience.Public
030 @InterfaceStability.Evolving
031 public class DecompressorStream extends CompressionInputStream {
032 protected Decompressor decompressor = null;
033 protected byte[] buffer;
034 protected boolean eof = false;
035 protected boolean closed = false;
036 private int lastBytesSent = 0;
037
038 public DecompressorStream(InputStream in, Decompressor decompressor,
039 int bufferSize)
040 throws IOException {
041 super(in);
042
043 if (in == null || decompressor == null) {
044 throw new NullPointerException();
045 } else if (bufferSize <= 0) {
046 throw new IllegalArgumentException("Illegal bufferSize");
047 }
048
049 this.decompressor = decompressor;
050 buffer = new byte[bufferSize];
051 }
052
053 public DecompressorStream(InputStream in, Decompressor decompressor)
054 throws IOException {
055 this(in, decompressor, 512);
056 }
057
058 /**
059 * Allow derived classes to directly set the underlying stream.
060 *
061 * @param in Underlying input stream.
062 * @throws IOException
063 */
064 protected DecompressorStream(InputStream in) throws IOException {
065 super(in);
066 }
067
068 private byte[] oneByte = new byte[1];
069 @Override
070 public int read() throws IOException {
071 checkStream();
072 return (read(oneByte, 0, oneByte.length) == -1) ? -1 : (oneByte[0] & 0xff);
073 }
074
075 @Override
076 public int read(byte[] b, int off, int len) throws IOException {
077 checkStream();
078
079 if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
080 throw new IndexOutOfBoundsException();
081 } else if (len == 0) {
082 return 0;
083 }
084
085 return decompress(b, off, len);
086 }
087
088 protected int decompress(byte[] b, int off, int len) throws IOException {
089 int n = 0;
090
091 while ((n = decompressor.decompress(b, off, len)) == 0) {
092 if (decompressor.needsDictionary()) {
093 eof = true;
094 return -1;
095 }
096
097 if (decompressor.finished()) {
098 // First see if there was any leftover buffered input from previous
099 // stream; if not, attempt to refill buffer. If refill -> EOF, we're
100 // all done; else reset, fix up input buffer, and get ready for next
101 // concatenated substream/"member".
102 int nRemaining = decompressor.getRemaining();
103 if (nRemaining == 0) {
104 int m = getCompressedData();
105 if (m == -1) {
106 // apparently the previous end-of-stream was also end-of-file:
107 // return success, as if we had never called getCompressedData()
108 eof = true;
109 return -1;
110 }
111 decompressor.reset();
112 decompressor.setInput(buffer, 0, m);
113 lastBytesSent = m;
114 } else {
115 // looks like it's a concatenated stream: reset low-level zlib (or
116 // other engine) and buffers, then "resend" remaining input data
117 decompressor.reset();
118 int leftoverOffset = lastBytesSent - nRemaining;
119 assert (leftoverOffset >= 0);
120 // this recopies userBuf -> direct buffer if using native libraries:
121 decompressor.setInput(buffer, leftoverOffset, nRemaining);
122 // NOTE: this is the one place we do NOT want to save the number
123 // of bytes sent (nRemaining here) into lastBytesSent: since we
124 // are resending what we've already sent before, offset is nonzero
125 // in general (only way it could be zero is if it already equals
126 // nRemaining), which would then screw up the offset calculation
127 // _next_ time around. IOW, getRemaining() is in terms of the
128 // original, zero-offset bufferload, so lastBytesSent must be as
129 // well. Cheesy ASCII art:
130 //
131 // <------------ m, lastBytesSent ----------->
132 // +===============================================+
133 // buffer: |1111111111|22222222222222222|333333333333| |
134 // +===============================================+
135 // #1: <-- off -->|<-------- nRemaining --------->
136 // #2: <----------- off ----------->|<-- nRem. -->
137 // #3: (final substream: nRemaining == 0; eof = true)
138 //
139 // If lastBytesSent is anything other than m, as shown, then "off"
140 // will be calculated incorrectly.
141 }
142 } else if (decompressor.needsInput()) {
143 int m = getCompressedData();
144 if (m == -1) {
145 throw new EOFException("Unexpected end of input stream");
146 }
147 decompressor.setInput(buffer, 0, m);
148 lastBytesSent = m;
149 }
150 }
151
152 return n;
153 }
154
155 protected int getCompressedData() throws IOException {
156 checkStream();
157
158 // note that the _caller_ is now required to call setInput() or throw
159 return in.read(buffer, 0, buffer.length);
160 }
161
162 protected void checkStream() throws IOException {
163 if (closed) {
164 throw new IOException("Stream closed");
165 }
166 }
167
168 @Override
169 public void resetState() throws IOException {
170 decompressor.reset();
171 }
172
173 private byte[] skipBytes = new byte[512];
174 @Override
175 public long skip(long n) throws IOException {
176 // Sanity checks
177 if (n < 0) {
178 throw new IllegalArgumentException("negative skip length");
179 }
180 checkStream();
181
182 // Read 'n' bytes
183 int skipped = 0;
184 while (skipped < n) {
185 int len = Math.min(((int)n - skipped), skipBytes.length);
186 len = read(skipBytes, 0, len);
187 if (len == -1) {
188 eof = true;
189 break;
190 }
191 skipped += len;
192 }
193 return skipped;
194 }
195
196 @Override
197 public int available() throws IOException {
198 checkStream();
199 return (eof) ? 0 : 1;
200 }
201
202 @Override
203 public void close() throws IOException {
204 if (!closed) {
205 in.close();
206 closed = true;
207 }
208 }
209
210 @Override
211 public boolean markSupported() {
212 return false;
213 }
214
215 @Override
216 public synchronized void mark(int readlimit) {
217 }
218
219 @Override
220 public synchronized void reset() throws IOException {
221 throw new IOException("mark/reset not supported");
222 }
223
224 }