blob: 256056c3e377fb20d5645cbe2d13987395409e13 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.yarn.entrypoint;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.ResourceManagerOptions;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.security.SecurityConfiguration;
import org.apache.flink.runtime.security.SecurityContext;
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.runtime.security.modules.HadoopModule;
import org.apache.flink.util.Preconditions;
import org.apache.flink.yarn.Utils;
import org.apache.flink.yarn.YarnConfigKeys;
import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.slf4j.Logger;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
/**
* This class contains utility methods for the {@link YarnSessionClusterEntrypoint} and
* {@link YarnJobClusterEntrypoint}.
*/
public class YarnEntrypointUtils {
public static SecurityContext installSecurityContext(
Configuration configuration,
String workingDirectory) throws Exception {
SecurityConfiguration sc;
//To support Yarn Secure Integration Test Scenario
File krb5Conf = new File(workingDirectory, Utils.KRB5_FILE_NAME);
if (krb5Conf.exists() && krb5Conf.canRead()) {
org.apache.hadoop.conf.Configuration hadoopConfiguration = new org.apache.hadoop.conf.Configuration();
hadoopConfiguration.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
hadoopConfiguration.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "true");
sc = new SecurityConfiguration(configuration,
Collections.singletonList(securityConfig -> new HadoopModule(securityConfig, hadoopConfiguration)));
} else {
sc = new SecurityConfiguration(configuration);
}
SecurityUtils.install(sc);
return SecurityUtils.getInstalledContext();
}
public static Configuration loadConfiguration(String workingDirectory, Map<String, String> env) {
Configuration configuration = GlobalConfiguration.loadConfiguration(workingDirectory);
final String remoteKeytabPrincipal = env.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
final String zooKeeperNamespace = env.get(YarnConfigKeys.ENV_ZOOKEEPER_NAMESPACE);
final Map<String, String> dynamicProperties = FlinkYarnSessionCli.getDynamicProperties(
env.get(YarnConfigKeys.ENV_DYNAMIC_PROPERTIES));
final String hostname = env.get(ApplicationConstants.Environment.NM_HOST.key());
Preconditions.checkState(
hostname != null,
"ApplicationMaster hostname variable %s not set",
ApplicationConstants.Environment.NM_HOST.key());
configuration.setString(JobManagerOptions.ADDRESS, hostname);
// TODO: Support port ranges for the AM
// final String portRange = configuration.getString(
// ConfigConstants.YARN_APPLICATION_MASTER_PORT,
// ConfigConstants.DEFAULT_YARN_JOB_MANAGER_PORT);
for (Map.Entry<String, String> property : dynamicProperties.entrySet()) {
configuration.setString(property.getKey(), property.getValue());
}
if (zooKeeperNamespace != null) {
configuration.setString(HighAvailabilityOptions.HA_CLUSTER_ID, zooKeeperNamespace);
}
// if a web monitor shall be started, set the port to random binding
if (configuration.getInteger(WebOptions.PORT, 0) >= 0) {
configuration.setInteger(WebOptions.PORT, 0);
}
// if the user has set the deprecated YARN-specific config keys, we add the
// corresponding generic config keys instead. that way, later code needs not
// deal with deprecated config keys
BootstrapTools.substituteDeprecatedConfigPrefix(configuration,
ConfigConstants.YARN_APPLICATION_MASTER_ENV_PREFIX,
ResourceManagerOptions.CONTAINERIZED_MASTER_ENV_PREFIX);
BootstrapTools.substituteDeprecatedConfigPrefix(configuration,
ConfigConstants.YARN_TASK_MANAGER_ENV_PREFIX,
ResourceManagerOptions.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX);
final String keytabPath;
if (env.get(YarnConfigKeys.KEYTAB_PATH) == null) {
keytabPath = null;
} else {
File f = new File(workingDirectory, Utils.KEYTAB_FILE_NAME);
keytabPath = f.getAbsolutePath();
}
if (keytabPath != null && remoteKeytabPrincipal != null) {
configuration.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, keytabPath);
configuration.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, remoteKeytabPrincipal);
}
return configuration;
}
public static void logYarnEnvironmentInformation(Map<String, String> env, Logger log) throws IOException {
final String yarnClientUsername = env.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
Preconditions.checkArgument(
yarnClientUsername != null,
"YARN client user name environment variable %s not set",
YarnConfigKeys.ENV_HADOOP_USER_NAME);
UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
log.info("YARN daemon is running as: {} Yarn client user obtainer: {}",
currentUser.getShortUserName(), yarnClientUsername);
}
}