blob: 42fddf5c9929c2192da7eebc179d6e8cbf1bc2e7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.hdfstests;
import org.apache.flink.api.common.io.FilePathFilter;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction;
import org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator;
import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
import org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit;
import org.apache.flink.streaming.util.StreamingProgramTestBase;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.junit.Assert.assertEquals;
/**
* IT cases for the {@link ContinuousFileMonitoringFunction} and {@link ContinuousFileReaderOperator}.
*/
public class ContinuousFileProcessingITCase extends StreamingProgramTestBase {
private static final int NO_OF_FILES = 5;
private static final int LINES_PER_FILE = 100;
private static final int PARALLELISM = 4;
private static final long INTERVAL = 100;
private File baseDir;
private org.apache.hadoop.fs.FileSystem hdfs;
private String hdfsURI;
private MiniDFSCluster hdfsCluster;
private static Map<Integer, String> expectedContents = new HashMap<>();
// PREPARING FOR THE TESTS
@Before
public void createHDFS() {
try {
baseDir = new File("./target/hdfs/hdfsTesting").getAbsoluteFile();
FileUtil.fullyDelete(baseDir);
org.apache.hadoop.conf.Configuration hdConf = new org.apache.hadoop.conf.Configuration();
hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath());
hdConf.set("dfs.block.size", String.valueOf(1048576)); // this is the minimum we can set.
MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(hdConf);
hdfsCluster = builder.build();
hdfsURI = "hdfs://" + hdfsCluster.getURI().getHost() + ":" + hdfsCluster.getNameNodePort() + "/";
hdfs = new org.apache.hadoop.fs.Path(hdfsURI).getFileSystem(hdConf);
} catch (Throwable e) {
e.printStackTrace();
Assert.fail("Test failed " + e.getMessage());
}
}
@After
public void destroyHDFS() {
try {
FileUtil.fullyDelete(baseDir);
hdfsCluster.shutdown();
} catch (Throwable t) {
throw new RuntimeException(t);
}
}
// END OF PREPARATIONS
@Override
protected void testProgram() throws Exception {
/*
* This test checks the interplay between the monitor and the reader
* and also the failExternally() functionality. To test the latter we
* set the parallelism to 1 so that we have the chaining between the sink,
* which throws the SuccessException to signal the end of the test, and the
* reader.
* */
TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
format.setFilePath(hdfsURI);
format.setFilesFilter(FilePathFilter.createDefaultFilter());
// create the stream execution environment with a parallelism > 1 to test
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(PARALLELISM);
ContinuousFileMonitoringFunction<String> monitoringFunction =
new ContinuousFileMonitoringFunction<>(format,
FileProcessingMode.PROCESS_CONTINUOUSLY,
env.getParallelism(), INTERVAL);
// the monitor has always DOP 1
DataStream<TimestampedFileInputSplit> splits = env.addSource(monitoringFunction);
Assert.assertEquals(1, splits.getParallelism());
ContinuousFileReaderOperator<String> reader = new ContinuousFileReaderOperator<>(format);
TypeInformation<String> typeInfo = TypeExtractor.getInputFormatTypes(format);
// the readers can be multiple
DataStream<String> content = splits.transform("FileSplitReader", typeInfo, reader);
Assert.assertEquals(PARALLELISM, content.getParallelism());
// finally for the sink we set the parallelism to 1 so that we can verify the output
TestingSinkFunction sink = new TestingSinkFunction();
content.addSink(sink).setParallelism(1);
Thread job = new Thread() {
@Override
public void run() {
try {
env.execute("ContinuousFileProcessingITCase Job.");
} catch (Exception e) {
Throwable th = e;
for (int depth = 0; depth < 20; depth++) {
if (th instanceof SuccessException) {
try {
postSubmit();
} catch (Exception e1) {
e1.printStackTrace();
}
return;
} else if (th.getCause() != null) {
th = th.getCause();
} else {
break;
}
}
e.printStackTrace();
Assert.fail(e.getMessage());
}
}
};
job.start();
// The modification time of the last created file.
long lastCreatedModTime = Long.MIN_VALUE;
// create the files to be read
for (int i = 0; i < NO_OF_FILES; i++) {
Tuple2<org.apache.hadoop.fs.Path, String> tmpFile;
long modTime;
do {
// give it some time so that the files have
// different modification timestamps.
Thread.sleep(50);
tmpFile = fillWithData(hdfsURI, "file", i, "This is test line.");
modTime = hdfs.getFileStatus(tmpFile.f0).getModificationTime();
if (modTime <= lastCreatedModTime) {
// delete the last created file to recreate it with a different timestamp
hdfs.delete(tmpFile.f0, false);
}
} while (modTime <= lastCreatedModTime);
lastCreatedModTime = modTime;
// put the contents in the expected results list before the reader picks them
// this is to guarantee that they are in before the reader finishes (avoid race conditions)
expectedContents.put(i, tmpFile.f1);
org.apache.hadoop.fs.Path file =
new org.apache.hadoop.fs.Path(hdfsURI + "/file" + i);
hdfs.rename(tmpFile.f0, file);
Assert.assertTrue(hdfs.exists(file));
}
// wait for the job to finish.
job.join();
}
private static class TestingSinkFunction extends RichSinkFunction<String> {
private int elementCounter = 0;
private Map<Integer, Set<String>> actualContent = new HashMap<>();
private transient Comparator<String> comparator;
@Override
public void open(Configuration parameters) throws Exception {
// this sink can only work with DOP 1
assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks());
comparator = new Comparator<String>() {
@Override
public int compare(String o1, String o2) {
return getLineNo(o1) - getLineNo(o2);
}
};
}
@Override
public void invoke(String value) throws Exception {
int fileIdx = getFileIdx(value);
Set<String> content = actualContent.get(fileIdx);
if (content == null) {
content = new HashSet<>();
actualContent.put(fileIdx, content);
}
if (!content.add(value + "\n")) {
Assert.fail("Duplicate line: " + value);
System.exit(0);
}
elementCounter++;
if (elementCounter == NO_OF_FILES * LINES_PER_FILE) {
throw new SuccessException();
}
}
@Override
public void close() {
// check if the data that we collected are the ones they are supposed to be.
Assert.assertEquals(expectedContents.size(), actualContent.size());
for (Integer fileIdx: expectedContents.keySet()) {
Assert.assertTrue(actualContent.keySet().contains(fileIdx));
List<String> cntnt = new ArrayList<>(actualContent.get(fileIdx));
Collections.sort(cntnt, comparator);
StringBuilder cntntStr = new StringBuilder();
for (String line: cntnt) {
cntntStr.append(line);
}
Assert.assertEquals(expectedContents.get(fileIdx), cntntStr.toString());
}
expectedContents.clear();
}
private int getLineNo(String line) {
String[] tkns = line.split("\\s");
return Integer.parseInt(tkns[tkns.length - 1]);
}
private int getFileIdx(String line) {
String[] tkns = line.split(":");
return Integer.parseInt(tkns[0]);
}
}
/** Create a file and fill it with content. */
private Tuple2<org.apache.hadoop.fs.Path, String> fillWithData(
String base, String fileName, int fileIdx, String sampleLine) throws IOException, InterruptedException {
assert (hdfs != null);
org.apache.hadoop.fs.Path tmp =
new org.apache.hadoop.fs.Path(base + "/." + fileName + fileIdx);
FSDataOutputStream stream = hdfs.create(tmp);
StringBuilder str = new StringBuilder();
for (int i = 0; i < LINES_PER_FILE; i++) {
String line = fileIdx + ": " + sampleLine + " " + i + "\n";
str.append(line);
stream.write(line.getBytes(ConfigConstants.DEFAULT_CHARSET));
}
stream.close();
return new Tuple2<>(tmp, str.toString());
}
private static class SuccessException extends Exception {
private static final long serialVersionUID = -7011865671593955887L;
}
}