blob: 9cbb51f3b105a367373232b492b87251aff8f48d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal;
import java.io.Externalizable;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InvalidObjectException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.io.ObjectStreamException;
import java.io.Serializable;
import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean;
import java.lang.reflect.Constructor;
import java.text.DateFormat;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.management.JMException;
import javax.management.ObjectName;
import org.apache.ignite.IgniteAtomicLong;
import org.apache.ignite.IgniteAtomicReference;
import org.apache.ignite.IgniteAtomicSequence;
import org.apache.ignite.IgniteAtomicStamped;
import org.apache.ignite.IgniteBinary;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteClientDisconnectedException;
import org.apache.ignite.IgniteCompute;
import org.apache.ignite.IgniteCountDownLatch;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.IgniteEvents;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteFileSystem;
import org.apache.ignite.IgniteLock;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteMessaging;
import org.apache.ignite.IgniteQueue;
import org.apache.ignite.IgniteScheduler;
import org.apache.ignite.IgniteSemaphore;
import org.apache.ignite.IgniteServices;
import org.apache.ignite.IgniteSet;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.IgniteTransactions;
import org.apache.ignite.Ignition;
import org.apache.ignite.MemoryMetrics;
import org.apache.ignite.cache.affinity.Affinity;
import org.apache.ignite.cluster.ClusterGroup;
import org.apache.ignite.cluster.ClusterMetrics;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.BinaryConfiguration;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.CollectionConfiguration;
import org.apache.ignite.configuration.ConnectorConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.MemoryConfiguration;
import org.apache.ignite.configuration.NearCacheConfiguration;
import org.apache.ignite.internal.binary.BinaryEnumCache;
import org.apache.ignite.internal.binary.BinaryMarshaller;
import org.apache.ignite.internal.binary.BinaryUtils;
import org.apache.ignite.internal.cluster.ClusterGroupAdapter;
import org.apache.ignite.internal.cluster.IgniteClusterEx;
import org.apache.ignite.internal.managers.GridManager;
import org.apache.ignite.internal.managers.checkpoint.GridCheckpointManager;
import org.apache.ignite.internal.managers.collision.GridCollisionManager;
import org.apache.ignite.internal.managers.communication.GridIoManager;
import org.apache.ignite.internal.managers.deployment.GridDeploymentManager;
import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
import org.apache.ignite.internal.managers.failover.GridFailoverManager;
import org.apache.ignite.internal.managers.indexing.GridIndexingManager;
import org.apache.ignite.internal.managers.loadbalancer.GridLoadBalancerManager;
import org.apache.ignite.internal.processors.GridProcessor;
import org.apache.ignite.internal.processors.affinity.GridAffinityProcessor;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheProcessor;
import org.apache.ignite.internal.processors.cache.GridCacheUtilityKey;
import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
import org.apache.ignite.internal.processors.cache.database.MemoryPolicy;
import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
import org.apache.ignite.internal.processors.closure.GridClosureProcessor;
import org.apache.ignite.internal.processors.cluster.ClusterProcessor;
import org.apache.ignite.internal.processors.cluster.GridClusterStateProcessor;
import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
import org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor;
import org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor;
import org.apache.ignite.internal.processors.hadoop.Hadoop;
import org.apache.ignite.internal.processors.hadoop.HadoopProcessorAdapter;
import org.apache.ignite.internal.processors.job.GridJobProcessor;
import org.apache.ignite.internal.processors.jobmetrics.GridJobMetricsProcessor;
import org.apache.ignite.internal.processors.marshaller.GridMarshallerMappingProcessor;
import org.apache.ignite.internal.processors.nodevalidation.DiscoveryNodeValidationProcessor;
import org.apache.ignite.internal.processors.nodevalidation.OsDiscoveryNodeValidationProcessor;
import org.apache.ignite.internal.processors.odbc.OdbcProcessor;
import org.apache.ignite.internal.processors.platform.PlatformNoopProcessor;
import org.apache.ignite.internal.processors.platform.PlatformProcessor;
import org.apache.ignite.internal.processors.platform.plugin.PlatformPluginProcessor;
import org.apache.ignite.internal.processors.plugin.IgnitePluginProcessor;
import org.apache.ignite.internal.processors.pool.PoolProcessor;
import org.apache.ignite.internal.processors.port.GridPortProcessor;
import org.apache.ignite.internal.processors.port.GridPortRecord;
import org.apache.ignite.internal.processors.query.GridQueryProcessor;
import org.apache.ignite.internal.processors.resource.GridResourceProcessor;
import org.apache.ignite.internal.processors.resource.GridSpringResourceContext;
import org.apache.ignite.internal.processors.rest.GridRestProcessor;
import org.apache.ignite.internal.processors.security.GridSecurityProcessor;
import org.apache.ignite.internal.processors.segmentation.GridSegmentationProcessor;
import org.apache.ignite.internal.processors.service.GridServiceProcessor;
import org.apache.ignite.internal.processors.session.GridTaskSessionProcessor;
import org.apache.ignite.internal.processors.task.GridTaskProcessor;
import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
import org.apache.ignite.internal.suggestions.GridPerformanceSuggestions;
import org.apache.ignite.internal.suggestions.JvmConfigurationSuggestions;
import org.apache.ignite.internal.suggestions.OsConfigurationSuggestions;
import org.apache.ignite.internal.util.StripedExecutor;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.future.IgniteFutureImpl;
import org.apache.ignite.internal.util.lang.GridAbsClosure;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.C1;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.SB;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.lifecycle.LifecycleAware;
import org.apache.ignite.lifecycle.LifecycleBean;
import org.apache.ignite.lifecycle.LifecycleEventType;
import org.apache.ignite.marshaller.MarshallerExclusions;
import org.apache.ignite.internal.marshaller.optimized.OptimizedMarshaller;
import org.apache.ignite.mxbean.ClusterLocalNodeMetricsMXBean;
import org.apache.ignite.mxbean.IgniteMXBean;
import org.apache.ignite.mxbean.StripedExecutorMXBean;
import org.apache.ignite.mxbean.ThreadPoolMXBean;
import org.apache.ignite.plugin.IgnitePlugin;
import org.apache.ignite.plugin.PluginNotFoundException;
import org.apache.ignite.plugin.PluginProvider;
import org.apache.ignite.spi.IgniteSpi;
import org.apache.ignite.spi.IgniteSpiVersionCheckException;
import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
import org.apache.ignite.thread.IgniteStripedThreadPoolExecutor;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_BINARY_MARSHALLER_USE_STRING_SERIALIZATION_VER_2;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_CONFIG_URL;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_DAEMON;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_NO_ASCII;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_OPTIMIZED_MARSHALLER_USE_DEFAULT_SUID;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_REST_START_ON_CLIENT;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_STARVATION_CHECK_INTERVAL;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_SUCCESS_FILE;
import static org.apache.ignite.IgniteSystemProperties.getBoolean;
import static org.apache.ignite.IgniteSystemProperties.snapshot;
import static org.apache.ignite.internal.GridKernalState.DISCONNECTED;
import static org.apache.ignite.internal.GridKernalState.STARTED;
import static org.apache.ignite.internal.GridKernalState.STARTING;
import static org.apache.ignite.internal.GridKernalState.STOPPED;
import static org.apache.ignite.internal.GridKernalState.STOPPING;
import static org.apache.ignite.internal.IgniteComponentType.HADOOP_HELPER;
import static org.apache.ignite.internal.IgniteComponentType.IGFS;
import static org.apache.ignite.internal.IgniteComponentType.IGFS_HELPER;
import static org.apache.ignite.internal.IgniteComponentType.SCHEDULE;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_ACTIVE_ON_START;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_BUILD_DATE;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_BUILD_VER;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_CLIENT_MODE;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_CONSISTENCY_CHECK_SKIPPED;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DAEMON;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DEPLOYMENT_MODE;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IPS;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_JIT_NAME;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_JMX_PORT;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_JVM_ARGS;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_JVM_PID;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_LANG_RUNTIME;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_LATE_AFFINITY_ASSIGNMENT;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MACS;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MARSHALLER;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MARSHALLER_COMPACT_FOOTER;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MARSHALLER_USE_BINARY_STRING_SER_VER_2;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MARSHALLER_USE_DFLT_SUID;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_NODE_CONSISTENT_ID;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_PEER_CLASSLOADING;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_PHY_RAM;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_PREFIX;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_RESTART_ENABLED;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_REST_PORT_RANGE;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_SPI_CLASS;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_USER_NAME;
import static org.apache.ignite.internal.IgniteVersionUtils.ACK_VER_STR;
import static org.apache.ignite.internal.IgniteVersionUtils.BUILD_TSTAMP_STR;
import static org.apache.ignite.internal.IgniteVersionUtils.COPYRIGHT;
import static org.apache.ignite.internal.IgniteVersionUtils.REV_HASH_STR;
import static org.apache.ignite.internal.IgniteVersionUtils.VER;
import static org.apache.ignite.internal.IgniteVersionUtils.VER_STR;
import static org.apache.ignite.lifecycle.LifecycleEventType.AFTER_NODE_START;
import static org.apache.ignite.lifecycle.LifecycleEventType.BEFORE_NODE_START;
/**
* Ignite kernal.
* <p/>
* See <a href="http://en.wikipedia.org/wiki/Kernal">http://en.wikipedia.org/wiki/Kernal</a> for information on the
* misspelling.
*/
public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
/** */
private static final long serialVersionUID = 0L;
/** Ignite site that is shown in log messages. */
public static final String SITE = "ignite.apache.org";
/** System line separator. */
private static final String NL = U.nl();
/** Periodic starvation check interval. */
private static final long PERIODIC_STARVATION_CHECK_FREQ = 1000 * 30;
/** Force complete reconnect future. */
private static final Object STOP_RECONNECT = new Object();
/** */
@GridToStringExclude
private GridKernalContextImpl ctx;
/** Configuration. */
private IgniteConfiguration cfg;
/** */
@SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
@GridToStringExclude
private GridLoggerProxy log;
/** */
private String igniteInstanceName;
/** */
@GridToStringExclude
private ObjectName kernalMBean;
/** */
@GridToStringExclude
private ObjectName locNodeMBean;
/** */
@GridToStringExclude
private ObjectName pubExecSvcMBean;
/** */
@GridToStringExclude
private ObjectName sysExecSvcMBean;
/** */
@GridToStringExclude
private ObjectName mgmtExecSvcMBean;
/** */
@GridToStringExclude
private ObjectName p2PExecSvcMBean;
/** */
@GridToStringExclude
private ObjectName restExecSvcMBean;
/** */
@GridToStringExclude
private ObjectName qryExecSvcMBean;
/** */
@GridToStringExclude
private ObjectName schemaExecSvcMBean;
/** */
@GridToStringExclude
private ObjectName stripedExecSvcMBean;
/** Kernal start timestamp. */
private long startTime = U.currentTimeMillis();
/** Spring context, potentially {@code null}. */
private GridSpringResourceContext rsrcCtx;
/** */
@GridToStringExclude
private GridTimeoutProcessor.CancelableTask starveTask;
/** */
@GridToStringExclude
private GridTimeoutProcessor.CancelableTask metricsLogTask;
/** */
@GridToStringExclude
private GridTimeoutProcessor.CancelableTask longOpDumpTask;
/** Indicate error on grid stop. */
@GridToStringExclude
private boolean errOnStop;
/** Scheduler. */
@GridToStringExclude
private IgniteScheduler scheduler;
/** Kernal gateway. */
@GridToStringExclude
private final AtomicReference<GridKernalGateway> gw = new AtomicReference<>();
/** Stop guard. */
@GridToStringExclude
private final AtomicBoolean stopGuard = new AtomicBoolean();
/** */
private final ReconnectState reconnectState = new ReconnectState();
/**
* No-arg constructor is required by externalization.
*/
public IgniteKernal() {
this(null);
}
/**
* @param rsrcCtx Optional Spring application context.
*/
public IgniteKernal(@Nullable GridSpringResourceContext rsrcCtx) {
this.rsrcCtx = rsrcCtx;
}
/** {@inheritDoc} */
@Override public IgniteClusterEx cluster() {
return ctx.cluster().get();
}
/** {@inheritDoc} */
@Override public ClusterNode localNode() {
return ctx.cluster().get().localNode();
}
/** {@inheritDoc} */
@Override public IgniteCompute compute() {
return ((ClusterGroupAdapter)ctx.cluster().get().forServers()).compute();
}
/** {@inheritDoc} */
@Override public IgniteMessaging message() {
return ctx.cluster().get().message();
}
/** {@inheritDoc} */
@Override public IgniteEvents events() {
return ctx.cluster().get().events();
}
/** {@inheritDoc} */
@Override public IgniteServices services() {
checkClusterState();
return ((ClusterGroupAdapter)ctx.cluster().get().forServers()).services();
}
/** {@inheritDoc} */
@Override public ExecutorService executorService() {
return ctx.cluster().get().executorService();
}
/** {@inheritDoc} */
@Override public final IgniteCompute compute(ClusterGroup grp) {
return ((ClusterGroupAdapter)grp).compute();
}
/** {@inheritDoc} */
@Override public final IgniteMessaging message(ClusterGroup prj) {
return ((ClusterGroupAdapter)prj).message();
}
/** {@inheritDoc} */
@Override public final IgniteEvents events(ClusterGroup grp) {
return ((ClusterGroupAdapter)grp).events();
}
/** {@inheritDoc} */
@Override public IgniteServices services(ClusterGroup grp) {
checkClusterState();
return ((ClusterGroupAdapter)grp).services();
}
/** {@inheritDoc} */
@Override public ExecutorService executorService(ClusterGroup grp) {
return ((ClusterGroupAdapter)grp).executorService();
}
/** {@inheritDoc} */
@Override public String name() {
return igniteInstanceName;
}
/** {@inheritDoc} */
@Override public String getCopyright() {
return COPYRIGHT;
}
/** {@inheritDoc} */
@Override public long getStartTimestamp() {
return startTime;
}
/** {@inheritDoc} */
@Override public String getStartTimestampFormatted() {
return DateFormat.getDateTimeInstance().format(new Date(startTime));
}
/** {@inheritDoc} */
@Override public long getUpTime() {
return U.currentTimeMillis() - startTime;
}
/** {@inheritDoc} */
@Override public String getUpTimeFormatted() {
return X.timeSpan2HMSM(U.currentTimeMillis() - startTime);
}
/** {@inheritDoc} */
@Override public String getFullVersion() {
return VER_STR + '-' + BUILD_TSTAMP_STR;
}
/** {@inheritDoc} */
@Override public String getCheckpointSpiFormatted() {
assert cfg != null;
return Arrays.toString(cfg.getCheckpointSpi());
}
/** {@inheritDoc} */
@Override public String getCommunicationSpiFormatted() {
assert cfg != null;
return cfg.getCommunicationSpi().toString();
}
/** {@inheritDoc} */
@Override public String getDeploymentSpiFormatted() {
assert cfg != null;
return cfg.getDeploymentSpi().toString();
}
/** {@inheritDoc} */
@Override public String getDiscoverySpiFormatted() {
assert cfg != null;
return cfg.getDiscoverySpi().toString();
}
/** {@inheritDoc} */
@Override public String getEventStorageSpiFormatted() {
assert cfg != null;
return cfg.getEventStorageSpi().toString();
}
/** {@inheritDoc} */
@Override public String getCollisionSpiFormatted() {
assert cfg != null;
return cfg.getCollisionSpi().toString();
}
/** {@inheritDoc} */
@Override public String getFailoverSpiFormatted() {
assert cfg != null;
return Arrays.toString(cfg.getFailoverSpi());
}
/** {@inheritDoc} */
@Override public String getLoadBalancingSpiFormatted() {
assert cfg != null;
return Arrays.toString(cfg.getLoadBalancingSpi());
}
/** {@inheritDoc} */
@Override public String getOsInformation() {
return U.osString();
}
/** {@inheritDoc} */
@Override public String getJdkInformation() {
return U.jdkString();
}
/** {@inheritDoc} */
@Override public String getOsUser() {
return System.getProperty("user.name");
}
/** {@inheritDoc} */
@Override public void printLastErrors() {
ctx.exceptionRegistry().printErrors(log);
}
/** {@inheritDoc} */
@Override public String getVmName() {
return ManagementFactory.getRuntimeMXBean().getName();
}
/** {@inheritDoc} */
@Override public String getInstanceName() {
return igniteInstanceName;
}
/** {@inheritDoc} */
@Override public String getExecutorServiceFormatted() {
assert cfg != null;
return String.valueOf(cfg.getPublicThreadPoolSize());
}
/** {@inheritDoc} */
@Override public String getIgniteHome() {
assert cfg != null;
return cfg.getIgniteHome();
}
/** {@inheritDoc} */
@Override public String getGridLoggerFormatted() {
assert cfg != null;
return cfg.getGridLogger().toString();
}
/** {@inheritDoc} */
@Override public String getMBeanServerFormatted() {
assert cfg != null;
return cfg.getMBeanServer().toString();
}
/** {@inheritDoc} */
@Override public UUID getLocalNodeId() {
assert cfg != null;
return cfg.getNodeId();
}
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public List<String> getUserAttributesFormatted() {
assert cfg != null;
return (List<String>)F.transform(cfg.getUserAttributes().entrySet(), new C1<Map.Entry<String, ?>, String>() {
@Override public String apply(Map.Entry<String, ?> e) {
return e.getKey() + ", " + e.getValue().toString();
}
});
}
/** {@inheritDoc} */
@Override public boolean isPeerClassLoadingEnabled() {
assert cfg != null;
return cfg.isPeerClassLoadingEnabled();
}
/** {@inheritDoc} */
@Override public List<String> getLifecycleBeansFormatted() {
LifecycleBean[] beans = cfg.getLifecycleBeans();
if (F.isEmpty(beans))
return Collections.emptyList();
else {
List<String> res = new ArrayList<>(beans.length);
for (LifecycleBean bean : beans)
res.add(String.valueOf(bean));
return res;
}
}
/**
* @param name New attribute name.
* @param val New attribute value.
* @throws IgniteCheckedException If duplicated SPI name found.
*/
private void add(String name, @Nullable Serializable val) throws IgniteCheckedException {
assert name != null;
if (ctx.addNodeAttribute(name, val) != null) {
if (name.endsWith(ATTR_SPI_CLASS))
// User defined duplicated names for the different SPIs.
throw new IgniteCheckedException("Failed to set SPI attribute. Duplicated SPI name found: " +
name.substring(0, name.length() - ATTR_SPI_CLASS.length()));
// Otherwise it's a mistake of setting up duplicated attribute.
assert false : "Duplicate attribute: " + name;
}
}
/**
* Notifies life-cycle beans of grid event.
*
* @param evt Grid event.
* @throws IgniteCheckedException If user threw exception during start.
*/
@SuppressWarnings({"CatchGenericClass"})
private void notifyLifecycleBeans(LifecycleEventType evt) throws IgniteCheckedException {
if (!cfg.isDaemon() && cfg.getLifecycleBeans() != null) {
for (LifecycleBean bean : cfg.getLifecycleBeans())
if (bean != null) {
try {
bean.onLifecycleEvent(evt);
}
catch (Exception e) {
throw new IgniteCheckedException(e);
}
}
}
}
/**
* Notifies life-cycle beans of grid event.
*
* @param evt Grid event.
*/
@SuppressWarnings({"CatchGenericClass"})
private void notifyLifecycleBeansEx(LifecycleEventType evt) {
try {
notifyLifecycleBeans(evt);
}
// Catch generic throwable to secure against user assertions.
catch (Throwable e) {
U.error(log, "Failed to notify lifecycle bean (safely ignored) [evt=" + evt +
(igniteInstanceName == null ? "" : ", igniteInstanceName=" + igniteInstanceName) + ']', e);
if (e instanceof Error)
throw (Error)e;
}
}
/**
* @param cfg Configuration to use.
* @param utilityCachePool Utility cache pool.
* @param execSvc Executor service.
* @param sysExecSvc System executor service.
* @param stripedExecSvc Striped executor.
* @param p2pExecSvc P2P executor service.
* @param mgmtExecSvc Management executor service.
* @param igfsExecSvc IGFS executor service.
* @param dataStreamExecSvc data stream executor service.
* @param restExecSvc Reset executor service.
* @param affExecSvc Affinity executor service.
* @param idxExecSvc Indexing executor service.
* @param callbackExecSvc Callback executor service.
* @param qryExecSvc Query executor service.
* @param schemaExecSvc Schema executor service.
* @param customExecSvcs Custom named executors.
* @param errHnd Error handler to use for notification about startup problems.
* @throws IgniteCheckedException Thrown in case of any errors.
*/
@SuppressWarnings({"CatchGenericClass", "unchecked"})
public void start(
final IgniteConfiguration cfg,
ExecutorService utilityCachePool,
final ExecutorService execSvc,
final ExecutorService svcExecSvc,
final ExecutorService sysExecSvc,
final StripedExecutor stripedExecSvc,
ExecutorService p2pExecSvc,
ExecutorService mgmtExecSvc,
ExecutorService igfsExecSvc,
ExecutorService dataStreamExecSvc,
ExecutorService restExecSvc,
ExecutorService affExecSvc,
@Nullable ExecutorService idxExecSvc,
IgniteStripedThreadPoolExecutor callbackExecSvc,
ExecutorService qryExecSvc,
ExecutorService schemaExecSvc,
Map<String, ? extends ExecutorService> customExecSvcs,
GridAbsClosure errHnd
)
throws IgniteCheckedException
{
gw.compareAndSet(null, new GridKernalGatewayImpl(cfg.getIgniteInstanceName()));
GridKernalGateway gw = this.gw.get();
gw.writeLock();
try {
switch (gw.getState()) {
case STARTED: {
U.warn(log, "Grid has already been started (ignored).");
return;
}
case STARTING: {
U.warn(log, "Grid is already in process of being started (ignored).");
return;
}
case STOPPING: {
throw new IgniteCheckedException("Grid is in process of being stopped");
}
case STOPPED: {
break;
}
}
gw.setState(STARTING);
}
finally {
gw.writeUnlock();
}
assert cfg != null;
// Make sure we got proper configuration.
validateCommon(cfg);
igniteInstanceName = cfg.getIgniteInstanceName();
this.cfg = cfg;
log = (GridLoggerProxy)cfg.getGridLogger().getLogger(
getClass().getName() + (igniteInstanceName != null ? '%' + igniteInstanceName : ""));
RuntimeMXBean rtBean = ManagementFactory.getRuntimeMXBean();
// Ack various information.
ackAsciiLogo();
ackConfigUrl();
ackDaemon();
ackOsInfo();
ackLanguageRuntime();
ackRemoteManagement();
ackVmArguments(rtBean);
ackClassPaths(rtBean);
ackSystemProperties();
ackEnvironmentVariables();
ackMemoryConfiguration();
ackCacheConfiguration();
ackP2pConfiguration();
ackRebalanceConfiguration();
// Run background network diagnostics.
GridDiagnostic.runBackgroundCheck(igniteInstanceName, execSvc, log);
// Ack 3-rd party licenses location.
if (log.isInfoEnabled() && cfg.getIgniteHome() != null)
log.info("3-rd party licenses can be found at: " + cfg.getIgniteHome() + File.separatorChar + "libs" +
File.separatorChar + "licenses");
// Check that user attributes are not conflicting
// with internally reserved names.
for (String name : cfg.getUserAttributes().keySet())
if (name.startsWith(ATTR_PREFIX))
throw new IgniteCheckedException("User attribute has illegal name: '" + name + "'. Note that all names " +
"starting with '" + ATTR_PREFIX + "' are reserved for internal use.");
// Ack local node user attributes.
logNodeUserAttributes();
// Ack configuration.
ackSpis();
List<PluginProvider> plugins = U.allPluginProviders();
final boolean activeOnStart = cfg.isActiveOnStart();
// Spin out SPIs & managers.
try {
ctx = new GridKernalContextImpl(log,
this,
cfg,
gw,
utilityCachePool,
execSvc,
svcExecSvc,
sysExecSvc,
stripedExecSvc,
p2pExecSvc,
mgmtExecSvc,
igfsExecSvc,
dataStreamExecSvc,
restExecSvc,
affExecSvc,
idxExecSvc,
callbackExecSvc,
qryExecSvc,
schemaExecSvc,
customExecSvcs,
plugins
);
cfg.getMarshaller().setContext(ctx.marshallerContext());
ClusterProcessor clusterProc = new ClusterProcessor(ctx);
startProcessor(clusterProc);
U.onGridStart();
// Start and configure resource processor first as it contains resources used
// by all other managers and processors.
GridResourceProcessor rsrcProc = new GridResourceProcessor(ctx);
rsrcProc.setSpringContext(rsrcCtx);
scheduler = new IgniteSchedulerImpl(ctx);
startProcessor(rsrcProc);
// Inject resources into lifecycle beans.
if (!cfg.isDaemon() && cfg.getLifecycleBeans() != null) {
for (LifecycleBean bean : cfg.getLifecycleBeans()) {
if (bean != null)
rsrcProc.inject(bean);
}
}
// Lifecycle notification.
notifyLifecycleBeans(BEFORE_NODE_START);
// Starts lifecycle aware components.
U.startLifecycleAware(lifecycleAwares(cfg));
addHelper(IGFS_HELPER.create(F.isEmpty(cfg.getFileSystemConfiguration())));
addHelper(HADOOP_HELPER.createIfInClassPath(ctx, false));
startProcessor(new IgnitePluginProcessor(ctx, cfg, plugins));
startProcessor(new PoolProcessor(ctx));
// Closure processor should be started before all others
// (except for resource processor), as many components can depend on it.
startProcessor(new GridClosureProcessor(ctx));
// Start some other processors (order & place is important).
startProcessor(new GridPortProcessor(ctx));
startProcessor(new GridJobMetricsProcessor(ctx));
// Timeout processor needs to be started before managers,
// as managers may depend on it.
startProcessor(new GridTimeoutProcessor(ctx));
// Start security processors.
startProcessor(createComponent(GridSecurityProcessor.class, ctx));
// Start SPI managers.
// NOTE: that order matters as there are dependencies between managers.
startManager(new GridIoManager(ctx));
startManager(new GridCheckpointManager(ctx));
startManager(new GridEventStorageManager(ctx));
startManager(new GridDeploymentManager(ctx));
startManager(new GridLoadBalancerManager(ctx));
startManager(new GridFailoverManager(ctx));
startManager(new GridCollisionManager(ctx));
startManager(new GridIndexingManager(ctx));
ackSecurity();
// Assign discovery manager to context before other processors start so they
// are able to register custom event listener.
GridManager discoMgr = new GridDiscoveryManager(ctx);
ctx.add(discoMgr, false);
// Start processors before discovery manager, so they will
// be able to start receiving messages once discovery completes.
try {startProcessor(createComponent(DiscoveryNodeValidationProcessor.class, ctx));
startProcessor(new GridAffinityProcessor(ctx));
startProcessor(createComponent(GridSegmentationProcessor.class, ctx));
startProcessor(createComponent(IgniteCacheObjectProcessor.class, ctx));
startProcessor(new GridCacheProcessor(ctx));startProcessor(new GridClusterStateProcessor(ctx));
startProcessor(new GridQueryProcessor(ctx));
startProcessor(new OdbcProcessor(ctx));
startProcessor(new GridServiceProcessor(ctx));
startProcessor(new GridTaskSessionProcessor(ctx));
startProcessor(new GridJobProcessor(ctx));
startProcessor(new GridTaskProcessor(ctx));
startProcessor((GridProcessor)SCHEDULE.createOptional(ctx));
startProcessor(new GridRestProcessor(ctx));
startProcessor(new DataStreamProcessor(ctx));
startProcessor((GridProcessor)IGFS.create(ctx, F.isEmpty(cfg.getFileSystemConfiguration())));
startProcessor(new GridContinuousProcessor(ctx));
startProcessor(createHadoopComponent());
startProcessor(new DataStructuresProcessor(ctx));
startProcessor(createComponent(PlatformProcessor.class, ctx));
startProcessor(new GridMarshallerMappingProcessor(ctx));
// Start plugins.
for (PluginProvider provider : ctx.plugins().allProviders()) {
ctx.add(new GridPluginComponent(provider));
provider.start(ctx.plugins().pluginContextForProvider(provider));
}
// Start platform plugins.
if (ctx.config().getPlatformConfiguration() != null)
startProcessor(new PlatformPluginProcessor(ctx));fillNodeAttributes(clusterProc.updateNotifierEnabled());}
catch (Throwable e) {
U.error(
log, "Exception during start processors, node will be stopped and close connections", e);
// Stop discovery spi to close tcp socket.
ctx.discovery().stop(true);
throw e;
}
gw.writeLock();
try {
gw.setState(STARTED);
// Start discovery manager last to make sure that grid is fully initialized.
startManager(discoMgr);
}
finally {
gw.writeUnlock();
}
// Check whether physical RAM is not exceeded.
checkPhysicalRam();
// Suggest configuration optimizations.
suggestOptimizations(cfg);
// Suggest JVM optimizations.
ctx.performance().addAll(JvmConfigurationSuggestions.getSuggestions());
// Suggest Operation System optimizations.
ctx.performance().addAll(OsConfigurationSuggestions.getSuggestions());
// Notify discovery manager the first to make sure that topology is discovered.
ctx.discovery().onKernalStart(activeOnStart);
// Notify IO manager the second so further components can send and receive messages.
ctx.io().onKernalStart(activeOnStart);
// Start plugins.
for (PluginProvider provider : ctx.plugins().allProviders())
provider.onIgniteStart();
boolean recon = false;
// Callbacks.
for (GridComponent comp : ctx) {
// Skip discovery manager.
if (comp instanceof GridDiscoveryManager)
continue;
// Skip IO manager.
if (comp instanceof GridIoManager)
continue;
if (comp instanceof GridPluginComponent)
continue;
if (!skipDaemon(comp)) {
try {
comp.onKernalStart(activeOnStart);
}
catch (IgniteNeedReconnectException e) {
assert ctx.discovery().reconnectSupported();
if (log.isDebugEnabled())
log.debug("Failed to start node components on node start, will wait for reconnect: " + e);
recon = true;
}
}
}
if (recon)
reconnectState.waitFirstReconnect();
// Register MBeans.
registerKernalMBean();
registerLocalNodeMBean();
registerExecutorMBeans(execSvc, sysExecSvc, p2pExecSvc, mgmtExecSvc, restExecSvc, qryExecSvc,
schemaExecSvc);
registerStripedExecutorMBean(stripedExecSvc);
// Lifecycle bean notifications.
notifyLifecycleBeans(AFTER_NODE_START);
}
catch (Throwable e) {
IgniteSpiVersionCheckException verCheckErr = X.cause(e, IgniteSpiVersionCheckException.class);
if (verCheckErr != null)
U.error(log, verCheckErr.getMessage());
else if (X.hasCause(e, InterruptedException.class, IgniteInterruptedCheckedException.class))
U.warn(log, "Grid startup routine has been interrupted (will rollback).");
else
U.error(log, "Got exception while starting (will rollback startup routine).", e);
errHnd.apply();
stop(true);
if (e instanceof Error)
throw e;
else if (e instanceof IgniteCheckedException)
throw (IgniteCheckedException)e;
else
throw new IgniteCheckedException(e);
}
// Mark start timestamp.
startTime = U.currentTimeMillis();
String intervalStr = IgniteSystemProperties.getString(IGNITE_STARVATION_CHECK_INTERVAL);
// Start starvation checker if enabled.
boolean starveCheck = !isDaemon() && !"0".equals(intervalStr);
if (starveCheck) {
final long interval = F.isEmpty(intervalStr) ? PERIODIC_STARVATION_CHECK_FREQ : Long.parseLong(intervalStr);
starveTask = ctx.timeout().schedule(new Runnable() {
/** Last completed task count. */
private long lastCompletedCntPub;
/** Last completed task count. */
private long lastCompletedCntSys;
@Override public void run() {
if (execSvc instanceof ThreadPoolExecutor) {
ThreadPoolExecutor exec = (ThreadPoolExecutor)execSvc;
lastCompletedCntPub = checkPoolStarvation(exec, lastCompletedCntPub, "public");
}
if (sysExecSvc instanceof ThreadPoolExecutor) {
ThreadPoolExecutor exec = (ThreadPoolExecutor)sysExecSvc;
lastCompletedCntSys = checkPoolStarvation(exec, lastCompletedCntSys, "system");
}
if (stripedExecSvc != null)
stripedExecSvc.checkStarvation();
}
/**
* @param exec Thread pool executor to check.
* @param lastCompletedCnt Last completed tasks count.
* @param pool Pool name for message.
* @return Current completed tasks count.
*/
private long checkPoolStarvation(
ThreadPoolExecutor exec,
long lastCompletedCnt,
String pool
) {
long completedCnt = exec.getCompletedTaskCount();
// If all threads are active and no task has completed since last time and there is
// at least one waiting request, then it is possible starvation.
if (exec.getPoolSize() == exec.getActiveCount() && completedCnt == lastCompletedCnt &&
!exec.getQueue().isEmpty())
LT.warn(
log,
"Possible thread pool starvation detected (no task completed in last " +
interval + "ms, is " + pool + " thread pool size large enough?)");
return completedCnt;
}
}, interval, interval);
}
long metricsLogFreq = cfg.getMetricsLogFrequency();
if (metricsLogFreq > 0) {
metricsLogTask = ctx.timeout().schedule(new Runnable() {
private final DecimalFormat dblFmt = new DecimalFormat("#.##");
@Override public void run() {
if (log.isInfoEnabled()) {
try {
ClusterMetrics m = cluster().localNode().metrics();
double cpuLoadPct = m.getCurrentCpuLoad() * 100;
double avgCpuLoadPct = m.getAverageCpuLoad() * 100;
double gcPct = m.getCurrentGcCpuLoad() * 100;
//Heap params
long heapUsed = m.getHeapMemoryUsed();
long heapMax = m.getHeapMemoryMaximum();
long heapUsedInMBytes = heapUsed / 1024 / 1024;
long heapCommInMBytes = m.getHeapMemoryCommitted() / 1024 / 1024;
double freeHeapPct = heapMax > 0 ? ((double)((heapMax - heapUsed) * 100)) / heapMax : -1;
//Non heap params
long nonHeapUsed = m.getNonHeapMemoryUsed();
long nonHeapMax = m.getNonHeapMemoryMaximum();
long nonHeapUsedInMBytes = nonHeapUsed / 1024 / 1024;
double freeNonHeapPct = nonHeapMax > 0 ? ((double)((nonHeapMax - nonHeapUsed) * 100)) / nonHeapMax : -1;
int hosts = 0;
int nodes = 0;
int cpus = 0;
try {
ClusterMetrics metrics = cluster().metrics();
Collection<ClusterNode> nodes0 = cluster().nodes();
hosts = U.neighborhood(nodes0).size();
nodes = metrics.getTotalNodes();
cpus = metrics.getTotalCpus();
}
catch (IgniteException ignore) {
// No-op.
}
int pubPoolActiveThreads = 0;
int pubPoolIdleThreads = 0;
int pubPoolQSize = 0;
if (execSvc instanceof ThreadPoolExecutor) {
ThreadPoolExecutor exec = (ThreadPoolExecutor)execSvc;
int poolSize = exec.getPoolSize();
pubPoolActiveThreads = Math.min(poolSize, exec.getActiveCount());
pubPoolIdleThreads = poolSize - pubPoolActiveThreads;
pubPoolQSize = exec.getQueue().size();
}
int sysPoolActiveThreads = 0;
int sysPoolIdleThreads = 0;
int sysPoolQSize = 0;
if (sysExecSvc instanceof ThreadPoolExecutor) {
ThreadPoolExecutor exec = (ThreadPoolExecutor)sysExecSvc;
int poolSize = exec.getPoolSize();
sysPoolActiveThreads = Math.min(poolSize, exec.getActiveCount());
sysPoolIdleThreads = poolSize - sysPoolActiveThreads;
sysPoolQSize = exec.getQueue().size();
}
int loadedPages = 0;
Collection<MemoryPolicy> policies = ctx.cache().context().database().memoryPolicies();
if (!F.isEmpty(policies)) {
for (MemoryPolicy memPlc : policies)
loadedPages += memPlc.pageMemory().loadedPages();
}
String id = U.id8(localNode().id());
String msg = NL +
"Metrics for local node (to disable set 'metricsLogFrequency' to 0)" + NL +
" ^-- Node [id=" + id + ", name=" + name() + ", uptime=" + getUpTimeFormatted() + "]" + NL +
" ^-- H/N/C [hosts=" + hosts + ", nodes=" + nodes + ", CPUs=" + cpus + "]" + NL +
" ^-- CPU [cur=" + dblFmt.format(cpuLoadPct) + "%, avg=" +
dblFmt.format(avgCpuLoadPct) + "%, GC=" + dblFmt.format(gcPct) + "%]" + NL +
" ^-- PageMemory [pages=" + loadedPages + "]" + NL +
" ^-- Heap [used=" + dblFmt.format(heapUsedInMBytes) + "MB, free=" +
dblFmt.format(freeHeapPct) + "%, comm=" + dblFmt.format(heapCommInMBytes) + "MB]" + NL +
" ^-- Non heap [used=" + dblFmt.format(nonHeapUsedInMBytes) + "MB, free=" +
dblFmt.format(freeNonHeapPct) + "%]" + NL +
" ^-- Public thread pool [active=" + pubPoolActiveThreads + ", idle=" +
pubPoolIdleThreads + ", qSize=" + pubPoolQSize + "]" + NL +
" ^-- System thread pool [active=" + sysPoolActiveThreads + ", idle=" +
sysPoolIdleThreads + ", qSize=" + sysPoolQSize + "]" + NL +
" ^-- Outbound messages queue [size=" + m.getOutboundMessagesQueueSize() + "]";
log.info(msg);
ctx.cache().context().database().dumpStatistics(log);
}
catch (IgniteClientDisconnectedException ignore) {
// No-op.
}
}
}
}, metricsLogFreq, metricsLogFreq);
}
final long longOpDumpTimeout =
IgniteSystemProperties.getLong(IgniteSystemProperties.IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT, 60_000);
if (longOpDumpTimeout > 0) {
longOpDumpTask = ctx.timeout().schedule(new Runnable() {
@Override public void run() {
GridKernalContext ctx = IgniteKernal.this.ctx;
if (ctx != null)
ctx.cache().context().exchange().dumpLongRunningOperations(longOpDumpTimeout);
}
}, longOpDumpTimeout, longOpDumpTimeout);
}
ctx.performance().add("Disable assertions (remove '-ea' from JVM options)", !U.assertionsEnabled());
ctx.performance().logSuggestions(log, igniteInstanceName);
U.quietAndInfo(log, "To start Console Management & Monitoring run ignitevisorcmd.{sh|bat}");
ackStart(rtBean);
if (!isDaemon())
ctx.discovery().ackTopology(localNode().order());
}
/**
* Create Hadoop component.
*
* @return Non-null Hadoop component: workable or no-op.
* @throws IgniteCheckedException If the component is mandatory and cannot be initialized.
*/
private HadoopProcessorAdapter createHadoopComponent() throws IgniteCheckedException {
boolean mandatory = cfg.getHadoopConfiguration() != null;
if (mandatory) {
if (cfg.isPeerClassLoadingEnabled())
throw new IgniteCheckedException("Hadoop module cannot be used with peer class loading enabled " +
"(set IgniteConfiguration.peerClassLoadingEnabled to \"false\").");
HadoopProcessorAdapter res = IgniteComponentType.HADOOP.createIfInClassPath(ctx, true);
res.validateEnvironment();
return res;
}
else {
HadoopProcessorAdapter cmp = null;
if (!ctx.hadoopHelper().isNoOp() && cfg.isPeerClassLoadingEnabled()) {
U.warn(log, "Hadoop module is found in classpath, but will not be started because peer class " +
"loading is enabled (set IgniteConfiguration.peerClassLoadingEnabled to \"false\" if you want " +
"to use Hadoop module).");
}
else {
cmp = IgniteComponentType.HADOOP.createIfInClassPath(ctx, false);
try {
cmp.validateEnvironment();
}
catch (IgniteException | IgniteCheckedException e) {
U.quietAndWarn(log, "Hadoop module will not start due to exception: " + e.getMessage());
cmp = null;
}
}
if (cmp == null)
cmp = IgniteComponentType.HADOOP.create(ctx, true);
return cmp;
}
}
/**
* Validates common configuration parameters.
*
* @param cfg Configuration.
*/
private void validateCommon(IgniteConfiguration cfg) {
A.notNull(cfg.getNodeId(), "cfg.getNodeId()");
A.notNull(cfg.getMBeanServer(), "cfg.getMBeanServer()");
A.notNull(cfg.getGridLogger(), "cfg.getGridLogger()");
A.notNull(cfg.getMarshaller(), "cfg.getMarshaller()");
A.notNull(cfg.getUserAttributes(), "cfg.getUserAttributes()");
// All SPIs should be non-null.
A.notNull(cfg.getCheckpointSpi(), "cfg.getCheckpointSpi()");
A.notNull(cfg.getCommunicationSpi(), "cfg.getCommunicationSpi()");
A.notNull(cfg.getDeploymentSpi(), "cfg.getDeploymentSpi()");
A.notNull(cfg.getDiscoverySpi(), "cfg.getDiscoverySpi()");
A.notNull(cfg.getEventStorageSpi(), "cfg.getEventStorageSpi()");
A.notNull(cfg.getCollisionSpi(), "cfg.getCollisionSpi()");
A.notNull(cfg.getFailoverSpi(), "cfg.getFailoverSpi()");
A.notNull(cfg.getLoadBalancingSpi(), "cfg.getLoadBalancingSpi()");
A.notNull(cfg.getIndexingSpi(), "cfg.getIndexingSpi()");
A.ensure(cfg.getNetworkTimeout() > 0, "cfg.getNetworkTimeout() > 0");
A.ensure(cfg.getNetworkSendRetryDelay() > 0, "cfg.getNetworkSendRetryDelay() > 0");
A.ensure(cfg.getNetworkSendRetryCount() > 0, "cfg.getNetworkSendRetryCount() > 0");
}
/**
* Checks whether physical RAM is not exceeded.
*/
@SuppressWarnings("ConstantConditions")
private void checkPhysicalRam() {
long ram = ctx.discovery().localNode().attribute(ATTR_PHY_RAM);
if (ram != -1) {
String macs = ctx.discovery().localNode().attribute(ATTR_MACS);
long totalHeap = 0;
for (ClusterNode node : ctx.discovery().allNodes()) {
if (macs.equals(node.attribute(ATTR_MACS))) {
long heap = node.metrics().getHeapMemoryMaximum();
if (heap != -1)
totalHeap += heap;
}
}
if (totalHeap > ram) {
U.quietAndWarn(log, "Attempting to start more nodes than physical RAM " +
"available on current host (this can cause significant slowdown)");
}
}
}
/**
* @param cfg Configuration to check for possible performance issues.
*/
private void suggestOptimizations(IgniteConfiguration cfg) {
GridPerformanceSuggestions perf = ctx.performance();
if (ctx.collision().enabled())
perf.add("Disable collision resolution (remove 'collisionSpi' from configuration)");
if (ctx.checkpoint().enabled())
perf.add("Disable checkpoints (remove 'checkpointSpi' from configuration)");
if (cfg.isMarshalLocalJobs())
perf.add("Disable local jobs marshalling (set 'marshalLocalJobs' to false)");
if (cfg.getIncludeEventTypes() != null && cfg.getIncludeEventTypes().length != 0)
perf.add("Disable grid events (remove 'includeEventTypes' from configuration)");
if (BinaryMarshaller.available() && (cfg.getMarshaller() != null && !(cfg.getMarshaller() instanceof BinaryMarshaller)))
perf.add("Use default binary marshaller (do not set 'marshaller' explicitly)");
}
/**
* Creates attributes map and fills it in.
*
* @param notifyEnabled Update notifier flag.
* @throws IgniteCheckedException thrown if was unable to set up attribute.
*/
@SuppressWarnings({"SuspiciousMethodCalls", "unchecked", "TypeMayBeWeakened"})
private void fillNodeAttributes(boolean notifyEnabled) throws IgniteCheckedException {
final String[] incProps = cfg.getIncludeProperties();
try {
// Stick all environment settings into node attributes.
for (Map.Entry<String, String> sysEntry : System.getenv().entrySet()) {
String name = sysEntry.getKey();
if (incProps == null || U.containsStringArray(incProps, name, true) ||
U.isVisorNodeStartProperty(name) || U.isVisorRequiredProperty(name))
ctx.addNodeAttribute(name, sysEntry.getValue());
}
if (log.isDebugEnabled())
log.debug("Added environment properties to node attributes.");
}
catch (SecurityException e) {
throw new IgniteCheckedException("Failed to add environment properties to node attributes due to " +
"security violation: " + e.getMessage());
}
try {
// Stick all system properties into node's attributes overwriting any
// identical names from environment properties.
for (Map.Entry<Object, Object> e : snapshot().entrySet()) {
String key = (String)e.getKey();
if (incProps == null || U.containsStringArray(incProps, key, true) ||
U.isVisorRequiredProperty(key)) {
Object val = ctx.nodeAttribute(key);
if (val != null && !val.equals(e.getValue()))
U.warn(log, "System property will override environment variable with the same name: " + key);
ctx.addNodeAttribute(key, e.getValue());
}
}
ctx.addNodeAttribute(IgniteNodeAttributes.ATTR_UPDATE_NOTIFIER_ENABLED, notifyEnabled);
if (log.isDebugEnabled())
log.debug("Added system properties to node attributes.");
}
catch (SecurityException e) {
throw new IgniteCheckedException("Failed to add system properties to node attributes due to security " +
"violation: " + e.getMessage());
}
// Add local network IPs and MACs.
String ips = F.concat(U.allLocalIps(), ", "); // Exclude loopbacks.
String macs = F.concat(U.allLocalMACs(), ", "); // Only enabled network interfaces.
// Ack network context.
if (log.isInfoEnabled()) {
log.info("Non-loopback local IPs: " + (F.isEmpty(ips) ? "N/A" : ips));
log.info("Enabled local MACs: " + (F.isEmpty(macs) ? "N/A" : macs));
}
// Warn about loopback.
if (ips.isEmpty() && macs.isEmpty())
U.warn(log, "Ignite is starting on loopback address... Only nodes on the same physical " +
"computer can participate in topology.",
"Ignite is starting on loopback address...");
// Stick in network context into attributes.
add(ATTR_IPS, (ips.isEmpty() ? "" : ips));
add(ATTR_MACS, (macs.isEmpty() ? "" : macs));
// Stick in some system level attributes
add(ATTR_JIT_NAME, U.getCompilerMx() == null ? "" : U.getCompilerMx().getName());
add(ATTR_BUILD_VER, VER_STR);
add(ATTR_BUILD_DATE, BUILD_TSTAMP_STR);
add(ATTR_MARSHALLER, cfg.getMarshaller().getClass().getName());
add(ATTR_MARSHALLER_USE_DFLT_SUID,
getBoolean(IGNITE_OPTIMIZED_MARSHALLER_USE_DEFAULT_SUID, OptimizedMarshaller.USE_DFLT_SUID));
add(ATTR_LATE_AFFINITY_ASSIGNMENT, cfg.isLateAffinityAssignment());
add(ATTR_ACTIVE_ON_START, cfg.isActiveOnStart());
if (cfg.getMarshaller() instanceof BinaryMarshaller) {
add(ATTR_MARSHALLER_COMPACT_FOOTER, cfg.getBinaryConfiguration() == null ?
BinaryConfiguration.DFLT_COMPACT_FOOTER :
cfg.getBinaryConfiguration().isCompactFooter());
add(ATTR_MARSHALLER_USE_BINARY_STRING_SER_VER_2,
getBoolean(IGNITE_BINARY_MARSHALLER_USE_STRING_SERIALIZATION_VER_2,
BinaryUtils.USE_STR_SERIALIZATION_VER_2));
}
add(ATTR_USER_NAME, System.getProperty("user.name"));
add(ATTR_IGNITE_INSTANCE_NAME, igniteInstanceName);
add(ATTR_PEER_CLASSLOADING, cfg.isPeerClassLoadingEnabled());
add(ATTR_DEPLOYMENT_MODE, cfg.getDeploymentMode());
add(ATTR_LANG_RUNTIME, getLanguage());
add(ATTR_JVM_PID, U.jvmPid());
add(ATTR_CLIENT_MODE, cfg.isClientMode());
add(ATTR_CONSISTENCY_CHECK_SKIPPED, getBoolean(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK));
if (cfg.getConsistentId() != null)
add(ATTR_NODE_CONSISTENT_ID, cfg.getConsistentId());
// Build a string from JVM arguments, because parameters with spaces are split.
SB jvmArgs = new SB(512);
for (String arg : U.jvmArgs()) {
if (arg.startsWith("-"))
jvmArgs.a("@@@");
else
jvmArgs.a(' ');
jvmArgs.a(arg);
}
// Add it to attributes.
add(ATTR_JVM_ARGS, jvmArgs.toString());
// Check daemon system property and override configuration if it's set.
if (isDaemon())
add(ATTR_DAEMON, "true");
// In case of the parsing error, JMX remote disabled or port not being set
// node attribute won't be set.
if (isJmxRemoteEnabled()) {
String portStr = System.getProperty("com.sun.management.jmxremote.port");
if (portStr != null)
try {
add(ATTR_JMX_PORT, Integer.parseInt(portStr));
}
catch (NumberFormatException ignore) {
// No-op.
}
}
// Whether restart is enabled and stick the attribute.
add(ATTR_RESTART_ENABLED, Boolean.toString(isRestartEnabled()));
// Save port range, port numbers will be stored by rest processor at runtime.
if (cfg.getConnectorConfiguration() != null)
add(ATTR_REST_PORT_RANGE, cfg.getConnectorConfiguration().getPortRange());
// Stick in SPI versions and classes attributes.
addSpiAttributes(cfg.getCollisionSpi());
addSpiAttributes(cfg.getDiscoverySpi());
addSpiAttributes(cfg.getFailoverSpi());
addSpiAttributes(cfg.getCommunicationSpi());
addSpiAttributes(cfg.getEventStorageSpi());
addSpiAttributes(cfg.getCheckpointSpi());
addSpiAttributes(cfg.getLoadBalancingSpi());
addSpiAttributes(cfg.getDeploymentSpi());
// Set user attributes for this node.
if (cfg.getUserAttributes() != null) {
for (Map.Entry<String, ?> e : cfg.getUserAttributes().entrySet()) {
if (ctx.hasNodeAttribute(e.getKey()))
U.warn(log, "User or internal attribute has the same name as environment or system " +
"property and will take precedence: " + e.getKey());
ctx.addNodeAttribute(e.getKey(), e.getValue());
}
}
}
/**
* Add SPI version and class attributes into node attributes.
*
* @param spiList Collection of SPIs to get attributes from.
* @throws IgniteCheckedException Thrown if was unable to set up attribute.
*/
private void addSpiAttributes(IgniteSpi... spiList) throws IgniteCheckedException {
for (IgniteSpi spi : spiList) {
Class<? extends IgniteSpi> spiCls = spi.getClass();
add(U.spiAttribute(spi, ATTR_SPI_CLASS), spiCls.getName());
}
}
/** @throws IgniteCheckedException If registration failed. */
private void registerKernalMBean() throws IgniteCheckedException {
try {
kernalMBean = U.registerMBean(
cfg.getMBeanServer(),
cfg.getIgniteInstanceName(),
"Kernal",
getClass().getSimpleName(),
this,
IgniteMXBean.class);
if (log.isDebugEnabled())
log.debug("Registered kernal MBean: " + kernalMBean);
}
catch (JMException e) {
kernalMBean = null;
throw new IgniteCheckedException("Failed to register kernal MBean.", e);
}
}
/** @throws IgniteCheckedException If registration failed. */
private void registerLocalNodeMBean() throws IgniteCheckedException {
ClusterLocalNodeMetricsMXBean mbean = new ClusterLocalNodeMetricsMXBeanImpl(ctx.discovery().localNode());
try {
locNodeMBean = U.registerMBean(
cfg.getMBeanServer(),
cfg.getIgniteInstanceName(),
"Kernal",
mbean.getClass().getSimpleName(),
mbean,
ClusterLocalNodeMetricsMXBean.class);
if (log.isDebugEnabled())
log.debug("Registered local node MBean: " + locNodeMBean);
}
catch (JMException e) {
locNodeMBean = null;
throw new IgniteCheckedException("Failed to register local node MBean.", e);
}
}
/**
* @param execSvc Public executor service.
* @param sysExecSvc System executor service.
* @param p2pExecSvc P2P executor service.
* @param mgmtExecSvc Management executor service.
* @param restExecSvc Query executor service.
* @param schemaExecSvc Schema executor service.
* @throws IgniteCheckedException If failed.
*/
private void registerExecutorMBeans(ExecutorService execSvc,
ExecutorService sysExecSvc,
ExecutorService p2pExecSvc,
ExecutorService mgmtExecSvc,
ExecutorService restExecSvc,
ExecutorService qryExecSvc,
ExecutorService schemaExecSvc) throws IgniteCheckedException {
pubExecSvcMBean = registerExecutorMBean(execSvc, "GridExecutionExecutor");
sysExecSvcMBean = registerExecutorMBean(sysExecSvc, "GridSystemExecutor");
mgmtExecSvcMBean = registerExecutorMBean(mgmtExecSvc, "GridManagementExecutor");
p2PExecSvcMBean = registerExecutorMBean(p2pExecSvc, "GridClassLoadingExecutor");
qryExecSvcMBean = registerExecutorMBean(qryExecSvc, "GridQueryExecutor");
schemaExecSvcMBean = registerExecutorMBean(schemaExecSvc, "GridSchemaExecutor");
ConnectorConfiguration clientCfg = cfg.getConnectorConfiguration();
if (clientCfg != null)
restExecSvcMBean = registerExecutorMBean(restExecSvc, "GridRestExecutor");
}
/**
* @param exec Executor service to register.
* @param name Property name for executor.
* @return Name for created MBean.
* @throws IgniteCheckedException If registration failed.
*/
private ObjectName registerExecutorMBean(ExecutorService exec, String name) throws IgniteCheckedException {
assert exec != null;
try {
ObjectName res = U.registerMBean(
cfg.getMBeanServer(),
cfg.getIgniteInstanceName(),
"Thread Pools",
name,
new ThreadPoolMXBeanAdapter(exec),
ThreadPoolMXBean.class);
if (log.isDebugEnabled())
log.debug("Registered executor service MBean: " + res);
return res;
}
catch (JMException e) {
throw new IgniteCheckedException("Failed to register executor service MBean [name=" + name +
", exec=" + exec + ']', e);
}
}
/**
* @param stripedExecSvc Executor service.
* @throws IgniteCheckedException If registration failed.
*/
private void registerStripedExecutorMBean(StripedExecutor stripedExecSvc) throws IgniteCheckedException {
if (stripedExecSvc != null) {
String name = "StripedExecutor";
try {
stripedExecSvcMBean = U.registerMBean(
cfg.getMBeanServer(),
cfg.getIgniteInstanceName(),
"Thread Pools",
name,
new StripedExecutorMXBeanAdapter(stripedExecSvc),
StripedExecutorMXBean.class);
if (log.isDebugEnabled())
log.debug("Registered executor service MBean: " + stripedExecSvcMBean);
} catch (JMException e) {
throw new IgniteCheckedException("Failed to register executor service MBean [name="
+ name + ", exec=" + stripedExecSvc + ']', e);
}
}
}
/**
* Unregisters given mbean.
*
* @param mbean MBean to unregister.
* @return {@code True} if successfully unregistered, {@code false} otherwise.
*/
private boolean unregisterMBean(@Nullable ObjectName mbean) {
if (mbean != null)
try {
cfg.getMBeanServer().unregisterMBean(mbean);
if (log.isDebugEnabled())
log.debug("Unregistered MBean: " + mbean);
return true;
}
catch (JMException e) {
U.error(log, "Failed to unregister MBean.", e);
return false;
}
return true;
}
/**
* @param mgr Manager to start.
* @throws IgniteCheckedException Throw in case of any errors.
*/
private void startManager(GridManager mgr) throws IgniteCheckedException {
// Add manager to registry before it starts to avoid cases when manager is started
// but registry does not have it yet.
ctx.add(mgr);
try {
if (!skipDaemon(mgr))
mgr.start(cfg.isActiveOnStart());
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to start manager: " + mgr, e);
throw new IgniteCheckedException("Failed to start manager: " + mgr, e);
}
}
/**
* @param proc Processor to start.
* @throws IgniteCheckedException Thrown in case of any error.
*/
private void startProcessor(GridProcessor proc) throws IgniteCheckedException {
ctx.add(proc);
try {
if (!skipDaemon(proc))
proc.start(cfg.isActiveOnStart());
}
catch (IgniteCheckedException e) {
throw new IgniteCheckedException("Failed to start processor: " + proc, e);
}
}
/**
* Add helper.
*
* @param helper Helper.
*/
private void addHelper(Object helper) {
ctx.addHelper(helper);
}
/**
* Gets "on" or "off" string for given boolean value.
*
* @param b Boolean value to convert.
* @return Result string.
*/
private String onOff(boolean b) {
return b ? "on" : "off";
}
/**
*
* @return Whether or not REST is enabled.
*/
private boolean isRestEnabled() {
assert cfg != null;
return cfg.getConnectorConfiguration() != null &&
// By default rest processor doesn't start on client nodes.
(!isClientNode() || (isClientNode() && IgniteSystemProperties.getBoolean(IGNITE_REST_START_ON_CLIENT)));
}
/**
* @return {@code True} if node client or daemon otherwise {@code false}.
*/
private boolean isClientNode() {
return cfg.isClientMode() || cfg.isDaemon();
}
/**
* Acks remote management.
*/
private void ackRemoteManagement() {
assert log != null;
if (!log.isInfoEnabled())
return;
SB sb = new SB();
sb.a("Remote Management [");
boolean on = isJmxRemoteEnabled();
sb.a("restart: ").a(onOff(isRestartEnabled())).a(", ");
sb.a("REST: ").a(onOff(isRestEnabled())).a(", ");
sb.a("JMX (");
sb.a("remote: ").a(onOff(on));
if (on) {
sb.a(", ");
sb.a("port: ").a(System.getProperty("com.sun.management.jmxremote.port", "<n/a>")).a(", ");
sb.a("auth: ").a(onOff(Boolean.getBoolean("com.sun.management.jmxremote.authenticate"))).a(", ");
// By default SSL is enabled, that's why additional check for null is needed.
// See http://docs.oracle.com/javase/6/docs/technotes/guides/management/agent.html
sb.a("ssl: ").a(onOff(Boolean.getBoolean("com.sun.management.jmxremote.ssl") ||
System.getProperty("com.sun.management.jmxremote.ssl") == null));
}
sb.a(")");
sb.a(']');
log.info(sb.toString());
}
/**
* Acks configuration URL.
*/
private void ackConfigUrl() {
assert log != null;
if (log.isInfoEnabled())
log.info("Config URL: " + System.getProperty(IGNITE_CONFIG_URL, "n/a"));
}
/**
* Acks ASCII-logo. Thanks to http://patorjk.com/software/taag
*/
private void ackAsciiLogo() {
assert log != null;
if (System.getProperty(IGNITE_NO_ASCII) == null) {
String ver = "ver. " + ACK_VER_STR;
// Big thanks to: http://patorjk.com/software/taag
// Font name "Small Slant"
if (log.isInfoEnabled()) {
log.info(NL + NL +
">>> __________ ________________ " + NL +
">>> / _/ ___/ |/ / _/_ __/ __/ " + NL +
">>> _/ // (7 7 // / / / / _/ " + NL +
">>> /___/\\___/_/|_/___/ /_/ /___/ " + NL +
">>> " + NL +
">>> " + ver + NL +
">>> " + COPYRIGHT + NL +
">>> " + NL +
">>> Ignite documentation: " + "http://" + SITE + NL
);
}
if (log.isQuiet()) {
U.quiet(false,
" __________ ________________ ",
" / _/ ___/ |/ / _/_ __/ __/ ",
" _/ // (7 7 // / / / / _/ ",
"/___/\\___/_/|_/___/ /_/ /___/ ",
"",
ver,
COPYRIGHT,
"",
"Ignite documentation: " + "http://" + SITE,
"",
"Quiet mode.");
String fileName = log.fileName();
if (fileName != null)
U.quiet(false, " ^-- Logging to file '" + fileName + '\'');
U.quiet(false,
" ^-- To see **FULL** console log here add -DIGNITE_QUIET=false or \"-v\" to ignite.{sh|bat}",
"");
}
}
}
/**
* Prints start info.
*
* @param rtBean Java runtime bean.
*/
private void ackStart(RuntimeMXBean rtBean) {
ClusterNode locNode = localNode();
if (log.isQuiet()) {
U.quiet(false, "");
U.quiet(false, "Ignite node started OK (id=" + U.id8(locNode.id()) +
(F.isEmpty(igniteInstanceName) ? "" : ", instance name=" + igniteInstanceName) + ')');
}
if (log.isInfoEnabled()) {
log.info("");
String ack = "Ignite ver. " + VER_STR + '#' + BUILD_TSTAMP_STR + "-sha1:" + REV_HASH_STR;
String dash = U.dash(ack.length());
SB sb = new SB();
for (GridPortRecord rec : ctx.ports().records())
sb.a(rec.protocol()).a(":").a(rec.port()).a(" ");
String str =
NL + NL +
">>> " + dash + NL +
">>> " + ack + NL +
">>> " + dash + NL +
">>> OS name: " + U.osString() + NL +
">>> CPU(s): " + locNode.metrics().getTotalCpus() + NL +
">>> Heap: " + U.heapSize(locNode, 2) + "GB" + NL +
">>> VM name: " + rtBean.getName() + NL +
(igniteInstanceName == null ? "" : ">>> Ignite instance name: " + igniteInstanceName + NL) +
">>> Local node [" +
"ID=" + locNode.id().toString().toUpperCase() +
", order=" + locNode.order() + ", clientMode=" + ctx.clientNode() +
"]" + NL +
">>> Local node addresses: " + U.addressesAsString(locNode) + NL +
">>> Local ports: " + sb + NL;
log.info(str);
}
}
/**
* Logs out OS information.
*/
private void ackOsInfo() {
assert log != null;
if (log.isQuiet())
U.quiet(false, "OS: " + U.osString());
if (log.isInfoEnabled()) {
log.info("OS: " + U.osString());
log.info("OS user: " + System.getProperty("user.name"));
int jvmPid = U.jvmPid();
log.info("PID: " + (jvmPid == -1 ? "N/A" : jvmPid));
}
}
/**
* Logs out language runtime.
*/
private void ackLanguageRuntime() {
assert log != null;
if (log.isQuiet())
U.quiet(false, "VM information: " + U.jdkString());
if (log.isInfoEnabled()) {
log.info("Language runtime: " + getLanguage());
log.info("VM information: " + U.jdkString());
log.info("VM total memory: " + U.heapSize(2) + "GB");
}
}
/**
* @return Language runtime.
*/
@SuppressWarnings("ThrowableInstanceNeverThrown")
private String getLanguage() {
boolean scala = false;
boolean groovy = false;
boolean clojure = false;
for (StackTraceElement elem : Thread.currentThread().getStackTrace()) {
String s = elem.getClassName().toLowerCase();
if (s.contains("scala")) {
scala = true;
break;
}
else if (s.contains("groovy")) {
groovy = true;
break;
}
else if (s.contains("clojure")) {
clojure = true;
break;
}
}
if (scala) {
try (InputStream in = getClass().getResourceAsStream("/library.properties")) {
Properties props = new Properties();
if (in != null)
props.load(in);
return "Scala ver. " + props.getProperty("version.number", "<unknown>");
}
catch (Exception ignore) {
return "Scala ver. <unknown>";
}
}
// How to get Groovy and Clojure version at runtime?!?
return groovy ? "Groovy" : clojure ? "Clojure" : U.jdkName() + " ver. " + U.jdkVersion();
}
/**
* Stops grid instance.
*
* @param cancel Whether or not to cancel running jobs.
*/
public void stop(boolean cancel) {
// Make sure that thread stopping grid is not interrupted.
boolean interrupted = Thread.interrupted();
try {
stop0(cancel);
}
finally {
if (interrupted)
Thread.currentThread().interrupt();
}
}
/**
* @return {@code True} if node started shutdown sequence.
*/
public boolean isStopping() {
return stopGuard.get();
}
/**
* @param cancel Whether or not to cancel running jobs.
*/
private void stop0(boolean cancel) {
gw.compareAndSet(null, new GridKernalGatewayImpl(igniteInstanceName));
GridKernalGateway gw = this.gw.get();
if (stopGuard.compareAndSet(false, true)) {
// Only one thread is allowed to perform stop sequence.
boolean firstStop = false;
GridKernalState state = gw.getState();
if (state == STARTED || state == DISCONNECTED)
firstStop = true;
else if (state == STARTING)
U.warn(log, "Attempt to stop starting grid. This operation " +
"cannot be guaranteed to be successful.");
if (firstStop) {
// Notify lifecycle beans.
if (log.isDebugEnabled())
log.debug("Notifying lifecycle beans.");
notifyLifecycleBeansEx(LifecycleEventType.BEFORE_NODE_STOP);
}
List<GridComponent> comps = ctx.components();
// Callback component in reverse order while kernal is still functional
// if called in the same thread, at least.
for (ListIterator<GridComponent> it = comps.listIterator(comps.size()); it.hasPrevious(); ) {
GridComponent comp = it.previous();
try {
if (!skipDaemon(comp))
comp.onKernalStop(cancel);
}
catch (Throwable e) {
errOnStop = true;
U.error(log, "Failed to pre-stop processor: " + comp, e);
if (e instanceof Error)
throw e;
}
}
if (starveTask != null)
starveTask.close();
if (metricsLogTask != null)
metricsLogTask.close();
if (longOpDumpTask != null)
longOpDumpTask.close();
boolean interrupted = false;
while (true) {
try {
if (gw.tryWriteLock(10))
break;
}
catch (InterruptedException ignored) {
// Preserve interrupt status & ignore.
// Note that interrupted flag is cleared.
interrupted = true;
}
}
if (interrupted)
Thread.currentThread().interrupt();
try {
assert gw.getState() == STARTED || gw.getState() == STARTING || gw.getState() == DISCONNECTED;
// No more kernal calls from this point on.
gw.setState(STOPPING);
ctx.cluster().get().clearNodeMap();
if (log.isDebugEnabled())
log.debug("Grid " + (igniteInstanceName == null ? "" : '\'' + igniteInstanceName + "' ") +
"is stopping.");
}
finally {
gw.writeUnlock();
}
// Stopping cache operations.
GridCacheProcessor cache = ctx.cache();
if (cache != null)
cache.blockGateways();
// Unregister MBeans.
if (!(
unregisterMBean(pubExecSvcMBean) &
unregisterMBean(sysExecSvcMBean) &
unregisterMBean(mgmtExecSvcMBean) &
unregisterMBean(p2PExecSvcMBean) &
unregisterMBean(kernalMBean) &
unregisterMBean(locNodeMBean) &
unregisterMBean(restExecSvcMBean) &
unregisterMBean(qryExecSvcMBean) &
unregisterMBean(schemaExecSvcMBean) &
unregisterMBean(stripedExecSvcMBean)
))
errOnStop = false;
// Stop components in reverse order.
for (ListIterator<GridComponent> it = comps.listIterator(comps.size()); it.hasPrevious(); ) {
GridComponent comp = it.previous();
try {
if (!skipDaemon(comp)) {
comp.stop(cancel);
if (log.isDebugEnabled())
log.debug("Component stopped: " + comp);
}
}
catch (Throwable e) {
errOnStop = true;
U.error(log, "Failed to stop component (ignoring): " + comp, e);
if (e instanceof Error)
throw (Error)e;
}
}
// Stops lifecycle aware components.
U.stopLifecycleAware(log, lifecycleAwares(cfg));
// Lifecycle notification.
notifyLifecycleBeansEx(LifecycleEventType.AFTER_NODE_STOP);
// Clean internal class/classloader caches to avoid stopped contexts held in memory.
U.clearClassCache();
MarshallerExclusions.clearCache();
BinaryEnumCache.clear();
gw.writeLock();
try {
gw.setState(STOPPED);
}
finally {
gw.writeUnlock();
}
// Ack stop.
if (log.isQuiet()) {
String nodeName = igniteInstanceName == null ? "" : "name=" + igniteInstanceName + ", ";
if (!errOnStop)
U.quiet(false, "Ignite node stopped OK [" + nodeName + "uptime=" +
X.timeSpan2HMSM(U.currentTimeMillis() - startTime) + ']');
else
U.quiet(true, "Ignite node stopped wih ERRORS [" + nodeName + "uptime=" +
X.timeSpan2HMSM(U.currentTimeMillis() - startTime) + ']');
}
if (log.isInfoEnabled())
if (!errOnStop) {
String ack = "Ignite ver. " + VER_STR + '#' + BUILD_TSTAMP_STR + "-sha1:" + REV_HASH_STR +
" stopped OK";
String dash = U.dash(ack.length());
log.info(NL + NL +
">>> " + dash + NL +
">>> " + ack + NL +
">>> " + dash + NL +
(igniteInstanceName == null ? "" : ">>> Ignite instance name: " + igniteInstanceName + NL) +
">>> Grid uptime: " + X.timeSpan2HMSM(U.currentTimeMillis() - startTime) +
NL +
NL);
}
else {
String ack = "Ignite ver. " + VER_STR + '#' + BUILD_TSTAMP_STR + "-sha1:" + REV_HASH_STR +
" stopped with ERRORS";
String dash = U.dash(ack.length());
log.info(NL + NL +
">>> " + ack + NL +
">>> " + dash + NL +
(igniteInstanceName == null ? "" : ">>> Ignite instance name: " + igniteInstanceName + NL) +
">>> Grid uptime: " + X.timeSpan2HMSM(U.currentTimeMillis() - startTime) +
NL +
">>> See log above for detailed error message." + NL +
">>> Note that some errors during stop can prevent grid from" + NL +
">>> maintaining correct topology since this node may have" + NL +
">>> not exited grid properly." + NL +
NL);
}
try {
U.onGridStop();
}
catch (InterruptedException ignored) {
// Preserve interrupt status.
Thread.currentThread().interrupt();
}
}
else {
// Proper notification.
if (log.isDebugEnabled()) {
if (gw.getState() == STOPPED)
log.debug("Grid is already stopped. Nothing to do.");
else
log.debug("Grid is being stopped by another thread. Aborting this stop sequence " +
"allowing other thread to finish.");
}
}
}
/**
* USED ONLY FOR TESTING.
*
* @param name Cache name.
* @param <K> Key type.
* @param <V> Value type.
* @return Internal cache instance.
*/
/*@java.test.only*/
public <K, V> GridCacheAdapter<K, V> internalCache(String name) {
CU.validateCacheName(name);
checkClusterState();
return ctx.cache().internalCache(name);
}
/**
* It's intended for use by internal marshalling implementation only.
*
* @return Kernal context.
*/
@Override public GridKernalContext context() {
return ctx;
}
/**
* Prints all system properties in debug mode.
*/
private void ackSystemProperties() {
assert log != null;
if (log.isDebugEnabled())
for (Map.Entry<Object, Object> entry : snapshot().entrySet())
log.debug("System property [" + entry.getKey() + '=' + entry.getValue() + ']');
}
/**
* Prints all user attributes in info mode.
*/
private void logNodeUserAttributes() {
assert log != null;
if (log.isInfoEnabled())
for (Map.Entry<?, ?> attr : cfg.getUserAttributes().entrySet())
log.info("Local node user attribute [" + attr.getKey() + '=' + attr.getValue() + ']');
}
/**
* Prints all environment variables in debug mode.
*/
private void ackEnvironmentVariables() {
assert log != null;
if (log.isDebugEnabled())
for (Map.Entry<?, ?> envVar : System.getenv().entrySet())
log.debug("Environment variable [" + envVar.getKey() + '=' + envVar.getValue() + ']');
}
/**
* Acks daemon mode status.
*/
private void ackDaemon() {
assert log != null;
if (log.isInfoEnabled())
log.info("Daemon mode: " + (isDaemon() ? "on" : "off"));
}
/**
*
* @return {@code True} is this node is daemon.
*/
private boolean isDaemon() {
assert cfg != null;
return cfg.isDaemon() || "true".equalsIgnoreCase(System.getProperty(IGNITE_DAEMON));
}
/**
* Whether or not remote JMX management is enabled for this node. Remote JMX management is
* enabled when the following system property is set:
* <ul>
* <li>{@code com.sun.management.jmxremote}</li>
* </ul>
*
* @return {@code True} if remote JMX management is enabled - {@code false} otherwise.
*/
@Override public boolean isJmxRemoteEnabled() {
return System.getProperty("com.sun.management.jmxremote") != null;
}
/**
* Whether or not node restart is enabled. Node restart us supported when this node was started
* with {@code bin/ignite.{sh|bat}} script using {@code -r} argument. Node can be
* programmatically restarted using {@link Ignition#restart(boolean)}} method.
*
* @return {@code True} if restart mode is enabled, {@code false} otherwise.
* @see Ignition#restart(boolean)
*/
@Override public boolean isRestartEnabled() {
return System.getProperty(IGNITE_SUCCESS_FILE) != null;
}
/**
* Prints all configuration properties in info mode and SPIs in debug mode.
*/
private void ackSpis() {
assert log != null;
if (log.isDebugEnabled()) {
log.debug("+-------------+");
log.debug("START SPI LIST:");
log.debug("+-------------+");
log.debug("Grid checkpoint SPI : " + Arrays.toString(cfg.getCheckpointSpi()));
log.debug("Grid collision SPI : " + cfg.getCollisionSpi());
log.debug("Grid communication SPI : " + cfg.getCommunicationSpi());
log.debug("Grid deployment SPI : " + cfg.getDeploymentSpi());
log.debug("Grid discovery SPI : " + cfg.getDiscoverySpi());
log.debug("Grid event storage SPI : " + cfg.getEventStorageSpi());
log.debug("Grid failover SPI : " + Arrays.toString(cfg.getFailoverSpi()));
log.debug("Grid load balancing SPI : " + Arrays.toString(cfg.getLoadBalancingSpi()));
}
}
/**
*
*/
private void ackRebalanceConfiguration() throws IgniteCheckedException {
if (cfg.getSystemThreadPoolSize() <= cfg.getRebalanceThreadPoolSize())
throw new IgniteCheckedException("Rebalance thread pool size exceed or equals System thread pool size. " +
"Change IgniteConfiguration.rebalanceThreadPoolSize property before next start.");
if (cfg.getRebalanceThreadPoolSize() < 1)
throw new IgniteCheckedException("Rebalance thread pool size minimal allowed value is 1. " +
"Change IgniteConfiguration.rebalanceThreadPoolSize property before next start.");
for (CacheConfiguration ccfg : cfg.getCacheConfiguration()) {
if (ccfg.getRebalanceBatchesPrefetchCount() < 1)
throw new IgniteCheckedException("Rebalance batches prefetch count minimal allowed value is 1. " +
"Change CacheConfiguration.rebalanceBatchesPrefetchCount property before next start. " +
"[cache=" + ccfg.getName() + "]");
}
}
/**
*
*/
private void ackMemoryConfiguration() {
MemoryConfiguration memCfg = cfg.getMemoryConfiguration();
if (memCfg == null)
return;
U.log(log, "System cache's MemoryPolicy size is configured to " +
(memCfg.getSystemCacheInitialSize() / (1024 * 1024)) + " MB. " +
"Use MemoryConfiguration.systemCacheMemorySize property to change the setting.");
}
/**
*
*/
private void ackCacheConfiguration() {
CacheConfiguration[] cacheCfgs = cfg.getCacheConfiguration();
if (cacheCfgs == null || cacheCfgs.length == 0)
U.warn(log, "Cache is not configured - in-memory data grid is off.");
else {
SB sb = new SB();
HashMap<String, ArrayList<String>> memPlcNamesMapping = new HashMap<>();
for (CacheConfiguration c : cacheCfgs) {
String cacheName = U.maskName(c.getName());
String memPlcName = U.maskName(c.getMemoryPolicyName());
if (!memPlcNamesMapping.containsKey(memPlcName))
memPlcNamesMapping.put(memPlcName, new ArrayList<String>());
ArrayList<String> cacheNames = memPlcNamesMapping.get(memPlcName);
cacheNames.add(cacheName);
}
for (Map.Entry<String, ArrayList<String>> e : memPlcNamesMapping.entrySet()) {
sb.a("in '").a(e.getKey()).a("' memoryPolicy: [");
for (String s : e.getValue())
sb.a("'").a(s).a("', ");
sb.d(sb.length() - 2, sb.length()).a("]");
}
U.log(log, "Configured caches [" + sb.toString() + ']');
}
}
/**
*
*/
private void ackP2pConfiguration() {
assert cfg != null;
if (cfg.isPeerClassLoadingEnabled())
U.warn(
log,
"Peer class loading is enabled (disable it in production for performance and " +
"deployment consistency reasons)",
"Peer class loading is enabled (disable it for better performance)"
);
}
/**
* Prints security status.
*/
private void ackSecurity() {
assert log != null;
U.quietAndInfo(log, "Security status [authentication=" + onOff(ctx.security().enabled())
+ ", tls/ssl=" + onOff(ctx.config().getSslContextFactory() != null) + ']');
}
/**
* Prints out VM arguments and IGNITE_HOME in info mode.
*
* @param rtBean Java runtime bean.
*/
private void ackVmArguments(RuntimeMXBean rtBean) {
assert log != null;
// Ack IGNITE_HOME and VM arguments.
if (log.isInfoEnabled()) {
log.info("IGNITE_HOME=" + cfg.getIgniteHome());
log.info("VM arguments: " + rtBean.getInputArguments());
}
}
/**
* Prints out class paths in debug mode.
*
* @param rtBean Java runtime bean.
*/
private void ackClassPaths(RuntimeMXBean rtBean) {
assert log != null;
// Ack all class paths.
if (log.isDebugEnabled()) {
log.debug("Boot class path: " + rtBean.getBootClassPath());
log.debug("Class path: " + rtBean.getClassPath());
log.debug("Library path: " + rtBean.getLibraryPath());
}
}
/**
* @param cfg Grid configuration.
* @return Components provided in configuration which can implement {@link LifecycleAware} interface.
*/
private Iterable<Object> lifecycleAwares(IgniteConfiguration cfg) {
Collection<Object> objs = new ArrayList<>();
if (cfg.getLifecycleBeans() != null)
Collections.addAll(objs, cfg.getLifecycleBeans());
if (cfg.getSegmentationResolvers() != null)
Collections.addAll(objs, cfg.getSegmentationResolvers());
if (cfg.getConnectorConfiguration() != null) {
objs.add(cfg.getConnectorConfiguration().getMessageInterceptor());
objs.add(cfg.getConnectorConfiguration().getSslContextFactory());
}
objs.add(cfg.getMarshaller());
objs.add(cfg.getGridLogger());
objs.add(cfg.getMBeanServer());
return objs;
}
/** {@inheritDoc} */
@Override public IgniteConfiguration configuration() {
return cfg;
}
/** {@inheritDoc} */
@Override public IgniteLogger log() {
return cfg.getGridLogger();
}
/** {@inheritDoc} */
@Override public boolean removeCheckpoint(String key) {
A.notNull(key, "key");
guard();
try {
checkClusterState();
return ctx.checkpoint().removeCheckpoint(key);
}
finally {
unguard();
}
}
/** {@inheritDoc} */
@Override public boolean pingNode(String nodeId) {
A.notNull(nodeId, "nodeId");
return cluster().pingNode(UUID.fromString(nodeId));
}
/** {@inheritDoc} */
@Override public void undeployTaskFromGrid(String taskName) throws JMException {
A.notNull(taskName, "taskName");
try {
compute().undeployTask(taskName);
}
catch (IgniteException e) {
throw U.jmException(e);
}
}
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public String executeTask(String taskName, String arg) throws JMException {
try {
return compute().execute(taskName, arg);
}
catch (IgniteException e) {
throw U.jmException(e);
}
}
/** {@inheritDoc} */
@Override public boolean pingNodeByAddress(String host) {
guard();
try {
for (ClusterNode n : cluster().nodes())
if (n.addresses().contains(host))
return ctx.discovery().pingNode(n.id());
return false;
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
}
finally {
unguard();
}
}
/** {@inheritDoc} */
@Override public boolean eventUserRecordable(int type) {
guard();
try {
return ctx.event().isUserRecordable(type);
}
finally {
unguard();
}
}
/** {@inheritDoc} */
@Override public boolean allEventsUserRecordable(int[] types) {
A.notNull(types, "types");
guard();
try {
return ctx.event().isAllUserRecordable(types);
}
finally {
unguard();
}
}
/** {@inheritDoc} */
@Override public IgniteTransactions transactions() {
guard();
try {
checkClusterState();
return ctx.cache().transactions();
}
finally {
unguard();
}
}
/**
* @param name Cache name.
* @return Cache.
*/
public <K, V> IgniteInternalCache<K, V> getCache(String name) {
CU.validateCacheName(name);
guard();
try {
checkClusterState();
return ctx.cache().publicCache(name);
}
finally {
unguard();
}
}
/** {@inheritDoc} */
@Override public <K, V> IgniteCache<K, V> cache(String name) {
CU.validateCacheName(name);
guard();
try {
checkClusterState();
return ctx.cache().publicJCache(name, false, true);
}
catch (IgniteCheckedException e) {
throw CU.convertToCacheException(e);
}
finally {
unguard();
}
}
/** {@inheritDoc} */
@Override public <K, V> IgniteCache<K, V> createCache(CacheConfiguration<K, V> cacheCfg) {
A.notNull(cacheCfg, "cacheCfg");
CU.validateCacheName(cacheCfg.getName());
guard();
try {
checkClusterState();
ctx.cache().dynamicStartCache(cacheCfg,
cacheCfg.getName(),
null,
true,
true,
true).get();
return ctx.cache().publicJCache(cacheCfg.getName());
}
catch (IgniteCheckedException e) {
throw CU.convertToCacheException(e);
}
finally {
unguard();
}
}
/** {@inheritDoc} */
@Override public Collection<IgniteCache> createCaches(Collection<CacheConfiguration> cacheCfgs) {
A.notNull(cacheCfgs, "cacheCfgs");
CU.validateConfigurationCacheNames(cacheCfgs);
guard();
try {
ctx.cache().dynamicStartCaches(cacheCfgs,
true,
true).get();
List<IgniteCache> createdCaches = new ArrayList<>(cacheCfgs.size());
for (CacheConfiguration cacheCfg : cacheCfgs)
createdCaches.add(ctx.cache().publicJCache(cacheCfg.getName()));
return createdCaches;
}
catch (IgniteCheckedException e) {
throw CU.convertToCacheException(e);
}
finally {
unguard();
}
}
/** {@inheritDoc} */
@Override public <K, V> IgniteCache<K, V> createCache(String cacheName) {
CU.validateCacheName(cacheName);
guard();
try {
checkClusterState();
ctx.cache().createFromTemplate(cacheName).get();
return ctx.cache().publicJCache(cacheName);
}
catch (IgniteCheckedException e) {
throw CU.convertToCacheException(e);
}
finally {
unguard();
}
}
/** {@inheritDoc} */
@Override public <K, V> IgniteCache<K, V> getOrCreateCache(CacheConfiguration<K, V> cacheCfg) {
A.notNull(cacheCfg, "cacheCfg");
CU.validateCacheName(cacheCfg.getName());
guard();
try {
checkClusterState();
if (ctx.cache().cache(cacheCfg.getName()) == null) {
ctx.cache().dynamicStartCache(cacheCfg,
cacheCfg.getName(),
null,
false,
true,
true).get();
}
return ctx.cache().publicJCache(cacheCfg.getName());
}
catch (IgniteCheckedException e) {
throw CU.convertToCacheException(e);
}
finally {
unguard();
}
}
/** {@inheritDoc} */
@Override public Collection<IgniteCache> getOrCreateCaches(Collection<CacheConfiguration> cacheCfgs) {
A.notNull(cacheCfgs, "cacheCfgs");
CU.validateConfigurationCacheNames(cacheCfgs);
guard();
try {
ctx.cache().dynamicStartCaches(cacheCfgs,
false,
true).get();
List<IgniteCache> createdCaches = new ArrayList<>(cacheCfgs.size());
for (CacheConfiguration cacheCfg : cacheCfgs)
createdCaches.add(ctx.cache().publicJCache(cacheCfg.getName()));
return createdCaches;
}
catch (IgniteCheckedException e) {
throw CU.convertToCacheException(e);
}
finally {
unguard();
}
}
/** {@inheritDoc} */
@Override public <K, V> IgniteCache<K, V> createCache(
CacheConfiguration<K, V> cacheCfg,
NearCacheConfiguration<K, V> nearCfg
) {
A.notNull(cacheCfg, "cacheCfg");
CU.validateCacheName(cacheCfg.getName());
A.notNull(nearCfg, "nearCfg");
guard();
try {
checkClusterState();
ctx.cache().dynamicStartCache(cacheCfg,
cacheCfg.getName(),
nearCfg,
true,
true,
true).get();
return ctx.cache().publicJCache(cacheCfg.getName());
}
catch (IgniteCheckedException e) {
throw CU.convertToCacheException(e);
}
finally {
unguard();
}
}
/** {@inheritDoc} */
@Override public <K, V> IgniteCache<K, V> getOrCreateCache(CacheConfiguration<K, V> cacheCfg,
NearCacheConfiguration<K, V> nearCfg) {
A.notNull(cacheCfg, "cacheCfg");
CU.validateCacheName(cacheCfg.getName());
A.notNull(nearCfg, "nearCfg");
guard();
try {
checkClusterState();
IgniteInternalCache<Object, Object> cache = ctx.cache().cache(cacheCfg.getName());
if (cache == null) {
ctx.cache().dynamicStartCache(cacheCfg,
cacheCfg.getName(),
nearCfg,
false,
true,
true).get();
}
else {
if (cache.configuration().getNearConfiguration() == null) {
ctx.cache().dynamicStartCache(cacheCfg,
cacheCfg.getName(),
nearCfg,
false,
true,
true).get();
}
}
return ctx.cache().publicJCache(cacheCfg.getName());
}
catch (IgniteCheckedException e) {
throw CU.convertToCacheException(e);
}
finally {
unguard();
}
}
/** {@inheritDoc} */
@Override public <K, V> IgniteCache<K, V> createNearCache(String cacheName, NearCacheConfiguration<K, V> nearCfg) {
CU.validateCacheName(cacheName);
A.notNull(nearCfg, "nearCfg");
guard();
try {
checkClusterState();
ctx.cache().dynamicStartCache(null,
cacheName,
nearCfg,
true,
true,
true).get();
IgniteCacheProxy<K, V> cache = ctx.cache().publicJCache(cacheName);
checkNearCacheStarted(cache);
return cache;
}
catch (IgniteCheckedException e) {
throw CU.convertToCacheException(e);
}
finally {
unguard();
}
}
/** {@inheritDoc} */
@Override public <K, V> IgniteCache<K, V> getOrCreateNearCache(String cacheName,
NearCacheConfiguration<K, V> nearCfg) {
CU.validateCacheName(cacheName);
A.notNull(nearCfg, "nearCfg");
guard();
try {
checkClusterState();
IgniteInternalCache<Object, Object> internalCache = ctx.cache().cache(cacheName);
if (internalCache == null) {
ctx.cache().dynamicStartCache(null,
cacheName,
nearCfg,
false,
true,
true).get();
}
else {
if (internalCache.configuration().getNearConfiguration() == null) {
ctx.cache().dynamicStartCache(null,
cacheName,
nearCfg,
false,
true,
true).get();
}
}
IgniteCacheProxy<K, V> cache = ctx.cache().publicJCache(cacheName);
checkNearCacheStarted(cache);
return cache;
}
catch (IgniteCheckedException e) {
throw CU.convertToCacheException(e);
}
finally {
unguard();
}
}
/**
* @param cache Cache.
* @throws IgniteCheckedException If cache without near cache was already started.
*/
private void checkNearCacheStarted(IgniteCacheProxy<?, ?> cache) throws IgniteCheckedException {
if (!cache.context().isNear())
throw new IgniteCheckedException("Failed to start near cache " +
"(a cache with the same name without near cache is already started)");
}
/** {@inheritDoc} */
@Override public void destroyCache(String cacheName) {
CU.validateCacheName(cacheName);
IgniteInternalFuture stopFut = destroyCacheAsync(cacheName, true);
try {
stopFut.get();
}
catch (IgniteCheckedException e) {
throw CU.convertToCacheException(e);
}
}
/** {@inheritDoc} */
@Override public void destroyCaches(Collection<String> cacheNames) {
CU.validateCacheNames(cacheNames);
IgniteInternalFuture stopFut = destroyCachesAsync(cacheNames, true);
try {
stopFut.get();
}
catch (IgniteCheckedException e) {
throw CU.convertToCacheException(e);
}
}
/**
* @param cacheName Cache name.
* @param checkThreadTx If {@code true} checks that current thread does not have active transactions.
* @return Ignite future.
*/
public IgniteInternalFuture<?> destroyCacheAsync(String cacheName, boolean checkThreadTx) {
CU.validateCacheName(cacheName);
guard();
try {
checkClusterState();
return ctx.cache().dynamicDestroyCache(cacheName, checkThreadTx);
}
finally {
unguard();
}
}
/**
* @param cacheNames Collection of cache names.
* @param checkThreadTx If {@code true} checks that current thread does not have active transactions.
* @return Ignite future.
*/
public IgniteInternalFuture<?> destroyCachesAsync(Collection<String> cacheNames, boolean checkThreadTx) {
CU.validateCacheNames(cacheNames);
guard();
try {
return ctx.cache().dynamicDestroyCaches(cacheNames, checkThreadTx);
}
finally {
unguard();
}
}
/** {@inheritDoc} */
@Override public <K, V> IgniteCache<K, V> getOrCreateCache(String cacheName) {
CU.validateCacheName(cacheName);
guard();
try {
checkClusterState();
if (ctx.cache().cache(cacheName) == null)
ctx.cache().getOrCreateFromTemplate(cacheName, true).get();
return ctx.cache().publicJCache(cacheName);
}
catch (IgniteCheckedException e) {
throw CU.convertToCacheException(e);
}
finally {
unguard();
}
}
/**
* @param cacheName Cache name.
* @param checkThreadTx If {@code true} checks that current thread does not have active transactions.
* @return Future that will be completed when cache is deployed.
*/
public IgniteInternalFuture<?> getOrCreateCacheAsync(String cacheName, boolean checkThreadTx) {
CU.validateCacheName(cacheName);
guard();
try {
checkClusterState();
if (ctx.cache().cache(cacheName) == null)
return ctx.cache().getOrCreateFromTemplate(cacheName, checkThreadTx);
return new GridFinishedFuture<>();
}
finally {
unguard();
}
}
/** {@inheritDoc} */
@Override public <K, V> void addCacheConfiguration(CacheConfiguration<K, V> cacheCfg) {
A.notNull(cacheCfg, "cacheCfg");
CU.validateCacheName(cacheCfg.getName());
guard();
try {
checkClusterState();
ctx.cache().addCacheConfiguration(cacheCfg);
}
catch (IgniteCheckedException e) {
throw CU.convertToCacheException(e);
}
finally {
unguard();
}
}
/**
* @return Public caches.
*/
public Collection<IgniteCacheProxy<?, ?>> caches() {
guard();
try {
checkClusterState();
return ctx.cache().publicCaches();
}
finally {
unguard();
}
}
/** {@inheritDoc} */
@Override public Collection<String> cacheNames() {
guard();
try {
checkClusterState();
return ctx.cache().publicCacheNames();
}
finally {
unguard();
}
}
/** {@inheritDoc} */
@Override public <K extends GridCacheUtilityKey, V> IgniteInternalCache<K, V> utilityCache() {
guard();
try {
checkClusterState();
return ctx.cache().utilityCache();
}
finally {
unguard();
}
}
/** {@inheritDoc} */
@Override public <K, V> IgniteInternalCache<K, V> cachex(String name) {
CU.validateCacheName(name);
guard();
try {
checkClusterState();
return ctx.cache().cache(name);
}
finally {
unguard();
}
}
/** {@inheritDoc} */
@Override public Collection<IgniteInternalCache<?, ?>> cachesx(
IgnitePredicate<? super IgniteInternalCache<?, ?>>[] p) {
guard();
try {
checkClusterState();
return F.retain(ctx.cache().caches(), true, p);
}
finally {
unguard();
}
}
/** {@inheritDoc} */
@Override public <K, V> IgniteDataStreamer<K, V> dataStreamer(String cacheName) {
CU.validateCacheName(cacheName);
guard();
try {
checkClusterState();
return ctx.<K, V>dataStream().dataStreamer(cacheName);
}
finally {
unguard();
}
}
/** {@inheritDoc} */
@Override public IgniteFileSystem fileSystem(String name) {
if (name == null)
throw new IllegalArgumentException("IGFS name cannot be null");
guard();
try {
checkClusterState();
IgniteFileSystem fs = ctx.igfs().igfs(name);
if (fs == null)
throw new IllegalArgumentException("IGFS is not configured: " + name);
return fs;
}
finally {
unguard();
}
}
/** {@inheritDoc} */
@Nullable @Override public IgniteFileSystem igfsx(String name) {
if (name == null)
throw new IllegalArgumentException("IGFS name cannot be null");
guard();
try {
checkClusterState();
return ctx.igfs().igfs(name);
}
finally {
unguard();
}
}
/** {@inheritDoc} */
@Override public Collection<IgniteFileSystem> fileSystems() {
guard();
try {
checkClusterState();
return ctx.igfs().igfss();
}
finally {
unguard();
}
}
/** {@inheritDoc} */
@Override public Hadoop hadoop() {
guard();
try {
checkClusterState();
return ctx.hadoop().hadoop();
}
finally {
unguard();
}
}
/** {@inheritDoc} */
@Override public <T extends IgnitePlugin> T plugin(String name) throws PluginNotFoundException {
guard();
try {
return (T)ctx.pluginProvider(name).plugin();
}
finally {
unguard();
}
}
/** {@inheritDoc} */
@Override public IgniteBinary binary() {
checkClusterState();
IgniteCacheObjectProcessor objProc = ctx.cacheObjects();
return objProc.binary();
}
/** {@inheritDoc} */
@Override public IgniteProductVersion version() {
return VER;
}
/** {@inheritDoc} */
@Override public String latestVersion() {
ctx.gateway().readLock();
try {
return ctx.cluster().latestVersion();
}
finally {
ctx.gateway().readUnlock();
}
}
/** {@inheritDoc} */
@Override public IgniteScheduler scheduler() {
return scheduler;
}
/** {@inheritDoc} */
@Override public void close() throws IgniteException {
Ignition.stop(igniteInstanceName, true);
}
@Override public <K> Affinity<K> affinity(String cacheName) {
CU.validateCacheName(cacheName);
checkClusterState();
GridCacheAdapter<K, ?> cache = ctx.cache().internalCache(cacheName);
if (cache != null)
return cache.affinity();
return ctx.affinity().affinityProxy(cacheName);
}
/** {@inheritDoc} */
@Override public boolean active() {
guard();
try {
return context().state().active();
}
finally {
unguard();
}
}
/** {@inheritDoc} */
@Override public void active(boolean active) {
guard();
try {
context().state().changeGlobalState(active).get();
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
}
finally {
unguard();
}
}
/** {@inheritDoc} */
@Override public void resetLostPartitions(Collection<String> cacheNames) {
CU.validateCacheNames(cacheNames);
guard();
try {
ctx.cache().resetCacheState(cacheNames).get();
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
}
finally {
unguard();
}
}
/** {@inheritDoc} */
@Override public Collection<MemoryMetrics> memoryMetrics() {
guard();
try {
return ctx.cache().context().database().memoryMetrics();
}
finally {
unguard();
}
}
/** {@inheritDoc} */
@Nullable @Override public IgniteAtomicSequence atomicSequence(String name, long initVal, boolean create) {
guard();
try {
checkClusterState();
return ctx.dataStructures().sequence(name, initVal, create);
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
}
finally {
unguard();
}
}
/** {@inheritDoc} */
@Nullable @Override public IgniteAtomicLong atomicLong(String name, long initVal, boolean create) {
guard();
try {
checkClusterState();
return ctx.dataStructures().atomicLong(name, initVal, create);
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
}
finally {
unguard();
}
}
/** {@inheritDoc} */
@Nullable @Override public <T> IgniteAtomicReference<T> atomicReference(
String name,
@Nullable T initVal,
boolean create
) {
guard();
try {
checkClusterState();
return ctx.dataStructures().atomicReference(name, initVal, create);
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
}
finally {
unguard();
}
}
/** {@inheritDoc} */
@Nullable @Override public <T, S> IgniteAtomicStamped<T, S> atomicStamped(String name,
@Nullable T initVal,
@Nullable S initStamp,
boolean create) {
guard();
try {
checkClusterState();
return ctx.dataStructures().atomicStamped(name, initVal, initStamp, create);
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
}
finally {
unguard();
}
}
/** {@inheritDoc} */
@Nullable @Override public IgniteCountDownLatch countDownLatch(String name,
int cnt,
boolean autoDel,
boolean create) {
guard();
try {
checkClusterState();
return ctx.dataStructures().countDownLatch(name, cnt, autoDel, create);
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
}
finally {
unguard();
}
}
/** {@inheritDoc} */
@Nullable @Override public IgniteSemaphore semaphore(
String name,
int cnt,
boolean failoverSafe,
boolean create
) {
guard();
try {
checkClusterState();
return ctx.dataStructures().semaphore(name, cnt, failoverSafe, create);
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
}
finally {
unguard();
}
}
/** {@inheritDoc} */
@Nullable @Override public IgniteLock reentrantLock(
String name,
boolean failoverSafe,
boolean fair,
boolean create
) {
guard();
try {
checkClusterState();
return ctx.dataStructures().reentrantLock(name, failoverSafe, fair, create);
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
}
finally {
unguard();
}
}
/** {@inheritDoc} */
@Nullable @Override public <T> IgniteQueue<T> queue(String name,
int cap,
CollectionConfiguration cfg) {
guard();
try {
checkClusterState();
return ctx.dataStructures().queue(name, cap, cfg);
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
}
finally {
unguard();
}
}
/** {@inheritDoc} */
@Nullable @Override public <T> IgniteSet<T> set(String name,
CollectionConfiguration cfg) {
guard();
try {
checkClusterState();
return ctx.dataStructures().set(name, cfg);
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
}
finally {
unguard();
}
}
/**
* <tt>ctx.gateway().readLock()</tt>
*/
private void guard() {
assert ctx != null;
ctx.gateway().readLock();
}
/**
* <tt>ctx.gateway().readUnlock()</tt>
*/
private void unguard() {
assert ctx != null;
ctx.gateway().readUnlock();
}
/**
* Validate operation on cluster. Check current cluster state.
*
* @throws IgniteException if cluster in inActive state
*/
private void checkClusterState() throws IgniteException {
if (!ctx.state().active())
throw new IgniteException("can not perform operation, because cluster inactive");
}
/**
*
*/
public void onDisconnected() {
Throwable err = null;
reconnectState.waitPreviousReconnect();
GridFutureAdapter<?> reconnectFut = ctx.gateway().onDisconnected();
if (reconnectFut == null) {
assert ctx.gateway().getState() != STARTED : ctx.gateway().getState();
return;
}
IgniteFutureImpl<?> curFut = (IgniteFutureImpl<?>)ctx.cluster().get().clientReconnectFuture();
IgniteFuture<?> userFut;
// In case of previous reconnect did not finish keep reconnect future.
if (curFut != null && curFut.internalFuture() == reconnectFut)
userFut = curFut;
else {
userFut = new IgniteFutureImpl<>(reconnectFut);
ctx.cluster().get().clientReconnectFuture(userFut);
}
ctx.disconnected(true);
List<GridComponent> comps = ctx.components();
for (ListIterator<GridComponent> it = comps.listIterator(comps.size()); it.hasPrevious(); ) {
GridComponent comp = it.previous();
try {
if (!skipDaemon(comp))
comp.onDisconnected(userFut);
}
catch (IgniteCheckedException e) {
err = e;
}
catch (Throwable e) {
err = e;
if (e instanceof Error)
throw e;
}
}
for (GridCacheContext cctx : ctx.cache().context().cacheContexts()) {
cctx.gate().writeLock();
cctx.gate().writeUnlock();
}
ctx.gateway().writeLock();
ctx.gateway().writeUnlock();
if (err != null) {
reconnectFut.onDone(err);
U.error(log, "Failed to reconnect, will stop node", err);
close();
}
}
/**
* @param clusterRestarted {@code True} if all cluster nodes restarted while client was disconnected.
*/
@SuppressWarnings("unchecked")
public void onReconnected(final boolean clusterRestarted) {
Throwable err = null;
try {
ctx.disconnected(false);
GridCompoundFuture curReconnectFut = reconnectState.curReconnectFut = new GridCompoundFuture<>();
reconnectState.reconnectDone = new GridFutureAdapter<>();
for (GridComponent comp : ctx.components()) {
IgniteInternalFuture<?> fut = comp.onReconnected(clusterRestarted);
if (fut != null)
curReconnectFut.add(fut);
}
curReconnectFut.add(ctx.cache().context().exchange().reconnectExchangeFuture());
curReconnectFut.markInitialized();
final GridFutureAdapter reconnectDone = reconnectState.reconnectDone;
curReconnectFut.listen(new CI1<IgniteInternalFuture<?>>() {
@Override public void apply(IgniteInternalFuture<?> fut) {
try {
Object res = fut.get();
if (res == STOP_RECONNECT)
return;
ctx.gateway().onReconnected();
reconnectState.firstReconnectFut.onDone();
}
catch (IgniteCheckedException e) {
if (!X.hasCause(e, IgniteNeedReconnectException.class,
IgniteClientDisconnectedCheckedException.class)) {
U.error(log, "Failed to reconnect, will stop node.", e);
reconnectState.firstReconnectFut.onDone(e);
close();
}
else {
assert ctx.discovery().reconnectSupported();
U.error(log, "Failed to finish reconnect, will retry [locNodeId=" + ctx.localNodeId() +
", err=" + e.getMessage() + ']');
}
}
finally {
reconnectDone.onDone();
}
}
});
}
catch (IgniteCheckedException e) {
err = e;
}
catch (Throwable e) {
err = e;
if (e instanceof Error)
throw e;
}
if (err != null) {
U.error(log, "Failed to reconnect, will stop node", err);
close();
}
}
/**
* Creates optional component.
*
* @param cls Component interface.
* @param ctx Kernal context.
* @return Created component.
* @throws IgniteCheckedException If failed to create component.
*/
private static <T extends GridComponent> T createComponent(Class<T> cls, GridKernalContext ctx)
throws IgniteCheckedException {
assert cls.isInterface() : cls;
T comp = ctx.plugins().createComponent(cls);
if (comp != null)
return comp;
if (cls.equals(IgniteCacheObjectProcessor.class))
return (T)new CacheObjectBinaryProcessorImpl(ctx);
if (cls.equals(DiscoveryNodeValidationProcessor.class))
return (T)new OsDiscoveryNodeValidationProcessor(ctx);
Class<T> implCls = null;
try {
String clsName;
// Handle special case for PlatformProcessor
if (cls.equals(PlatformProcessor.class))
clsName = ctx.config().getPlatformConfiguration() == null ?
PlatformNoopProcessor.class.getName() : cls.getName() + "Impl";
else
clsName = componentClassName(cls);
implCls = (Class<T>)Class.forName(clsName);
}
catch (ClassNotFoundException ignore) {
// No-op.
}
if (implCls == null)
throw new IgniteCheckedException("Failed to find component implementation: " + cls.getName());
if (!cls.isAssignableFrom(implCls))
throw new IgniteCheckedException("Component implementation does not implement component interface " +
"[component=" + cls.getName() + ", implementation=" + implCls.getName() + ']');
Constructor<T> constructor;
try {
constructor = implCls.getConstructor(GridKernalContext.class);
}
catch (NoSuchMethodException e) {
throw new IgniteCheckedException("Component does not have expected constructor: " + implCls.getName(), e);
}
try {
return constructor.newInstance(ctx);
}
catch (ReflectiveOperationException e) {
throw new IgniteCheckedException("Failed to create component [component=" + cls.getName() +
", implementation=" + implCls.getName() + ']', e);
}
}
/**
* @param cls Component interface.
* @return Name of component implementation class for open source edition.
*/
private static String componentClassName(Class<?> cls) {
return cls.getPackage().getName() + ".os." + cls.getSimpleName().replace("Grid", "GridOs");
}
/** {@inheritDoc} */
@Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
igniteInstanceName = U.readString(in);
}
/** {@inheritDoc} */
@Override public void writeExternal(ObjectOutput out) throws IOException {
U.writeString(out, igniteInstanceName);
}
/**
* @return IgniteKernal instance.
*
* @throws ObjectStreamException If failed.
*/
protected Object readResolve() throws ObjectStreamException {
try {
return IgnitionEx.localIgnite();
}
catch (IllegalStateException e) {
throw U.withCause(new InvalidObjectException(e.getMessage()), e);
}
}
/**
* @param comp Grid component.
* @return {@code true} if node running in daemon mode and component marked by {@code SkipDaemon} annotation.
*/
private boolean skipDaemon(GridComponent comp) {
return ctx.isDaemon() && U.hasAnnotation(comp.getClass(), SkipDaemon.class);
}
/** {@inheritDoc} */
public void dumpDebugInfo() {
try {
GridKernalContextImpl ctx = this.ctx;
GridDiscoveryManager discoMrg = ctx != null ? ctx.discovery() : null;
ClusterNode locNode = discoMrg != null ? discoMrg.localNode() : null;
if (ctx != null && discoMrg != null && locNode != null) {
boolean client = ctx.clientNode();
UUID routerId = locNode instanceof TcpDiscoveryNode ? ((TcpDiscoveryNode)locNode).clientRouterNodeId() : null;
U.warn(log, "Dumping debug info for node [id=" + locNode.id() +
", name=" + ctx.igniteInstanceName() +
", order=" + locNode.order() +
", topVer=" + discoMrg.topologyVersion() +
", client=" + client +
(client && routerId != null ? ", routerId=" + routerId : "") + ']');
ctx.cache().context().exchange().dumpDebugInfo();
}
else
U.warn(log, "Dumping debug info for node, context is not initialized [name=" + igniteInstanceName +
']');
}
catch (Exception e) {
U.error(log, "Failed to dump debug info for node: " + e, e);
}
}
/**
* @param node Node.
* @param payload Message payload.
* @param procFromNioThread If {@code true} message is processed from NIO thread.
* @return Response future.
*/
public IgniteInternalFuture sendIoTest(ClusterNode node, byte[] payload, boolean procFromNioThread) {
return ctx.io().sendIoTest(node, payload, procFromNioThread);
}
/**
* @param nodes Nodes.
* @param payload Message payload.
* @param procFromNioThread If {@code true} message is processed from NIO thread.
* @return Response future.
*/
public IgniteInternalFuture sendIoTest(List<ClusterNode> nodes, byte[] payload, boolean procFromNioThread) {
return ctx.io().sendIoTest(nodes, payload, procFromNioThread);
}
/**
*
*/
private class ReconnectState {
/** */
private final GridFutureAdapter firstReconnectFut = new GridFutureAdapter();
/** */
private GridCompoundFuture<?, Object> curReconnectFut;
/** */
private GridFutureAdapter<?> reconnectDone;
/**
* @throws IgniteCheckedException If failed.
*/
void waitFirstReconnect() throws IgniteCheckedException {
firstReconnectFut.get();
}
/**
*
*/
void waitPreviousReconnect() {
if (curReconnectFut != null && !curReconnectFut.isDone()) {
assert reconnectDone != null;
curReconnectFut.onDone(STOP_RECONNECT);
try {
reconnectDone.get();
}
catch (IgniteCheckedException ignote) {
// No-op.
}
}
}
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(IgniteKernal.class, this);
}
}