001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.io.serializer.avro;
020
021 import java.util.HashSet;
022 import java.util.Set;
023
024 import org.apache.avro.Schema;
025 import org.apache.avro.io.DatumReader;
026 import org.apache.avro.io.DatumWriter;
027 import org.apache.avro.reflect.ReflectData;
028 import org.apache.avro.reflect.ReflectDatumReader;
029 import org.apache.avro.reflect.ReflectDatumWriter;
030 import org.apache.hadoop.classification.InterfaceAudience;
031 import org.apache.hadoop.classification.InterfaceStability;
032
033 /**
034 * Serialization for Avro Reflect classes. For a class to be accepted by this
035 * serialization, it must either be in the package list configured via
036 * <code>avro.reflect.pkgs</code> or implement
037 * {@link AvroReflectSerializable} interface.
038 *
039 */
040 @SuppressWarnings("unchecked")
041 @InterfaceAudience.Public
042 @InterfaceStability.Evolving
043 public class AvroReflectSerialization extends AvroSerialization<Object>{
044
045 /**
046 * Key to configure packages that contain classes to be serialized and
047 * deserialized using this class. Multiple packages can be specified using
048 * comma-separated list.
049 */
050 @InterfaceAudience.Private
051 public static final String AVRO_REFLECT_PACKAGES = "avro.reflect.pkgs";
052
053 private Set<String> packages;
054
055 @InterfaceAudience.Private
056 @Override
057 public synchronized boolean accept(Class<?> c) {
058 if (packages == null) {
059 getPackages();
060 }
061 return AvroReflectSerializable.class.isAssignableFrom(c) ||
062 (c.getPackage() != null && packages.contains(c.getPackage().getName()));
063 }
064
065 private void getPackages() {
066 String[] pkgList = getConf().getStrings(AVRO_REFLECT_PACKAGES);
067 packages = new HashSet<String>();
068 if (pkgList != null) {
069 for (String pkg : pkgList) {
070 packages.add(pkg.trim());
071 }
072 }
073 }
074
075 @InterfaceAudience.Private
076 @Override
077 public DatumReader getReader(Class<Object> clazz) {
078 try {
079 return new ReflectDatumReader(clazz);
080 } catch (Exception e) {
081 throw new RuntimeException(e);
082 }
083 }
084
085 @InterfaceAudience.Private
086 @Override
087 public Schema getSchema(Object t) {
088 return ReflectData.get().getSchema(t.getClass());
089 }
090
091 @InterfaceAudience.Private
092 @Override
093 public DatumWriter getWriter(Class<Object> clazz) {
094 return new ReflectDatumWriter();
095 }
096
097 }