blob: c2347acb6c40a73d19a81e4d288b0c851a9dbac2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.yarn;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobType;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.flink.yarn.testjob.YarnTestArchiveJob;
import org.apache.flink.yarn.testjob.YarnTestCacheJob;
import org.apache.flink.yarn.util.TestUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import static org.apache.flink.yarn.configuration.YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
/** Test cases for the deployment of Yarn Flink clusters. */
public class YARNITCase extends YarnTestBase {
private static final Duration yarnAppTerminateTimeout = Duration.ofSeconds(10);
private static final int sleepIntervalInMS = 100;
@Rule public TemporaryFolder temporaryFolder = new TemporaryFolder();
@BeforeClass
public static void setup() {
YARN_CONFIGURATION.set(YarnTestBase.TEST_CLUSTER_NAME_KEY, "flink-yarn-tests-per-job");
startYARNWithConfig(YARN_CONFIGURATION, true);
}
@Test
public void testPerJobModeWithEnableSystemClassPathIncludeUserJar() throws Exception {
runTest(
() ->
deployPerJob(
createDefaultConfiguration(
YarnConfigOptions.UserJarInclusion.FIRST),
getTestingJobGraph(),
true));
}
@Test
public void testPerJobModeWithDisableSystemClassPathIncludeUserJar() throws Exception {
runTest(
() ->
deployPerJob(
createDefaultConfiguration(
YarnConfigOptions.UserJarInclusion.DISABLED),
getTestingJobGraph(),
true));
}
@Test
public void testPerJobModeWithDistributedCache() throws Exception {
runTest(
() ->
deployPerJob(
createDefaultConfiguration(
YarnConfigOptions.UserJarInclusion.DISABLED),
YarnTestCacheJob.getDistributedCacheJobGraph(tmp.newFolder()),
true));
}
@Test
public void testPerJobWithProvidedLibDirs() throws Exception {
final Path remoteLib =
new Path(
miniDFSCluster.getFileSystem().getUri().toString() + "/flink-provided-lib");
miniDFSCluster
.getFileSystem()
.copyFromLocalFile(new Path(flinkLibFolder.toURI()), remoteLib);
miniDFSCluster.getFileSystem().setPermission(remoteLib, new FsPermission("755"));
final Configuration flinkConfig =
createDefaultConfiguration(YarnConfigOptions.UserJarInclusion.DISABLED);
flinkConfig.set(
YarnConfigOptions.PROVIDED_LIB_DIRS,
Collections.singletonList(remoteLib.toString()));
runTest(() -> deployPerJob(flinkConfig, getTestingJobGraph(), false));
}
@Test
public void testPerJobWithArchive() throws Exception {
final Configuration flinkConfig =
createDefaultConfiguration(YarnConfigOptions.UserJarInclusion.DISABLED);
final JobGraph archiveJobGraph =
YarnTestArchiveJob.getArchiveJobGraph(tmp.newFolder(), flinkConfig);
runTest(() -> deployPerJob(flinkConfig, archiveJobGraph, true));
}
private void deployPerJob(Configuration configuration, JobGraph jobGraph, boolean withDist)
throws Exception {
jobGraph.setJobType(JobType.STREAMING);
try (final YarnClusterDescriptor yarnClusterDescriptor =
withDist
? createYarnClusterDescriptor(configuration)
: createYarnClusterDescriptorWithoutLibDir(configuration)) {
final int masterMemory =
yarnClusterDescriptor
.getFlinkConfiguration()
.get(JobManagerOptions.TOTAL_PROCESS_MEMORY)
.getMebiBytes();
final ClusterSpecification clusterSpecification =
new ClusterSpecification.ClusterSpecificationBuilder()
.setMasterMemoryMB(masterMemory)
.setTaskManagerMemoryMB(1024)
.setSlotsPerTaskManager(1)
.createClusterSpecification();
File testingJar =
TestUtils.findFile("..", new TestUtils.TestJarFinder("flink-yarn-tests"));
jobGraph.addJar(new org.apache.flink.core.fs.Path(testingJar.toURI()));
try (ClusterClient<ApplicationId> clusterClient =
yarnClusterDescriptor
.deployJobCluster(clusterSpecification, jobGraph, false)
.getClusterClient()) {
for (DistributedCache.DistributedCacheEntry entry :
jobGraph.getUserArtifacts().values()) {
assertTrue(
String.format(
"The user artifacts(%s) should be remote or uploaded to remote filesystem.",
entry.filePath),
Utils.isRemotePath(entry.filePath));
}
ApplicationId applicationId = clusterClient.getClusterId();
final CompletableFuture<JobResult> jobResultCompletableFuture =
clusterClient.requestJobResult(jobGraph.getJobID());
final JobResult jobResult = jobResultCompletableFuture.get();
assertThat(jobResult, is(notNullValue()));
assertThat(jobResult.getSerializedThrowable().isPresent(), is(false));
checkStagingDirectory(configuration, applicationId);
waitApplicationFinishedElseKillIt(
applicationId,
yarnAppTerminateTimeout,
yarnClusterDescriptor,
sleepIntervalInMS);
}
}
}
private void checkStagingDirectory(Configuration flinkConfig, ApplicationId appId)
throws IOException {
final List<String> providedLibDirs = flinkConfig.get(YarnConfigOptions.PROVIDED_LIB_DIRS);
final boolean isProvidedLibDirsConfigured =
providedLibDirs != null && !providedLibDirs.isEmpty();
try (final FileSystem fs = FileSystem.get(YARN_CONFIGURATION)) {
final Path stagingDirectory =
new Path(fs.getHomeDirectory(), ".flink/" + appId.toString());
if (isProvidedLibDirsConfigured) {
assertFalse(
"The provided lib dirs is set, so the lib directory should not be uploaded to staging directory.",
fs.exists(new Path(stagingDirectory, flinkLibFolder.getName())));
} else {
assertTrue(
"The lib directory should be uploaded to staging directory.",
fs.exists(new Path(stagingDirectory, flinkLibFolder.getName())));
}
}
}
private JobGraph getTestingJobGraph() {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
env.addSource(new NoDataSource()).shuffle().addSink(new DiscardingSink<>());
return env.getStreamGraph().getJobGraph();
}
private Configuration createDefaultConfiguration(
YarnConfigOptions.UserJarInclusion userJarInclusion) {
Configuration configuration = new Configuration();
configuration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(768));
configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("1g"));
configuration.set(AkkaOptions.ASK_TIMEOUT_DURATION, Duration.ofSeconds(30));
configuration.set(CLASSPATH_INCLUDE_USER_JAR, userJarInclusion);
return configuration;
}
}