blob: b27cc4bd0275f311626784a9a737b73e320ea74b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.managers.deployment;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentMap;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.compute.ComputeTask;
import org.apache.ignite.compute.ComputeTaskName;
import org.apache.ignite.configuration.DeploymentMode;
import org.apache.ignite.events.DeploymentEvent;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteDeploymentCheckedException;
import org.apache.ignite.internal.util.GridAnnotationsCache;
import org.apache.ignite.internal.util.GridClassLoaderCache;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.marshaller.AbstractMarshaller;
import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.deployment.DeploymentListener;
import org.apache.ignite.spi.deployment.DeploymentResource;
import org.apache.ignite.spi.deployment.DeploymentSpi;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.events.EventType.EVT_CLASS_DEPLOYED;
import static org.apache.ignite.events.EventType.EVT_CLASS_DEPLOY_FAILED;
import static org.apache.ignite.events.EventType.EVT_CLASS_UNDEPLOYED;
import static org.apache.ignite.events.EventType.EVT_TASK_DEPLOYED;
import static org.apache.ignite.events.EventType.EVT_TASK_DEPLOY_FAILED;
import static org.apache.ignite.events.EventType.EVT_TASK_UNDEPLOYED;
/**
* Storage for local deployments.
*/
class GridDeploymentLocalStore extends GridDeploymentStoreAdapter {
/** Deployment cache by class name. */
private final ConcurrentMap<String, Deque<GridDeployment>> cache = new ConcurrentHashMap<>();
/** Mutex. */
private final Object mux = new Object();
/**
* @param spi Deployment SPI.
* @param ctx Grid kernal context.
* @param comm Deployment communication.
*/
GridDeploymentLocalStore(DeploymentSpi spi, GridKernalContext ctx, GridDeploymentCommunication comm) {
super(spi, ctx, comm);
}
/** {@inheritDoc} */
@Override public void start() throws IgniteCheckedException {
spi.setListener(new LocalDeploymentListener());
if (log.isDebugEnabled())
log.debug(startInfo());
}
/** {@inheritDoc} */
@Override public void stop() {
spi.setListener(null);
Map<String, Collection<GridDeployment>> cp;
synchronized (mux) {
cp = new HashMap<String, Collection<GridDeployment>>(cache);
for (Entry<String, Collection<GridDeployment>> entry : cp.entrySet())
entry.setValue(new ArrayList<>(entry.getValue()));
}
for (Collection<GridDeployment> deps : cp.values()) {
for (GridDeployment cls : deps)
undeploy(cls.classLoader());
}
if (log.isDebugEnabled())
log.debug(stopInfo());
}
/** {@inheritDoc} */
@Override public Collection<GridDeployment> getDeployments() {
Collection<GridDeployment> deps = new ArrayList<>();
synchronized (mux) {
for (Deque<GridDeployment> depList : cache.values())
for (GridDeployment d : depList)
if (!deps.contains(d))
deps.add(d);
}
return deps;
}
/** {@inheritDoc} */
@Nullable @Override public GridDeployment getDeployment(IgniteUuid ldrId) {
synchronized (mux) {
for (Deque<GridDeployment> deps : cache.values())
for (GridDeployment dep : deps)
if (dep.classLoaderId().equals(ldrId))
return dep;
}
for (GridDeployment dep : ctx.task().getUsedDeployments())
if (dep.classLoaderId().equals(ldrId))
return dep;
return null;
}
/** {@inheritDoc} */
@Nullable @Override public GridDeployment getDeployment(GridDeploymentMetadata meta) {
if (log.isDebugEnabled())
log.debug("Deployment meta for local deployment: " + meta);
String alias = meta.alias();
// Validate metadata.
assert alias != null : "Meta is invalid: " + meta;
GridDeployment dep = deployment(alias);
if (dep != null) {
if (log.isDebugEnabled())
log.debug("Acquired deployment class from local cache: " + dep);
return dep;
}
DeploymentResource rsrc = spi.findResource(alias);
if (rsrc != null) {
dep = deploy(ctx.config().getDeploymentMode(), rsrc.getClassLoader(), rsrc.getResourceClass(), alias,
meta.record());
assert dep != null;
if (log.isDebugEnabled())
log.debug("Acquired deployment class from SPI: " + dep);
}
// Auto-deploy.
else {
ClassLoader ldr = meta.classLoader();
if (ldr == null) {
ldr = Thread.currentThread().getContextClassLoader();
// Safety.
if (ldr == null)
ldr = U.resolveClassLoader(ctx.config());
}
if (ldr instanceof GridDeploymentClassLoader) {
if (log.isDebugEnabled())
log.debug("Skipping local auto-deploy (nested execution) [ldr=" + ldr + ", meta=" + meta + ']');
return null;
}
try {
// Check that class can be loaded.
String clsName = meta.className();
Class<?> cls = Class.forName(clsName != null ? clsName : alias, true, ldr);
spi.register(ldr, cls);
rsrc = spi.findResource(cls.getName());
if (rsrc != null && rsrc.getResourceClass().equals(cls)) {
if (log.isDebugEnabled())
log.debug("Retrieved auto-loaded resource from spi: " + rsrc);
dep = deploy(ctx.config().getDeploymentMode(), ldr, cls, meta.alias(), meta.record());
assert dep != null;
}
else {
U.warn(log, "Failed to find resource from deployment SPI even after registering: " + meta);
return null;
}
}
catch (ClassNotFoundException ignored) {
if (log.isDebugEnabled())
log.debug("Failed to load class for local auto-deployment [ldr=" + ldr + ", meta=" + meta + ']');
return null;
}
catch (IgniteSpiException e) {
U.error(log, "Failed to deploy local class with meta: " + meta, e);
return null;
}
}
if (log.isDebugEnabled())
log.debug("Acquired deployment class: " + dep);
return dep;
}
/**
* @param alias Class alias.
* @return Deployment.
*/
@Nullable private GridDeployment deployment(String alias) {
Deque<GridDeployment> deps = cache.get(alias);
if (deps != null) {
GridDeployment dep = deps.peekFirst();
if (dep != null && !dep.undeployed())
return dep;
}
return null;
}
/**
* @param depMode Deployment mode.
* @param ldr Class loader to deploy.
* @param cls Class.
* @param alias Class alias.
* @param recordEvt {@code True} to record event.
* @return Deployment.
*/
private GridDeployment deploy(DeploymentMode depMode, ClassLoader ldr, Class<?> cls, String alias,
boolean recordEvt) {
GridDeployment dep = null;
synchronized (mux) {
boolean fireEvt = false;
try {
Deque<GridDeployment> cachedDeps = null;
// Find existing class loader info.
for (Deque<GridDeployment> deps : cache.values()) {
for (GridDeployment d : deps) {
if (d.classLoader() == ldr) {
// Cache class and alias.
fireEvt = d.addDeployedClass(cls, alias);
cachedDeps = deps;
dep = d;
break;
}
}
if (cachedDeps != null)
break;
}
if (cachedDeps != null) {
assert dep != null;
cache.put(alias, cachedDeps);
if (!cls.getName().equals(alias))
// Cache by class name as well.
cache.put(cls.getName(), cachedDeps);
return dep;
}
IgniteUuid ldrId = IgniteUuid.fromUuid(ctx.localNodeId());
String userVer = userVersion(ldr);
dep = new GridDeployment(depMode, ldr, ldrId, userVer, cls.getName(), true);
fireEvt = dep.addDeployedClass(cls, alias);
assert fireEvt : "Class was not added to newly created deployment [cls=" + cls +
", depMode=" + depMode + ", dep=" + dep + ']';
Deque<GridDeployment> deps = F.<String, Deque<GridDeployment>>addIfAbsent(
cache,
alias,
ConcurrentLinkedDeque::new
);
if (!deps.isEmpty()) {
for (GridDeployment d : deps) {
if (!d.undeployed()) {
U.error(log, "Found more than one active deployment for the same resource " +
"[cls=" + cls + ", depMode=" + depMode + ", dep=" + d + ']');
return null;
}
}
}
// Add at the beginning of the list for future fast access.
deps.addFirst(dep);
if (!cls.getName().equals(alias))
// Cache by class name as well.
cache.put(cls.getName(), deps);
if (log.isDebugEnabled())
log.debug("Created new deployment: " + dep);
}
finally {
if (fireEvt)
recordDeploy(cls, alias, recordEvt);
}
}
return dep;
}
/** {@inheritDoc} */
@Nullable @Override public GridDeployment explicitDeploy(Class<?> cls, ClassLoader clsLdr) throws IgniteCheckedException {
try {
// Make sure not to deploy peer loaded tasks with non-local class loader,
// if local one exists.
if (clsLdr.getClass().equals(GridDeploymentClassLoader.class))
clsLdr = clsLdr.getParent();
spi.register(clsLdr, cls);
GridDeployment dep = deployment(cls.getName());
if (dep == null) {
DeploymentResource rsrc = spi.findResource(cls.getName());
if (rsrc != null && rsrc.getClassLoader() == clsLdr)
dep = deploy(ctx.config().getDeploymentMode(), rsrc.getClassLoader(),
rsrc.getResourceClass(), rsrc.getName(), true);
}
return dep;
}
catch (IgniteSpiException e) {
recordDeployFailed(cls, clsLdr, true);
// Avoid double wrapping.
if (e.getCause() instanceof IgniteCheckedException)
throw (IgniteCheckedException)e.getCause();
throw new IgniteDeploymentCheckedException("Failed to deploy class: " + cls.getName(), e);
}
}
/** {@inheritDoc} */
@Override public void explicitUndeploy(UUID nodeId, String rsrcName) {
assert rsrcName != null;
// Simply delegate to SPI.
// Internal cache will be cleared once undeployment callback is received from SPI.
spi.unregister(rsrcName);
}
/** {@inheritDoc} */
@Override public void addParticipants(Map<UUID, IgniteUuid> allParticipants,
Map<UUID, IgniteUuid> addedParticipants) {
assert false;
}
/**
* Records deploy event.
* <p>
* This needs to be called in synchronized block.
*
* @param cls Deployed class.
* @param alias Class alias.
* @param recordEvt Flag indicating whether to record events.
*/
private void recordDeploy(Class<?> cls, String alias, boolean recordEvt) {
assert cls != null;
boolean isTask = isTask(cls);
String msg = (isTask ? "Task" : "Class") + " locally deployed: " + cls;
if (recordEvt && ctx.event().isRecordable(isTask ? EVT_TASK_DEPLOYED : EVT_CLASS_DEPLOYED)) {
DeploymentEvent evt = new DeploymentEvent();
evt.message(msg);
evt.node(ctx.discovery().localNode());
evt.type(isTask ? EVT_TASK_DEPLOYED : EVT_CLASS_DEPLOYED);
evt.alias(alias);
ctx.event().record(evt);
}
// Don't record JDK or Grid classes.
if (U.isGrid(cls) || U.isJdk(cls))
return;
if (log.isInfoEnabled())
log.info(msg);
}
/**
* Records deploy event.
*
* @param cls Deployed class.
* @param clsLdr Class loader.
* @param recordEvt Flag indicating whether to record events.
*/
@SuppressWarnings({"unchecked"})
private void recordDeployFailed(Class<?> cls, ClassLoader clsLdr, boolean recordEvt) {
assert cls != null;
assert clsLdr != null;
boolean isTask = isTask(cls);
String msg = "Failed to deploy " + (isTask ? "task" : "class") + " [cls=" + cls + ", clsLdr=" + clsLdr + ']';
if (recordEvt && ctx.event().isRecordable(isTask ? EVT_CLASS_DEPLOY_FAILED : EVT_TASK_DEPLOY_FAILED)) {
String taskName = isTask ? U.getTaskName((Class<? extends ComputeTask<?, ?>>)cls) : null;
DeploymentEvent evt = new DeploymentEvent();
evt.message(msg);
evt.node(ctx.discovery().localNode());
evt.type(isTask(cls) ? EVT_CLASS_DEPLOY_FAILED : EVT_TASK_DEPLOY_FAILED);
evt.alias(taskName);
ctx.event().record(evt);
}
if (log.isInfoEnabled())
log.info(msg);
}
/**
* Records undeploy event.
*
* @param dep Undeployed class loader.
*/
private void recordUndeploy(GridDeployment dep) {
assert dep.undeployed();
if (ctx.event().isRecordable(EVT_TASK_UNDEPLOYED) ||
ctx.event().isRecordable(EVT_CLASS_UNDEPLOYED)) {
for (Class<?> cls : dep.deployedClasses()) {
boolean isTask = isTask(cls);
String msg = isTask ? "Task locally undeployed: " + cls : "Class locally undeployed: " + cls;
if (ctx.event().isRecordable(isTask ? EVT_TASK_UNDEPLOYED : EVT_CLASS_UNDEPLOYED)) {
DeploymentEvent evt = new DeploymentEvent();
evt.message(msg);
evt.node(ctx.discovery().localNode());
evt.type(isTask ? EVT_TASK_UNDEPLOYED : EVT_CLASS_UNDEPLOYED);
evt.alias(getAlias(dep, cls));
ctx.event().record(evt);
}
if (log.isInfoEnabled())
log.info(msg);
}
}
}
/**
* Gets alias for a class.
*
* @param dep Deployment.
* @param cls Class.
* @return Alias for a class.
*/
private String getAlias(GridDeployment dep, Class<?> cls) {
String alias = cls.getName();
if (isTask(cls)) {
ComputeTaskName ann = dep.annotation(cls, ComputeTaskName.class);
if (ann != null)
alias = ann.value();
}
return alias;
}
/**
* @param ldr Class loader to undeploy.
*/
private void undeploy(ClassLoader ldr) {
Collection<GridDeployment> doomed = new HashSet<>();
synchronized (mux) {
for (Iterator<Deque<GridDeployment>> i1 = cache.values().iterator(); i1.hasNext();) {
Deque<GridDeployment> deps = i1.next();
for (Iterator<GridDeployment> i2 = deps.iterator(); i2.hasNext();) {
GridDeployment dep = i2.next();
if (dep.classLoader() == ldr) {
dep.undeploy();
i2.remove();
doomed.add(dep);
if (log.isInfoEnabled())
log.info("Removed undeployed class: " + dep);
}
}
if (deps.isEmpty())
i1.remove();
}
}
for (GridDeployment dep : doomed) {
if (dep.obsolete()) {
// Resource cleanup.
ctx.resource().onUndeployed(dep);
// Clear optimized marshaller's cache.
if (ctx.config().getMarshaller() instanceof AbstractMarshaller)
((AbstractMarshaller)ctx.config().getMarshaller()).onUndeploy(ldr);
clearSerializationCaches();
// Class loader cache should be cleared in the last order.
GridAnnotationsCache.onUndeployed(ldr);
GridClassLoaderCache.onUndeployed(ldr);
}
recordUndeploy(dep);
}
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridDeploymentLocalStore.class, this);
}
/**
*
*/
private class LocalDeploymentListener implements DeploymentListener {
/** {@inheritDoc} */
@Override public void onUnregistered(ClassLoader ldr) {
if (log.isDebugEnabled())
log.debug("Received callback from SPI to unregister class loader: " + ldr);
undeploy(ldr);
}
}
}