001 /*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.io.compress;
020
021 import java.io.IOException;
022 import java.io.OutputStream;
023
024 import org.apache.hadoop.classification.InterfaceAudience;
025 import org.apache.hadoop.classification.InterfaceStability;
026
027 /**
028 * A {@link org.apache.hadoop.io.compress.CompressorStream} which works
029 * with 'block-based' based compression algorithms, as opposed to
030 * 'stream-based' compression algorithms.
031 *
032 * It should be noted that this wrapper does not guarantee that blocks will
033 * be sized for the compressor. If the
034 * {@link org.apache.hadoop.io.compress.Compressor} requires buffering to
035 * effect meaningful compression, it is responsible for it.
036 */
037 @InterfaceAudience.Public
038 @InterfaceStability.Evolving
039 public class BlockCompressorStream extends CompressorStream {
040
041 // The 'maximum' size of input data to be compressed, to account
042 // for the overhead of the compression algorithm.
043 private final int MAX_INPUT_SIZE;
044
045 /**
046 * Create a {@link BlockCompressorStream}.
047 *
048 * @param out stream
049 * @param compressor compressor to be used
050 * @param bufferSize size of buffer
051 * @param compressionOverhead maximum 'overhead' of the compression
052 * algorithm with given bufferSize
053 */
054 public BlockCompressorStream(OutputStream out, Compressor compressor,
055 int bufferSize, int compressionOverhead) {
056 super(out, compressor, bufferSize);
057 MAX_INPUT_SIZE = bufferSize - compressionOverhead;
058 }
059
060 /**
061 * Create a {@link BlockCompressorStream} with given output-stream and
062 * compressor.
063 * Use default of 512 as bufferSize and compressionOverhead of
064 * (1% of bufferSize + 12 bytes) = 18 bytes (zlib algorithm).
065 *
066 * @param out stream
067 * @param compressor compressor to be used
068 */
069 public BlockCompressorStream(OutputStream out, Compressor compressor) {
070 this(out, compressor, 512, 18);
071 }
072
073 /**
074 * Write the data provided to the compression codec, compressing no more
075 * than the buffer size less the compression overhead as specified during
076 * construction for each block.
077 *
078 * Each block contains the uncompressed length for the block, followed by
079 * one or more length-prefixed blocks of compressed data.
080 */
081 @Override
082 public void write(byte[] b, int off, int len) throws IOException {
083 // Sanity checks
084 if (compressor.finished()) {
085 throw new IOException("write beyond end of stream");
086 }
087 if (b == null) {
088 throw new NullPointerException();
089 } else if ((off < 0) || (off > b.length) || (len < 0) ||
090 ((off + len) > b.length)) {
091 throw new IndexOutOfBoundsException();
092 } else if (len == 0) {
093 return;
094 }
095
096 long limlen = compressor.getBytesRead();
097 if (len + limlen > MAX_INPUT_SIZE && limlen > 0) {
098 // Adding this segment would exceed the maximum size.
099 // Flush data if we have it.
100 finish();
101 compressor.reset();
102 }
103
104 if (len > MAX_INPUT_SIZE) {
105 // The data we're given exceeds the maximum size. Any data
106 // we had have been flushed, so we write out this chunk in segments
107 // not exceeding the maximum size until it is exhausted.
108 rawWriteInt(len);
109 do {
110 int bufLen = Math.min(len, MAX_INPUT_SIZE);
111
112 compressor.setInput(b, off, bufLen);
113 compressor.finish();
114 while (!compressor.finished()) {
115 compress();
116 }
117 compressor.reset();
118 off += bufLen;
119 len -= bufLen;
120 } while (len > 0);
121 return;
122 }
123
124 // Give data to the compressor
125 compressor.setInput(b, off, len);
126 if (!compressor.needsInput()) {
127 // compressor buffer size might be smaller than the maximum
128 // size, so we permit it to flush if required.
129 rawWriteInt((int)compressor.getBytesRead());
130 do {
131 compress();
132 } while (!compressor.needsInput());
133 }
134 }
135
136 @Override
137 public void finish() throws IOException {
138 if (!compressor.finished()) {
139 rawWriteInt((int)compressor.getBytesRead());
140 compressor.finish();
141 while (!compressor.finished()) {
142 compress();
143 }
144 }
145 }
146
147 @Override
148 protected void compress() throws IOException {
149 int len = compressor.compress(buffer, 0, buffer.length);
150 if (len > 0) {
151 // Write out the compressed chunk
152 rawWriteInt(len);
153 out.write(buffer, 0, len);
154 }
155 }
156
157 private void rawWriteInt(int v) throws IOException {
158 out.write((v >>> 24) & 0xFF);
159 out.write((v >>> 16) & 0xFF);
160 out.write((v >>> 8) & 0xFF);
161 out.write((v >>> 0) & 0xFF);
162 }
163
164 }