blob: b08823f4f977777946e676730f197c36c680a2cf [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.hdfstests;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.io.FileOutputFormat;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.ExecutionEnvironmentFactory;
import org.apache.flink.api.java.LocalEnvironment;
import org.apache.flink.api.java.io.TextOutputFormat;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.examples.java.wordcount.WordCount;
import org.apache.flink.runtime.blob.BlobCacheCorruptionTest;
import org.apache.flink.runtime.blob.BlobCacheRecoveryTest;
import org.apache.flink.runtime.blob.BlobServerCorruptionTest;
import org.apache.flink.runtime.blob.BlobServerRecoveryTest;
import org.apache.flink.runtime.blob.BlobStoreService;
import org.apache.flink.runtime.blob.BlobUtils;
import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.OperatingSystem;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
import java.io.StringWriter;
import java.util.Arrays;
import java.util.UUID;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
/**
* This test should logically be located in the 'flink-runtime' tests. However, this project
* has already all dependencies required (flink-java-examples). Also, the ParallelismOneExecEnv is here.
*/
public class HDFSTest {
protected String hdfsURI;
private MiniDFSCluster hdfsCluster;
private org.apache.hadoop.fs.Path hdPath;
protected org.apache.hadoop.fs.FileSystem hdfs;
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
@Rule
public final ExpectedException exception = ExpectedException.none();
@BeforeClass
public static void verifyOS() {
Assume.assumeTrue("HDFS cluster cannot be started on Windows without extensions.", !OperatingSystem.isWindows());
}
@Before
public void createHDFS() {
try {
Configuration hdConf = new Configuration();
File baseDir = temporaryFolder.newFolder();
FileUtil.fullyDelete(baseDir);
hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath());
MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(hdConf);
hdfsCluster = builder.build();
hdfsURI = "hdfs://" + hdfsCluster.getURI().getHost() + ":" + hdfsCluster.getNameNodePort() + "/";
hdPath = new org.apache.hadoop.fs.Path("/test");
hdfs = hdPath.getFileSystem(hdConf);
FSDataOutputStream stream = hdfs.create(hdPath);
for (int i = 0; i < 10; i++) {
stream.write("Hello HDFS\n".getBytes(ConfigConstants.DEFAULT_CHARSET));
}
stream.close();
} catch (Throwable e) {
e.printStackTrace();
Assert.fail("Test failed " + e.getMessage());
}
}
@After
public void destroyHDFS() {
try {
hdfs.delete(hdPath, false);
hdfsCluster.shutdown();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Test
public void testHDFS() {
Path file = new Path(hdfsURI + hdPath);
org.apache.hadoop.fs.Path result = new org.apache.hadoop.fs.Path(hdfsURI + "/result");
try {
FileSystem fs = file.getFileSystem();
assertTrue("Must be HadoopFileSystem", fs instanceof HadoopFileSystem);
DopOneTestEnvironment.setAsContext();
try {
WordCount.main(new String[]{
"--input", file.toString(),
"--output", result.toString()});
}
catch (Throwable t) {
t.printStackTrace();
Assert.fail("Test failed with " + t.getMessage());
}
finally {
DopOneTestEnvironment.unsetAsContext();
}
assertTrue("No result file present", hdfs.exists(result));
// validate output:
org.apache.hadoop.fs.FSDataInputStream inStream = hdfs.open(result);
StringWriter writer = new StringWriter();
IOUtils.copy(inStream, writer);
String resultString = writer.toString();
Assert.assertEquals("hdfs 10\n" +
"hello 10\n", resultString);
inStream.close();
} catch (IOException e) {
e.printStackTrace();
Assert.fail("Error in test: " + e.getMessage());
}
}
@Test
public void testChangingFileNames() {
org.apache.hadoop.fs.Path hdfsPath = new org.apache.hadoop.fs.Path(hdfsURI + "/hdfsTest");
Path path = new Path(hdfsPath.toString());
String type = "one";
TextOutputFormat<String> outputFormat = new TextOutputFormat<>(path);
outputFormat.setWriteMode(FileSystem.WriteMode.NO_OVERWRITE);
outputFormat.setOutputDirectoryMode(FileOutputFormat.OutputDirectoryMode.ALWAYS);
try {
outputFormat.open(0, 2);
outputFormat.writeRecord(type);
outputFormat.close();
outputFormat.open(1, 2);
outputFormat.writeRecord(type);
outputFormat.close();
assertTrue("No result file present", hdfs.exists(hdfsPath));
FileStatus[] files = hdfs.listStatus(hdfsPath);
Assert.assertEquals(2, files.length);
for (FileStatus file : files) {
assertTrue("1".equals(file.getPath().getName()) || "2".equals(file.getPath().getName()));
}
} catch (IOException e) {
e.printStackTrace();
Assert.fail(e.getMessage());
}
}
/**
* Test that {@link FileUtils#deletePathIfEmpty(FileSystem, Path)} deletes the path if it is
* empty. A path can only be empty if it is a directory which does not contain any
* files/directories.
*/
@Test
public void testDeletePathIfEmpty() throws IOException {
final Path basePath = new Path(hdfsURI);
final Path directory = new Path(basePath, UUID.randomUUID().toString());
final Path directoryFile = new Path(directory, UUID.randomUUID().toString());
final Path singleFile = new Path(basePath, UUID.randomUUID().toString());
FileSystem fs = basePath.getFileSystem();
fs.mkdirs(directory);
byte[] data = "HDFSTest#testDeletePathIfEmpty".getBytes(ConfigConstants.DEFAULT_CHARSET);
for (Path file: Arrays.asList(singleFile, directoryFile)) {
org.apache.flink.core.fs.FSDataOutputStream outputStream = fs.create(file, FileSystem.WriteMode.OVERWRITE);
outputStream.write(data);
outputStream.close();
}
// verify that the files have been created
assertTrue(fs.exists(singleFile));
assertTrue(fs.exists(directoryFile));
// delete the single file
assertFalse(FileUtils.deletePathIfEmpty(fs, singleFile));
assertTrue(fs.exists(singleFile));
// try to delete the non-empty directory
assertFalse(FileUtils.deletePathIfEmpty(fs, directory));
assertTrue(fs.exists(directory));
// delete the file contained in the directory
assertTrue(fs.delete(directoryFile, false));
// now the deletion should work
assertTrue(FileUtils.deletePathIfEmpty(fs, directory));
assertFalse(fs.exists(directory));
}
/**
* Tests that with {@link HighAvailabilityMode#ZOOKEEPER} distributed JARs are recoverable from any
* participating BlobServer when talking to the {@link org.apache.flink.runtime.blob.BlobServer} directly.
*/
@Test
public void testBlobServerRecovery() throws Exception {
org.apache.flink.configuration.Configuration
config = new org.apache.flink.configuration.Configuration();
config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
config.setString(BlobServerOptions.STORAGE_DIRECTORY,
temporaryFolder.newFolder().getAbsolutePath());
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, hdfsURI);
BlobStoreService blobStoreService = BlobUtils.createBlobStoreFromConfig(config);
try {
BlobServerRecoveryTest.testBlobServerRecovery(config, blobStoreService);
} finally {
blobStoreService.closeAndCleanupAllData();
}
}
/**
* Tests that with {@link HighAvailabilityMode#ZOOKEEPER} distributed corrupted JARs are
* recognised during the download via a {@link org.apache.flink.runtime.blob.BlobServer}.
*/
@Test
public void testBlobServerCorruptedFile() throws Exception {
org.apache.flink.configuration.Configuration
config = new org.apache.flink.configuration.Configuration();
config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
config.setString(BlobServerOptions.STORAGE_DIRECTORY,
temporaryFolder.newFolder().getAbsolutePath());
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, hdfsURI);
BlobStoreService blobStoreService = BlobUtils.createBlobStoreFromConfig(config);
try {
BlobServerCorruptionTest.testGetFailsFromCorruptFile(config, blobStoreService, exception);
} finally {
blobStoreService.closeAndCleanupAllData();
}
}
/**
* Tests that with {@link HighAvailabilityMode#ZOOKEEPER} distributed JARs are recoverable from any
* participating BlobServer when uploaded via a BLOB cache.
*/
@Test
public void testBlobCacheRecovery() throws Exception {
org.apache.flink.configuration.Configuration
config = new org.apache.flink.configuration.Configuration();
config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
config.setString(BlobServerOptions.STORAGE_DIRECTORY,
temporaryFolder.newFolder().getAbsolutePath());
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, hdfsURI);
BlobStoreService blobStoreService = BlobUtils.createBlobStoreFromConfig(config);
try {
BlobCacheRecoveryTest.testBlobCacheRecovery(config, blobStoreService);
} finally {
blobStoreService.closeAndCleanupAllData();
}
}
/**
* Tests that with {@link HighAvailabilityMode#ZOOKEEPER} distributed corrupted JARs are
* recognised during the download via a BLOB cache.
*/
@Test
public void testBlobCacheCorruptedFile() throws Exception {
org.apache.flink.configuration.Configuration
config = new org.apache.flink.configuration.Configuration();
config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
config.setString(BlobServerOptions.STORAGE_DIRECTORY,
temporaryFolder.newFolder().getAbsolutePath());
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, hdfsURI);
BlobStoreService blobStoreService = BlobUtils.createBlobStoreFromConfig(config);
try {
BlobCacheCorruptionTest
.testGetFailsFromCorruptFile(new JobID(), config, blobStoreService, exception);
} finally {
blobStoreService.closeAndCleanupAllData();
}
}
abstract static class DopOneTestEnvironment extends ExecutionEnvironment {
public static void setAsContext() {
final LocalEnvironment le = new LocalEnvironment();
le.setParallelism(1);
initializeContextEnvironment(new ExecutionEnvironmentFactory() {
@Override
public ExecutionEnvironment createExecutionEnvironment() {
return le;
}
});
}
public static void unsetAsContext() {
resetContextEnvironment();
}
}
}