blob: 31799055a3fba899eb0db679c6004d93fc0c36df [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim;
import org.apache.commons.configuration2.BaseConfiguration;
import org.apache.commons.configuration2.Configuration;
import org.apache.commons.configuration2.ConfigurationUtils;
import org.apache.tinkerpop.gremlin.util.SystemUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.ServiceLoader;
/**
* Loads the highest-priority or user-selected {@link KryoShimService}.
*/
public class KryoShimServiceLoader {
private static volatile KryoShimService cachedShimService;
private static volatile Configuration configuration;
private static final String maskedProperties = ".+\\.(password|keyStorePassword|trustStorePassword)|spark.authenticate.secret";
private static final Logger log = LoggerFactory.getLogger(KryoShimServiceLoader.class);
/**
* Set this system property to the fully-qualified name of a {@link KryoShimService}
* package-and-classname to force it into service. Setting this property causes the
* priority-selection mechanism ({@link KryoShimService#getPriority()}) to be ignored.
*/
public static final String KRYO_SHIM_SERVICE = "gremlin.io.kryoShimService";
public static void applyConfiguration(final Configuration configuration) {
if (null == KryoShimServiceLoader.configuration ||
null == KryoShimServiceLoader.cachedShimService ||
!KryoShimServiceLoader.configuration.getKeys().hasNext()) {
KryoShimServiceLoader.configuration = configuration;
load(true);
}
}
public static void close() {
if (null != cachedShimService)
cachedShimService.close();
cachedShimService = null;
configuration = null;
}
/**
* Return a reference to the shim service. This method may return a cached shim service
* unless {@code forceReload} is true. Calls to this method need not be externally
* synchonized.
*
* @param forceReload if false, this method may use its internal service cache; if true,
* this method must ignore cache, and it must invoke {@link ServiceLoader#reload()}
* before selecting a new service to return
* @return the shim service
*/
private static KryoShimService load(final boolean forceReload) {
// if the service is loaded and doesn't need reloading, simply return in
if (null != cachedShimService && !forceReload)
return cachedShimService;
// if a service is already loaded, close it
if (null != cachedShimService)
cachedShimService.close();
// if the configuration is null, try and load the configuration from System.properties
if (null == configuration)
configuration = SystemUtil.getSystemPropertiesConfiguration("tinkerpop", true);
// get all of the shim services
final ArrayList<KryoShimService> services = new ArrayList<>();
final ServiceLoader<KryoShimService> serviceLoader = ServiceLoader.load(KryoShimService.class);
synchronized (KryoShimServiceLoader.class) {
if (forceReload) serviceLoader.reload();
for (final KryoShimService kss : serviceLoader) {
services.add(kss);
}
}
// if a shim service class is specified in the configuration, use it -- else, priority-based
if (configuration.containsKey(KRYO_SHIM_SERVICE)) {
for (final KryoShimService kss : services) {
if (kss.getClass().getCanonicalName().equals(configuration.getString(KRYO_SHIM_SERVICE))) {
log.info("Set KryoShimService to {} because of configuration {}={}",
kss.getClass().getSimpleName(),
KRYO_SHIM_SERVICE,
configuration.getString(KRYO_SHIM_SERVICE));
cachedShimService = kss;
break;
}
}
} else {
services.sort(KryoShimServiceComparator.INSTANCE);
for (final KryoShimService kss : services) {
log.debug("Found KryoShimService: {} (priority {})", kss.getClass().getCanonicalName(), kss.getPriority());
}
if (0 != services.size()) {
cachedShimService = services.get(services.size() - 1);
log.info("Set KryoShimService to {} because its priority value ({}) is the best available",
cachedShimService.getClass().getSimpleName(), cachedShimService.getPriority());
}
}
// no shim service was available
if (null == cachedShimService)
throw new IllegalStateException("Unable to load KryoShimService");
// once the shim service is defined, configure it
log.info("Configuring KryoShimService {} with the following configuration:\n#######START########\n{}\n########END#########",
cachedShimService.getClass().getCanonicalName(),
ConfigurationUtils.toString(maskedConfiguration(configuration)));
cachedShimService.applyConfiguration(configuration);
return cachedShimService;
}
private static Configuration maskedConfiguration(final Configuration configuration) {
final Configuration maskedConfiguration = new BaseConfiguration();
final Iterator keys = configuration.getKeys();
while(keys.hasNext()) {
final String key = (String)keys.next();
if (key.matches(maskedProperties))
maskedConfiguration.setProperty(key, "******");
else
maskedConfiguration.setProperty(key, configuration.getProperty(key));
}
return maskedConfiguration;
}
/**
* A loose abstraction of {@link org.apache.tinkerpop.shaded.kryo.Kryo#writeClassAndObject},
* where the {@code output} parameter is an internally-created {@link ByteArrayOutputStream}. Returns
* the byte array underlying that stream.
*
* @param object an object for which the instance and class are serialized
* @return the serialized form
*/
public static byte[] writeClassAndObjectToBytes(final Object object) {
final KryoShimService shimService = load(false);
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
shimService.writeClassAndObject(object, baos);
return baos.toByteArray();
}
/**
* A loose abstraction of {@link org.apache.tinkerpop.shaded.kryo.Kryo#readClassAndObject},
* where the {@code input} parameter is {@code source}. Returns the deserialized object.
*
* @param inputStream an input stream containing data for a serialized object class and instance
* @param <T> the type to which the deserialized object is cast as it is returned
* @return the deserialized object
*/
public static <T> T readClassAndObject(final InputStream inputStream) {
final KryoShimService shimService = load(false);
return (T) shimService.readClassAndObject(inputStream);
}
/**
* Selects the service with greatest {@link KryoShimService#getPriority()}
* (not absolute value).
* <p>
* Breaks ties with lexicographical comparison of classnames where the
* name that sorts last is considered to have highest priority. Ideally
* nothing should rely on that tiebreaking behavior, but it beats random
* selection in case a user ever gets into that situation by accident and
* tries to figure out what's going on.
*/
private enum KryoShimServiceComparator implements Comparator<KryoShimService> {
INSTANCE;
@Override
public int compare(final KryoShimService a, final KryoShimService b) {
final int ap = a.getPriority();
final int bp = b.getPriority();
if (ap < bp) {
return -1;
} else if (bp < ap) {
return 1;
} else {
final int result = a.getClass().getCanonicalName().compareTo(b.getClass().getCanonicalName());
if (0 == result) {
log.warn("Found two {} implementations with the same canonical classname: {}. " +
"This may indicate a problem with the classpath/classloader such as " +
"duplicate or conflicting copies of the file " +
"META-INF/services/org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimService.",
a.getClass().getCanonicalName());
} else {
final String winner = 0 < result ? a.getClass().getCanonicalName() : b.getClass().getCanonicalName();
log.warn("{} implementations {} and {} are tied with priority value {}. " +
"Preferring {} to the other because it has a lexicographically greater classname. " +
"Consider setting the system property \"{}\" instead of relying on priority tie-breaking.",
KryoShimService.class.getSimpleName(), a, b, ap, winner, KRYO_SHIM_SERVICE);
}
return result;
}
}
}
}