blob: f8fe6ce545329a71f5b5352d1c9235cf5abf9dc8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.pool;
import java.lang.Thread.UncaughtExceptionHandler;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.configuration.ExecutorConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.failure.FailureContext;
import org.apache.ignite.failure.FailureType;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteComponentType;
import org.apache.ignite.internal.managers.communication.GridIoPolicy;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
import org.apache.ignite.internal.processors.plugin.IgnitePluginProcessor;
import org.apache.ignite.internal.util.StripedExecutor;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.worker.WorkersRegistry;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.plugin.extensions.communication.IoPool;
import org.apache.ignite.thread.IgniteStripedThreadPoolExecutor;
import org.apache.ignite.thread.IgniteThreadPoolExecutor;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.configuration.IgniteConfiguration.DFLT_THREAD_KEEP_ALIVE_TIME;
import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION;
/**
* Processor which abstracts out thread pool management.
*/
public class PoolProcessor extends GridProcessorAdapter {
/** Executor service. */
@GridToStringExclude
private ThreadPoolExecutor execSvc;
/** Executor service for services. */
@GridToStringExclude
private ThreadPoolExecutor svcExecSvc;
/** System executor service. */
@GridToStringExclude
private ThreadPoolExecutor sysExecSvc;
/** */
@GridToStringExclude
private StripedExecutor stripedExecSvc;
/** Management executor service. */
@GridToStringExclude
private ThreadPoolExecutor mgmtExecSvc;
/** P2P executor service. */
@GridToStringExclude
private ThreadPoolExecutor p2pExecSvc;
/** Data streamer executor service. */
@GridToStringExclude
private StripedExecutor dataStreamerExecSvc;
/** REST requests executor service. */
@GridToStringExclude
private ThreadPoolExecutor restExecSvc;
/** Utility cache executor service. */
private ThreadPoolExecutor utilityCacheExecSvc;
/** Affinity executor service. */
@GridToStringExclude
private ThreadPoolExecutor affExecSvc;
/** Indexing pool. */
@GridToStringExclude
private ThreadPoolExecutor idxExecSvc;
/** Thread pool for create/rebuild indexes. */
@GridToStringExclude
private ThreadPoolExecutor buildIdxExecSvc;
/** Continuous query executor service. */
@GridToStringExclude
private IgniteStripedThreadPoolExecutor callbackExecSvc;
/** Query executor service. */
@GridToStringExclude
private ThreadPoolExecutor qryExecSvc;
/** Query executor service. */
@GridToStringExclude
private ThreadPoolExecutor schemaExecSvc;
/** Rebalance executor service. */
@GridToStringExclude
private ThreadPoolExecutor rebalanceExecSvc;
/** Rebalance striped executor service. */
@GridToStringExclude
private IgniteStripedThreadPoolExecutor rebalanceStripedExecSvc;
/** Map of {@link IoPool}-s injected by Ignite plugins. */
private final IoPool[] extPools = new IoPool[128];
/** Custom named pools. */
private Map<String, ThreadPoolExecutor> customExecs;
/**
* Constructor.
*
* @param ctx Kernal context.
*/
public PoolProcessor(GridKernalContext ctx) {
super(ctx);
IgnitePluginProcessor plugins = ctx.plugins();
if (plugins != null) {
// Process custom IO messaging pool extensions:
final IoPool[] executorExtensions = ctx.plugins().extensions(IoPool.class);
if (executorExtensions != null) {
// Store it into the map and check for duplicates:
for (IoPool ex : executorExtensions) {
final byte id = ex.id();
// 1. Check the pool id is non-negative:
if (id < 0)
throw new IgniteException("Failed to register IO executor pool because its ID is " +
"negative: " + id);
// 2. Check the pool id is in allowed range:
if (GridIoPolicy.isReservedGridIoPolicy(id))
throw new IgniteException("Failed to register IO executor pool because its ID in in the " +
"reserved range: " + id);
// 3. Check the pool for duplicates:
if (extPools[id] != null)
throw new IgniteException("Failed to register IO executor pool because its ID as " +
"already used: " + id);
extPools[id] = ex;
}
}
}
}
/** {@inheritDoc} */
@Override public void start() throws IgniteCheckedException {
super.start();
IgniteConfiguration cfg = ctx.config();
UncaughtExceptionHandler oomeHnd = ctx.uncaughtExceptionHandler();
UncaughtExceptionHandler excHnd = new UncaughtExceptionHandler() {
@Override public void uncaughtException(Thread t, Throwable e) {
ctx.failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
}
};
validateThreadPoolSize(cfg.getPublicThreadPoolSize(), "public");
execSvc = new IgniteThreadPoolExecutor(
"pub",
cfg.getIgniteInstanceName(),
cfg.getPublicThreadPoolSize(),
cfg.getPublicThreadPoolSize(),
DFLT_THREAD_KEEP_ALIVE_TIME,
new LinkedBlockingQueue<>(),
GridIoPolicy.PUBLIC_POOL,
oomeHnd);
execSvc.allowCoreThreadTimeOut(true);
validateThreadPoolSize(cfg.getServiceThreadPoolSize(), "service");
svcExecSvc = new IgniteThreadPoolExecutor(
"svc",
cfg.getIgniteInstanceName(),
cfg.getServiceThreadPoolSize(),
cfg.getServiceThreadPoolSize(),
DFLT_THREAD_KEEP_ALIVE_TIME,
new LinkedBlockingQueue<>(),
GridIoPolicy.SERVICE_POOL,
oomeHnd);
svcExecSvc.allowCoreThreadTimeOut(true);
validateThreadPoolSize(cfg.getSystemThreadPoolSize(), "system");
sysExecSvc = new IgniteThreadPoolExecutor(
"sys",
cfg.getIgniteInstanceName(),
cfg.getSystemThreadPoolSize(),
cfg.getSystemThreadPoolSize(),
DFLT_THREAD_KEEP_ALIVE_TIME,
new LinkedBlockingQueue<>(),
GridIoPolicy.SYSTEM_POOL,
oomeHnd);
sysExecSvc.allowCoreThreadTimeOut(true);
validateThreadPoolSize(cfg.getStripedPoolSize(), "stripedPool");
WorkersRegistry workerRegistry = ctx.workersRegistry();
stripedExecSvc = new StripedExecutor(
cfg.getStripedPoolSize(),
cfg.getIgniteInstanceName(),
"sys",
log,
new IgniteInClosure<Throwable>() {
@Override public void apply(Throwable t) {
ctx.failure().process(new FailureContext(SYSTEM_WORKER_TERMINATION, t));
}
},
workerRegistry,
cfg.getFailureDetectionTimeout());
// Note that since we use 'LinkedBlockingQueue', number of
// maximum threads has no effect.
// Note, that we do not pre-start threads here as management pool may
// not be needed.
validateThreadPoolSize(cfg.getManagementThreadPoolSize(), "management");
mgmtExecSvc = new IgniteThreadPoolExecutor(
"mgmt",
cfg.getIgniteInstanceName(),
cfg.getManagementThreadPoolSize(),
cfg.getManagementThreadPoolSize(),
DFLT_THREAD_KEEP_ALIVE_TIME,
new LinkedBlockingQueue<>(),
GridIoPolicy.MANAGEMENT_POOL,
oomeHnd);
mgmtExecSvc.allowCoreThreadTimeOut(true);
// Note that since we use 'LinkedBlockingQueue', number of
// maximum threads has no effect.
// Note, that we do not pre-start threads here as class loading pool may
// not be needed.
validateThreadPoolSize(cfg.getPeerClassLoadingThreadPoolSize(), "peer class loading");
p2pExecSvc = new IgniteThreadPoolExecutor(
"p2p",
cfg.getIgniteInstanceName(),
cfg.getPeerClassLoadingThreadPoolSize(),
cfg.getPeerClassLoadingThreadPoolSize(),
DFLT_THREAD_KEEP_ALIVE_TIME,
new LinkedBlockingQueue<>(),
GridIoPolicy.P2P_POOL,
oomeHnd);
p2pExecSvc.allowCoreThreadTimeOut(true);
dataStreamerExecSvc = new StripedExecutor(
cfg.getDataStreamerThreadPoolSize(),
cfg.getIgniteInstanceName(),
"data-streamer",
log,
new IgniteInClosure<Throwable>() {
@Override public void apply(Throwable t) {
ctx.failure().process(new FailureContext(SYSTEM_WORKER_TERMINATION, t));
}
},
true,
workerRegistry,
cfg.getFailureDetectionTimeout());
// Note that we do not pre-start threads here as this pool may not be needed.
validateThreadPoolSize(cfg.getAsyncCallbackPoolSize(), "async callback");
callbackExecSvc = new IgniteStripedThreadPoolExecutor(
cfg.getAsyncCallbackPoolSize(),
cfg.getIgniteInstanceName(),
"callback",
oomeHnd,
false,
0);
if (cfg.getConnectorConfiguration() != null) {
validateThreadPoolSize(cfg.getConnectorConfiguration().getThreadPoolSize(), "connector");
restExecSvc = new IgniteThreadPoolExecutor(
"rest",
cfg.getIgniteInstanceName(),
cfg.getConnectorConfiguration().getThreadPoolSize(),
cfg.getConnectorConfiguration().getThreadPoolSize(),
DFLT_THREAD_KEEP_ALIVE_TIME,
new LinkedBlockingQueue<>(),
GridIoPolicy.UNDEFINED,
oomeHnd
);
restExecSvc.allowCoreThreadTimeOut(true);
}
validateThreadPoolSize(cfg.getUtilityCacheThreadPoolSize(), "utility cache");
utilityCacheExecSvc = new IgniteThreadPoolExecutor(
"utility",
cfg.getIgniteInstanceName(),
cfg.getUtilityCacheThreadPoolSize(),
cfg.getUtilityCacheThreadPoolSize(),
cfg.getUtilityCacheKeepAliveTime(),
new LinkedBlockingQueue<>(),
GridIoPolicy.UTILITY_CACHE_POOL,
oomeHnd);
utilityCacheExecSvc.allowCoreThreadTimeOut(true);
affExecSvc = new IgniteThreadPoolExecutor(
"aff",
cfg.getIgniteInstanceName(),
1,
1,
DFLT_THREAD_KEEP_ALIVE_TIME,
new LinkedBlockingQueue<>(),
GridIoPolicy.AFFINITY_POOL,
oomeHnd);
affExecSvc.allowCoreThreadTimeOut(true);
if (IgniteComponentType.INDEXING.inClassPath()) {
int cpus = Runtime.getRuntime().availableProcessors();
idxExecSvc = new IgniteThreadPoolExecutor(
"idx",
cfg.getIgniteInstanceName(),
cpus,
cpus * 2,
3000L,
new LinkedBlockingQueue<>(1000),
GridIoPolicy.IDX_POOL,
oomeHnd
);
int buildIdxThreadPoolSize = cfg.getBuildIndexThreadPoolSize();
validateThreadPoolSize(buildIdxThreadPoolSize, "build-idx");
buildIdxExecSvc = new IgniteThreadPoolExecutor(
"build-idx-runner",
cfg.getIgniteInstanceName(),
buildIdxThreadPoolSize,
buildIdxThreadPoolSize,
DFLT_THREAD_KEEP_ALIVE_TIME,
new LinkedBlockingQueue<>(),
GridIoPolicy.UNDEFINED,
oomeHnd
);
buildIdxExecSvc.allowCoreThreadTimeOut(true);
}
validateThreadPoolSize(cfg.getQueryThreadPoolSize(), "query");
qryExecSvc = new IgniteThreadPoolExecutor(
"query",
cfg.getIgniteInstanceName(),
cfg.getQueryThreadPoolSize(),
cfg.getQueryThreadPoolSize(),
DFLT_THREAD_KEEP_ALIVE_TIME,
new LinkedBlockingQueue<>(),
GridIoPolicy.QUERY_POOL,
oomeHnd);
qryExecSvc.allowCoreThreadTimeOut(true);
schemaExecSvc = new IgniteThreadPoolExecutor(
"schema",
cfg.getIgniteInstanceName(),
2,
2,
DFLT_THREAD_KEEP_ALIVE_TIME,
new LinkedBlockingQueue<>(),
GridIoPolicy.SCHEMA_POOL,
oomeHnd);
schemaExecSvc.allowCoreThreadTimeOut(true);
validateThreadPoolSize(cfg.getRebalanceThreadPoolSize(), "rebalance");
rebalanceExecSvc = new IgniteThreadPoolExecutor(
"rebalance",
cfg.getIgniteInstanceName(),
cfg.getRebalanceThreadPoolSize(),
cfg.getRebalanceThreadPoolSize(),
DFLT_THREAD_KEEP_ALIVE_TIME,
new LinkedBlockingQueue<>(),
GridIoPolicy.UNDEFINED,
excHnd);
rebalanceExecSvc.allowCoreThreadTimeOut(true);
rebalanceStripedExecSvc = new IgniteStripedThreadPoolExecutor(
cfg.getRebalanceThreadPoolSize(),
cfg.getIgniteInstanceName(),
"rebalance-striped",
excHnd,
true,
DFLT_THREAD_KEEP_ALIVE_TIME);
if (!F.isEmpty(cfg.getExecutorConfiguration())) {
validateCustomExecutorsConfiguration(cfg.getExecutorConfiguration());
customExecs = new HashMap<>();
for (ExecutorConfiguration execCfg : cfg.getExecutorConfiguration()) {
ThreadPoolExecutor exec = new IgniteThreadPoolExecutor(
execCfg.getName(),
cfg.getIgniteInstanceName(),
execCfg.getSize(),
execCfg.getSize(),
DFLT_THREAD_KEEP_ALIVE_TIME,
new LinkedBlockingQueue<>(),
GridIoPolicy.UNDEFINED,
oomeHnd);
customExecs.put(execCfg.getName(), exec);
}
}
}
/** {@inheritDoc} */
@Override public void stop(boolean cancel) throws IgniteCheckedException {
// Avoid external thread pools GC retention.
Arrays.fill(extPools, null);
stopExecutors(log);
}
/**
* Get executor service for policy.
*
* @param plc Policy.
* @return Executor service.
* @throws IgniteCheckedException If failed.
*/
public Executor poolForPolicy(byte plc) throws IgniteCheckedException {
switch (plc) {
case GridIoPolicy.P2P_POOL:
return getPeerClassLoadingExecutorService();
case GridIoPolicy.SYSTEM_POOL:
return getSystemExecutorService();
case GridIoPolicy.PUBLIC_POOL:
return getExecutorService();
case GridIoPolicy.MANAGEMENT_POOL:
return getManagementExecutorService();
case GridIoPolicy.AFFINITY_POOL:
return getAffinityExecutorService();
case GridIoPolicy.IDX_POOL:
assert getIndexingExecutorService() != null : "Indexing pool is not configured.";
return getIndexingExecutorService();
case GridIoPolicy.UTILITY_CACHE_POOL:
assert utilityCachePool() != null : "Utility cache pool is not configured.";
return utilityCachePool();
case GridIoPolicy.SERVICE_POOL:
assert getServiceExecutorService() != null : "Service pool is not configured.";
return getServiceExecutorService();
case GridIoPolicy.DATA_STREAMER_POOL:
assert getDataStreamerExecutorService() != null : "Data streamer pool is not configured.";
return getDataStreamerExecutorService();
case GridIoPolicy.QUERY_POOL:
assert getQueryExecutorService() != null : "Query pool is not configured.";
return getQueryExecutorService();
case GridIoPolicy.SCHEMA_POOL:
assert getSchemaExecutorService() != null : "Query pool is not configured.";
return getSchemaExecutorService();
default: {
if (plc < 0)
throw new IgniteCheckedException("Policy cannot be negative: " + plc);
if (GridIoPolicy.isReservedGridIoPolicy(plc))
throw new IgniteCheckedException("Policy is reserved for internal usage (range 0-31): " + plc);
IoPool pool = extPools[plc];
if (pool == null)
throw new IgniteCheckedException("No pool is registered for policy: " + plc);
assert plc == pool.id();
Executor res = pool.executor();
if (res == null)
throw new IgniteCheckedException("Thread pool for policy is null: " + plc);
return res;
}
}
}
/**
* Gets executor service for custom policy by executor name.
*
* @param name Executor name.
* @return Executor service.
*/
@Nullable public Executor customExecutor(String name) {
assert name != null;
Executor exec = null;
if (customExecs != null)
exec = customExecs.get(name);
return exec;
}
/**
* Gets utility cache pool.
*
* @return Utility cache pool.
*/
public ExecutorService utilityCachePool() {
return utilityCacheExecSvc;
}
/**
* Gets async callback pool.
*
* @return Async callback pool.
*/
public IgniteStripedThreadPoolExecutor asyncCallbackPool() {
return callbackExecSvc;
}
/**
* @return Thread pool implementation to be used in grid to process job execution
* requests and user messages sent to the node.
*/
public ExecutorService getExecutorService() {
return execSvc;
}
/**
* Executor service that is in charge of processing service proxy invocations.
*
* @return Thread pool implementation to be used in grid for service proxy invocations.
*/
public ExecutorService getServiceExecutorService() {
return svcExecSvc;
}
/**
* Executor service that is in charge of processing internal system messages.
*
* @return Thread pool implementation to be used in grid for internal system messages.
*/
public ExecutorService getSystemExecutorService() {
return sysExecSvc;
}
/**
* Executor service that is in charge of processing internal system messages
* in stripes (dedicated threads).
*
* @return Thread pool implementation to be used in grid for internal system messages.
*/
public StripedExecutor getStripedExecutorService() {
return stripedExecSvc;
}
/**
* Executor service that is in charge of processing internal and Visor
* {@link org.apache.ignite.compute.ComputeJob GridJobs}.
*
* @return Thread pool implementation to be used in grid for internal and Visor
* jobs processing.
*/
public ExecutorService getManagementExecutorService() {
return mgmtExecSvc;
}
/**
* @return Thread pool implementation to be used for peer class loading
* requests handling.
*/
public ExecutorService getPeerClassLoadingExecutorService() {
return p2pExecSvc;
}
/**
* Executor service that is in charge of processing data stream messages.
*
* @return Thread pool implementation to be used for data stream messages.
*/
public StripedExecutor getDataStreamerExecutorService() {
return dataStreamerExecSvc;
}
/**
* Should return an instance of fully configured thread pool to be used for
* processing of client messages (REST requests).
*
* @return Thread pool implementation to be used for processing of client
* messages.
*/
public ExecutorService getRestExecutorService() {
return restExecSvc;
}
/**
* Get affinity executor service.
*
* @return Affinity executor service.
*/
public ExecutorService getAffinityExecutorService() {
return affExecSvc;
}
/**
* Get indexing executor service.
*
* @return Indexing executor service.
*/
@Nullable public ExecutorService getIndexingExecutorService() {
return idxExecSvc;
}
/**
* Executor service that is in charge of processing query messages.
*
* @return Thread pool implementation to be used in grid for query messages.
*/
public ExecutorService getQueryExecutorService() {
return qryExecSvc;
}
/**
* Executor services that is in charge of processing user compute task.
*
* @return Map of custom thread pool executors.
*/
@Nullable public Map<String, ? extends ExecutorService> customExecutors() {
return customExecs == null ? null : Collections.unmodifiableMap(customExecs);
}
/**
* Executor service that is in charge of processing schema change messages.
*
* @return Executor service that is in charge of processing schema change messages.
*/
public ExecutorService getSchemaExecutorService() {
return schemaExecSvc;
}
/**
* Executor service that is in charge of processing rebalance messages.
*
* @return Executor service that is in charge of processing rebalance messages.
*/
public ExecutorService getRebalanceExecutorService() {
return rebalanceExecSvc;
}
/**
* Executor service that is in charge of processing unorderable rebalance messages.
*
* @return Executor service that is in charge of processing unorderable rebalance messages.
*/
public IgniteStripedThreadPoolExecutor getStripedRebalanceExecutorService() {
return rebalanceStripedExecSvc;
}
/**
* Return Thread pool for create/rebuild indexes.
*
* @return Thread pool for create/rebuild indexes.
*/
public ExecutorService buildIndexExecutorService() {
return buildIdxExecSvc;
}
/**
* Stops executor services if they has been started.
*
* @param log Grid logger.
*/
private void stopExecutors(IgniteLogger log) {
boolean interrupted = Thread.interrupted();
try {
stopExecutors0(log);
}
finally {
if (interrupted)
Thread.currentThread().interrupt();
}
}
/**
* Stops executor services if they has been started.
*
* @param log Grid logger.
*/
private void stopExecutors0(IgniteLogger log) {
assert log != null;
U.shutdownNow(getClass(), execSvc, log);
execSvc = null;
U.shutdownNow(getClass(), svcExecSvc, log);
svcExecSvc = null;
U.shutdownNow(getClass(), sysExecSvc, log);
sysExecSvc = null;
U.shutdownNow(getClass(), qryExecSvc, log);
qryExecSvc = null;
U.shutdownNow(getClass(), schemaExecSvc, log);
schemaExecSvc = null;
U.shutdownNow(getClass(), rebalanceExecSvc, log);
rebalanceExecSvc = null;
U.shutdownNow(getClass(), rebalanceStripedExecSvc, log);
rebalanceStripedExecSvc = null;
U.shutdownNow(getClass(), stripedExecSvc, log);
stripedExecSvc = null;
U.shutdownNow(getClass(), mgmtExecSvc, log);
mgmtExecSvc = null;
U.shutdownNow(getClass(), p2pExecSvc, log);
p2pExecSvc = null;
U.shutdownNow(getClass(), dataStreamerExecSvc, log);
dataStreamerExecSvc = null;
if (restExecSvc != null)
U.shutdownNow(getClass(), restExecSvc, log);
restExecSvc = null;
U.shutdownNow(getClass(), utilityCacheExecSvc, log);
utilityCacheExecSvc = null;
U.shutdownNow(getClass(), affExecSvc, log);
affExecSvc = null;
U.shutdownNow(getClass(), idxExecSvc, log);
idxExecSvc = null;
U.shutdownNow(getClass(), buildIdxExecSvc, log);
buildIdxExecSvc = null;
U.shutdownNow(getClass(), callbackExecSvc, log);
callbackExecSvc = null;
if (!F.isEmpty(customExecs)) {
for (ThreadPoolExecutor exec : customExecs.values())
U.shutdownNow(getClass(), exec, log);
customExecs = null;
}
}
/**
* @param poolSize an actual value in the configuration.
* @param poolName a name of the pool like 'management'.
* @throws IgniteCheckedException If the poolSize is wrong.
*/
private static void validateThreadPoolSize(int poolSize, String poolName)
throws IgniteCheckedException {
if (poolSize <= 0) {
throw new IgniteCheckedException("Invalid " + poolName + " thread pool size" +
" (must be greater than 0), actual value: " + poolSize);
}
}
/**
* @param cfgs Array of the executors configurations.
* @throws IgniteCheckedException If configuration is wrong.
*/
private static void validateCustomExecutorsConfiguration(ExecutorConfiguration[] cfgs)
throws IgniteCheckedException {
if (cfgs == null)
return;
Set<String> names = new HashSet<>(cfgs.length);
for (ExecutorConfiguration cfg : cfgs) {
if (F.isEmpty(cfg.getName()))
throw new IgniteCheckedException("Custom executor name cannot be null or empty.");
if (!names.add(cfg.getName()))
throw new IgniteCheckedException("Duplicate custom executor name: " + cfg.getName());
if (cfg.getSize() <= 0)
throw new IgniteCheckedException("Custom executor size must be positive [name=" + cfg.getName() +
", size=" + cfg.getSize() + ']');
}
}
}