blob: 8bed2a654023f150a341cabf7e20c2b328d41ea7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.hdfstests;
import org.apache.flink.api.common.io.FilePathFilter;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction;
import org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperatorFactory;
import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
import org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.util.ExceptionUtils;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import static org.junit.Assert.assertEquals;
/**
* IT cases for the {@link ContinuousFileMonitoringFunction} and {@link
* ContinuousFileReaderOperator}.
*/
public class ContinuousFileProcessingITCase extends AbstractTestBase {
private static final int NO_OF_FILES = 5;
private static final int LINES_PER_FILE = 100;
private static final int PARALLELISM = 4;
private static final long INTERVAL = 100;
private File baseDir;
private org.apache.hadoop.fs.FileSystem hdfs;
private String hdfsURI;
private MiniDFSCluster hdfsCluster;
private static Map<Integer, String> expectedContents = new HashMap<>();
// PREPARING FOR THE TESTS
@Before
public void createHDFS() throws IOException {
baseDir = new File("./target/hdfs/hdfsTesting").getAbsoluteFile();
FileUtil.fullyDelete(baseDir);
org.apache.hadoop.conf.Configuration hdConf = new org.apache.hadoop.conf.Configuration();
hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath());
hdConf.set("dfs.block.size", String.valueOf(1048576)); // this is the minimum we can set.
MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(hdConf);
hdfsCluster = builder.build();
hdfsURI =
"hdfs://"
+ hdfsCluster.getURI().getHost()
+ ":"
+ hdfsCluster.getNameNodePort()
+ "/";
hdfs = new org.apache.hadoop.fs.Path(hdfsURI).getFileSystem(hdConf);
}
@After
public void destroyHDFS() {
FileUtil.fullyDelete(baseDir);
hdfsCluster.shutdown();
}
// END OF PREPARATIONS
@Test
public void testProgram() throws Exception {
/*
* This test checks the interplay between the monitor and the reader
* and also the failExternally() functionality. To test the latter we
* set the parallelism to 1 so that we have the chaining between the sink,
* which throws the SuccessException to signal the end of the test, and the
* reader.
* */
TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
format.setFilePath(hdfsURI);
format.setFilesFilter(FilePathFilter.createDefaultFilter());
// create the stream execution environment with a parallelism > 1 to test
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(PARALLELISM);
ContinuousFileMonitoringFunction<String> monitoringFunction =
new ContinuousFileMonitoringFunction<>(
format,
FileProcessingMode.PROCESS_CONTINUOUSLY,
env.getParallelism(),
INTERVAL);
// the monitor has always DOP 1
DataStream<TimestampedFileInputSplit> splits = env.addSource(monitoringFunction);
Assert.assertEquals(1, splits.getParallelism());
TypeInformation<String> typeInfo = TypeExtractor.getInputFormatTypes(format);
// the readers can be multiple
DataStream<String> content =
splits.transform(
"FileSplitReader",
typeInfo,
new ContinuousFileReaderOperatorFactory<>(format));
Assert.assertEquals(PARALLELISM, content.getParallelism());
// finally for the sink we set the parallelism to 1 so that we can verify the output
TestingSinkFunction sink = new TestingSinkFunction();
content.addSink(sink).setParallelism(1);
CompletableFuture<Void> jobFuture = new CompletableFuture<>();
new Thread(
() -> {
try {
env.execute("ContinuousFileProcessingITCase Job.");
jobFuture.complete(null);
} catch (Exception e) {
if (ExceptionUtils.findThrowable(e, SuccessException.class)
.isPresent()) {
jobFuture.complete(null);
} else {
jobFuture.completeExceptionally(e);
}
}
})
.start();
// The modification time of the last created file.
long lastCreatedModTime = Long.MIN_VALUE;
// create the files to be read
for (int i = 0; i < NO_OF_FILES; i++) {
Tuple2<org.apache.hadoop.fs.Path, String> tmpFile;
long modTime;
do {
// give it some time so that the files have
// different modification timestamps.
Thread.sleep(50);
tmpFile = fillWithData(hdfsURI, "file", i, "This is test line.");
modTime = hdfs.getFileStatus(tmpFile.f0).getModificationTime();
if (modTime <= lastCreatedModTime) {
// delete the last created file to recreate it with a different timestamp
hdfs.delete(tmpFile.f0, false);
}
} while (modTime <= lastCreatedModTime);
lastCreatedModTime = modTime;
// put the contents in the expected results list before the reader picks them
// this is to guarantee that they are in before the reader finishes (avoid race
// conditions)
expectedContents.put(i, tmpFile.f1);
org.apache.hadoop.fs.Path file = new org.apache.hadoop.fs.Path(hdfsURI + "/file" + i);
hdfs.rename(tmpFile.f0, file);
Assert.assertTrue(hdfs.exists(file));
}
jobFuture.get();
}
private static class TestingSinkFunction extends RichSinkFunction<String> {
private int elementCounter = 0;
private Map<Integer, Set<String>> actualContent = new HashMap<>();
private transient Comparator<String> comparator;
@Override
public void open(Configuration parameters) throws Exception {
// this sink can only work with DOP 1
assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks());
comparator =
new Comparator<String>() {
@Override
public int compare(String o1, String o2) {
return getLineNo(o1) - getLineNo(o2);
}
};
}
@Override
public void invoke(String value) throws Exception {
int fileIdx = getFileIdx(value);
Set<String> content = actualContent.get(fileIdx);
if (content == null) {
content = new HashSet<>();
actualContent.put(fileIdx, content);
}
if (!content.add(value + "\n")) {
Assert.fail("Duplicate line: " + value);
System.exit(0);
}
elementCounter++;
if (elementCounter == NO_OF_FILES * LINES_PER_FILE) {
throw new SuccessException();
}
}
@Override
public void close() {
// check if the data that we collected are the ones they are supposed to be.
Assert.assertEquals(expectedContents.size(), actualContent.size());
for (Integer fileIdx : expectedContents.keySet()) {
Assert.assertTrue(actualContent.keySet().contains(fileIdx));
List<String> cntnt = new ArrayList<>(actualContent.get(fileIdx));
Collections.sort(cntnt, comparator);
StringBuilder cntntStr = new StringBuilder();
for (String line : cntnt) {
cntntStr.append(line);
}
Assert.assertEquals(expectedContents.get(fileIdx), cntntStr.toString());
}
expectedContents.clear();
}
private int getLineNo(String line) {
String[] tkns = line.split("\\s");
return Integer.parseInt(tkns[tkns.length - 1]);
}
private int getFileIdx(String line) {
String[] tkns = line.split(":");
return Integer.parseInt(tkns[0]);
}
}
/** Create a file and fill it with content. */
private Tuple2<org.apache.hadoop.fs.Path, String> fillWithData(
String base, String fileName, int fileIdx, String sampleLine)
throws IOException, InterruptedException {
assert (hdfs != null);
org.apache.hadoop.fs.Path tmp =
new org.apache.hadoop.fs.Path(base + "/." + fileName + fileIdx);
FSDataOutputStream stream = hdfs.create(tmp);
StringBuilder str = new StringBuilder();
for (int i = 0; i < LINES_PER_FILE; i++) {
String line = fileIdx + ": " + sampleLine + " " + i + "\n";
str.append(line);
stream.write(line.getBytes(ConfigConstants.DEFAULT_CHARSET));
}
stream.close();
return new Tuple2<>(tmp, str.toString());
}
private static class SuccessException extends Exception {
private static final long serialVersionUID = -7011865671593955887L;
}
}