blob: 93bf86902006f72813ceec14607386ea6ae0078c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.xml;
import static org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionExhaustive;
import static org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionFails;
import static org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionSucceedsAndConsistent;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import java.io.BufferedWriter;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import javax.xml.bind.ValidationEventHandler;
import javax.xml.bind.annotation.XmlAttribute;
import javax.xml.bind.annotation.XmlRootElement;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.Source.Reader;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/**
* Tests for {@link XmlSource} in particular - the rest of {@link XmlIO} is tested in {@link
* XmlIOTest}.
*/
@RunWith(JUnit4.class)
public class XmlSourceTest {
@Rule public TestPipeline p = TestPipeline.create();
@Rule public TemporaryFolder tempFolder = new TemporaryFolder();
@Rule public ExpectedException exception = ExpectedException.none();
private String tinyXML =
"<trains><train><name>Thomas</name></train><train><name>Henry</name></train>"
+ "<train><name>James</name></train></trains>";
private String xmlWithMultiByteElementName =
"<දුම්රියන්><දුම්රිය><name>Thomas</name></දුම්රිය><දුම්රිය><name>Henry</name></දුම්රිය>"
+ "<දුම්රිය><name>James</name></දුම්රිය></දුම්රියන්>";
private String xmlWithMultiByteChars =
"<trains><train><name>Thomas¥</name></train><train><name>Hen¶ry</name></train>"
+ "<train><name>Jamßes</name></train></trains>";
private String trainXML =
"<trains>"
+ "<train><name>Thomas</name><number>1</number><color>blue</color></train>"
+ "<train><name>Henry</name><number>3</number><color>green</color></train>"
+ "<train><name>Toby</name><number>7</number><color>brown</color></train>"
+ "<train><name>Gordon</name><number>4</number><color>blue</color></train>"
+ "<train><name>Emily</name><number>-1</number><color>red</color></train>"
+ "<train><name>Percy</name><number>6</number><color>green</color></train>"
+ "</trains>";
private String trainXMLWithEmptyTags =
"<trains>"
+ "<train/>"
+ "<train><name>Thomas</name><number>1</number><color>blue</color></train>"
+ "<train><name>Henry</name><number>3</number><color>green</color></train>"
+ "<train/>"
+ "<train><name>Toby</name><number>7</number><color>brown</color></train>"
+ "<train><name>Gordon</name><number>4</number><color>blue</color></train>"
+ "<train><name>Emily</name><number>-1</number><color>red</color></train>"
+ "<train><name>Percy</name><number>6</number><color>green</color></train>"
+ "</trains>";
private String trainXMLWithAttributes =
"<trains>"
+ "<train size=\"small\"><name>Thomas</name><number>1</number><color>blue</color></train>"
+ "<train size=\"big\"><name>Henry</name><number>3</number><color>green</color></train>"
+ "<train size=\"small\"><name>Toby</name><number>7</number><color>brown</color></train>"
+ "<train size=\"big\"><name>Gordon</name><number>4</number><color>blue</color></train>"
+ "<train size=\"small\"><name>Emily</name><number>-1</number><color>red</color></train>"
+ "<train size=\"small\"><name>Percy</name><number>6</number><color>green</color></train>"
+ "</trains>";
private String trainXMLWithSpaces =
"<trains>"
+ "<train><name>Thomas </name> <number>1</number><color>blue</color></train>"
+ "<train><name>Henry</name><number>3</number><color>green</color></train>\n"
+ "<train><name>Toby</name><number>7</number><color> brown </color></train> "
+ "<train><name>Gordon</name> <number>4</number><color>blue</color>\n</train>\t"
+ "<train><name>Emily</name><number>-1</number>\t<color>red</color></train>"
+ "<train>\n<name>Percy</name> <number>6 </number> <color>green</color></train>"
+ "</trains>";
private String trainXMLWithAllFeaturesMultiByte =
"<දුම්රියන්>"
+ "<දුම්රිය/>"
+ "<දුම්රිය size=\"small\"><name> Thomas¥</name><number>1</number><color>blue</color>"
+ "</දුම්රිය>"
+ "<දුම්රිය size=\"big\"><name>He nry</name><number>3</number><color>green</color></දුම්රිය>"
+ "<දුම්රිය size=\"small\"><name>Toby </name><number>7</number><color>br¶own</color>"
+ "</දුම්රිය>"
+ "<දුම්රිය/>"
+ "<දුම්රිය size=\"big\"><name>Gordon</name><number>4</number><color> blue</color></දුම්රිය>"
+ "<දුම්රිය size=\"small\"><name>Emily</name><number>-1</number><color>red</color></දුම්රිය>"
+ "<දුම්රිය size=\"small\"><name>Percy</name><number>6</number><color>green</color>"
+ "</දුම්රිය>"
+ "</දුම්රියන්>";
private String trainXMLWithAllFeaturesSingleByte =
"<trains>"
+ "<train/>"
+ "<train size=\"small\"><name> Thomas</name><number>1</number><color>blue</color>"
+ "</train>"
+ "<train size=\"big\"><name>He nry</name><number>3</number><color>green</color></train>"
+ "<train size=\"small\"><name>Toby </name><number>7</number><color>brown</color>"
+ "</train>"
+ "<train/>"
+ "<train size=\"big\"><name>Gordon</name><number>4</number><color> blue</color></train>"
+ "<train size=\"small\"><name>Emily</name><number>-1</number><color>red</color></train>"
+ "<train size=\"small\"><name>Percy</name><number>6</number><color>green</color>"
+ "</train>"
+ "</trains>";
private String trainXMLWithISO88591 =
"<trains>"
+ "<train size=\"small\"><name>Cédric</name><number>7</number><color>blue</color></train>"
+ "</trains>";
@XmlRootElement
static class TinyTrain {
TinyTrain(String name) {
this.name = name;
}
public TinyTrain() {}
public String name = null;
@Override
public String toString() {
String str = "Train[";
if (name != null) {
str = str + "name=" + name;
}
str = str + "]";
return str;
}
}
@XmlRootElement
static class Train {
public static final int TRAIN_NUMBER_UNDEFINED = -1;
public String name = null;
public String color = null;
public int number = TRAIN_NUMBER_UNDEFINED;
@XmlAttribute(name = "size")
public String size = null;
public Train() {}
public Train(String name, int number, String color, String size) {
this.name = name;
this.number = number;
this.color = color;
this.size = size;
}
@Override
public int hashCode() {
int hashCode = 1;
hashCode = 31 * hashCode + (name == null ? 0 : name.hashCode());
hashCode = 31 * hashCode + number;
hashCode = 31 * hashCode + (color == null ? 0 : name.hashCode());
hashCode = 31 * hashCode + (size == null ? 0 : name.hashCode());
return hashCode;
}
@Override
public boolean equals(Object obj) {
if (!(obj instanceof Train)) {
return false;
}
Train other = (Train) obj;
return (name == null || name.equals(other.name))
&& (number == other.number)
&& (color == null || color.equals(other.color))
&& (size == null || size.equals(other.size));
}
@Override
public String toString() {
String str = "Train[";
boolean first = true;
if (name != null) {
str = str + "name=" + name;
first = false;
}
if (number != Integer.MIN_VALUE) {
if (!first) {
str = str + ",";
}
str = str + "number=" + number;
first = false;
}
if (color != null) {
if (!first) {
str = str + ",";
}
str = str + "color=" + color;
first = false;
}
if (size != null) {
if (!first) {
str = str + ",";
}
str = str + "size=" + size;
}
str = str + "]";
return str;
}
}
private List<Train> generateRandomTrainList(int size) {
String[] names = {
"Thomas", "Henry", "Gordon", "Emily", "Toby", "Percy", "Mavis", "Edward", "Bertie", "Harold",
"Hiro", "Terence", "Salty", "Trevor"
};
int[] numbers = {-1, 1, 2, 3, 4, 5, 6, 7, 8, 9};
String[] colors = {"red", "blue", "green", "orange", "brown", "black", "white"};
String[] sizes = {"small", "medium", "big"};
Random random = new Random(System.currentTimeMillis());
List<Train> trains = new ArrayList<>();
for (int i = 0; i < size; i++) {
trains.add(
new Train(
names[random.nextInt(names.length - 1)],
numbers[random.nextInt(numbers.length - 1)],
colors[random.nextInt(colors.length - 1)],
sizes[random.nextInt(sizes.length - 1)]));
}
return trains;
}
private String trainToXMLElement(Train train) {
return "<train size=\""
+ train.size
+ "\"><name>"
+ train.name
+ "</name><number>"
+ train.number
+ "</number><color>"
+ train.color
+ "</color></train>";
}
private File createRandomTrainXML(String fileName, List<Train> trains) throws IOException {
File file = tempFolder.newFile(fileName);
try (BufferedWriter writer = Files.newBufferedWriter(file.toPath(), StandardCharsets.UTF_8)) {
writer.write("<trains>");
writer.newLine();
for (Train train : trains) {
String str = trainToXMLElement(train);
writer.write(str);
writer.newLine();
}
writer.write("</trains>");
writer.newLine();
}
return file;
}
private <T> List<T> readEverythingFromReader(Reader<T> reader) throws IOException {
List<T> results = new ArrayList<>();
for (boolean available = reader.start(); available; available = reader.advance()) {
T train = reader.getCurrent();
results.add(train);
}
return results;
}
@Test
public void testReadXMLTiny() throws IOException {
File file = tempFolder.newFile("trainXMLTiny");
Files.write(file.toPath(), tinyXML.getBytes(StandardCharsets.UTF_8));
BoundedSource<Train> source =
XmlIO.<Train>read()
.from(file.toPath().toString())
.withRootElement("trains")
.withRecordElement("train")
.withRecordClass(Train.class)
.withMinBundleSize(1024)
.createSource();
List<Train> expectedResults =
ImmutableList.of(
new Train("Thomas", Train.TRAIN_NUMBER_UNDEFINED, null, null),
new Train("Henry", Train.TRAIN_NUMBER_UNDEFINED, null, null),
new Train("James", Train.TRAIN_NUMBER_UNDEFINED, null, null));
assertThat(
trainsToStrings(expectedResults),
containsInAnyOrder(
trainsToStrings(readEverythingFromReader(source.createReader(null))).toArray()));
}
@Test
public void testReadXMLWithMultiByteChars() throws IOException {
File file = tempFolder.newFile("trainXMLTiny");
Files.write(file.toPath(), xmlWithMultiByteChars.getBytes(StandardCharsets.UTF_8));
BoundedSource<Train> source =
XmlIO.<Train>read()
.from(file.toPath().toString())
.withRootElement("trains")
.withRecordElement("train")
.withRecordClass(Train.class)
.withMinBundleSize(1024)
.createSource();
List<Train> expectedResults =
ImmutableList.of(
new Train("Thomas¥", Train.TRAIN_NUMBER_UNDEFINED, null, null),
new Train("Hen¶ry", Train.TRAIN_NUMBER_UNDEFINED, null, null),
new Train("Jamßes", Train.TRAIN_NUMBER_UNDEFINED, null, null));
assertThat(
trainsToStrings(expectedResults),
containsInAnyOrder(
trainsToStrings(readEverythingFromReader(source.createReader(null))).toArray()));
}
@Test
@Ignore(
"Multi-byte characters in XML are not supported because the parser "
+ "currently does not correctly report byte offsets")
public void testReadXMLWithMultiByteElementName() throws IOException {
File file = tempFolder.newFile("trainXMLTiny");
Files.write(file.toPath(), xmlWithMultiByteElementName.getBytes(StandardCharsets.UTF_8));
BoundedSource<Train> source =
XmlIO.<Train>read()
.from(file.toPath().toString())
.withRootElement("දුම්රියන්")
.withRecordElement("දුම්රිය")
.withRecordClass(Train.class)
.withMinBundleSize(1024)
.createSource();
List<Train> expectedResults =
ImmutableList.of(
new Train("Thomas", Train.TRAIN_NUMBER_UNDEFINED, null, null),
new Train("Henry", Train.TRAIN_NUMBER_UNDEFINED, null, null),
new Train("James", Train.TRAIN_NUMBER_UNDEFINED, null, null));
assertThat(
trainsToStrings(expectedResults),
containsInAnyOrder(
trainsToStrings(readEverythingFromReader(source.createReader(null))).toArray()));
}
@Test
public void testSplitWithEmptyBundleAtEnd() throws Exception {
File file = tempFolder.newFile("trainXMLTiny");
Files.write(file.toPath(), tinyXML.getBytes(StandardCharsets.UTF_8));
BoundedSource<Train> source =
XmlIO.<Train>read()
.from(file.toPath().toString())
.withRootElement("trains")
.withRecordElement("train")
.withRecordClass(Train.class)
.withMinBundleSize(10)
.createSource();
List<? extends BoundedSource<Train>> splits = source.split(50, null);
assertTrue(splits.size() > 2);
List<Train> results = new ArrayList<>();
for (BoundedSource<Train> split : splits) {
results.addAll(readEverythingFromReader(split.createReader(null)));
}
List<Train> expectedResults =
ImmutableList.of(
new Train("Thomas", Train.TRAIN_NUMBER_UNDEFINED, null, null),
new Train("Henry", Train.TRAIN_NUMBER_UNDEFINED, null, null),
new Train("James", Train.TRAIN_NUMBER_UNDEFINED, null, null));
assertThat(
trainsToStrings(expectedResults), containsInAnyOrder(trainsToStrings(results).toArray()));
}
List<String> trainsToStrings(List<Train> input) {
List<String> strings = new ArrayList<>();
for (Object data : input) {
strings.add(data.toString());
}
return strings;
}
List<String> tinyTrainsToStrings(List<TinyTrain> input) {
List<String> strings = new ArrayList<>();
for (Object data : input) {
strings.add(data.toString());
}
return strings;
}
@Test
public void testReadXMLSmall() throws IOException {
File file = tempFolder.newFile("trainXMLSmall");
Files.write(file.toPath(), trainXML.getBytes(StandardCharsets.UTF_8));
BoundedSource<Train> source =
XmlIO.<Train>read()
.from(file.toPath().toString())
.withRootElement("trains")
.withRecordElement("train")
.withRecordClass(Train.class)
.withMinBundleSize(1024)
.createSource();
List<Train> expectedResults =
ImmutableList.of(
new Train("Thomas", 1, "blue", null),
new Train("Henry", 3, "green", null),
new Train("Toby", 7, "brown", null),
new Train("Gordon", 4, "blue", null),
new Train("Emily", -1, "red", null),
new Train("Percy", 6, "green", null));
assertThat(
trainsToStrings(expectedResults),
containsInAnyOrder(
trainsToStrings(readEverythingFromReader(source.createReader(null))).toArray()));
}
@Test
public void testReadXMLIncorrectRootElement() throws IOException {
File file = tempFolder.newFile("trainXMLSmall");
Files.write(file.toPath(), trainXML.getBytes(StandardCharsets.UTF_8));
BoundedSource<Train> source =
XmlIO.<Train>read()
.from(file.toPath().toString())
.withRootElement("something")
.withRecordElement("train")
.withRecordClass(Train.class)
.createSource();
exception.expectMessage("Unexpected close tag </trains>; expected </something>.");
readEverythingFromReader(source.createReader(null));
}
@Test
public void testReadXMLIncorrectRecordElement() throws IOException {
File file = tempFolder.newFile("trainXMLSmall");
Files.write(file.toPath(), trainXML.getBytes(StandardCharsets.UTF_8));
BoundedSource<Train> source =
XmlIO.<Train>read()
.from(file.toPath().toString())
.withRootElement("trains")
.withRecordElement("something")
.withRecordClass(Train.class)
.createSource();
assertEquals(readEverythingFromReader(source.createReader(null)), new ArrayList<Train>());
}
@XmlRootElement
private static class WrongTrainType {
@SuppressWarnings("unused")
public String something;
}
@Test
public void testReadXMLInvalidRecordClassWithCustomEventHandler() throws IOException {
File file = tempFolder.newFile("trainXMLSmall");
Files.write(file.toPath(), trainXML.getBytes(StandardCharsets.UTF_8));
ValidationEventHandler validationEventHandler =
event -> {
throw new RuntimeException("MyCustomValidationEventHandler failure mesage");
};
BoundedSource<WrongTrainType> source =
XmlIO.<WrongTrainType>read()
.from(file.toPath().toString())
.withRootElement("trains")
.withRecordElement("train")
.withRecordClass(WrongTrainType.class)
.withValidationEventHandler(validationEventHandler)
.createSource();
exception.expect(RuntimeException.class);
// JAXB internationalizes the error message. So this is all we can match for.
exception.expectMessage("MyCustomValidationEventHandler failure mesage");
try (Reader<WrongTrainType> reader = source.createReader(null)) {
List<WrongTrainType> results = new ArrayList<>();
for (boolean available = reader.start(); available; available = reader.advance()) {
WrongTrainType train = reader.getCurrent();
results.add(train);
}
}
}
@Test
public void testReadXmlWithAdditionalFieldsShouldNotThrowException() throws IOException {
File file = tempFolder.newFile("trainXMLSmall");
Files.write(file.toPath(), trainXML.getBytes(StandardCharsets.UTF_8));
BoundedSource<TinyTrain> source =
XmlIO.<TinyTrain>read()
.from(file.toPath().toString())
.withRootElement("trains")
.withRecordElement("train")
.withRecordClass(TinyTrain.class)
.createSource();
List<TinyTrain> expectedResults =
ImmutableList.of(
new TinyTrain("Thomas"),
new TinyTrain("Henry"),
new TinyTrain("Toby"),
new TinyTrain("Gordon"),
new TinyTrain("Emily"),
new TinyTrain("Percy"));
assertThat(
tinyTrainsToStrings(expectedResults),
containsInAnyOrder(
tinyTrainsToStrings(readEverythingFromReader(source.createReader(null))).toArray()));
}
@Test
public void testReadXMLNoBundleSize() throws IOException {
File file = tempFolder.newFile("trainXMLSmall");
Files.write(file.toPath(), trainXML.getBytes(StandardCharsets.UTF_8));
BoundedSource<Train> source =
XmlIO.<Train>read()
.from(file.toPath().toString())
.withRootElement("trains")
.withRecordElement("train")
.withRecordClass(Train.class)
.createSource();
List<Train> expectedResults =
ImmutableList.of(
new Train("Thomas", 1, "blue", null),
new Train("Henry", 3, "green", null),
new Train("Toby", 7, "brown", null),
new Train("Gordon", 4, "blue", null),
new Train("Emily", -1, "red", null),
new Train("Percy", 6, "green", null));
assertThat(
trainsToStrings(expectedResults),
containsInAnyOrder(
trainsToStrings(readEverythingFromReader(source.createReader(null))).toArray()));
}
@Test
public void testReadXMLWithEmptyTags() throws IOException {
File file = tempFolder.newFile("trainXMLSmall");
Files.write(file.toPath(), trainXMLWithEmptyTags.getBytes(StandardCharsets.UTF_8));
BoundedSource<Train> source =
XmlIO.<Train>read()
.from(file.toPath().toString())
.withRootElement("trains")
.withRecordElement("train")
.withRecordClass(Train.class)
.withMinBundleSize(1024)
.createSource();
List<Train> expectedResults =
ImmutableList.of(
new Train("Thomas", 1, "blue", null),
new Train("Henry", 3, "green", null),
new Train("Toby", 7, "brown", null),
new Train("Gordon", 4, "blue", null),
new Train("Emily", -1, "red", null),
new Train("Percy", 6, "green", null),
new Train(),
new Train());
assertThat(
trainsToStrings(expectedResults),
containsInAnyOrder(
trainsToStrings(readEverythingFromReader(source.createReader(null))).toArray()));
}
@Test
public void testReadXMLWithCharset() throws IOException {
File file = tempFolder.newFile("trainXMLISO88591");
Files.write(file.toPath(), trainXMLWithISO88591.getBytes(StandardCharsets.ISO_8859_1));
PCollection<Train> output =
p.apply(
"ReadFileData",
XmlIO.<Train>read()
.from(file.toPath().toString())
.withRootElement("trains")
.withRecordElement("train")
.withRecordClass(Train.class)
.withMinBundleSize(1024)
.withCharset(StandardCharsets.ISO_8859_1));
List<Train> expectedResults = ImmutableList.of(new Train("Cédric", 7, "blue", "small"));
PAssert.that(output).containsInAnyOrder(expectedResults);
p.run();
}
@Test
public void testReadXMLSmallPipeline() throws IOException {
File file = tempFolder.newFile("trainXMLSmall");
Files.write(file.toPath(), trainXML.getBytes(StandardCharsets.UTF_8));
PCollection<Train> output =
p.apply(
"ReadFileData",
XmlIO.<Train>read()
.from(file.toPath().toString())
.withRootElement("trains")
.withRecordElement("train")
.withRecordClass(Train.class)
.withMinBundleSize(1024));
List<Train> expectedResults =
ImmutableList.of(
new Train("Thomas", 1, "blue", null),
new Train("Henry", 3, "green", null),
new Train("Toby", 7, "brown", null),
new Train("Gordon", 4, "blue", null),
new Train("Emily", -1, "red", null),
new Train("Percy", 6, "green", null));
PAssert.that(output).containsInAnyOrder(expectedResults);
p.run();
}
@Test
public void testReadXMLWithAttributes() throws IOException {
File file = tempFolder.newFile("trainXMLSmall");
Files.write(file.toPath(), trainXMLWithAttributes.getBytes(StandardCharsets.UTF_8));
BoundedSource<Train> source =
XmlIO.<Train>read()
.from(file.toPath().toString())
.withRootElement("trains")
.withRecordElement("train")
.withRecordClass(Train.class)
.withMinBundleSize(1024)
.createSource();
List<Train> expectedResults =
ImmutableList.of(
new Train("Thomas", 1, "blue", "small"),
new Train("Henry", 3, "green", "big"),
new Train("Toby", 7, "brown", "small"),
new Train("Gordon", 4, "blue", "big"),
new Train("Emily", -1, "red", "small"),
new Train("Percy", 6, "green", "small"));
assertThat(
trainsToStrings(expectedResults),
containsInAnyOrder(
trainsToStrings(readEverythingFromReader(source.createReader(null))).toArray()));
}
@Test
public void testReadXMLWithWhitespaces() throws IOException {
File file = tempFolder.newFile("trainXMLSmall");
Files.write(file.toPath(), trainXMLWithSpaces.getBytes(StandardCharsets.UTF_8));
BoundedSource<Train> source =
XmlIO.<Train>read()
.from(file.toPath().toString())
.withRootElement("trains")
.withRecordElement("train")
.withRecordClass(Train.class)
.withMinBundleSize(1024)
.createSource();
List<Train> expectedResults =
ImmutableList.of(
new Train("Thomas ", 1, "blue", null),
new Train("Henry", 3, "green", null),
new Train("Toby", 7, " brown ", null),
new Train("Gordon", 4, "blue", null),
new Train("Emily", -1, "red", null),
new Train("Percy", 6, "green", null));
assertThat(
trainsToStrings(expectedResults),
containsInAnyOrder(
trainsToStrings(readEverythingFromReader(source.createReader(null))).toArray()));
}
@Test
public void testReadXMLLarge() throws IOException {
String fileName = "temp.xml";
List<Train> trains = generateRandomTrainList(100);
File file = createRandomTrainXML(fileName, trains);
BoundedSource<Train> source =
XmlIO.<Train>read()
.from(file.toPath().toString())
.withRootElement("trains")
.withRecordElement("train")
.withRecordClass(Train.class)
.withMinBundleSize(1024)
.createSource();
assertThat(
trainsToStrings(trains),
containsInAnyOrder(
trainsToStrings(readEverythingFromReader(source.createReader(null))).toArray()));
}
@Test
public void testSplitWithEmptyBundles() throws Exception {
String fileName = "temp.xml";
List<Train> trains = generateRandomTrainList(10);
File file = createRandomTrainXML(fileName, trains);
BoundedSource<Train> source =
XmlIO.<Train>read()
.from(file.toPath().toString())
.withRootElement("trains")
.withRecordElement("train")
.withRecordClass(Train.class)
.withMinBundleSize(10)
.createSource();
List<? extends BoundedSource<Train>> splits = source.split(100, null);
assertTrue(splits.size() > 2);
List<Train> results = new ArrayList<>();
for (BoundedSource<Train> split : splits) {
results.addAll(readEverythingFromReader(split.createReader(null)));
}
assertThat(trainsToStrings(trains), containsInAnyOrder(trainsToStrings(results).toArray()));
}
@Test
public void testXMLWithSplits() throws Exception {
String fileName = "temp.xml";
List<Train> trains = generateRandomTrainList(100);
File file = createRandomTrainXML(fileName, trains);
BoundedSource<Train> source =
XmlIO.<Train>read()
.from(file.toPath().toString())
.withRootElement("trains")
.withRecordElement("train")
.withRecordClass(Train.class)
.withMinBundleSize(10)
.createSource();
List<? extends BoundedSource<Train>> splits = source.split(256, null);
// Not a trivial split
assertTrue(splits.size() > 2);
List<Train> results = new ArrayList<>();
for (BoundedSource<Train> split : splits) {
results.addAll(readEverythingFromReader(split.createReader(null)));
}
assertThat(trainsToStrings(trains), containsInAnyOrder(trainsToStrings(results).toArray()));
}
@Test
public void testSplitAtFraction() throws Exception {
PipelineOptions options = PipelineOptionsFactory.create();
String fileName = "temp.xml";
List<Train> trains = generateRandomTrainList(100);
File file = createRandomTrainXML(fileName, trains);
BoundedSource<Train> fileSource =
XmlIO.<Train>read()
.from(file.toPath().toString())
.withRootElement("trains")
.withRecordElement("train")
.withRecordClass(Train.class)
.withMinBundleSize(10)
.createSource();
List<? extends BoundedSource<Train>> splits = fileSource.split(file.length() / 3, null);
for (BoundedSource<Train> splitSource : splits) {
int numItems = readEverythingFromReader(splitSource.createReader(null)).size();
// Should not split while unstarted.
assertSplitAtFractionFails(splitSource, 0, 0.7, options);
assertSplitAtFractionSucceedsAndConsistent(splitSource, 1, 0.7, options);
assertSplitAtFractionSucceedsAndConsistent(splitSource, 15, 0.7, options);
assertSplitAtFractionFails(splitSource, 0, 0.0, options);
assertSplitAtFractionFails(splitSource, 20, 0.3, options);
assertSplitAtFractionFails(splitSource, numItems, 1.0, options);
// After reading 100 elements we will be approximately at position
// 0.99 * (endOffset - startOffset) hence trying to split at fraction 0.9 will be
// unsuccessful.
assertSplitAtFractionFails(splitSource, numItems, 0.9, options);
// Following passes since we can always find a fraction that is extremely close to 1 such that
// the position suggested by the fraction will be larger than the position the reader is at
// after reading "items - 1" elements.
// This also passes for "numItemsToReadBeforeSplit = items" if the position at suggested
// fraction is larger than the position the reader is at after reading all "items" elements
// (i.e., the start position of the last element). This is true for most cases but will not
// be true if reader position is only one less than the end position. (i.e., the last element
// of the bundle start at the last byte that belongs to the bundle).
assertSplitAtFractionSucceedsAndConsistent(splitSource, numItems - 1, 0.999, options);
}
}
@Test
public void testSplitAtFractionExhaustiveSingleByte() throws Exception {
PipelineOptions options = PipelineOptionsFactory.create();
File file = tempFolder.newFile("trainXMLSmall");
Files.write(file.toPath(), trainXMLWithAllFeaturesSingleByte.getBytes(StandardCharsets.UTF_8));
BoundedSource<Train> source =
XmlIO.<Train>read()
.from(file.toPath().toString())
.withRootElement("trains")
.withRecordElement("train")
.withRecordClass(Train.class)
.createSource();
assertSplitAtFractionExhaustive(source, options);
}
@Test
@Ignore(
"Multi-byte characters in XML are not supported because the parser "
+ "currently does not correctly report byte offsets")
public void testSplitAtFractionExhaustiveMultiByte() throws Exception {
PipelineOptions options = PipelineOptionsFactory.create();
File file = tempFolder.newFile("trainXMLSmall");
Files.write(file.toPath(), trainXMLWithAllFeaturesMultiByte.getBytes(StandardCharsets.UTF_8));
BoundedSource<Train> source =
XmlIO.<Train>read()
.from(file.toPath().toString())
.withRootElement("දුම්රියන්")
.withRecordElement("දුම්රිය")
.withRecordClass(Train.class)
.createSource();
assertSplitAtFractionExhaustive(source, options);
}
}