blob: a6c00ed7d16f5fc1caa85558b6fadf509f30ad9b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io.serial;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SERIALIZATIONS_KEY;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_SERIALIZATIONS_KEY;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.serial.lib.CompatibilitySerialization;
import org.apache.hadoop.io.serial.lib.WritableSerialization;
import org.apache.hadoop.io.serial.lib.avro.AvroSerialization;
import org.apache.hadoop.io.serial.lib.protobuf.ProtoBufSerialization;
import org.apache.hadoop.io.serial.lib.thrift.ThriftSerialization;
import org.apache.hadoop.util.ReflectionUtils;
/**
* A factory that finds and creates Serializations.
*
* There are two methods. The first finds a Serialization by its name (ie.
* avro, writable, thrift, etc.). The second finds a TypedSerialization based
* on the type that needs to be serialized.
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public class SerializationFactory {
private static final Log LOG = LogFactory.getLog(SerializationFactory.class);
private final List<TypedSerialization<?>> typedSerializations =
new ArrayList<TypedSerialization<?>>();
private final Map<String, Serialization<?>> serializations =
new HashMap<String, Serialization<?>>();
public SerializationFactory(Configuration conf) {
Class<?>[] classes =
conf.getClasses(HADOOP_SERIALIZATIONS_KEY,
new Class<?>[]{WritableSerialization.class,
ProtoBufSerialization.class,
ThriftSerialization.class,
AvroSerialization.class,
CompatibilitySerialization.class});
for(Class<?> cls: classes) {
if (Serialization.class.isAssignableFrom(cls)) {
Serialization<?> serial =
(Serialization<?>) ReflectionUtils.newInstance(cls, conf);
if (serial instanceof TypedSerialization<?>) {
typedSerializations.add((TypedSerialization<?>) serial);
}
String name = serial.getName();
if (serializations.containsKey(name)) {
throw new IllegalArgumentException("Two serializations have the" +
" same name: " + name);
}
serializations.put(serial.getName(), serial);
LOG.debug("Adding serialization " + serial.getName());
} else {
throw new IllegalArgumentException("Unknown serialization class " +
cls.getName());
}
}
}
private static final Map<String, SerializationFactory> FACTORY_CACHE =
new HashMap<String, SerializationFactory>();
/**
* Get the cached factory for the given configuration. Two configurations
* that have the same io.configurations value will be considered identical
* because we can't keep a reference to the Configuration without locking it
* in memory.
* @param conf the configuration
* @return the factory for a given configuration
*/
public static synchronized
SerializationFactory getInstance(Configuration conf) {
String serializerNames = conf.get(HADOOP_SERIALIZATIONS_KEY, "*default*");
String obsoleteSerializerNames = conf.get(IO_SERIALIZATIONS_KEY, "*default*");
String key = serializerNames + " " + obsoleteSerializerNames;
SerializationFactory result = FACTORY_CACHE.get(key);
if (result == null) {
result = new SerializationFactory(conf);
FACTORY_CACHE.put(key, result);
}
return result;
}
/**
* Look up a serialization by name and return a clone of it.
* @param name
* @return a newly cloned serialization of the right name
*/
public Serialization<?> getSerialization(String name) {
return serializations.get(name).clone();
}
/**
* Find the first acceptable serialization for a given type.
* @param cls the class that should be serialized
* @return a serialization that should be used to serialize the class
*/
@SuppressWarnings("unchecked")
public <T> TypedSerialization<? super T> getSerializationByType(Class<T> cls){
for (TypedSerialization<?> serial: typedSerializations) {
if (serial.accept(cls)) {
TypedSerialization<? super T> result =
(TypedSerialization<? super T>) serial.clone();
result.setSpecificType(cls);
return result;
}
}
throw new IllegalArgumentException("Could not find a serialization to"+
" accept " + cls.getName());
}
}