blob: 7b9dd74e185942388f2f8421d570a8275e581525 [file] [log] [blame]
package org.apache.helix.task;
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.apache.helix.SystemPropertyKeys;
import org.apache.helix.manager.zk.ZKHelixManager;
import org.apache.helix.monitoring.mbeans.ThreadPoolExecutorMonitor;
import org.apache.helix.msdcommon.exception.InvalidRoutingDataException;
import org.apache.helix.participant.statemachine.StateModelFactory;
import org.apache.helix.util.HelixUtil;
import org.apache.helix.zookeeper.api.client.HelixZkClient;
import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
import org.apache.helix.zookeeper.impl.client.FederatedZkClient;
import org.apache.helix.zookeeper.impl.factory.SharedZkClientFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* Factory class for {@link TaskStateModel}.
public class TaskStateModelFactory extends StateModelFactory<TaskStateModel> {
private static Logger LOG = LoggerFactory.getLogger(TaskStateModelFactory.class);
// Unit in minutes. Need a retry timeout to prevent zkClient from hanging infinitely.
private static final int ZKCLIENT_OPERATION_RETRY_TIMEOUT = 5;
private final HelixManager _manager;
private final Map<String, TaskFactory> _taskFactoryRegistry;
private final ScheduledExecutorService _taskExecutor;
private final ScheduledExecutorService _timerTaskExecutor;
private ThreadPoolExecutorMonitor _monitor;
public TaskStateModelFactory(HelixManager manager, Map<String, TaskFactory> taskFactoryRegistry) {
this(manager, taskFactoryRegistry, createThreadPoolExecutor(manager));
// DO NOT USE! This size of provided thread pool will not be reflected to controller
// properly, the controller may over schedule tasks to this participant. Task Framework needs to
// have full control of the thread pool unlike the state transition thread pool.
public TaskStateModelFactory(HelixManager manager, Map<String, TaskFactory> taskFactoryRegistry,
ScheduledExecutorService taskExecutor) {
_manager = manager;
_taskFactoryRegistry = taskFactoryRegistry;
_taskExecutor = taskExecutor;
// TODO: Hunter: I'm not sure why this needs to be a single thread executor. We could certainly
// use more threads for timer tasks.
_timerTaskExecutor = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
public Thread newThread(Runnable r) {
return new Thread(r, "TaskStateModelFactory-timeTask_thread");
if (_taskExecutor instanceof ThreadPoolExecutor) {
try {
_monitor = new ThreadPoolExecutorMonitor(TaskConstants.STATE_MODEL_NAME,
(ThreadPoolExecutor) _taskExecutor);
} catch (JMException e) {
LOG.warn("Error in creating ThreadPoolExecutorMonitor for TaskStateModelFactory.", e);
public TaskStateModel createNewStateModel(String resourceName, String partitionKey) {
return new TaskStateModel(_manager, _taskFactoryRegistry, _taskExecutor, _timerTaskExecutor);
public void shutdown() {
if (_monitor != null) {
if (_monitor != null) {
void shutdownNow() {
if (_monitor != null) {
public boolean isShutdown() {
return _taskExecutor.isShutdown();
public boolean isTerminated() {
return _taskExecutor.isTerminated();
* Create a RealmAwareZkClient to get thread pool sizes
protected static RealmAwareZkClient createZkClient(HelixManager manager) {
if (!(manager instanceof ZKHelixManager)) {
// TODO: None-ZKHelixManager cannot initialize this class. After interface rework of
// HelixManager, the initialization should be allowed.
throw new UnsupportedOperationException(
"Only ZKHelixManager is supported for configurable thread pool.");
RealmAwareZkClient.RealmAwareZkClientConfig clientConfig =
new RealmAwareZkClient.RealmAwareZkClientConfig().setZkSerializer(new ZNRecordSerializer());
// Set operation retry timeout to prevent hanging infinitely
String zkAddress = manager.getMetadataStoreConnectionString();
if (Boolean.getBoolean(SystemPropertyKeys.MULTI_ZK_ENABLED) || zkAddress == null) {
RealmAwareZkClient.RealmAwareZkConnectionConfig zkConnectionConfig =
((ZKHelixManager) manager).getRealmAwareZkConnectionConfig();
// TODO: a fallback logic is created because it's possible for the ZKHelixManager to not
// have a connection config, since a connection config may be created during
// ZKHelixManager.connect(). This is the same problem as described earlier because connect()
// may happen before or after TaskStateModelFactory initialization. Clean this up after that
// problem is fixed.
if (zkConnectionConfig == null) {
String clusterName = manager.getClusterName();
String shardingKey = HelixUtil.clusterNameToShardingKey(clusterName);
zkConnectionConfig = new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder()
try {
return new FederatedZkClient(zkConnectionConfig, clientConfig);
} catch (InvalidRoutingDataException | IllegalArgumentException e) {
throw new HelixException("Failed to create FederatedZkClient!", e);
// Note: operation retry timeout doesn't take effect due to
return SharedZkClientFactory.getInstance()
.buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddress),
private static ScheduledExecutorService createThreadPoolExecutor(HelixManager manager) {
// TODO: revisit the logic here - we are creating a connection although we already have a
// manager. We cannot use the connection within manager because some users connect the manager
// after registering the state model factory (in which case we cannot use manager's connection),
// and some connect the manager before registering the state model factory (in which case we
// can use manager's connection). We need to think about the right order and determine if we
// want to enforce it, which may cause backward incompatibility.
RealmAwareZkClient zkClient = createZkClient(manager);
int targetThreadPoolSize;
// Ensure the zkClient is closed after reading the pool size;
try {
targetThreadPoolSize = TaskUtil
.getTargetThreadPoolSize(zkClient, manager.getClusterName(), manager.getInstanceName());
} finally {
"Obtained target thread pool size: {} from cluster {} for instance {}. Creating thread pool.",
targetThreadPoolSize, manager.getClusterName(), manager.getInstanceName());
return Executors.newScheduledThreadPool(targetThreadPoolSize, new ThreadFactory() {
private AtomicInteger threadId = new AtomicInteger(0);
public Thread newThread(Runnable r) {
return new Thread(r, "TaskStateModelFactory-task_thread-" + threadId.getAndIncrement());