blob: 6ea42afba153c28cb6a193ee38ebc37f95ff1ad1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.filesystem.stream;
import org.apache.flink.api.common.serialization.Encoder;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.filesystem.stream.StreamingFileCommitter.CommitMessage;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
/**
* Test for {@link StreamingFileWriter}.
*/
public class StreamingFileWriterTest {
@ClassRule
public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
private Path path;
@Before
public void before() throws IOException {
File file = TEMPORARY_FOLDER.newFile();
file.delete();
path = new Path(file.toURI());
}
@Test
public void testFailover() throws Exception {
OperatorSubtaskState state;
try (OneInputStreamOperatorTestHarness<RowData, CommitMessage> harness = create()) {
harness.setup();
harness.initializeEmptyState();
harness.open();
harness.processElement(row("1"), 0);
harness.processElement(row("2"), 0);
harness.processElement(row("2"), 0);
state = harness.snapshot(1, 1);
harness.processElement(row("3"), 0);
harness.processElement(row("4"), 0);
harness.notifyOfCompletedCheckpoint(1);
List<String> partitions = collect(harness);
Assert.assertEquals(Arrays.asList("1", "2"), partitions);
}
// first retry, no partition {1, 2} records
try (OneInputStreamOperatorTestHarness<RowData, CommitMessage> harness = create()) {
harness.setup();
harness.initializeState(state);
harness.open();
harness.processElement(row("3"), 0);
harness.processElement(row("4"), 0);
state = harness.snapshot(2, 2);
harness.notifyOfCompletedCheckpoint(2);
List<String> partitions = collect(harness);
Assert.assertEquals(Arrays.asList("1", "2", "3", "4"), partitions);
}
// second retry, partition {4} repeat
try (OneInputStreamOperatorTestHarness<RowData, CommitMessage> harness = create()) {
harness.setup();
harness.initializeState(state);
harness.open();
harness.processElement(row("4"), 0);
harness.processElement(row("5"), 0);
state = harness.snapshot(3, 3);
harness.notifyOfCompletedCheckpoint(3);
List<String> partitions = collect(harness);
Assert.assertEquals(Arrays.asList("3", "4", "5"), partitions);
}
// third retry, multiple snapshots
try (OneInputStreamOperatorTestHarness<RowData, CommitMessage> harness = create()) {
harness.setup();
harness.initializeState(state);
harness.open();
harness.processElement(row("6"), 0);
harness.processElement(row("7"), 0);
harness.snapshot(4, 4);
harness.processElement(row("8"), 0);
harness.snapshot(5, 5);
harness.processElement(row("9"), 0);
harness.snapshot(6, 6);
harness.notifyOfCompletedCheckpoint(5);
List<String> partitions = collect(harness);
// should not contains partition {9}
Assert.assertEquals(Arrays.asList("4", "5", "6", "7", "8"), partitions);
}
}
private static RowData row(String s) {
return GenericRowData.of(StringData.fromString(s));
}
private static List<String> collect(
OneInputStreamOperatorTestHarness<RowData, CommitMessage> harness) {
List<String> parts = new ArrayList<>();
harness.extractOutputValues().forEach(m -> parts.addAll(m.partitions));
return parts;
}
private OneInputStreamOperatorTestHarness<RowData, CommitMessage> create() throws Exception {
StreamingFileWriter writer = new StreamingFileWriter(1000, StreamingFileSink.forRowFormat(
path, (Encoder<RowData>) (element, stream) ->
stream.write((element.getString(0) + "\n").getBytes(StandardCharsets.UTF_8)))
.withBucketAssigner(new BucketAssigner<RowData, String>() {
@Override
public String getBucketId(RowData element, Context context) {
return element.getString(0).toString();
}
@Override
public SimpleVersionedSerializer<String> getSerializer() {
return SimpleVersionedStringSerializer.INSTANCE;
}
})
.withRollingPolicy(OnCheckpointRollingPolicy.build()));
OneInputStreamOperatorTestHarness<RowData, CommitMessage> harness = new OneInputStreamOperatorTestHarness<>(
writer, 1, 1, 0);
harness.getStreamConfig().setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
return harness;
}
}