blob: 01d8604ceaffd0b7674c6d7c2c9359feaebaa322 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.managers.deployment;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.compute.ComputeTask;
import org.apache.ignite.compute.ComputeTaskName;
import org.apache.ignite.configuration.DeploymentMode;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.managers.GridManagerAdapter;
import org.apache.ignite.internal.managers.deployment.protocol.gg.GridProtocolHandler;
import org.apache.ignite.internal.processors.task.GridInternal;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.spi.deployment.DeploymentSpi;
import org.apache.ignite.spi.deployment.IgnoreIfPeerClassLoadingDisabled;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.configuration.DeploymentMode.CONTINUOUS;
import static org.apache.ignite.configuration.DeploymentMode.ISOLATED;
import static org.apache.ignite.configuration.DeploymentMode.PRIVATE;
import static org.apache.ignite.configuration.DeploymentMode.SHARED;
/**
* Deployment manager.
*/
public class GridDeploymentManager extends GridManagerAdapter<DeploymentSpi> {
/** Local deployment storage. */
private GridDeploymentStore locStore;
/** Isolated mode storage. */
private GridDeploymentStore ldrStore;
/** Shared mode storage. */
private GridDeploymentStore verStore;
/** */
private GridDeploymentCommunication comm;
/** */
private final GridDeployment locDep;
/**
* @param ctx Grid kernal context.
*/
public GridDeploymentManager(GridKernalContext ctx) {
super(ctx, ctx.config().getDeploymentSpi());
if (!ctx.config().isPeerClassLoadingEnabled()) {
DeploymentSpi spi = ctx.config().getDeploymentSpi();
IgnoreIfPeerClassLoadingDisabled ann = U.getAnnotation(spi.getClass(),
IgnoreIfPeerClassLoadingDisabled.class);
locDep = ann != null ?
new LocalDeployment(
ctx.config().getDeploymentMode(),
ctx.config().getClassLoader() != null ? ctx.config().getClassLoader() : U.gridClassLoader(),
IgniteUuid.fromUuid(ctx.localNodeId()),
ctx.userVersion(U.gridClassLoader()),
String.class.getName()) :
null;
}
else
locDep = null;
}
/** {@inheritDoc} */
@Override public void start() throws IgniteCheckedException {
GridProtocolHandler.registerDeploymentManager(this);
assertParameter(ctx.config().getDeploymentMode() != null, "ctx.config().getDeploymentMode() != null");
if (ctx.config().isPeerClassLoadingEnabled())
assertParameter(ctx.config().getNetworkTimeout() > 0, "networkTimeout > 0");
startSpi();
comm = new GridDeploymentCommunication(ctx, log);
comm.start();
startStores();
if (log.isDebugEnabled()) {
log.debug("Local deployment: " + locDep);
log.debug(startInfo());
}
}
/** {@inheritDoc} */
@Override public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException {
storesOnKernalStop();
storesStop();
startStores();
}
/** {@inheritDoc} */
@Override public IgniteInternalFuture<?> onReconnected(boolean clusterRestarted) throws IgniteCheckedException {
storesOnKernalStart();
return null;
}
/** {@inheritDoc} */
@Override public void stop(boolean cancel) throws IgniteCheckedException {
GridProtocolHandler.deregisterDeploymentManager();
storesStop();
if (comm != null)
comm.stop();
getSpi().setListener(null);
stopSpi();
if (log.isDebugEnabled())
log.debug(stopInfo());
}
/** {@inheritDoc} */
@Override public void onKernalStart0() throws IgniteCheckedException {
storesOnKernalStart();
}
/** {@inheritDoc} */
@Override public void onKernalStop0(boolean cancel) {
storesOnKernalStop();
}
/** {@inheritDoc} */
@Override public boolean enabled() {
return super.enabled() && locDep == null;
}
/**
* @param p Filtering predicate.
* @return All deployed tasks for given predicate.
*/
@SuppressWarnings("unchecked")
public Map<String, Class<? extends ComputeTask<?, ?>>> findAllTasks(
@Nullable IgnitePredicate<? super Class<? extends ComputeTask<?, ?>>>... p) {
Map<String, Class<? extends ComputeTask<?, ?>>> map = new HashMap<>();
if (locDep != null)
tasks(map, locDep, p);
else {
Collection<GridDeployment> deps = locStore.getDeployments();
for (GridDeployment dep : deps)
tasks(map, dep, p);
}
return map;
}
/**
* @param map Map (out parameter).
* @param dep Deployment.
* @param p Predicate.
*/
private void tasks(Map<String, Class<? extends ComputeTask<?, ?>>> map, GridDeployment dep,
IgnitePredicate<? super Class<? extends ComputeTask<?, ?>>>[] p) {
assert map != null;
assert dep != null;
for (Map.Entry<String, Class<?>> clsEntry : dep.deployedClassMap().entrySet()) {
if (ComputeTask.class.isAssignableFrom(clsEntry.getValue())) {
Class<? extends ComputeTask<?, ?>> taskCls = (Class<? extends ComputeTask<?, ?>>)clsEntry.getValue();
if (F.isAll(taskCls, p))
map.put(clsEntry.getKey(), taskCls);
}
}
}
/**
* @param taskName Task name.
* @param locUndeploy Local undeploy flag.
* @param rmtNodes Nodes to send request to.
*/
public void undeployTask(String taskName, boolean locUndeploy, Collection<ClusterNode> rmtNodes) {
assert taskName != null;
assert !rmtNodes.contains(ctx.discovery().localNode());
if (locDep == null) {
if (locUndeploy)
locStore.explicitUndeploy(null, taskName);
try {
comm.sendUndeployRequest(taskName, rmtNodes);
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to send undeployment request for task: " + taskName, e);
}
}
}
/**
* @param nodeId Node ID.
* @param taskName Task name.
*/
void undeployTask(UUID nodeId, String taskName) {
assert taskName != null;
if (locDep != null) {
U.warn(log, "Received unexpected undeploy request [nodeId=" + nodeId + ", taskName=" + taskName + ']');
return;
}
locStore.explicitUndeploy(nodeId, taskName);
ldrStore.explicitUndeploy(nodeId, taskName);
verStore.explicitUndeploy(nodeId, taskName);
}
/**
* @param cls Class to deploy.
* @param clsLdr Class loader.
* @throws IgniteCheckedException If deployment failed.
* @return Grid deployment.
*/
@Nullable public GridDeployment deploy(Class<?> cls, ClassLoader clsLdr) throws IgniteCheckedException {
if (clsLdr == null)
clsLdr = getClass().getClassLoader();
String clsName = cls.getName();
String lambdaParent = U.lambdaEnclosingClassName(clsName);
if (lambdaParent != null) {
clsName = lambdaParent;
// Need to override passed in class if class is Lambda.
try {
cls = Class.forName(clsName, true, clsLdr);
}
catch (ClassNotFoundException e) {
throw new IgniteCheckedException("Cannot deploy parent class for lambda: " + clsName, e);
}
}
if (clsLdr instanceof GridDeploymentClassLoader) {
GridDeploymentInfo ldr = (GridDeploymentInfo)clsLdr;
// Expecting that peer-deploy awareness handled on upper level.
if ((ldr.deployMode() == ISOLATED || ldr.deployMode() == PRIVATE) &&
(ctx.config().getDeploymentMode() == SHARED || ctx.config().getDeploymentMode() == CONTINUOUS) &&
!U.hasAnnotation(cls, GridInternal.class))
throw new IgniteCheckedException("Attempt to deploy class loaded in ISOLATED or PRIVATE mode on node with " +
"SHARED or CONTINUOUS deployment mode [cls=" + cls + ", clsDeployMode=" + ldr.deployMode() +
", localDeployMode=" + ctx.config().getDeploymentMode() + ']');
GridDeploymentMetadata meta = new GridDeploymentMetadata();
meta.alias(clsName);
meta.classLoader(clsLdr);
// Check for nested execution. In that case, if task
// is available locally by name, then we should ignore
// class loader ID.
GridDeployment dep = locStore.getDeployment(meta);
if (dep == null) {
dep = ldrStore.getDeployment(ldr.classLoaderId());
if (dep == null)
dep = verStore.getDeployment(ldr.classLoaderId());
}
return dep;
}
else if (locDep != null) {
if (ComputeTask.class.isAssignableFrom(cls)) {
ComputeTaskName taskNameAnn = locDep.annotation(cls, ComputeTaskName.class);
if (taskNameAnn != null)
locDep.addDeployedClass(cls, taskNameAnn.value());
}
return locDep;
}
else
return locStore.explicitDeploy(cls, clsLdr);
}
/**
* Gets any deployment by loader ID.
*
* @param ldrId Loader ID.
* @return Deployment for given ID.
*/
@Nullable public GridDeployment getDeployment(IgniteUuid ldrId) {
if (locDep != null)
return locDep.classLoaderId().equals(ldrId) ? locDep : null;
GridDeployment dep = locStore.getDeployment(ldrId);
if (dep == null) {
dep = ldrStore.getDeployment(ldrId);
if (dep == null)
dep = verStore.getDeployment(ldrId);
}
return dep;
}
/**
* @param rsrcName Resource to find deployment for.
* @return Found deployment or {@code null} if one was not found.
*/
@Nullable public GridDeployment getDeployment(String rsrcName) {
if (locDep != null)
return locDep;
GridDeployment dep = getLocalDeployment(rsrcName);
if (dep == null) {
ClassLoader ldr = Thread.currentThread().getContextClassLoader();
if (ldr instanceof GridDeploymentClassLoader) {
GridDeploymentInfo depLdr = (GridDeploymentInfo)ldr;
dep = ldrStore.getDeployment(depLdr.classLoaderId());
if (dep == null)
dep = verStore.getDeployment(depLdr.classLoaderId());
}
}
return dep;
}
/**
* @param rsrcName Class name.
* @return Grid cached task.
*/
@Nullable public GridDeployment getLocalDeployment(String rsrcName) {
if (locDep != null)
return locDep;
String lambdaEnclosingClsName = U.lambdaEnclosingClassName(rsrcName);
String clsName = lambdaEnclosingClsName == null ? rsrcName : lambdaEnclosingClsName;
GridDeploymentMetadata meta = new GridDeploymentMetadata();
meta.record(true);
meta.deploymentMode(ctx.config().getDeploymentMode());
meta.alias(rsrcName);
meta.className(clsName);
meta.senderNodeId(ctx.localNodeId());
return locStore.getDeployment(meta);
}
/**
* @param depMode Deployment mode.
* @param rsrcName Resource name (could be task name).
* @param clsName Class name.
* @param userVer User version.
* @param sndNodeId Sender node ID.
* @param clsLdrId Class loader ID.
* @param participants Node class loader participant map.
* @param nodeFilter Node filter for class loader.
* @return Deployment class if found.
*/
@Nullable public GridDeployment getGlobalDeployment(
DeploymentMode depMode,
String rsrcName,
String clsName,
String userVer,
UUID sndNodeId,
IgniteUuid clsLdrId,
Map<UUID, IgniteUuid> participants,
@Nullable IgnitePredicate<ClusterNode> nodeFilter) {
if (locDep != null)
return locDep;
String lambdaEnclosingClsName = U.lambdaEnclosingClassName(clsName);
if (lambdaEnclosingClsName != null)
clsName = lambdaEnclosingClsName;
GridDeploymentMetadata meta = new GridDeploymentMetadata();
meta.deploymentMode(depMode);
meta.className(clsName);
meta.alias(rsrcName);
meta.userVersion(userVer);
meta.senderNodeId(sndNodeId);
meta.classLoaderId(clsLdrId);
meta.participants(participants);
meta.nodeFilter(nodeFilter);
if (!ctx.config().isPeerClassLoadingEnabled()) {
meta.record(true);
return locStore.getDeployment(meta);
}
// In shared mode, if class is locally available, we never load
// from remote node simply because the class loader needs to be "shared".
if (isPerVersionMode(meta.deploymentMode())) {
meta.record(true);
boolean reuse = true;
// Check local exclusions.
if (!sndNodeId.equals(ctx.localNodeId())) {
String[] p2pExc = ctx.config().getPeerClassLoadingLocalClassPathExclude();
if (p2pExc != null) {
for (String rsrc : p2pExc) {
// Remove star (*) at the end.
if (rsrc.endsWith("*"))
rsrc = rsrc.substring(0, rsrc.length() - 1);
if (meta.alias().startsWith(rsrc) || meta.className().startsWith(rsrc)) {
if (log.isDebugEnabled())
log.debug("Will not reuse local deployment because resource is excluded [meta=" +
meta + ']');
reuse = false;
break;
}
}
}
}
if (reuse) {
GridDeployment locDep = locStore.getDeployment(meta);
if (locDep == null && participants != null && participants.containsKey(ctx.localNodeId()))
locDep = locStore.getDeployment(participants.get(ctx.localNodeId()));
if (locDep != null) {
if (!isPerVersionMode(locDep.deployMode())) {
U.warn(log, "Failed to deploy class in SHARED or CONTINUOUS mode (class is locally deployed " +
"in some other mode). Either change IgniteConfiguration.getDeploymentMode() property to " +
"SHARED or CONTINUOUS or remove class from local classpath and any of " +
"the local GAR deployments that may have it [cls=" + meta.className() + ", depMode=" +
locDep.deployMode() + ']');
return null;
}
if (!locDep.userVersion().equals(meta.userVersion())) {
U.warn(log, "Failed to deploy class in SHARED or CONTINUOUS mode for given user version " +
"(class is locally deployed for a different user version) [cls=" + meta.className() +
", localVer=" + locDep.userVersion() + ", otherVer=" + meta.userVersion() + ']');
return null;
}
if (log.isDebugEnabled())
log.debug("Reusing local deployment for SHARED or CONTINUOUS mode: " + locDep);
return locDep;
}
}
return verStore.getDeployment(meta);
}
// Private or Isolated mode.
meta.record(false);
GridDeployment dep = locStore.getDeployment(meta);
if (sndNodeId.equals(ctx.localNodeId())) {
if (dep == null)
U.warn(log, "Task got undeployed while deployment was in progress: " + meta);
// For local execution, return the same deployment as for the task.
return dep;
}
if (dep != null)
meta.parentLoader(dep.classLoader());
meta.record(true);
return ldrStore.getDeployment(meta);
}
/**
* Adds participants to all SHARED deployments.
*
* @param allParticipants All participants.
* @param addedParticipants Added participants.
*/
public void addCacheParticipants(Map<UUID, IgniteUuid> allParticipants, Map<UUID, IgniteUuid> addedParticipants) {
verStore.addParticipants(allParticipants, addedParticipants);
}
/**
* @param mode Mode to check.
* @return {@code True} if shared mode.
*/
private boolean isPerVersionMode(DeploymentMode mode) {
return mode == DeploymentMode.CONTINUOUS || mode == DeploymentMode.SHARED;
}
/**
* @param ldr Class loader to get ID for.
* @return ID for given class loader or {@code null} if given loader is not
* grid deployment class loader.
*/
@Nullable public IgniteUuid getClassLoaderId(ClassLoader ldr) {
assert ldr != null;
return ldr instanceof GridDeploymentClassLoader ? ((GridDeploymentInfo)ldr).classLoaderId() : null;
}
/**
* @param ldr Loader to check.
* @return {@code True} if P2P class loader.
*/
public boolean isGlobalLoader(ClassLoader ldr) {
return ldr instanceof GridDeploymentClassLoader;
}
/**
* @throws IgniteCheckedException If failed.
*/
private void startStores() throws IgniteCheckedException {
locStore = new GridDeploymentLocalStore(getSpi(), ctx, comm);
ldrStore = new GridDeploymentPerLoaderStore(getSpi(), ctx, comm);
verStore = new GridDeploymentPerVersionStore(getSpi(), ctx, comm);
locStore.start();
ldrStore.start();
verStore.start();
}
/**
* @throws IgniteCheckedException If failed.
*/
private void storesOnKernalStart() throws IgniteCheckedException {
locStore.onKernalStart();
ldrStore.onKernalStart();
verStore.onKernalStart();
}
/**
*
*/
private void storesOnKernalStop() {
if (verStore != null)
verStore.onKernalStop();
if (ldrStore != null)
ldrStore.onKernalStop();
if (locStore != null)
locStore.onKernalStop();
}
/**
*
*/
private void storesStop() {
if (verStore != null)
verStore.stop();
if (ldrStore != null)
ldrStore.stop();
if (locStore != null)
locStore.stop();
}
/**
*
*/
private static class LocalDeployment extends GridDeployment {
/**
* @param depMode Mode.
* @param clsLdr Loader.
* @param clsLdrId Loader ID.
* @param userVer User version.
* @param sampleClsName Sample class name.
*/
private LocalDeployment(DeploymentMode depMode, ClassLoader clsLdr, IgniteUuid clsLdrId, String userVer,
String sampleClsName) {
super(depMode, clsLdr, clsLdrId, userVer, sampleClsName, /*local*/true);
}
/** {@inheritDoc} */
@Override public boolean undeployed() {
return false;
}
/** {@inheritDoc} */
@Override public void undeploy() {
// No-op.
}
/** {@inheritDoc} */
@Override public boolean pendingUndeploy() {
return false;
}
/** {@inheritDoc} */
@Override public void onUndeployScheduled() {
// No-op.
}
/** {@inheritDoc} */
@Override public boolean acquire() {
return true;
}
/** {@inheritDoc} */
@Override public void release() {
// No-op.
}
/** {@inheritDoc} */
@Override public boolean obsolete() {
return false;
}
/** {@inheritDoc} */
@Nullable @Override public Map<UUID, IgniteUuid> participants() {
return null;
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(LocalDeployment.class, this, super.toString());
}
}
}