blob: 30ed0ba36b1ca69fa200ddd56c9e86015ac93277 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.spi.deployment.local;
import java.io.InputStream;
import java.util.Collection;
import java.util.LinkedList;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.compute.ComputeTask;
import org.apache.ignite.compute.ComputeTaskName;
import org.apache.ignite.internal.util.GridAnnotationsCache;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.resources.LoggerResource;
import org.apache.ignite.spi.IgniteSpiAdapter;
import org.apache.ignite.spi.IgniteSpiConsistencyChecked;
import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.IgniteSpiMBeanAdapter;
import org.apache.ignite.spi.IgniteSpiMultipleInstancesSupport;
import org.apache.ignite.spi.deployment.DeploymentListener;
import org.apache.ignite.spi.deployment.DeploymentResource;
import org.apache.ignite.spi.deployment.DeploymentResourceAdapter;
import org.apache.ignite.spi.deployment.DeploymentSpi;
import org.apache.ignite.spi.deployment.IgnoreIfPeerClassLoadingDisabled;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentLinkedHashMap;
/**
* Local deployment SPI that implements only within VM deployment on local
* node via {@link #register(ClassLoader, Class)} method. This SPI requires
* no configuration.
* <p>
* Note that if peer class loading is enabled (which is default behavior,
* see {@link org.apache.ignite.configuration.IgniteConfiguration#isPeerClassLoadingEnabled()}), then it is
* enough to deploy a task only on one node and all other nodes will load
* required classes from the node that initiated task execution.
* <p>
* <h1 class="header">Configuration</h1>
* This SPI requires no configuration.
* <h2 class="header">Example</h2>
* There is no point to explicitly configure {@code GridLocalDeploymentSpi}
* with {@link org.apache.ignite.configuration.IgniteConfiguration} as it is used by default and has no
* configuration parameters.
* @see org.apache.ignite.spi.deployment.DeploymentSpi
*/
@IgniteSpiMultipleInstancesSupport(true)
@IgniteSpiConsistencyChecked(optional = false)
@IgnoreIfPeerClassLoadingDisabled
public class LocalDeploymentSpi extends IgniteSpiAdapter implements DeploymentSpi {
/** Enables additional check for resource name on resources removal. */
public static final String IGNITE_DEPLOYMENT_ADDITIONAL_CHECK = "IGNITE.DEPLOYMENT.ADDITIONAL.CHECK";
/** Value for additional check on resources removal. */
private static final boolean ENABLE_IGNITE_DEPLOYMENT_ADDITIONAL_CHECK =
IgniteSystemProperties.getBoolean(IGNITE_DEPLOYMENT_ADDITIONAL_CHECK);
/** */
@LoggerResource
private IgniteLogger log;
/** Map of all resources. */
private ConcurrentLinkedHashMap<ClassLoader, ConcurrentMap<String, String>> ldrRsrcs =
new ConcurrentLinkedHashMap<>(16, 0.75f, 64);
/** Deployment SPI listener. */
private volatile DeploymentListener lsnr;
/** {@inheritDoc} */
@Override public void spiStart(@Nullable String igniteInstanceName) throws IgniteSpiException {
// Start SPI start stopwatch.
startStopwatch();
registerMBean(igniteInstanceName, new LocalDeploymentSpiMBeanImpl(this), LocalDeploymentSpiMBean.class);
if (log.isDebugEnabled())
log.debug(startInfo());
}
/** {@inheritDoc} */
@Override public void spiStop() throws IgniteSpiException {
unregisterMBean();
for (ClassLoader ldr : ldrRsrcs.descendingKeySet())
onClassLoaderReleased(ldr);
if (log.isDebugEnabled())
log.debug(stopInfo());
}
/** {@inheritDoc} */
@Nullable @Override public DeploymentResource findResource(String rsrcName) {
assert rsrcName != null;
// Last updated class loader has highest priority in search.
for (Entry<ClassLoader, ConcurrentMap<String, String>> e : ldrRsrcs.descendingEntrySet()) {
ClassLoader ldr = e.getKey();
ConcurrentMap<String, String> rsrcs = e.getValue();
String clsName = rsrcs.get(rsrcName);
// Return class if it was found in resources map.
if (clsName != null) {
// Recalculate resource name in case if access is performed by
// class name and not the resource name.
rsrcName = getResourceName(clsName, rsrcs);
assert clsName != null;
try {
Class<?> cls = Class.forName(clsName, true, ldr);
assert cls != null;
// Return resource.
return new DeploymentResourceAdapter(rsrcName, cls, ldr);
}
catch (ClassNotFoundException ignored) {
// No-op.
}
}
}
return null;
}
/**
* Gets resource name for a given class name.
*
* @param clsName Class name.
* @param rsrcs Map of resources.
* @return Resource name.
*/
private String getResourceName(String clsName, Map<String, String> rsrcs) {
String rsrcName = clsName;
for (Entry<String, String> e : rsrcs.entrySet()) {
if (e.getValue().equals(clsName) && !e.getKey().equals(clsName)) {
rsrcName = e.getKey();
break;
}
}
return rsrcName;
}
/** {@inheritDoc} */
@Override public boolean register(ClassLoader ldr, Class<?> rsrc) throws IgniteSpiException {
assert ldr != null;
assert rsrc != null;
if (log.isDebugEnabled())
log.debug("Registering [ldrRsrcs=" + ldrRsrcs + ", ldr=" + ldr + ", rsrc=" + rsrc + ']');
ConcurrentMap<String, String> clsLdrRsrcs = ldrRsrcs.getSafe(ldr);
if (clsLdrRsrcs == null) {
ConcurrentMap<String, String> old = ldrRsrcs.putIfAbsent(ldr,
clsLdrRsrcs = new ConcurrentHashMap<>());
if (old != null)
clsLdrRsrcs = old;
}
Map<String, String> newRsrcs = addResource(ldr, clsLdrRsrcs, rsrc);
Collection<ClassLoader> rmvClsLdrs = null;
if (!F.isEmpty(newRsrcs)) {
rmvClsLdrs = new LinkedList<>();
removeResources(ldr, newRsrcs, rmvClsLdrs);
}
if (rmvClsLdrs != null) {
for (ClassLoader cldLdr : rmvClsLdrs)
onClassLoaderReleased(cldLdr);
}
return !F.isEmpty(newRsrcs);
}
/** {@inheritDoc} */
@Override public boolean unregister(String rsrcName) {
Collection<ClassLoader> rmvClsLdrs = new LinkedList<>();
Map<String, String> rsrcs = U.newHashMap(1);
rsrcs.put(rsrcName, rsrcName);
boolean rmv = removeResources(null, rsrcs, rmvClsLdrs);
for (ClassLoader cldLdr : rmvClsLdrs)
onClassLoaderReleased(cldLdr);
return rmv;
}
/**
* Add new classes in class loader resource map.
* Note that resource map may contain two entries for one added class:
* task name -> class name and class name -> class name.
*
* @param ldr Registered class loader.
* @param ldrRsrcs Class loader resources.
* @param cls Registered classes collection.
* @return Map of new resources added for registered class loader.
* @throws org.apache.ignite.spi.IgniteSpiException If resource already registered. Exception thrown
* if registered resources conflicts with rule when all task classes must be
* annotated with different task names.
*/
@Nullable private Map<String, String> addResource(ClassLoader ldr, ConcurrentMap<String, String> ldrRsrcs,
Class<?> cls) throws IgniteSpiException {
assert ldr != null;
assert ldrRsrcs != null;
assert cls != null;
// Maps resources to classes.
// Map may contain 2 entries for one class.
Map<String, String> regRsrcs = U.newHashMap(2);
// Check alias collision for added classes.
String alias = null;
if (ComputeTask.class.isAssignableFrom(cls)) {
ComputeTaskName nameAnn = GridAnnotationsCache.getAnnotation(cls, ComputeTaskName.class);
if (nameAnn != null)
alias = nameAnn.value();
}
if (alias != null)
regRsrcs.put(alias, cls.getName());
regRsrcs.put(cls.getName(), cls.getName());
if (log.isDebugEnabled())
log.debug("Resources to register: " + regRsrcs);
Map<String, String> newRsrcs = null;
// Check collisions for added classes.
for (Entry<String, String> entry : regRsrcs.entrySet()) {
String oldCls = ldrRsrcs.putIfAbsent(entry.getKey(), entry.getValue());
if (oldCls != null) {
if (!oldCls.equals(entry.getValue()))
throw new IgniteSpiException("Failed to register resources with given task name " +
"(found another class with same task name in the same class loader) " +
"[taskName=" + entry.getKey() + ", existingCls=" + oldCls +
", newCls=" + entry.getValue() + ", ldr=" + ldr + ']');
}
else {
// New resource was added.
if (newRsrcs == null)
newRsrcs = U.newHashMap(regRsrcs.size());
newRsrcs.put(entry.getKey(), entry.getValue());
}
}
// New resources to register. Add it all.
if (newRsrcs != null)
ldrRsrcs.putAll(newRsrcs);
if (log.isDebugEnabled())
log.debug("New resources: " + newRsrcs);
return newRsrcs;
}
/**
* Remove resources for all class loaders except {@code ignoreClsLdr}.
*
* @param clsLdrToIgnore Ignored class loader or {@code null} to remove for all class loaders.
* @param rsrcs Resources that should be used in search for class loader to remove.
* @param rmvClsLdrs Class loaders to remove.
* @return {@code True} if resource was removed.
*/
private boolean removeResources(@Nullable ClassLoader clsLdrToIgnore, Map<String, String> rsrcs,
Collection<ClassLoader> rmvClsLdrs) {
assert rsrcs != null;
if (log.isDebugEnabled())
log.debug("Removing resources [clsLdrToIgnore=" + clsLdrToIgnore + ", rsrcs=" + rsrcs + ']');
boolean res = false;
for (Entry<ClassLoader, ConcurrentMap<String, String>> e : ldrRsrcs.descendingEntrySet()) {
ClassLoader ldr = e.getKey();
if (clsLdrToIgnore == null || !ldr.equals(clsLdrToIgnore)) {
Map<String, String> clsLdrRsrcs = e.getValue();
boolean isRmv = false;
// Check class loader registered resources.
for (String rsrcName : rsrcs.keySet()) {
// Remove class loader if resource found.
if (clsLdrRsrcs.containsKey(rsrcName) && ldrRsrcs.remove(ldr, clsLdrRsrcs)) {
// Add class loaders in collection to notify listener outside synchronization block.
rmvClsLdrs.add(ldr);
isRmv = true;
res = true;
if (log.isDebugEnabled())
log.debug("Removed resources [ldr=" + ldr + ", rsrcs=" + clsLdrRsrcs + ']');
break;
}
}
if (isRmv)
continue;
// Check is possible to load resources with class loader.
for (Entry<String, String> entry : rsrcs.entrySet()) {
// Check classes with class loader only when classes points to classes to avoid redundant check.
// Resources map contains two entries for class with task name(alias).
if (entry.getKey().equals(entry.getValue()) && isResourceExist(ldr, entry.getKey()) &&
!U.hasParent(clsLdrToIgnore, ldr) &&
(!ENABLE_IGNITE_DEPLOYMENT_ADDITIONAL_CHECK || clsLdrRsrcs.containsKey(entry.getKey())) &&
ldrRsrcs.remove(ldr, clsLdrRsrcs)) {
// Add class loaders in collection to notify listener outside synchronization block.
rmvClsLdrs.add(ldr);
if (log.isDebugEnabled())
log.debug("Removed resources after checking existence [ldr=" + ldr +
", clsLdrRsrcs=" + clsLdrRsrcs + ", rsrcs=" + rsrcs + ']');
res = true;
break;
}
}
}
}
return res;
}
/**
* Check is class can be reached.
*
* @param ldr Class loader.
* @param clsName Class name.
* @return {@code true} if class can be loaded.
*/
private boolean isResourceExist(ClassLoader ldr, String clsName) {
assert ldr != null;
assert clsName != null;
String rsrcName = clsName.replaceAll("\\.", "/") + ".class";
InputStream in = null;
try {
in = ldr.getResourceAsStream(rsrcName);
return in != null;
}
finally {
U.closeQuiet(in);
}
}
/**
* Notifies listener about released class loader.
*
* @param clsLdr Released class loader.
*/
private void onClassLoaderReleased(ClassLoader clsLdr) {
DeploymentListener tmp = lsnr;
if (tmp != null)
tmp.onUnregistered(clsLdr);
}
/** {@inheritDoc} */
@Override public void setListener(DeploymentListener lsnr) {
this.lsnr = lsnr;
}
/** {@inheritDoc} */
@Override public LocalDeploymentSpi setName(String name) {
super.setName(name);
return this;
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(LocalDeploymentSpi.class, this);
}
/**
* MBean implementation for LocalDeploymentSpi.
*/
private class LocalDeploymentSpiMBeanImpl extends IgniteSpiMBeanAdapter implements LocalDeploymentSpiMBean {
/** {@inheritDoc} */
LocalDeploymentSpiMBeanImpl(IgniteSpiAdapter spiAdapter) {
super(spiAdapter);
}
}
}