| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.flink.test.checkpointing; |
| |
| import org.apache.flink.api.common.functions.FlatMapFunction; |
| import org.apache.flink.api.common.io.FilePathFilter; |
| import org.apache.flink.api.common.state.CheckpointListener; |
| import org.apache.flink.api.java.io.TextInputFormat; |
| import org.apache.flink.api.java.tuple.Tuple2; |
| import org.apache.flink.configuration.ConfigConstants; |
| import org.apache.flink.configuration.Configuration; |
| import org.apache.flink.runtime.execution.SuppressRestartsException; |
| import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; |
| import org.apache.flink.streaming.api.datastream.DataStream; |
| import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; |
| import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; |
| import org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction; |
| import org.apache.flink.streaming.api.functions.source.FileProcessingMode; |
| import org.apache.flink.test.util.SuccessException; |
| import org.apache.flink.util.Collector; |
| |
| import org.apache.hadoop.fs.FSDataOutputStream; |
| import org.apache.hadoop.fs.FileUtil; |
| import org.apache.hadoop.fs.Path; |
| import org.junit.After; |
| import org.junit.Assert; |
| import org.junit.AssumptionViolatedException; |
| import org.junit.Before; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.Comparator; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Random; |
| import java.util.Set; |
| |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.fail; |
| |
| /** Test checkpointing while sourcing a continuous file processor. */ |
| public class ContinuousFileProcessingCheckpointITCase extends StreamFaultToleranceTestBase { |
| |
| private static final int NO_OF_FILES = 5; |
| private static final int LINES_PER_FILE = 150; |
| private static final long INTERVAL = 100; |
| |
| private static File baseDir; |
| private static org.apache.hadoop.fs.FileSystem localFs; |
| private static String localFsURI; |
| private FileCreator fc; |
| |
| private static Map<Integer, Set<String>> actualCollectedContent = new HashMap<>(); |
| |
| @Before |
| public void createHDFS() throws IOException { |
| if (failoverStrategy.equals(FailoverStrategy.RestartPipelinedRegionFailoverStrategy)) { |
| // TODO the 'NO_OF_RETRIES' is useless for current RestartPipelinedRegionStrategy, |
| // for this ContinuousFileProcessingCheckpointITCase, using |
| // RestartPipelinedRegionStrategy would result in endless running. |
| throw new AssumptionViolatedException( |
| "ignored ContinuousFileProcessingCheckpointITCase when using RestartPipelinedRegionStrategy"); |
| } |
| |
| baseDir = new File("./target/localfs/fs_tests").getAbsoluteFile(); |
| FileUtil.fullyDelete(baseDir); |
| |
| org.apache.hadoop.conf.Configuration hdConf = new org.apache.hadoop.conf.Configuration(); |
| |
| localFsURI = "file:///" + baseDir + "/"; |
| localFs = new org.apache.hadoop.fs.Path(localFsURI).getFileSystem(hdConf); |
| } |
| |
| @After |
| public void destroyHDFS() { |
| if (baseDir != null) { |
| FileUtil.fullyDelete(baseDir); |
| } |
| } |
| |
| @Override |
| public void testProgram(StreamExecutionEnvironment env) { |
| |
| env.enableCheckpointing(10); |
| |
| // create and start the file creating thread. |
| fc = new FileCreator(); |
| fc.start(); |
| |
| // create the monitoring source along with the necessary readers. |
| TextInputFormat format = new TextInputFormat(new org.apache.flink.core.fs.Path(localFsURI)); |
| format.setFilesFilter(FilePathFilter.createDefaultFilter()); |
| |
| DataStream<String> inputStream = |
| env.readFile(format, localFsURI, FileProcessingMode.PROCESS_CONTINUOUSLY, INTERVAL); |
| |
| TestingSinkFunction sink = new TestingSinkFunction(); |
| |
| inputStream |
| .flatMap( |
| new FlatMapFunction<String, String>() { |
| @Override |
| public void flatMap(String value, Collector<String> out) |
| throws Exception { |
| out.collect(value); |
| } |
| }) |
| .addSink(sink) |
| .setParallelism(1); |
| } |
| |
| @Override |
| public void postSubmit() throws Exception { |
| |
| // be sure that the file creating thread is done. |
| fc.join(); |
| |
| Map<Integer, Set<String>> collected = actualCollectedContent; |
| Assert.assertEquals(collected.size(), fc.getFileContent().size()); |
| |
| for (Integer fileIdx : fc.getFileContent().keySet()) { |
| Assert.assertTrue(collected.keySet().contains(fileIdx)); |
| |
| List<String> cntnt = new ArrayList<>(collected.get(fileIdx)); |
| Collections.sort( |
| cntnt, |
| new Comparator<String>() { |
| @Override |
| public int compare(String o1, String o2) { |
| return getLineNo(o1) - getLineNo(o2); |
| } |
| }); |
| |
| StringBuilder cntntStr = new StringBuilder(); |
| for (String line : cntnt) { |
| cntntStr.append(line); |
| } |
| Assert.assertEquals(fc.getFileContent().get(fileIdx), cntntStr.toString()); |
| } |
| |
| collected.clear(); |
| actualCollectedContent.clear(); |
| fc.clean(); |
| } |
| |
| private int getLineNo(String line) { |
| String[] tkns = line.split("\\s"); |
| return Integer.parseInt(tkns[tkns.length - 1]); |
| } |
| |
| // -------------------------- Task Sink ------------------------------ |
| |
| private static class TestingSinkFunction extends RichSinkFunction<String> |
| implements ListCheckpointed<Tuple2<Long, Map<Integer, Set<String>>>>, |
| CheckpointListener { |
| |
| private boolean hasRestoredAfterFailure; |
| |
| private volatile int successfulCheckpoints; |
| |
| private long elementsToFailure; |
| |
| private long elementCounter; |
| |
| private Map<Integer, Set<String>> actualContent = new HashMap<>(); |
| |
| TestingSinkFunction() { |
| hasRestoredAfterFailure = false; |
| elementCounter = 0; |
| successfulCheckpoints = 0; |
| } |
| |
| @Override |
| public void open(Configuration parameters) throws Exception { |
| // this sink can only work with DOP 1 |
| assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks()); |
| |
| long failurePosMin = (long) (0.4 * LINES_PER_FILE); |
| long failurePosMax = (long) (0.7 * LINES_PER_FILE); |
| |
| elementsToFailure = |
| (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin; |
| } |
| |
| @Override |
| public void invoke(String value) throws Exception { |
| int fileIdx = getFileIdx(value); |
| |
| Set<String> content = actualContent.get(fileIdx); |
| if (content == null) { |
| content = new HashSet<>(); |
| actualContent.put(fileIdx, content); |
| } |
| |
| // detect duplicate lines. |
| if (!content.add(value + "\n")) { |
| fail("Duplicate line: " + value); |
| System.exit(0); |
| } |
| |
| elementCounter++; |
| |
| // this is termination |
| if (elementCounter >= NO_OF_FILES * LINES_PER_FILE) { |
| actualCollectedContent = actualContent; |
| throw new SuppressRestartsException(new SuccessException()); |
| } |
| |
| // add some latency so that we have at least two checkpoint in |
| if (!hasRestoredAfterFailure && successfulCheckpoints < 2) { |
| Thread.sleep(5); |
| } |
| |
| // simulate a node failure |
| if (!hasRestoredAfterFailure |
| && successfulCheckpoints >= 2 |
| && elementCounter >= elementsToFailure) { |
| throw new Exception( |
| "Task Failure @ elem: " + elementCounter + " / " + elementsToFailure); |
| } |
| } |
| |
| @Override |
| public List<Tuple2<Long, Map<Integer, Set<String>>>> snapshotState( |
| long checkpointId, long checkpointTimestamp) throws Exception { |
| Tuple2<Long, Map<Integer, Set<String>>> state = |
| new Tuple2<>(elementCounter, actualContent); |
| return Collections.singletonList(state); |
| } |
| |
| @Override |
| public void restoreState(List<Tuple2<Long, Map<Integer, Set<String>>>> state) |
| throws Exception { |
| Tuple2<Long, Map<Integer, Set<String>>> s = state.get(0); |
| this.elementCounter = s.f0; |
| this.actualContent = s.f1; |
| this.hasRestoredAfterFailure = |
| this.elementCounter |
| != 0; // because now restore is also called at initialization |
| } |
| |
| @Override |
| public void notifyCheckpointComplete(long checkpointId) throws Exception { |
| this.successfulCheckpoints++; |
| } |
| |
| @Override |
| public void notifyCheckpointAborted(long checkpointId) {} |
| |
| private int getFileIdx(String line) { |
| String[] tkns = line.split(":"); |
| return Integer.parseInt(tkns[0]); |
| } |
| } |
| |
| // ------------------------- FILE CREATION ------------------------------- |
| |
| /** |
| * A separate thread creating {@link #NO_OF_FILES} files, one file every {@link #INTERVAL} |
| * milliseconds. It serves for testing the file monitoring functionality of the {@link |
| * ContinuousFileMonitoringFunction}. The files are filled with data by the {@link |
| * #fillWithData(String, String, int, String)} method. |
| */ |
| private class FileCreator extends Thread { |
| |
| private final Set<Path> filesCreated = new HashSet<>(); |
| private final Map<Integer, String> fileContents = new HashMap<>(); |
| |
| /** The modification time of the last created file. */ |
| private long lastCreatedModTime = Long.MIN_VALUE; |
| |
| public void run() { |
| try { |
| for (int i = 0; i < NO_OF_FILES; i++) { |
| Tuple2<org.apache.hadoop.fs.Path, String> tmpFile; |
| long modTime; |
| do { |
| |
| // give it some time so that the files have |
| // different modification timestamps. |
| Thread.sleep(50); |
| |
| tmpFile = fillWithData(localFsURI, "file", i, "This is test line."); |
| |
| modTime = localFs.getFileStatus(tmpFile.f0).getModificationTime(); |
| if (modTime <= lastCreatedModTime) { |
| // delete the last created file to recreate it with a different |
| // timestamp |
| localFs.delete(tmpFile.f0, false); |
| } |
| } while (modTime <= lastCreatedModTime); |
| lastCreatedModTime = modTime; |
| |
| // rename the file |
| org.apache.hadoop.fs.Path file = |
| new org.apache.hadoop.fs.Path(localFsURI + "/file" + i); |
| localFs.rename(tmpFile.f0, file); |
| Assert.assertTrue(localFs.exists(file)); |
| |
| filesCreated.add(file); |
| fileContents.put(i, tmpFile.f1); |
| } |
| } catch (IOException | InterruptedException e) { |
| e.printStackTrace(); |
| } |
| } |
| |
| void clean() throws IOException { |
| assert (localFs != null); |
| for (org.apache.hadoop.fs.Path path : filesCreated) { |
| localFs.delete(path, false); |
| } |
| fileContents.clear(); |
| } |
| |
| Map<Integer, String> getFileContent() { |
| return this.fileContents; |
| } |
| } |
| |
| /** Fill the file with content and put the content in the {@code hdPathContents} list. */ |
| private Tuple2<Path, String> fillWithData( |
| String base, String fileName, int fileIdx, String sampleLine) |
| throws IOException, InterruptedException { |
| |
| assert (localFs != null); |
| |
| org.apache.hadoop.fs.Path tmp = |
| new org.apache.hadoop.fs.Path(base + "/." + fileName + fileIdx); |
| |
| FSDataOutputStream stream = localFs.create(tmp); |
| StringBuilder str = new StringBuilder(); |
| for (int i = 0; i < LINES_PER_FILE; i++) { |
| String line = fileIdx + ": " + sampleLine + " " + i + "\n"; |
| str.append(line); |
| stream.write(line.getBytes(ConfigConstants.DEFAULT_CHARSET)); |
| } |
| stream.close(); |
| return new Tuple2<>(tmp, str.toString()); |
| } |
| } |