001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.hadoop.io.compress;
019
020 import java.util.ArrayList;
021 import java.util.HashMap;
022 import java.util.List;
023 import java.util.Map;
024 import java.util.concurrent.atomic.AtomicInteger;
025
026 import org.apache.commons.logging.Log;
027 import org.apache.commons.logging.LogFactory;
028 import org.apache.hadoop.classification.InterfaceAudience;
029 import org.apache.hadoop.classification.InterfaceStability;
030 import org.apache.hadoop.conf.Configuration;
031 import org.apache.hadoop.util.ReflectionUtils;
032
033 import com.google.common.cache.CacheBuilder;
034 import com.google.common.cache.CacheLoader;
035 import com.google.common.cache.LoadingCache;
036
037 /**
038 * A global compressor/decompressor pool used to save and reuse
039 * (possibly native) compression/decompression codecs.
040 */
041 @InterfaceAudience.Public
042 @InterfaceStability.Evolving
043 public class CodecPool {
044 private static final Log LOG = LogFactory.getLog(CodecPool.class);
045
046 /**
047 * A global compressor pool used to save the expensive
048 * construction/destruction of (possibly native) decompression codecs.
049 */
050 private static final Map<Class<Compressor>, List<Compressor>> compressorPool =
051 new HashMap<Class<Compressor>, List<Compressor>>();
052
053 /**
054 * A global decompressor pool used to save the expensive
055 * construction/destruction of (possibly native) decompression codecs.
056 */
057 private static final Map<Class<Decompressor>, List<Decompressor>> decompressorPool =
058 new HashMap<Class<Decompressor>, List<Decompressor>>();
059
060 private static <T> LoadingCache<Class<T>, AtomicInteger> createCache(
061 Class<T> klass) {
062 return CacheBuilder.newBuilder().build(
063 new CacheLoader<Class<T>, AtomicInteger>() {
064 @Override
065 public AtomicInteger load(Class<T> key) throws Exception {
066 return new AtomicInteger();
067 }
068 });
069 }
070
071 /**
072 * Map to track the number of leased compressors
073 */
074 private static final LoadingCache<Class<Compressor>, AtomicInteger> compressorCounts =
075 createCache(Compressor.class);
076
077 /**
078 * Map to tracks the number of leased decompressors
079 */
080 private static final LoadingCache<Class<Decompressor>, AtomicInteger> decompressorCounts =
081 createCache(Decompressor.class);
082
083 private static <T> T borrow(Map<Class<T>, List<T>> pool,
084 Class<? extends T> codecClass) {
085 T codec = null;
086
087 // Check if an appropriate codec is available
088 List<T> codecList;
089 synchronized (pool) {
090 codecList = pool.get(codecClass);
091 }
092
093 if (codecList != null) {
094 synchronized (codecList) {
095 if (!codecList.isEmpty()) {
096 codec = codecList.remove(codecList.size() - 1);
097 }
098 }
099 }
100
101 return codec;
102 }
103
104 private static <T> void payback(Map<Class<T>, List<T>> pool, T codec) {
105 if (codec != null) {
106 Class<T> codecClass = ReflectionUtils.getClass(codec);
107 List<T> codecList;
108 synchronized (pool) {
109 codecList = pool.get(codecClass);
110 if (codecList == null) {
111 codecList = new ArrayList<T>();
112 pool.put(codecClass, codecList);
113 }
114 }
115
116 synchronized (codecList) {
117 codecList.add(codec);
118 }
119 }
120 }
121
122 @SuppressWarnings("unchecked")
123 private static <T> int getLeaseCount(
124 LoadingCache<Class<T>, AtomicInteger> usageCounts,
125 Class<? extends T> codecClass) {
126 return usageCounts.getUnchecked((Class<T>) codecClass).get();
127 }
128
129 private static <T> void updateLeaseCount(
130 LoadingCache<Class<T>, AtomicInteger> usageCounts, T codec, int delta) {
131 if (codec != null) {
132 Class<T> codecClass = ReflectionUtils.getClass(codec);
133 usageCounts.getUnchecked(codecClass).addAndGet(delta);
134 }
135 }
136
137 /**
138 * Get a {@link Compressor} for the given {@link CompressionCodec} from the
139 * pool or a new one.
140 *
141 * @param codec the <code>CompressionCodec</code> for which to get the
142 * <code>Compressor</code>
143 * @param conf the <code>Configuration</code> object which contains confs for creating or reinit the compressor
144 * @return <code>Compressor</code> for the given
145 * <code>CompressionCodec</code> from the pool or a new one
146 */
147 public static Compressor getCompressor(CompressionCodec codec, Configuration conf) {
148 Compressor compressor = borrow(compressorPool, codec.getCompressorType());
149 if (compressor == null) {
150 compressor = codec.createCompressor();
151 LOG.info("Got brand-new compressor ["+codec.getDefaultExtension()+"]");
152 } else {
153 compressor.reinit(conf);
154 if(LOG.isDebugEnabled()) {
155 LOG.debug("Got recycled compressor");
156 }
157 }
158 updateLeaseCount(compressorCounts, compressor, 1);
159 return compressor;
160 }
161
162 public static Compressor getCompressor(CompressionCodec codec) {
163 return getCompressor(codec, null);
164 }
165
166 /**
167 * Get a {@link Decompressor} for the given {@link CompressionCodec} from the
168 * pool or a new one.
169 *
170 * @param codec the <code>CompressionCodec</code> for which to get the
171 * <code>Decompressor</code>
172 * @return <code>Decompressor</code> for the given
173 * <code>CompressionCodec</code> the pool or a new one
174 */
175 public static Decompressor getDecompressor(CompressionCodec codec) {
176 Decompressor decompressor = borrow(decompressorPool, codec.getDecompressorType());
177 if (decompressor == null) {
178 decompressor = codec.createDecompressor();
179 LOG.info("Got brand-new decompressor ["+codec.getDefaultExtension()+"]");
180 } else {
181 if(LOG.isDebugEnabled()) {
182 LOG.debug("Got recycled decompressor");
183 }
184 }
185 updateLeaseCount(decompressorCounts, decompressor, 1);
186 return decompressor;
187 }
188
189 /**
190 * Return the {@link Compressor} to the pool.
191 *
192 * @param compressor the <code>Compressor</code> to be returned to the pool
193 */
194 public static void returnCompressor(Compressor compressor) {
195 if (compressor == null) {
196 return;
197 }
198 // if the compressor can't be reused, don't pool it.
199 if (compressor.getClass().isAnnotationPresent(DoNotPool.class)) {
200 return;
201 }
202 compressor.reset();
203 payback(compressorPool, compressor);
204 updateLeaseCount(compressorCounts, compressor, -1);
205 }
206
207 /**
208 * Return the {@link Decompressor} to the pool.
209 *
210 * @param decompressor the <code>Decompressor</code> to be returned to the
211 * pool
212 */
213 public static void returnDecompressor(Decompressor decompressor) {
214 if (decompressor == null) {
215 return;
216 }
217 // if the decompressor can't be reused, don't pool it.
218 if (decompressor.getClass().isAnnotationPresent(DoNotPool.class)) {
219 return;
220 }
221 decompressor.reset();
222 payback(decompressorPool, decompressor);
223 updateLeaseCount(decompressorCounts, decompressor, -1);
224 }
225
226 /**
227 * Return the number of leased {@link Compressor}s for this
228 * {@link CompressionCodec}
229 */
230 public static int getLeasedCompressorsCount(CompressionCodec codec) {
231 return (codec == null) ? 0 : getLeaseCount(compressorCounts,
232 codec.getCompressorType());
233 }
234
235 /**
236 * Return the number of leased {@link Decompressor}s for this
237 * {@link CompressionCodec}
238 */
239 public static int getLeasedDecompressorsCount(CompressionCodec codec) {
240 return (codec == null) ? 0 : getLeaseCount(decompressorCounts,
241 codec.getDecompressorType());
242 }
243 }