blob: d00b705d8ce4f2d9d9876bbb0b8d5e1c4d501f77 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cassandra.spark;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import org.apache.cassandra.bridge.BigNumberConfigImpl;
import org.apache.cassandra.bridge.CassandraBridgeFactory;
import org.apache.cassandra.bridge.CassandraVersion;
import org.apache.cassandra.clients.SidecarInstanceImpl;
import org.apache.cassandra.clients.SslConfig;
import org.apache.cassandra.spark.data.CassandraDataLayer;
import org.apache.cassandra.spark.data.LocalDataLayer;
import org.apache.cassandra.spark.data.ReplicationFactor;
import org.apache.cassandra.spark.data.partitioner.CassandraInstance;
import org.apache.cassandra.spark.data.partitioner.CassandraRing;
import org.apache.cassandra.spark.data.partitioner.TokenPartitioner;
import org.apache.spark.SparkConf;
import org.apache.spark.serializer.KryoRegistrator;
import org.jetbrains.annotations.NotNull;
/**
* Helper class to register classes for Kryo serialization
*/
public class KryoRegister implements KryoRegistrator
{
private static final Logger LOGGER = LoggerFactory.getLogger(KryoRegister.class);
private static final String SPARK_SERIALIZER = "spark.serializer";
private static final String SPARK_REGISTRATORS = "spark.kryo.registrator";
private static final Map<Class<?>, Serializer<?>> KRYO_SERIALIZERS = Collections.synchronizedMap(new LinkedHashMap<>());
static
{
// Cassandra-version-agnostic Kryo serializers
KRYO_SERIALIZERS.put(LocalDataLayer.class, new LocalDataLayer.Serializer());
KRYO_SERIALIZERS.put(CassandraInstance.class, new CassandraInstance.Serializer());
KRYO_SERIALIZERS.put(ReplicationFactor.class, new ReplicationFactor.Serializer());
KRYO_SERIALIZERS.put(CassandraRing.class, new CassandraRing.Serializer());
KRYO_SERIALIZERS.put(TokenPartitioner.class, new TokenPartitioner.Serializer());
KRYO_SERIALIZERS.put(CassandraDataLayer.class, new CassandraDataLayer.Serializer());
KRYO_SERIALIZERS.put(BigNumberConfigImpl.class, new BigNumberConfigImpl.Serializer());
KRYO_SERIALIZERS.put(SidecarInstanceImpl.class, new SidecarInstanceImpl.Serializer());
KRYO_SERIALIZERS.put(SslConfig.class, new SslConfig.Serializer());
}
public static <T> void addSerializer(@NotNull Class<T> type, @NotNull Serializer<T> serializer)
{
LOGGER.info("Registering custom Kryo serializer type={}", type.getName());
KRYO_SERIALIZERS.put(type, serializer);
}
@Override
public void registerClasses(@NotNull Kryo kryo)
{
LOGGER.info("Initializing KryoRegister");
// TODO: Implicitly defaulting to Cassandra version 4.0 is a part of a previously published API.
// We might want to persist the version of Cassandra into the Spark configuration instead.
CassandraBridgeFactory.get(CassandraVersion.FOURZERO).kryoRegister(kryo);
KRYO_SERIALIZERS.forEach(kryo::register);
}
public static void setup(@NotNull SparkConf configuration)
{
// Use KryoSerializer
LOGGER.info("Setting up Kryo");
configuration.set(SPARK_SERIALIZER, "org.apache.spark.serializer.KryoSerializer");
// Add KryoRegister to SparkConf serialization if not already there
Set<String> registratorsSet = Arrays.stream(configuration.get(SPARK_REGISTRATORS, "").split(","))
.filter(string -> string != null && !string.isEmpty())
.collect(Collectors.toSet());
registratorsSet.add(KryoRegister.class.getName());
String registratorsString = String.join(",", registratorsSet);
LOGGER.info("Setting kryo registrators: " + registratorsString);
configuration.set(SPARK_REGISTRATORS, registratorsString);
configuration.registerKryoClasses(new Class<?>[]{KryoRegister.class});
}
}