001 /*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.io.compress;
020
021 import java.io.IOException;
022 import java.io.InputStream;
023
024 import org.apache.hadoop.classification.InterfaceAudience;
025 import org.apache.hadoop.classification.InterfaceStability;
026 import org.apache.hadoop.fs.PositionedReadable;
027 import org.apache.hadoop.fs.Seekable;
028 /**
029 * A compression input stream.
030 *
031 * <p>Implementations are assumed to be buffered. This permits clients to
032 * reposition the underlying input stream then call {@link #resetState()},
033 * without having to also synchronize client buffers.
034 */
035 @InterfaceAudience.Public
036 @InterfaceStability.Evolving
037 public abstract class CompressionInputStream extends InputStream implements Seekable {
038 /**
039 * The input stream to be compressed.
040 */
041 protected final InputStream in;
042 protected long maxAvailableData = 0L;
043
044 private Decompressor trackedDecompressor;
045
046 /**
047 * Create a compression input stream that reads
048 * the decompressed bytes from the given stream.
049 *
050 * @param in The input stream to be compressed.
051 * @throws IOException
052 */
053 protected CompressionInputStream(InputStream in) throws IOException {
054 if (!(in instanceof Seekable) || !(in instanceof PositionedReadable)) {
055 this.maxAvailableData = in.available();
056 }
057 this.in = in;
058 }
059
060 @Override
061 public void close() throws IOException {
062 in.close();
063 if (trackedDecompressor != null) {
064 CodecPool.returnDecompressor(trackedDecompressor);
065 trackedDecompressor = null;
066 }
067 }
068
069 /**
070 * Read bytes from the stream.
071 * Made abstract to prevent leakage to underlying stream.
072 */
073 @Override
074 public abstract int read(byte[] b, int off, int len) throws IOException;
075
076 /**
077 * Reset the decompressor to its initial state and discard any buffered data,
078 * as the underlying stream may have been repositioned.
079 */
080 public abstract void resetState() throws IOException;
081
082 /**
083 * This method returns the current position in the stream.
084 *
085 * @return Current position in stream as a long
086 */
087 @Override
088 public long getPos() throws IOException {
089 if (!(in instanceof Seekable) || !(in instanceof PositionedReadable)){
090 //This way of getting the current position will not work for file
091 //size which can be fit in an int and hence can not be returned by
092 //available method.
093 return (this.maxAvailableData - this.in.available());
094 }
095 else{
096 return ((Seekable)this.in).getPos();
097 }
098
099 }
100
101 /**
102 * This method is current not supported.
103 *
104 * @throws UnsupportedOperationException
105 */
106
107 @Override
108 public void seek(long pos) throws UnsupportedOperationException {
109 throw new UnsupportedOperationException();
110 }
111
112 /**
113 * This method is current not supported.
114 *
115 * @throws UnsupportedOperationException
116 */
117 @Override
118 public boolean seekToNewSource(long targetPos) throws UnsupportedOperationException {
119 throw new UnsupportedOperationException();
120 }
121
122 void setTrackedDecompressor(Decompressor decompressor) {
123 trackedDecompressor = decompressor;
124 }
125 }