blob: fd959af4aee0391b86aad92bf2ef10c9323136eb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.hdfstests;
import org.apache.commons.io.IOUtils;
import org.apache.flink.api.common.io.FileOutputFormat;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.ExecutionEnvironmentFactory;
import org.apache.flink.api.java.LocalEnvironment;
import org.apache.flink.api.java.io.AvroOutputFormat;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.examples.java.wordcount.WordCount;
import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.io.StringWriter;
/**
* This test should logically be located in the 'flink-runtime' tests. However, this project
* has already all dependencies required (flink-java-examples). Also, the ParallelismOneExecEnv is here.
*/
public class HDFSTest {
protected String hdfsURI;
private MiniDFSCluster hdfsCluster;
private org.apache.hadoop.fs.Path hdPath;
protected org.apache.hadoop.fs.FileSystem hdfs;
@Before
public void createHDFS() {
try {
Configuration hdConf = new Configuration();
File baseDir = new File("./target/hdfs/hdfsTest").getAbsoluteFile();
FileUtil.fullyDelete(baseDir);
hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath());
MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(hdConf);
hdfsCluster = builder.build();
hdfsURI = "hdfs://" + hdfsCluster.getURI().getHost() + ":" + hdfsCluster.getNameNodePort() +"/";
hdPath = new org.apache.hadoop.fs.Path("/test");
hdfs = hdPath.getFileSystem(hdConf);
FSDataOutputStream stream = hdfs.create(hdPath);
for(int i = 0; i < 10; i++) {
stream.write("Hello HDFS\n".getBytes());
}
stream.close();
} catch(Throwable e) {
e.printStackTrace();
Assert.fail("Test failed " + e.getMessage());
}
}
@After
public void destroyHDFS() {
try {
hdfs.delete(hdPath, false);
hdfsCluster.shutdown();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Test
public void testHDFS() {
Path file = new Path(hdfsURI + hdPath);
org.apache.hadoop.fs.Path result = new org.apache.hadoop.fs.Path(hdfsURI + "/result");
try {
FileSystem fs = file.getFileSystem();
Assert.assertTrue("Must be HadoopFileSystem", fs instanceof HadoopFileSystem);
DopOneTestEnvironment.setAsContext();
try {
WordCount.main(new String[]{
"--input", file.toString(),
"--output", result.toString()});
}
catch(Throwable t) {
t.printStackTrace();
Assert.fail("Test failed with " + t.getMessage());
}
finally {
DopOneTestEnvironment.unsetAsContext();
}
Assert.assertTrue("No result file present", hdfs.exists(result));
// validate output:
org.apache.hadoop.fs.FSDataInputStream inStream = hdfs.open(result);
StringWriter writer = new StringWriter();
IOUtils.copy(inStream, writer);
String resultString = writer.toString();
Assert.assertEquals("hdfs 10\n" +
"hello 10\n", resultString);
inStream.close();
} catch (IOException e) {
e.printStackTrace();
Assert.fail("Error in test: " + e.getMessage() );
}
}
@Test
public void testAvroOut() {
String type = "one";
AvroOutputFormat<String> avroOut =
new AvroOutputFormat<String>( String.class );
org.apache.hadoop.fs.Path result = new org.apache.hadoop.fs.Path(hdfsURI + "/avroTest");
avroOut.setOutputFilePath(new Path(result.toString()));
avroOut.setWriteMode(FileSystem.WriteMode.NO_OVERWRITE);
avroOut.setOutputDirectoryMode(FileOutputFormat.OutputDirectoryMode.ALWAYS);
try {
avroOut.open(0, 2);
avroOut.writeRecord(type);
avroOut.close();
avroOut.open(1, 2);
avroOut.writeRecord(type);
avroOut.close();
Assert.assertTrue("No result file present", hdfs.exists(result));
FileStatus[] files = hdfs.listStatus(result);
Assert.assertEquals(2, files.length);
for(FileStatus file : files) {
Assert.assertTrue("1.avro".equals(file.getPath().getName()) || "2.avro".equals(file.getPath().getName()));
}
} catch (IOException e) {
e.printStackTrace();
Assert.fail(e.getMessage());
}
}
// package visible
static abstract class DopOneTestEnvironment extends ExecutionEnvironment {
public static void setAsContext() {
final LocalEnvironment le = new LocalEnvironment();
le.setParallelism(1);
initializeContextEnvironment(new ExecutionEnvironmentFactory() {
@Override
public ExecutionEnvironment createExecutionEnvironment() {
return le;
}
});
}
public static void unsetAsContext() {
resetContextEnvironment();
}
}
}