blob: 301997a7c7c0fc63970f0eb0dcc2159173663c17 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.taskexecutor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.configuration.TaskManagerOptionsInternal;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.entrypoint.ClusterEntrypointUtils;
import org.apache.flink.runtime.entrypoint.WorkingDirectory;
import org.apache.flink.runtime.registration.RetryingRegistrationConfiguration;
import org.apache.flink.runtime.util.ConfigurationParserUtils;
import org.apache.flink.util.FlinkUserCodeClassLoaders;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.Reference;
import javax.annotation.Nullable;
import java.io.File;
import java.net.InetAddress;
import java.util.Optional;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* Configuration for the task manager services such as the memory manager, the io manager and the
* metric registry.
*/
public class TaskManagerServicesConfiguration {
private static final String LOCAL_STATE_SUB_DIRECTORY_ROOT = "localState_";
private final Configuration configuration;
private final ResourceID resourceID;
private final String externalAddress;
private final String nodeId;
private final InetAddress bindAddress;
private final int externalDataPort;
private final boolean localCommunicationOnly;
private final String[] tmpDirPaths;
private final Reference<File[]> localRecoveryStateDirectories;
private final int numberOfSlots;
@Nullable private final QueryableStateConfiguration queryableStateConfig;
private final int pageSize;
private final long timerServiceShutdownTimeout;
private final boolean localRecoveryEnabled;
private final RetryingRegistrationConfiguration retryingRegistrationConfiguration;
private Optional<Time> systemResourceMetricsProbingInterval;
private final TaskExecutorResourceSpec taskExecutorResourceSpec;
private final FlinkUserCodeClassLoaders.ResolveOrder classLoaderResolveOrder;
private final String[] alwaysParentFirstLoaderPatterns;
private final int numIoThreads;
private TaskManagerServicesConfiguration(
Configuration configuration,
ResourceID resourceID,
String externalAddress,
InetAddress bindAddress,
int externalDataPort,
boolean localCommunicationOnly,
String[] tmpDirPaths,
Reference<File[]> localRecoveryStateDirectories,
boolean localRecoveryEnabled,
@Nullable QueryableStateConfiguration queryableStateConfig,
int numberOfSlots,
int pageSize,
TaskExecutorResourceSpec taskExecutorResourceSpec,
long timerServiceShutdownTimeout,
RetryingRegistrationConfiguration retryingRegistrationConfiguration,
Optional<Time> systemResourceMetricsProbingInterval,
FlinkUserCodeClassLoaders.ResolveOrder classLoaderResolveOrder,
String[] alwaysParentFirstLoaderPatterns,
int numIoThreads,
String nodeId) {
this.configuration = checkNotNull(configuration);
this.resourceID = checkNotNull(resourceID);
this.externalAddress = checkNotNull(externalAddress);
this.bindAddress = checkNotNull(bindAddress);
this.externalDataPort = externalDataPort;
this.localCommunicationOnly = localCommunicationOnly;
this.tmpDirPaths = checkNotNull(tmpDirPaths);
this.localRecoveryStateDirectories = checkNotNull(localRecoveryStateDirectories);
this.localRecoveryEnabled = localRecoveryEnabled;
this.queryableStateConfig = queryableStateConfig;
this.numberOfSlots = numberOfSlots;
this.pageSize = pageSize;
this.taskExecutorResourceSpec = taskExecutorResourceSpec;
this.classLoaderResolveOrder = classLoaderResolveOrder;
this.alwaysParentFirstLoaderPatterns = alwaysParentFirstLoaderPatterns;
this.numIoThreads = numIoThreads;
checkArgument(
timerServiceShutdownTimeout >= 0L,
"The timer " + "service shutdown timeout must be greater or equal to 0.");
this.timerServiceShutdownTimeout = timerServiceShutdownTimeout;
this.retryingRegistrationConfiguration = checkNotNull(retryingRegistrationConfiguration);
this.systemResourceMetricsProbingInterval =
checkNotNull(systemResourceMetricsProbingInterval);
this.nodeId = checkNotNull(nodeId);
}
// --------------------------------------------------------------------------------------------
// Getter/Setter
// --------------------------------------------------------------------------------------------
public Configuration getConfiguration() {
return configuration;
}
public ResourceID getResourceID() {
return resourceID;
}
String getExternalAddress() {
return externalAddress;
}
InetAddress getBindAddress() {
return bindAddress;
}
int getExternalDataPort() {
return externalDataPort;
}
boolean isLocalCommunicationOnly() {
return localCommunicationOnly;
}
public String[] getTmpDirPaths() {
return tmpDirPaths;
}
Reference<File[]> getLocalRecoveryStateDirectories() {
return localRecoveryStateDirectories;
}
boolean isLocalRecoveryEnabled() {
return localRecoveryEnabled;
}
@Nullable
QueryableStateConfiguration getQueryableStateConfig() {
return queryableStateConfig;
}
public int getNumberOfSlots() {
return numberOfSlots;
}
public int getPageSize() {
return pageSize;
}
public TaskExecutorResourceSpec getTaskExecutorResourceSpec() {
return taskExecutorResourceSpec;
}
public MemorySize getNetworkMemorySize() {
return taskExecutorResourceSpec.getNetworkMemSize();
}
public MemorySize getManagedMemorySize() {
return taskExecutorResourceSpec.getManagedMemorySize();
}
long getTimerServiceShutdownTimeout() {
return timerServiceShutdownTimeout;
}
public Optional<Time> getSystemResourceMetricsProbingInterval() {
return systemResourceMetricsProbingInterval;
}
RetryingRegistrationConfiguration getRetryingRegistrationConfiguration() {
return retryingRegistrationConfiguration;
}
public FlinkUserCodeClassLoaders.ResolveOrder getClassLoaderResolveOrder() {
return classLoaderResolveOrder;
}
public String[] getAlwaysParentFirstLoaderPatterns() {
return alwaysParentFirstLoaderPatterns;
}
public int getNumIoThreads() {
return numIoThreads;
}
public String getNodeId() {
return nodeId;
}
// --------------------------------------------------------------------------------------------
// Parsing of Flink configuration
// --------------------------------------------------------------------------------------------
/**
* Utility method to extract TaskManager config parameters from the configuration and to sanity
* check them.
*
* @param configuration The configuration.
* @param resourceID resource ID of the task manager
* @param externalAddress identifying the IP address under which the TaskManager will be
* accessible
* @param localCommunicationOnly True if only local communication is possible. Use only in cases
* where only one task manager runs.
* @param taskExecutorResourceSpec resource specification of the TaskManager to start
* @param workingDirectory working directory of the TaskManager
* @return configuration of task manager services used to create them
*/
public static TaskManagerServicesConfiguration fromConfiguration(
Configuration configuration,
ResourceID resourceID,
String externalAddress,
boolean localCommunicationOnly,
TaskExecutorResourceSpec taskExecutorResourceSpec,
WorkingDirectory workingDirectory)
throws Exception {
String[] localStateRootDirs = ConfigurationUtils.parseLocalStateDirectories(configuration);
final Reference<File[]> localStateDirs;
if (localStateRootDirs.length == 0) {
localStateDirs =
Reference.borrowed(new File[] {workingDirectory.getLocalStateDirectory()});
} else {
File[] createdLocalStateDirs = new File[localStateRootDirs.length];
final String localStateDirectoryName = LOCAL_STATE_SUB_DIRECTORY_ROOT + resourceID;
for (int i = 0; i < localStateRootDirs.length; i++) {
createdLocalStateDirs[i] = new File(localStateRootDirs[i], localStateDirectoryName);
}
localStateDirs = Reference.owned(createdLocalStateDirs);
}
boolean localRecoveryMode = configuration.getBoolean(CheckpointingOptions.LOCAL_RECOVERY);
final QueryableStateConfiguration queryableStateConfig =
QueryableStateConfiguration.fromConfiguration(configuration);
long timerServiceShutdownTimeout =
configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION).toMillis();
final RetryingRegistrationConfiguration retryingRegistrationConfiguration =
RetryingRegistrationConfiguration.fromConfiguration(configuration);
final int externalDataPort =
configuration.getInteger(NettyShuffleEnvironmentOptions.DATA_PORT);
String bindAddr =
configuration.getString(
TaskManagerOptions.BIND_HOST, NetUtils.getWildcardIPAddress());
InetAddress bindAddress = InetAddress.getByName(bindAddr);
final String classLoaderResolveOrder =
configuration.getString(CoreOptions.CLASSLOADER_RESOLVE_ORDER);
final String[] alwaysParentFirstLoaderPatterns =
CoreOptions.getParentFirstLoaderPatterns(configuration);
final int numIoThreads = ClusterEntrypointUtils.getPoolSize(configuration);
final String[] tmpDirs = ConfigurationUtils.parseTempDirectories(configuration);
// If TaskManagerOptionsInternal.TASK_MANAGER_NODE_ID is not set, use the external address
// as the node id.
final String nodeId =
configuration
.getOptional(TaskManagerOptionsInternal.TASK_MANAGER_NODE_ID)
.orElse(externalAddress);
return new TaskManagerServicesConfiguration(
configuration,
resourceID,
externalAddress,
bindAddress,
externalDataPort,
localCommunicationOnly,
tmpDirs,
localStateDirs,
localRecoveryMode,
queryableStateConfig,
ConfigurationParserUtils.getSlot(configuration),
ConfigurationParserUtils.getPageSize(configuration),
taskExecutorResourceSpec,
timerServiceShutdownTimeout,
retryingRegistrationConfiguration,
ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration),
FlinkUserCodeClassLoaders.ResolveOrder.fromString(classLoaderResolveOrder),
alwaysParentFirstLoaderPatterns,
numIoThreads,
nodeId);
}
}