blob: 221c7bba670deab88802c121f214bbcbb20ab2df [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.pool;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.managers.communication.GridIoPolicy;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
import org.apache.ignite.internal.processors.plugin.IgnitePluginProcessor;
import org.apache.ignite.plugin.extensions.communication.IoPool;
import org.jetbrains.annotations.Nullable;
/**
* Processor which abstracts out thread pool management.
*/
public class PoolProcessor extends GridProcessorAdapter {
/** Map of {@link IoPool}-s injected by Ignite plugins. */
private final IoPool[] extPools = new IoPool[128];
/** Custom named pools. */
private final Map<String, ? extends ExecutorService> customExecs;
/**
* Constructor.
*
* @param ctx Kernal context.
*/
public PoolProcessor(GridKernalContext ctx) {
super(ctx);
IgnitePluginProcessor plugins = ctx.plugins();
if (plugins != null) {
// Process custom IO messaging pool extensions:
final IoPool[] executorExtensions = ctx.plugins().extensions(IoPool.class);
if (executorExtensions != null) {
// Store it into the map and check for duplicates:
for (IoPool ex : executorExtensions) {
final byte id = ex.id();
// 1. Check the pool id is non-negative:
if (id < 0)
throw new IgniteException("Failed to register IO executor pool because its ID is " +
"negative: " + id);
// 2. Check the pool id is in allowed range:
if (GridIoPolicy.isReservedGridIoPolicy(id))
throw new IgniteException("Failed to register IO executor pool because its ID in in the " +
"reserved range: " + id);
// 3. Check the pool for duplicates:
if (extPools[id] != null)
throw new IgniteException("Failed to register IO executor pool because its ID as " +
"already used: " + id);
extPools[id] = ex;
}
}
}
customExecs = ctx.customExecutors();
}
/** {@inheritDoc} */
@Override public void stop(boolean cancel) throws IgniteCheckedException {
// Avoid external thread pools GC retention.
Arrays.fill(extPools, null);
}
/**
* @return P2P pool.
*/
public Executor p2pPool() {
return ctx.getPeerClassLoadingExecutorService();
}
/**
* Get executor service for policy.
*
* @param plc Policy.
* @return Executor service.
* @throws IgniteCheckedException If failed.
*/
public Executor poolForPolicy(byte plc) throws IgniteCheckedException {
switch (plc) {
case GridIoPolicy.P2P_POOL:
return ctx.getPeerClassLoadingExecutorService();
case GridIoPolicy.SYSTEM_POOL:
return ctx.getSystemExecutorService();
case GridIoPolicy.PUBLIC_POOL:
return ctx.getExecutorService();
case GridIoPolicy.MANAGEMENT_POOL:
return ctx.getManagementExecutorService();
case GridIoPolicy.AFFINITY_POOL:
return ctx.getAffinityExecutorService();
case GridIoPolicy.IDX_POOL:
assert ctx.getIndexingExecutorService() != null : "Indexing pool is not configured.";
return ctx.getIndexingExecutorService();
case GridIoPolicy.UTILITY_CACHE_POOL:
assert ctx.utilityCachePool() != null : "Utility cache pool is not configured.";
return ctx.utilityCachePool();
case GridIoPolicy.IGFS_POOL:
assert ctx.getIgfsExecutorService() != null : "IGFS pool is not configured.";
return ctx.getIgfsExecutorService();
case GridIoPolicy.SERVICE_POOL:
assert ctx.getServiceExecutorService() != null : "Service pool is not configured.";
return ctx.getServiceExecutorService();
case GridIoPolicy.DATA_STREAMER_POOL:
assert ctx.getDataStreamerExecutorService() != null : "Data streamer pool is not configured.";
return ctx.getDataStreamerExecutorService();
case GridIoPolicy.QUERY_POOL:
assert ctx.getQueryExecutorService() != null : "Query pool is not configured.";
return ctx.getQueryExecutorService();
case GridIoPolicy.SCHEMA_POOL:
assert ctx.getSchemaExecutorService() != null : "Query pool is not configured.";
return ctx.getSchemaExecutorService();
default: {
if (plc < 0)
throw new IgniteCheckedException("Policy cannot be negative: " + plc);
if (GridIoPolicy.isReservedGridIoPolicy(plc))
throw new IgniteCheckedException("Policy is reserved for internal usage (range 0-31): " + plc);
IoPool pool = extPools[plc];
if (pool == null)
throw new IgniteCheckedException("No pool is registered for policy: " + plc);
assert plc == pool.id();
Executor res = pool.executor();
if (res == null)
throw new IgniteCheckedException("Thread pool for policy is null: " + plc);
return res;
}
}
}
/**
* Gets executor service for custom policy by executor name.
*
* @param name Executor name.
* @return Executor service.
*/
@Nullable public Executor customExecutor(String name) {
assert name != null;
Executor exec = null;
if (customExecs != null)
exec = customExecs.get(name);
return exec;
}
}