blob: 588f91faf3b1a8c1a394f7c8959746d7879fb49a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io;
import static org.apache.beam.sdk.TestUtils.LINES2_ARRAY;
import static org.apache.beam.sdk.TestUtils.LINES_ARRAY;
import static org.apache.beam.sdk.TestUtils.NO_LINES_ARRAY;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects.firstNonNull;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assume.assumeFalse;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory;
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
import org.apache.beam.sdk.io.fs.ResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.windowing.AfterPane;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Function;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Functions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Predicate;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Predicates;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.FluentIterable;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.apache.commons.lang3.SystemUtils;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/** Tests for {@link TextIO.Write}. */
@RunWith(JUnit4.class)
public class TextIOWriteTest {
private static final String MY_HEADER = "myHeader";
private static final String MY_FOOTER = "myFooter";
@Rule public transient TemporaryFolder tempFolder = new TemporaryFolder();
@Rule public transient TestPipeline p = TestPipeline.create();
@Rule public transient ExpectedException expectedException = ExpectedException.none();
static class TestDynamicDestinations
extends FileBasedSink.DynamicDestinations<String, String, String> {
ResourceId baseDir;
TestDynamicDestinations(ResourceId baseDir) {
this.baseDir = baseDir;
}
@Override
public String formatRecord(String record) {
return record;
}
@Override
public String getDestination(String element) {
// Destination is based on first character of string.
return element.substring(0, 1);
}
@Override
public String getDefaultDestination() {
return "";
}
@Override
public @Nullable Coder<String> getDestinationCoder() {
return StringUtf8Coder.of();
}
@Override
public FileBasedSink.FilenamePolicy getFilenamePolicy(String destination) {
return DefaultFilenamePolicy.fromStandardParameters(
ValueProvider.StaticValueProvider.of(
baseDir.resolve(
"file_" + destination + ".txt",
ResolveOptions.StandardResolveOptions.RESOLVE_FILE)),
null,
null,
false);
}
}
static class StartsWith implements Predicate<String> {
String prefix;
StartsWith(String prefix) {
this.prefix = prefix;
}
@Override
public boolean apply(@Nullable String input) {
return input.startsWith(prefix);
}
}
@Test
@Category(NeedsRunner.class)
public void testDynamicDestinations() throws Exception {
testDynamicDestinations(false);
}
@Test
@Category(NeedsRunner.class)
public void testDynamicDestinationsWithCustomType() throws Exception {
testDynamicDestinations(true);
}
private void testDynamicDestinations(boolean customType) throws Exception {
ResourceId baseDir =
FileSystems.matchNewResource(
Files.createTempDirectory(tempFolder.getRoot().toPath(), "testDynamicDestinations")
.toString(),
true);
List<String> elements = Lists.newArrayList("aaaa", "aaab", "baaa", "baab", "caaa", "caab");
PCollection<String> input = p.apply(Create.of(elements).withCoder(StringUtf8Coder.of()));
if (customType) {
input.apply(
TextIO.<String>writeCustomType()
.to(new TestDynamicDestinations(baseDir))
.withTempDirectory(baseDir));
} else {
input.apply(
TextIO.write().to(new TestDynamicDestinations(baseDir)).withTempDirectory(baseDir));
}
p.run();
assertOutputFiles(
Iterables.toArray(Iterables.filter(elements, new StartsWith("a")), String.class),
null,
null,
0,
baseDir.resolve("file_a.txt", ResolveOptions.StandardResolveOptions.RESOLVE_FILE),
DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE);
assertOutputFiles(
Iterables.toArray(Iterables.filter(elements, new StartsWith("b")), String.class),
null,
null,
0,
baseDir.resolve("file_b.txt", ResolveOptions.StandardResolveOptions.RESOLVE_FILE),
DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE);
assertOutputFiles(
Iterables.toArray(Iterables.filter(elements, new StartsWith("c")), String.class),
null,
null,
0,
baseDir.resolve("file_c.txt", ResolveOptions.StandardResolveOptions.RESOLVE_FILE),
DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE);
}
@DefaultCoder(AvroCoder.class)
private static class UserWriteType {
String destination;
String metadata;
UserWriteType() {
this.destination = "";
this.metadata = "";
}
UserWriteType(String destination, String metadata) {
this.destination = destination;
this.metadata = metadata;
}
@Override
public String toString() {
return String.format("destination: %s metadata : %s", destination, metadata);
}
}
private static class SerializeUserWrite implements SerializableFunction<UserWriteType, String> {
@Override
public String apply(UserWriteType input) {
return input.toString();
}
}
private static class UserWriteDestination
implements SerializableFunction<UserWriteType, DefaultFilenamePolicy.Params> {
private ResourceId baseDir;
UserWriteDestination(ResourceId baseDir) {
this.baseDir = baseDir;
}
@Override
public DefaultFilenamePolicy.Params apply(UserWriteType input) {
return new DefaultFilenamePolicy.Params()
.withBaseFilename(
baseDir.resolve(
"file_" + input.destination.substring(0, 1) + ".txt",
ResolveOptions.StandardResolveOptions.RESOLVE_FILE));
}
}
private static class ExtractWriteDestination implements Function<UserWriteType, String> {
@Override
public String apply(@Nullable UserWriteType input) {
return input.destination;
}
}
@Test
@Category(NeedsRunner.class)
public void testDynamicDefaultFilenamePolicy() throws Exception {
ResourceId baseDir =
FileSystems.matchNewResource(
Files.createTempDirectory(tempFolder.getRoot().toPath(), "testDynamicDestinations")
.toString(),
true);
List<UserWriteType> elements =
Lists.newArrayList(
new UserWriteType("aaaa", "first"),
new UserWriteType("aaab", "second"),
new UserWriteType("baaa", "third"),
new UserWriteType("baab", "fourth"),
new UserWriteType("caaa", "fifth"),
new UserWriteType("caab", "sixth"));
PCollection<UserWriteType> input = p.apply(Create.of(elements));
input.apply(
TextIO.<UserWriteType>writeCustomType()
.to(
new UserWriteDestination(baseDir),
new DefaultFilenamePolicy.Params()
.withBaseFilename(
baseDir.resolve(
"empty", ResolveOptions.StandardResolveOptions.RESOLVE_FILE)))
.withFormatFunction(new SerializeUserWrite())
.withTempDirectory(FileSystems.matchNewResource(baseDir.toString(), true)));
p.run();
String[] aElements =
Iterables.toArray(
StreamSupport.stream(
elements.stream()
.filter(
Predicates.compose(new StartsWith("a"), new ExtractWriteDestination())
::apply)
.collect(Collectors.toList())
.spliterator(),
false)
.map(Functions.toStringFunction()::apply)
.collect(Collectors.toList()),
String.class);
String[] bElements =
Iterables.toArray(
StreamSupport.stream(
elements.stream()
.filter(
Predicates.compose(new StartsWith("b"), new ExtractWriteDestination())
::apply)
.collect(Collectors.toList())
.spliterator(),
false)
.map(Functions.toStringFunction()::apply)
.collect(Collectors.toList()),
String.class);
String[] cElements =
Iterables.toArray(
StreamSupport.stream(
elements.stream()
.filter(
Predicates.compose(new StartsWith("c"), new ExtractWriteDestination())
::apply)
.collect(Collectors.toList())
.spliterator(),
false)
.map(Functions.toStringFunction()::apply)
.collect(Collectors.toList()),
String.class);
assertOutputFiles(
aElements,
null,
null,
0,
baseDir.resolve("file_a.txt", ResolveOptions.StandardResolveOptions.RESOLVE_FILE),
DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE);
assertOutputFiles(
bElements,
null,
null,
0,
baseDir.resolve("file_b.txt", ResolveOptions.StandardResolveOptions.RESOLVE_FILE),
DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE);
assertOutputFiles(
cElements,
null,
null,
0,
baseDir.resolve("file_c.txt", ResolveOptions.StandardResolveOptions.RESOLVE_FILE),
DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE);
}
private void runTestWrite(String[] elems) throws Exception {
runTestWrite(elems, null, null, 1);
}
private void runTestWrite(String[] elems, int numShards) throws Exception {
runTestWrite(elems, null, null, numShards);
}
private void runTestWrite(String[] elems, String header, String footer) throws Exception {
runTestWrite(elems, header, footer, 1);
}
private void runTestWrite(String[] elems, String header, String footer, int numShards)
throws Exception {
String outputName = "file.txt";
Path baseDir = Files.createTempDirectory(tempFolder.getRoot().toPath(), "testwrite");
ResourceId baseFilename =
FileBasedSink.convertToFileResourceIfPossible(baseDir.resolve(outputName).toString());
PCollection<String> input =
p.apply("CreateInput", Create.of(Arrays.asList(elems)).withCoder(StringUtf8Coder.of()));
TextIO.TypedWrite<String, Void> write =
TextIO.write().to(baseFilename).withHeader(header).withFooter(footer).withOutputFilenames();
if (numShards == 1) {
write = write.withoutSharding();
} else if (numShards > 0) {
write = write.withNumShards(numShards).withShardNameTemplate(ShardNameTemplate.INDEX_OF_MAX);
}
input.apply(write);
p.run();
assertOutputFiles(
elems,
header,
footer,
numShards,
baseFilename,
firstNonNull(
write.getShardTemplate(), DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE));
}
private static void assertOutputFiles(
String[] elems,
final String header,
final String footer,
int numShards,
ResourceId outputPrefix,
String shardNameTemplate)
throws Exception {
List<File> expectedFiles = new ArrayList<>();
if (numShards == 0) {
String pattern = outputPrefix.toString() + "*";
List<MatchResult> matches = FileSystems.match(Collections.singletonList(pattern));
for (Metadata expectedFile : Iterables.getOnlyElement(matches).metadata()) {
expectedFiles.add(new File(expectedFile.resourceId().toString()));
}
} else {
for (int i = 0; i < numShards; i++) {
expectedFiles.add(
new File(
DefaultFilenamePolicy.constructName(
outputPrefix, shardNameTemplate, "", i, numShards, null, null)
.toString()));
}
}
List<List<String>> actual = new ArrayList<>();
for (File tmpFile : expectedFiles) {
List<String> currentFile = readLinesFromFile(tmpFile);
actual.add(currentFile);
}
List<String> expectedElements = new ArrayList<>(elems.length);
for (String elem : elems) {
byte[] encodedElem = CoderUtils.encodeToByteArray(StringUtf8Coder.of(), elem);
String line = new String(encodedElem, Charsets.UTF_8);
expectedElements.add(line);
}
List<String> actualElements =
Lists.newArrayList(
Iterables.concat(
FluentIterable.from(actual)
.transform(removeHeaderAndFooter(header, footer))
.toList()));
assertThat(actualElements, containsInAnyOrder(expectedElements.toArray()));
assertTrue(actual.stream().allMatch(haveProperHeaderAndFooter(header, footer)::apply));
}
private static List<String> readLinesFromFile(File f) throws IOException {
List<String> currentFile = new ArrayList<>();
try (BufferedReader reader = Files.newBufferedReader(f.toPath(), Charsets.UTF_8)) {
while (true) {
String line = reader.readLine();
if (line == null) {
break;
}
currentFile.add(line);
}
}
return currentFile;
}
private static Function<List<String>, List<String>> removeHeaderAndFooter(
final String header, final String footer) {
return new Function<List<String>, List<String>>() {
@Override
public @Nullable List<String> apply(List<String> lines) {
ArrayList<String> newLines = Lists.newArrayList(lines);
if (header != null) {
newLines.remove(0);
}
if (footer != null) {
int last = newLines.size() - 1;
newLines.remove(last);
}
return newLines;
}
};
}
private static Predicate<List<String>> haveProperHeaderAndFooter(
final String header, final String footer) {
return fileLines -> {
int last = fileLines.size() - 1;
return (header == null || fileLines.get(0).equals(header))
&& (footer == null || fileLines.get(last).equals(footer));
};
}
@Test
@Category(NeedsRunner.class)
public void testWriteStrings() throws Exception {
runTestWrite(LINES_ARRAY);
}
@Test
@Category(NeedsRunner.class)
public void testWriteEmptyStringsNoSharding() throws Exception {
runTestWrite(NO_LINES_ARRAY, 0);
}
@Test
@Category(NeedsRunner.class)
public void testWriteEmptyStrings() throws Exception {
runTestWrite(NO_LINES_ARRAY);
}
@Test
@Category(NeedsRunner.class)
public void testShardedWrite() throws Exception {
runTestWrite(LINES_ARRAY, 5);
}
@Test
@Category(NeedsRunner.class)
public void testWriteWithHeader() throws Exception {
runTestWrite(LINES_ARRAY, MY_HEADER, null);
}
@Test
@Category(NeedsRunner.class)
public void testWriteWithFooter() throws Exception {
runTestWrite(LINES_ARRAY, null, MY_FOOTER);
}
@Test
@Category(NeedsRunner.class)
public void testWriteWithHeaderAndFooter() throws Exception {
runTestWrite(LINES_ARRAY, MY_HEADER, MY_FOOTER);
}
@Test
@Category(NeedsRunner.class)
public void testWriteWithWritableByteChannelFactory() throws Exception {
Coder<String> coder = StringUtf8Coder.of();
String outputName = "file.txt";
ResourceId baseDir =
FileSystems.matchNewResource(
Files.createTempDirectory(tempFolder.getRoot().toPath(), "testwrite").toString(), true);
PCollection<String> input = p.apply(Create.of(Arrays.asList(LINES2_ARRAY)).withCoder(coder));
final WritableByteChannelFactory writableByteChannelFactory =
new DrunkWritableByteChannelFactory();
TextIO.Write write =
TextIO.write()
.to(
baseDir
.resolve(outputName, ResolveOptions.StandardResolveOptions.RESOLVE_FILE)
.toString())
.withoutSharding()
.withWritableByteChannelFactory(writableByteChannelFactory);
DisplayData displayData = DisplayData.from(write);
assertThat(displayData, hasDisplayItem("writableByteChannelFactory", "DRUNK"));
input.apply(write);
p.run();
final List<String> drunkElems = new ArrayList<>(LINES2_ARRAY.length * 2 + 2);
for (String elem : LINES2_ARRAY) {
drunkElems.add(elem);
drunkElems.add(elem);
}
assertOutputFiles(
drunkElems.toArray(new String[0]),
null,
null,
1,
baseDir.resolve(
outputName + writableByteChannelFactory.getSuggestedFilenameSuffix(),
ResolveOptions.StandardResolveOptions.RESOLVE_FILE),
write.inner.getShardTemplate());
}
@Test
public void testWriteDisplayData() {
// TODO: Java core test failing on windows, https://issues.apache.org/jira/browse/BEAM-10737
assumeFalse(SystemUtils.IS_OS_WINDOWS);
TextIO.Write write =
TextIO.write()
.to("/foo")
.withSuffix("bar")
.withShardNameTemplate("-SS-of-NN-")
.withNumShards(100)
.withFooter("myFooter")
.withHeader("myHeader");
DisplayData displayData = DisplayData.from(write);
assertThat(displayData, hasDisplayItem("filePrefix", "/foo"));
assertThat(displayData, hasDisplayItem("fileSuffix", "bar"));
assertThat(displayData, hasDisplayItem("fileHeader", "myHeader"));
assertThat(displayData, hasDisplayItem("fileFooter", "myFooter"));
assertThat(displayData, hasDisplayItem("shardNameTemplate", "-SS-of-NN-"));
assertThat(displayData, hasDisplayItem("numShards", 100));
assertThat(displayData, hasDisplayItem("writableByteChannelFactory", "UNCOMPRESSED"));
}
@Test
public void testWriteDisplayDataValidateThenHeader() {
TextIO.Write write = TextIO.write().to("foo").withHeader("myHeader");
DisplayData displayData = DisplayData.from(write);
assertThat(displayData, hasDisplayItem("fileHeader", "myHeader"));
}
@Test
public void testWriteDisplayDataValidateThenFooter() {
TextIO.Write write = TextIO.write().to("foo").withFooter("myFooter");
DisplayData displayData = DisplayData.from(write);
assertThat(displayData, hasDisplayItem("fileFooter", "myFooter"));
}
@Test
public void testGetName() {
assertEquals("TextIO.Write", TextIO.write().to("somefile").getName());
}
/** Options for testing. */
public interface RuntimeTestOptions extends PipelineOptions {
ValueProvider<String> getOutput();
void setOutput(ValueProvider<String> value);
}
@Test
public void testRuntimeOptionsNotCalledInApply() throws Exception {
p.enableAbandonedNodeEnforcement(false);
RuntimeTestOptions options = PipelineOptionsFactory.as(RuntimeTestOptions.class);
p.apply(Create.of("")).apply(TextIO.write().to(options.getOutput()));
}
@Test
@Category(NeedsRunner.class)
public void testWindowedWritesWithOnceTrigger() throws Throwable {
p.enableAbandonedNodeEnforcement(false);
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("Unsafe trigger");
// Tests for https://issues.apache.org/jira/browse/BEAM-3169
PCollection<String> data =
p.apply(Create.of("0", "1", "2"))
.apply(
Window.<String>into(FixedWindows.of(Duration.standardSeconds(1)))
// According to this trigger, all data should be written.
// However, the continuation of this trigger is elementCountAtLeast(1),
// so with a buggy implementation that used a GBK before renaming files,
// only 1 file would be renamed.
.triggering(AfterPane.elementCountAtLeast(3))
.withAllowedLateness(Duration.standardMinutes(1))
.discardingFiredPanes());
PCollection<String> filenames =
data.apply(
TextIO.write()
.to(new File(tempFolder.getRoot(), "windowed-writes").getAbsolutePath())
.withNumShards(2)
.withWindowedWrites()
.<Void>withOutputFilenames())
.getPerDestinationOutputFilenames()
.apply(Values.create());
}
@Test
@Category(NeedsRunner.class)
public void testWriteViaSink() throws Exception {
List<String> data = ImmutableList.of("a", "b", "c", "d", "e", "f");
PAssert.that(
p.apply("Create Data ReadFiles", Create.of(data))
.apply(
"Write ReadFiles",
FileIO.<String>write()
.to(tempFolder.getRoot().toString())
.withSuffix(".txt")
.via(TextIO.sink())
.withIgnoreWindowing())
.getPerDestinationOutputFilenames()
.apply("Extract Values ReadFiles", Values.create())
.apply("Match All", FileIO.matchAll())
.apply("Read Matches", FileIO.readMatches())
.apply("Read Files", TextIO.readFiles()))
.containsInAnyOrder(data);
PAssert.that(
p.apply("Create Data ReadAll", Create.of(data))
.apply(
"Write ReadAll",
FileIO.<String>write()
.to(tempFolder.getRoot().toString())
.withSuffix(".txt")
.via(TextIO.sink())
.withIgnoreWindowing())
.getPerDestinationOutputFilenames()
.apply("Extract Values ReadAll", Values.create())
.apply("Read All", TextIO.readAll()))
.containsInAnyOrder(data);
p.run();
}
@Test
public void testSink() throws Exception {
TextIO.Sink sink = TextIO.sink().withHeader("header").withFooter("footer");
File f = new File(tempFolder.getRoot(), "file");
try (WritableByteChannel chan = Channels.newChannel(new FileOutputStream(f))) {
sink.open(chan);
sink.write("a");
sink.write("b");
sink.write("c");
sink.flush();
}
assertEquals(Arrays.asList("header", "a", "b", "c", "footer"), readLinesFromFile(f));
}
}