blob: 80e70118a9ed0f6124fd07ef5b404d3bf5850d6b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.giraph.writable.kryo;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.KryoException;
import com.esotericsoftware.kryo.Registration;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.esotericsoftware.kryo.util.DefaultClassResolver;
import com.esotericsoftware.kryo.util.ObjectMap;
import org.apache.giraph.zk.ZooKeeperExt;
import org.apache.log4j.Logger;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import java.util.HashMap;
import java.util.Map;
import java.util.List;
import static com.esotericsoftware.kryo.util.Util.getWrapperClass;
/**
* In order to avoid writing class names to the stream, this class resolver
* assigns unique integers to each class name and writes/reads those integers
* to/from the stream. Reads assume that there is already a class assigned
* to the given integer. This resolver only assigns unique integers for
* classes that are not explicitly registered since those classes are already
* assigned unique integers at the time of registration. This implementation
* uses zookeeper to provide consistent class name to ID mapping across all
+ nodes.
*
*
* If resolver encounters a class name that has not been assigned to a unique
* integer yet, it creates a class node in zookeeper under a designated path
* with persistent_sequential mode - allowing the file name of the class node
* to be suffixed with an auto incremented integer. After the class node is
* created, the resolver reads back all the nodes under the designated path
* and uses the unique suffix as the class id. If there are duplicate entries
* for the same class name due to some race condition, the lowest suffix is
* used.
*/
public class GiraphClassResolver extends DefaultClassResolver {
/** Base ID to start for class name assignments.
* This number has to be high enough to not conflict with
* explicity registered class IDs.
* */
private static final int BASE_CLASS_ID = 1000;
/** Class logger */
private static final Logger LOG =
Logger.getLogger(GiraphClassResolver.class);
/** Class name to ID cache */
private static Map<String, Integer> CLASS_NAME_TO_ID = new HashMap();
/** ID to class name cache */
private static Map<Integer, String> ID_TO_CLASS_NAME = new HashMap();
/** Zookeeper */
private static ZooKeeperExt ZK;
/** Zookeeper path for automatic class registrations */
private static String KRYO_REGISTERED_CLASS_PATH;
/** Minimum class ID assigned by zookeeper sequencing */
private static int MIN_CLASS_ID = -1;
/** True if the zookeeper class registration path is already created */
private static boolean IS_CLASS_PATH_CREATED = false;
/** Memoized class id*/
private int memoizedClassId = -1;
/** Memoized class registration */
private Registration memoizedClassIdValue;
/**
* Sets zookeeper informaton.
* @param zookeeperExt ZookeeperExt
* @param kryoClassPath Zookeeper directory path where class Name-ID
* mapping is stored.
*/
public static void setZookeeperInfo(ZooKeeperExt zookeeperExt,
String kryoClassPath) {
ZK = zookeeperExt;
KRYO_REGISTERED_CLASS_PATH = kryoClassPath;
}
/**
* Return true of the zookeeper is initialized.
* @return True if the zookeeper is initialized.
*/
public static boolean isInitialized() {
return ZK != null;
}
/**
* Creates a new node for the given class name.
* Creation mode is persistent sequential, i.e.
* ZK will always create a new node . There could be
* multiple entries for the same class name but since
* the lowest index is used, this is not a problem.
* @param className Class name
*/
public static void createClassName(String className) {
try {
String path = KRYO_REGISTERED_CLASS_PATH + "/" + className;
ZK.createExt(path,
null,
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT_SEQUENTIAL,
true);
} catch (KeeperException e) {
throw new IllegalStateException(
"Failed to create class " + className, e);
} catch (InterruptedException e) {
throw new IllegalStateException(
"Interrupted while creating " + className, e);
}
}
/**
* Refreshes class-ID mapping from zookeeper.
* Not thread safe.
*/
public static void refreshCache() {
if (!IS_CLASS_PATH_CREATED) {
try {
ZK.createOnceExt(KRYO_REGISTERED_CLASS_PATH,
null,
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT,
true);
IS_CLASS_PATH_CREATED = true;
} catch (KeeperException e) {
throw new IllegalStateException(
"Failed to refresh kryo cache " +
KRYO_REGISTERED_CLASS_PATH, e);
} catch (InterruptedException e) {
throw new IllegalStateException(
"Interrupted while refreshing kryo cache " +
KRYO_REGISTERED_CLASS_PATH, e);
}
}
List<String> registeredList;
try {
registeredList =
ZK.getChildrenExt(KRYO_REGISTERED_CLASS_PATH,
false,
true,
false);
} catch (KeeperException e) {
throw new IllegalStateException(
"Failed to retrieve child nodes for " + KRYO_REGISTERED_CLASS_PATH, e);
} catch (InterruptedException e) {
throw new IllegalStateException(
"Interrupted while retrieving child nodes for " +
KRYO_REGISTERED_CLASS_PATH, e);
}
for (String name : registeredList) {
// Since these files are created with PERSISTENT_SEQUENTIAL mode,
// Kryo appends a sequential number to their file name.
if (LOG.isDebugEnabled()) {
LOG.debug("Registered class: " + name);
}
String className = name.substring(0,
name.length() - ZooKeeperExt.SEQUENCE_NUMBER_LENGTH);
int classId = Integer.parseInt(
name.substring(name.length() - ZooKeeperExt.SEQUENCE_NUMBER_LENGTH));
if (MIN_CLASS_ID == -1) {
MIN_CLASS_ID = classId;
}
int adjustedId = classId - MIN_CLASS_ID + BASE_CLASS_ID;
if (CLASS_NAME_TO_ID.putIfAbsent(className, adjustedId) == null) {
ID_TO_CLASS_NAME.put(adjustedId, className);
}
}
}
/**
* Gets ID for the given class name.
* @param className Class name
* @return class id Class ID
*/
public static int getClassId(String className) {
if (CLASS_NAME_TO_ID.containsKey(className)) {
return CLASS_NAME_TO_ID.get(className);
}
synchronized (GiraphClassResolver.class) {
if (CLASS_NAME_TO_ID.containsKey(className)) {
return CLASS_NAME_TO_ID.get(className);
}
refreshCache();
if (!CLASS_NAME_TO_ID.containsKey(className)) {
createClassName(className);
refreshCache();
}
}
if (!CLASS_NAME_TO_ID.containsKey(className)) {
throw new IllegalStateException("Failed to assigned id to " + className);
}
return CLASS_NAME_TO_ID.get(className);
}
/**
* Get class name for given ID.
* @param id class ID
* @return class name
*/
public static String getClassName(int id) {
if (ID_TO_CLASS_NAME.containsKey(id)) {
return ID_TO_CLASS_NAME.get(id);
}
synchronized (GiraphClassResolver.class) {
if (ID_TO_CLASS_NAME.containsKey(id)) {
return ID_TO_CLASS_NAME.get(id);
}
refreshCache();
}
if (!ID_TO_CLASS_NAME.containsKey(id)) {
throw new IllegalStateException("ID " + id + " doesn't exist");
}
return ID_TO_CLASS_NAME.get(id);
}
@Override
public Registration register(Registration registration) {
if (registration == null) {
throw new IllegalArgumentException("registration cannot be null");
}
if (registration.getId() == NAME) {
throw new IllegalArgumentException("Invalid registration ID");
}
idToRegistration.put(registration.getId(), registration);
classToRegistration.put(registration.getType(), registration);
if (registration.getType().isPrimitive()) {
classToRegistration.put(getWrapperClass(registration.getType()),
registration);
}
return registration;
}
@Override
public Registration registerImplicit(Class type) {
return register(new Registration(type, kryo.getDefaultSerializer(type),
getClassId(type.getName())));
}
@Override
public Registration writeClass(Output output, Class type) {
if (type == null) {
output.writeVarInt(Kryo.NULL, true);
return null;
}
Registration registration = kryo.getRegistration(type);
if (registration.getId() == NAME) {
throw new IllegalStateException("Invalid registration ID");
} else {
// Class ID's are incremented by 2 when writing, because 0 is used
// for null and 1 is used for non-explicitly registered classes.
output.writeVarInt(registration.getId() + 2, true);
}
return registration;
}
@Override
public Registration readClass(Input input) {
int classID = input.readVarInt(true);
if (classID == Kryo.NULL) {
return null;
} else if (classID == NAME + 2) {
throw new IllegalStateException("Invalid class ID");
}
if (classID == memoizedClassId) {
return memoizedClassIdValue;
}
Registration registration = idToRegistration.get(classID - 2);
if (registration == null) {
String className = getClassName(classID - 2);
Class type = getTypeByName(className);
if (type == null) {
try {
type = Class.forName(className, false, kryo.getClassLoader());
} catch (ClassNotFoundException ex) {
throw new KryoException("Unable to find class: " + className, ex);
}
if (nameToClass == null) {
nameToClass = new ObjectMap();
}
nameToClass.put(className, type);
}
registration = new Registration(type, kryo.getDefaultSerializer(type),
classID - 2);
register(registration);
}
memoizedClassId = classID;
memoizedClassIdValue = registration;
return registration;
}
/**
* Reset the internal state
* Reset clears two hash tables:
* 1 - Class name to ID: Every non-explicitly registered class takes the
* ID agreed by all kryo instances, and it doesn't change across
* serializations, so this reset is not required.
* 2- Reference tracking: Not required because it is disabled.
*
* Therefore, this method should not be invoked.
*
*/
public void reset() {
throw new IllegalStateException("Not implemented");
}
/**
* This method writes the class name for the first encountered
* non-explicitly registered class. Since all non-explicitly registered
* classes take the ID agreed by all kryo instances, there is no need
* to write the class name, so this method should not be invoked.
* @param output Output stream
* @param type CLass type
* @param registration Registration
*/
@Override
protected void writeName(Output output, Class type,
Registration registration) {
throw new IllegalStateException("Not implemented");
}
/**
* This method reads the class name for the first encountered
* non-explicitly registered class. Since all non-explicitly registered
* classes take the ID agreed by all kryo instances, class name is
* never written, so this method should not be invoked.
* @param input Input stream
* @return Registration
*/
@Override
protected Registration readName(Input input) {
throw new IllegalStateException("Not implemented");
}
/**
* Get type by class name.
* @param className Class name
* @return class type
*/
protected Class<?> getTypeByName(final String className) {
return nameToClass != null ? nameToClass.get(className) : null;
}
}