blob: 99c569cc0c2f71358d2c7ed778d17ef5ad1a6884 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.java.io;
import org.apache.flink.api.common.io.ParseException;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.api.java.tuple.Tuple6;
import org.apache.flink.api.java.typeutils.PojoTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.Path;
import org.apache.flink.types.parser.FieldParser;
import org.apache.flink.types.parser.StringParser;
import org.junit.Assert;
import org.junit.Test;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/**
* Tests for {@link CsvInputFormat}.
*/
public class CsvInputFormatTest {
private static final Path PATH = new Path("an/ignored/file/");
//Static variables for testing the removal of \r\n to \n
private static final String FIRST_PART = "That is the first part";
private static final String SECOND_PART = "That is the second part";
@Test
public void testSplitCsvInputStreamInLargeBuffer() throws Exception {
testSplitCsvInputStream(1024 * 1024, false);
}
@Test
public void testSplitCsvInputStreamInSmallBuffer() throws Exception {
testSplitCsvInputStream(2, false);
}
private void testSplitCsvInputStream(int bufferSize, boolean failAtStart) throws Exception {
final String fileContent =
"this is|1|2.0|\n" +
"a test|3|4.0|\n" +
"#next|5|6.0|\n" +
"asdadas|5|30.0|\n";
// create temporary file with 3 blocks
final File tempFile = File.createTempFile("input-stream-decoration-test", "tmp");
tempFile.deleteOnExit();
try (FileOutputStream fileOutputStream = new FileOutputStream(tempFile)) {
fileOutputStream.write(fileContent.getBytes(ConfigConstants.DEFAULT_CHARSET));
}
// fix the number of blocks and the size of each one.
final int noOfBlocks = 3;
final TupleTypeInfo<Tuple3<String, Integer, Double>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class, Integer.class, Double.class);
CsvInputFormat<Tuple3<String, Integer, Double>> format = new TupleCsvInputFormat<>(new Path(tempFile.toURI()), "\n", "|", typeInfo);
format.setLenient(true);
format.setBufferSize(bufferSize);
final Configuration config = new Configuration();
format.configure(config);
long[] offsetsAfterRecord = new long[]{ 15, 29, 42, 58};
long[] offsetAtEndOfSplit = new long[]{ 20, 40, 58};
int recordCounter = 0;
int splitCounter = 0;
FileInputSplit[] inputSplits = format.createInputSplits(noOfBlocks);
Tuple3<String, Integer, Double> result = new Tuple3<>();
for (FileInputSplit inputSplit : inputSplits) {
assertEquals(inputSplit.getStart() + inputSplit.getLength(), offsetAtEndOfSplit[splitCounter]);
splitCounter++;
format.open(inputSplit);
format.reopen(inputSplit, format.getCurrentState());
while (!format.reachedEnd()) {
if ((result = format.nextRecord(result)) != null) {
assertEquals((long) format.getCurrentState(), offsetsAfterRecord[recordCounter]);
recordCounter++;
if (recordCounter == 1) {
assertNotNull(result);
assertEquals("this is", result.f0);
assertEquals(Integer.valueOf(1), result.f1);
assertEquals(new Double(2.0), result.f2);
assertEquals((long) format.getCurrentState(), 15);
} else if (recordCounter == 2) {
assertNotNull(result);
assertEquals("a test", result.f0);
assertEquals(Integer.valueOf(3), result.f1);
assertEquals(new Double(4.0), result.f2);
assertEquals((long) format.getCurrentState(), 29);
} else if (recordCounter == 3) {
assertNotNull(result);
assertEquals("#next", result.f0);
assertEquals(Integer.valueOf(5), result.f1);
assertEquals(new Double(6.0), result.f2);
assertEquals((long) format.getCurrentState(), 42);
} else {
assertNotNull(result);
assertEquals("asdadas", result.f0);
assertEquals(new Integer(5), result.f1);
assertEquals(new Double(30.0), result.f2);
assertEquals((long) format.getCurrentState(), 58);
}
// simulate checkpoint
Long state = format.getCurrentState();
long offsetToRestore = state;
// create a new format
format = new TupleCsvInputFormat<>(new Path(tempFile.toURI()), "\n", "|", typeInfo);
format.setLenient(true);
format.setBufferSize(bufferSize);
format.configure(config);
// simulate the restore operation.
format.reopen(inputSplit, offsetToRestore);
} else {
result = new Tuple3<>();
}
}
format.close();
}
Assert.assertEquals(4, recordCounter);
}
@Test
public void ignoreInvalidLinesAndGetOffsetInLargeBuffer() {
ignoreInvalidLines(1024 * 1024);
}
@Test
public void ignoreInvalidLinesAndGetOffsetInSmallBuffer() {
ignoreInvalidLines(2);
}
private void ignoreInvalidLines(int bufferSize) {
try {
final String fileContent = "#description of the data\n" +
"header1|header2|header3|\n" +
"this is|1|2.0|\n" +
"//a comment\n" +
"a test|3|4.0|\n" +
"#next|5|6.0|\n" +
"asdasdas";
final FileInputSplit split = createTempFile(fileContent);
final TupleTypeInfo<Tuple3<String, Integer, Double>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class, Integer.class, Double.class);
final CsvInputFormat<Tuple3<String, Integer, Double>> format = new TupleCsvInputFormat<Tuple3<String, Integer, Double>>(PATH, "\n", "|", typeInfo);
format.setLenient(true);
format.setBufferSize(bufferSize);
final Configuration parameters = new Configuration();
format.configure(parameters);
format.open(split);
Tuple3<String, Integer, Double> result = new Tuple3<String, Integer, Double>();
result = format.nextRecord(result);
assertNotNull(result);
assertEquals("this is", result.f0);
assertEquals(Integer.valueOf(1), result.f1);
assertEquals(new Double(2.0), result.f2);
assertEquals((long) format.getCurrentState(), 65);
result = format.nextRecord(result);
assertNotNull(result);
assertEquals("a test", result.f0);
assertEquals(Integer.valueOf(3), result.f1);
assertEquals(new Double(4.0), result.f2);
assertEquals((long) format.getCurrentState(), 91);
result = format.nextRecord(result);
assertNotNull(result);
assertEquals("#next", result.f0);
assertEquals(Integer.valueOf(5), result.f1);
assertEquals(new Double(6.0), result.f2);
assertEquals((long) format.getCurrentState(), 104);
result = format.nextRecord(result);
assertNull(result);
assertEquals(fileContent.length(), (long) format.getCurrentState());
}
catch (Exception ex) {
ex.printStackTrace();
fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
}
}
@Test
public void ignoreSingleCharPrefixComments() {
try {
final String fileContent = "#description of the data\n" +
"#successive commented line\n" +
"this is|1|2.0|\n" +
"a test|3|4.0|\n" +
"#next|5|6.0|\n";
final FileInputSplit split = createTempFile(fileContent);
final TupleTypeInfo<Tuple3<String, Integer, Double>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class, Integer.class, Double.class);
final CsvInputFormat<Tuple3<String, Integer, Double>> format = new TupleCsvInputFormat<Tuple3<String, Integer, Double>>(PATH, "\n", "|", typeInfo);
format.setCommentPrefix("#");
final Configuration parameters = new Configuration();
format.configure(parameters);
format.open(split);
Tuple3<String, Integer, Double> result = new Tuple3<String, Integer, Double>();
result = format.nextRecord(result);
assertNotNull(result);
assertEquals("this is", result.f0);
assertEquals(Integer.valueOf(1), result.f1);
assertEquals(new Double(2.0), result.f2);
result = format.nextRecord(result);
assertNotNull(result);
assertEquals("a test", result.f0);
assertEquals(Integer.valueOf(3), result.f1);
assertEquals(new Double(4.0), result.f2);
result = format.nextRecord(result);
assertNull(result);
}
catch (Exception ex) {
ex.printStackTrace();
fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
}
}
@Test
public void ignoreMultiCharPrefixComments() {
try {
final String fileContent = "//description of the data\n" +
"//successive commented line\n" +
"this is|1|2.0|\n" +
"a test|3|4.0|\n" +
"//next|5|6.0|\n";
final FileInputSplit split = createTempFile(fileContent);
final TupleTypeInfo<Tuple3<String, Integer, Double>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class, Integer.class, Double.class);
final CsvInputFormat<Tuple3<String, Integer, Double>> format = new TupleCsvInputFormat<Tuple3<String, Integer, Double>>(PATH, "\n", "|", typeInfo);
format.setCommentPrefix("//");
final Configuration parameters = new Configuration();
format.configure(parameters);
format.open(split);
Tuple3<String, Integer, Double> result = new Tuple3<String, Integer, Double>();
result = format.nextRecord(result);
assertNotNull(result);
assertEquals("this is", result.f0);
assertEquals(Integer.valueOf(1), result.f1);
assertEquals(new Double(2.0), result.f2);
result = format.nextRecord(result);
assertNotNull(result);
assertEquals("a test", result.f0);
assertEquals(Integer.valueOf(3), result.f1);
assertEquals(new Double(4.0), result.f2);
result = format.nextRecord(result);
assertNull(result);
}
catch (Exception ex) {
ex.printStackTrace();
fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
}
}
@Test
public void readStringFields() {
try {
final String fileContent = "abc|def|ghijk\nabc||hhg\n|||";
final FileInputSplit split = createTempFile(fileContent);
final TupleTypeInfo<Tuple3<String, String, String>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class, String.class, String.class);
final CsvInputFormat<Tuple3<String, String, String>> format = new TupleCsvInputFormat<Tuple3<String, String, String>>(PATH, "\n", "|", typeInfo);
final Configuration parameters = new Configuration();
format.configure(parameters);
format.open(split);
Tuple3<String, String, String> result = new Tuple3<String, String, String>();
result = format.nextRecord(result);
assertNotNull(result);
assertEquals("abc", result.f0);
assertEquals("def", result.f1);
assertEquals("ghijk", result.f2);
result = format.nextRecord(result);
assertNotNull(result);
assertEquals("abc", result.f0);
assertEquals("", result.f1);
assertEquals("hhg", result.f2);
result = format.nextRecord(result);
assertNotNull(result);
assertEquals("", result.f0);
assertEquals("", result.f1);
assertEquals("", result.f2);
result = format.nextRecord(result);
assertNull(result);
assertTrue(format.reachedEnd());
}
catch (Exception ex) {
ex.printStackTrace();
fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
}
}
@Test
public void readMixedQuotedStringFields() {
try {
final String fileContent = "@a|b|c@|def|@ghijk@\nabc||@|hhg@\n|||";
final FileInputSplit split = createTempFile(fileContent);
final TupleTypeInfo<Tuple3<String, String, String>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class, String.class, String.class);
final CsvInputFormat<Tuple3<String, String, String>> format = new TupleCsvInputFormat<Tuple3<String, String, String>>(PATH, "\n", "|", typeInfo);
final Configuration parameters = new Configuration();
format.configure(parameters);
format.enableQuotedStringParsing('@');
format.open(split);
Tuple3<String, String, String> result = new Tuple3<String, String, String>();
result = format.nextRecord(result);
assertNotNull(result);
assertEquals("a|b|c", result.f0);
assertEquals("def", result.f1);
assertEquals("ghijk", result.f2);
result = format.nextRecord(result);
assertNotNull(result);
assertEquals("abc", result.f0);
assertEquals("", result.f1);
assertEquals("|hhg", result.f2);
result = format.nextRecord(result);
assertNotNull(result);
assertEquals("", result.f0);
assertEquals("", result.f1);
assertEquals("", result.f2);
result = format.nextRecord(result);
assertNull(result);
assertTrue(format.reachedEnd());
}
catch (Exception ex) {
ex.printStackTrace();
fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
}
}
@Test
public void readStringFieldsWithTrailingDelimiters() {
try {
final String fileContent = "abc|-def|-ghijk\nabc|-|-hhg\n|-|-|-\n";
final FileInputSplit split = createTempFile(fileContent);
final TupleTypeInfo<Tuple3<String, String, String>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class, String.class, String.class);
final CsvInputFormat<Tuple3<String, String, String>> format = new TupleCsvInputFormat<Tuple3<String, String, String>>(PATH, typeInfo);
format.setFieldDelimiter("|-");
format.configure(new Configuration());
format.open(split);
Tuple3<String, String, String> result = new Tuple3<String, String, String>();
result = format.nextRecord(result);
assertNotNull(result);
assertEquals("abc", result.f0);
assertEquals("def", result.f1);
assertEquals("ghijk", result.f2);
result = format.nextRecord(result);
assertNotNull(result);
assertEquals("abc", result.f0);
assertEquals("", result.f1);
assertEquals("hhg", result.f2);
result = format.nextRecord(result);
assertNotNull(result);
assertEquals("", result.f0);
assertEquals("", result.f1);
assertEquals("", result.f2);
result = format.nextRecord(result);
assertNull(result);
assertTrue(format.reachedEnd());
}
catch (Exception ex) {
fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
}
}
@Test
public void testTailingEmptyFields() throws Exception {
final String fileContent = "aa,bb,cc\n" + // ok
"aa,bb,\n" + // the last field is empty
"aa,,\n" + // the last two fields are empty
",,\n" + // all fields are empty
"aa,bb"; // row too short
final FileInputSplit split = createTempFile(fileContent);
final TupleTypeInfo<Tuple3<String, String, String>> typeInfo =
TupleTypeInfo.getBasicTupleTypeInfo(String.class, String.class, String.class);
final CsvInputFormat<Tuple3<String, String, String>> format =
new TupleCsvInputFormat<Tuple3<String, String, String>>(PATH, typeInfo);
format.setFieldDelimiter(",");
format.configure(new Configuration());
format.open(split);
Tuple3<String, String, String> result = new Tuple3<String, String, String>();
result = format.nextRecord(result);
assertNotNull(result);
assertEquals("aa", result.f0);
assertEquals("bb", result.f1);
assertEquals("cc", result.f2);
result = format.nextRecord(result);
assertNotNull(result);
assertEquals("aa", result.f0);
assertEquals("bb", result.f1);
assertEquals("", result.f2);
result = format.nextRecord(result);
assertNotNull(result);
assertEquals("aa", result.f0);
assertEquals("", result.f1);
assertEquals("", result.f2);
result = format.nextRecord(result);
assertNotNull(result);
assertEquals("", result.f0);
assertEquals("", result.f1);
assertEquals("", result.f2);
try {
format.nextRecord(result);
fail("Parse Exception was not thrown! (Row too short)");
} catch (ParseException e) {}
}
@Test
public void testIntegerFields() throws IOException {
try {
final String fileContent = "111|222|333|444|555\n666|777|888|999|000|\n";
final FileInputSplit split = createTempFile(fileContent);
final TupleTypeInfo<Tuple5<Integer, Integer, Integer, Integer, Integer>> typeInfo =
TupleTypeInfo.getBasicTupleTypeInfo(Integer.class, Integer.class, Integer.class, Integer.class, Integer.class);
final CsvInputFormat<Tuple5<Integer, Integer, Integer, Integer, Integer>> format = new TupleCsvInputFormat<Tuple5<Integer, Integer, Integer, Integer, Integer>>(PATH, typeInfo);
format.setFieldDelimiter("|");
format.configure(new Configuration());
format.open(split);
Tuple5<Integer, Integer, Integer, Integer, Integer> result = new Tuple5<Integer, Integer, Integer, Integer, Integer>();
result = format.nextRecord(result);
assertNotNull(result);
assertEquals(Integer.valueOf(111), result.f0);
assertEquals(Integer.valueOf(222), result.f1);
assertEquals(Integer.valueOf(333), result.f2);
assertEquals(Integer.valueOf(444), result.f3);
assertEquals(Integer.valueOf(555), result.f4);
result = format.nextRecord(result);
assertNotNull(result);
assertEquals(Integer.valueOf(666), result.f0);
assertEquals(Integer.valueOf(777), result.f1);
assertEquals(Integer.valueOf(888), result.f2);
assertEquals(Integer.valueOf(999), result.f3);
assertEquals(Integer.valueOf(000), result.f4);
result = format.nextRecord(result);
assertNull(result);
assertTrue(format.reachedEnd());
}
catch (Exception ex) {
fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
}
}
@Test
public void testEmptyFields() throws IOException {
try {
final String fileContent = "|0|0|0|0|0|\n" +
"1||1|1|1|1|\n" +
"2|2||2|2|2|\n" +
"3|3|3| |3|3|\n" +
"4|4|4|4||4|\n" +
"5|5|5|5|5||\n";
final FileInputSplit split = createTempFile(fileContent);
final TupleTypeInfo<Tuple6<Short, Integer, Long, Float, Double, Byte>> typeInfo =
TupleTypeInfo.getBasicTupleTypeInfo(Short.class, Integer.class, Long.class, Float.class, Double.class, Byte.class);
final CsvInputFormat<Tuple6<Short, Integer, Long, Float, Double, Byte>> format = new TupleCsvInputFormat<Tuple6<Short, Integer, Long, Float, Double, Byte>>(PATH, typeInfo);
format.setFieldDelimiter("|");
format.configure(new Configuration());
format.open(split);
Tuple6<Short, Integer, Long, Float, Double, Byte> result = new Tuple6<Short, Integer, Long, Float, Double, Byte>();
try {
result = format.nextRecord(result);
fail("Empty String Parse Exception was not thrown! (ShortParser)");
} catch (ParseException e) {}
try {
result = format.nextRecord(result);
fail("Empty String Parse Exception was not thrown! (IntegerParser)");
} catch (ParseException e) {}
try {
result = format.nextRecord(result);
fail("Empty String Parse Exception was not thrown! (LongParser)");
} catch (ParseException e) {}
try {
result = format.nextRecord(result);
fail("Empty String Parse Exception was not thrown! (FloatParser)");
} catch (ParseException e) {}
try {
result = format.nextRecord(result);
fail("Empty String Parse Exception was not thrown! (DoubleParser)");
} catch (ParseException e) {}
try {
result = format.nextRecord(result);
fail("Empty String Parse Exception was not thrown! (ByteParser)");
} catch (ParseException e) {}
result = format.nextRecord(result);
assertNull(result);
assertTrue(format.reachedEnd());
}
catch (Exception ex) {
fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
}
}
@Test
public void testDoubleFields() throws IOException {
try {
final String fileContent = "11.1|22.2|33.3|44.4|55.5\n66.6|77.7|88.8|99.9|00.0|\n";
final FileInputSplit split = createTempFile(fileContent);
final TupleTypeInfo<Tuple5<Double, Double, Double, Double, Double>> typeInfo =
TupleTypeInfo.getBasicTupleTypeInfo(Double.class, Double.class, Double.class, Double.class, Double.class);
final CsvInputFormat<Tuple5<Double, Double, Double, Double, Double>> format = new TupleCsvInputFormat<Tuple5<Double, Double, Double, Double, Double>>(PATH, typeInfo);
format.setFieldDelimiter("|");
format.configure(new Configuration());
format.open(split);
Tuple5<Double, Double, Double, Double, Double> result = new Tuple5<Double, Double, Double, Double, Double>();
result = format.nextRecord(result);
assertNotNull(result);
assertEquals(Double.valueOf(11.1), result.f0);
assertEquals(Double.valueOf(22.2), result.f1);
assertEquals(Double.valueOf(33.3), result.f2);
assertEquals(Double.valueOf(44.4), result.f3);
assertEquals(Double.valueOf(55.5), result.f4);
result = format.nextRecord(result);
assertNotNull(result);
assertEquals(Double.valueOf(66.6), result.f0);
assertEquals(Double.valueOf(77.7), result.f1);
assertEquals(Double.valueOf(88.8), result.f2);
assertEquals(Double.valueOf(99.9), result.f3);
assertEquals(Double.valueOf(00.0), result.f4);
result = format.nextRecord(result);
assertNull(result);
assertTrue(format.reachedEnd());
}
catch (Exception ex) {
fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
}
}
@Test
public void testReadFirstN() throws IOException {
try {
final String fileContent = "111|222|333|444|555|\n666|777|888|999|000|\n";
final FileInputSplit split = createTempFile(fileContent);
final TupleTypeInfo<Tuple2<Integer, Integer>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(Integer.class, Integer.class);
final CsvInputFormat<Tuple2<Integer, Integer>> format = new TupleCsvInputFormat<Tuple2<Integer, Integer>>(PATH, typeInfo);
format.setFieldDelimiter("|");
format.configure(new Configuration());
format.open(split);
Tuple2<Integer, Integer> result = new Tuple2<Integer, Integer>();
result = format.nextRecord(result);
assertNotNull(result);
assertEquals(Integer.valueOf(111), result.f0);
assertEquals(Integer.valueOf(222), result.f1);
result = format.nextRecord(result);
assertNotNull(result);
assertEquals(Integer.valueOf(666), result.f0);
assertEquals(Integer.valueOf(777), result.f1);
result = format.nextRecord(result);
assertNull(result);
assertTrue(format.reachedEnd());
}
catch (Exception ex) {
fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
}
}
@Test
public void testReadSparseWithNullFieldsForTypes() throws IOException {
try {
final String fileContent = "111|x|222|x|333|x|444|x|555|x|666|x|777|x|888|x|999|x|000|x|\n" +
"000|x|999|x|888|x|777|x|666|x|555|x|444|x|333|x|222|x|111|x|";
final FileInputSplit split = createTempFile(fileContent);
final TupleTypeInfo<Tuple3<Integer, Integer, Integer>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(Integer.class, Integer.class, Integer.class);
final CsvInputFormat<Tuple3<Integer, Integer, Integer>> format = new TupleCsvInputFormat<Tuple3<Integer, Integer, Integer>>(PATH, typeInfo, new boolean[]{true, false, false, true, false, false, false, true});
format.setFieldDelimiter("|x|");
format.configure(new Configuration());
format.open(split);
Tuple3<Integer, Integer, Integer> result = new Tuple3<Integer, Integer, Integer>();
result = format.nextRecord(result);
assertNotNull(result);
assertEquals(Integer.valueOf(111), result.f0);
assertEquals(Integer.valueOf(444), result.f1);
assertEquals(Integer.valueOf(888), result.f2);
result = format.nextRecord(result);
assertNotNull(result);
assertEquals(Integer.valueOf(000), result.f0);
assertEquals(Integer.valueOf(777), result.f1);
assertEquals(Integer.valueOf(333), result.f2);
result = format.nextRecord(result);
assertNull(result);
assertTrue(format.reachedEnd());
}
catch (Exception ex) {
fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
}
}
@Test
public void testReadSparseWithPositionSetter() throws IOException {
try {
final String fileContent = "111|222|333|444|555|666|777|888|999|000|\n000|999|888|777|666|555|444|333|222|111|";
final FileInputSplit split = createTempFile(fileContent);
final TupleTypeInfo<Tuple3<Integer, Integer, Integer>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(Integer.class, Integer.class, Integer.class);
final CsvInputFormat<Tuple3<Integer, Integer, Integer>> format = new TupleCsvInputFormat<Tuple3<Integer, Integer, Integer>>(PATH, typeInfo, new int[]{0, 3, 7});
format.setFieldDelimiter("|");
format.configure(new Configuration());
format.open(split);
Tuple3<Integer, Integer, Integer> result = new Tuple3<Integer, Integer, Integer>();
result = format.nextRecord(result);
assertNotNull(result);
assertEquals(Integer.valueOf(111), result.f0);
assertEquals(Integer.valueOf(444), result.f1);
assertEquals(Integer.valueOf(888), result.f2);
result = format.nextRecord(result);
assertNotNull(result);
assertEquals(Integer.valueOf(000), result.f0);
assertEquals(Integer.valueOf(777), result.f1);
assertEquals(Integer.valueOf(333), result.f2);
result = format.nextRecord(result);
assertNull(result);
assertTrue(format.reachedEnd());
}
catch (Exception ex) {
fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
}
}
@Test
public void testReadSparseWithMask() throws IOException {
try {
final String fileContent = "111&&222&&333&&444&&555&&666&&777&&888&&999&&000&&\n" +
"000&&999&&888&&777&&666&&555&&444&&333&&222&&111&&";
final FileInputSplit split = createTempFile(fileContent);
final TupleTypeInfo<Tuple3<Integer, Integer, Integer>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(Integer.class, Integer.class, Integer.class);
final CsvInputFormat<Tuple3<Integer, Integer, Integer>> format = new TupleCsvInputFormat<Tuple3<Integer, Integer, Integer>>(PATH, typeInfo, new boolean[]{true, false, false, true, false, false, false, true});
format.setFieldDelimiter("&&");
format.configure(new Configuration());
format.open(split);
Tuple3<Integer, Integer, Integer> result = new Tuple3<Integer, Integer, Integer>();
result = format.nextRecord(result);
assertNotNull(result);
assertEquals(Integer.valueOf(111), result.f0);
assertEquals(Integer.valueOf(444), result.f1);
assertEquals(Integer.valueOf(888), result.f2);
result = format.nextRecord(result);
assertNotNull(result);
assertEquals(Integer.valueOf(000), result.f0);
assertEquals(Integer.valueOf(777), result.f1);
assertEquals(Integer.valueOf(333), result.f2);
result = format.nextRecord(result);
assertNull(result);
assertTrue(format.reachedEnd());
}
catch (Exception ex) {
fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
}
}
@Test
public void testParseStringErrors() throws Exception {
StringParser stringParser = new StringParser();
stringParser.enableQuotedStringParsing((byte) '"');
Object[][] failures = {
{"\"string\" trailing", FieldParser.ParseErrorState.UNQUOTED_CHARS_AFTER_QUOTED_STRING},
{"\"unterminated ", FieldParser.ParseErrorState.UNTERMINATED_QUOTED_STRING}
};
for (Object[] failure : failures) {
String input = (String) failure[0];
int result = stringParser.parseField(input.getBytes(ConfigConstants.DEFAULT_CHARSET), 0,
input.length(), new byte[]{'|'}, null);
assertThat(result, is(-1));
assertThat(stringParser.getErrorState(), is(failure[1]));
}
}
// Test disabled because we do not support double-quote escaped quotes right now.
// @Test
public void testParserCorrectness() throws Exception {
// RFC 4180 Compliance Test content
// Taken from http://en.wikipedia.org/wiki/Comma-separated_values#Example
final String fileContent =
"Year,Make,Model,Description,Price\n" +
"1997,Ford,E350,\"ac, abs, moon\",3000.00\n" +
"1999,Chevy,\"Venture \"\"Extended Edition\"\"\",\"\",4900.00\n" +
"1996,Jeep,Grand Cherokee,\"MUST SELL! air, moon roof, loaded\",4799.00\n" +
"1999,Chevy,\"Venture \"\"Extended Edition, Very Large\"\"\",,5000.00\n" +
",,\"Venture \"\"Extended Edition\"\"\",\"\",4900.00";
final FileInputSplit split = createTempFile(fileContent);
final TupleTypeInfo<Tuple5<Integer, String, String, String, Double>> typeInfo =
TupleTypeInfo.getBasicTupleTypeInfo(Integer.class, String.class, String.class, String.class, Double.class);
final CsvInputFormat<Tuple5<Integer, String, String, String, Double>> format = new TupleCsvInputFormat<Tuple5<Integer, String, String, String, Double>>(PATH, typeInfo);
format.setSkipFirstLineAsHeader(true);
format.setFieldDelimiter(",");
format.configure(new Configuration());
format.open(split);
Tuple5<Integer, String, String, String, Double> result = new Tuple5<Integer, String, String, String, Double>();
@SuppressWarnings("unchecked")
Tuple5<Integer, String, String, String, Double>[] expectedLines = new Tuple5[] {
new Tuple5<Integer, String, String, String, Double>(1997, "Ford", "E350", "ac, abs, moon", 3000.0),
new Tuple5<Integer, String, String, String, Double>(1999, "Chevy", "Venture \"Extended Edition\"", "", 4900.0),
new Tuple5<Integer, String, String, String, Double>(1996, "Jeep", "Grand Cherokee", "MUST SELL! air, moon roof, loaded", 4799.00),
new Tuple5<Integer, String, String, String, Double>(1999, "Chevy", "Venture \"Extended Edition, Very Large\"", "", 5000.00),
new Tuple5<Integer, String, String, String, Double>(0, "", "Venture \"Extended Edition\"", "", 4900.0)
};
try {
for (Tuple5<Integer, String, String, String, Double> expected : expectedLines) {
result = format.nextRecord(result);
assertEquals(expected, result);
}
assertNull(format.nextRecord(result));
assertTrue(format.reachedEnd());
} catch (Exception ex) {
fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
}
}
private FileInputSplit createTempFile(String content) throws IOException {
File tempFile = File.createTempFile("test_contents", "tmp");
tempFile.deleteOnExit();
OutputStreamWriter wrt = new OutputStreamWriter(
new FileOutputStream(tempFile), StandardCharsets.UTF_8
);
wrt.write(content);
wrt.close();
return new FileInputSplit(0, new Path(tempFile.toURI().toString()), 0, tempFile.length(), new String[] {"localhost"});
}
@Test
public void testWindowsLineEndRemoval() {
//Check typical use case -- linux file is correct and it is set up to linux (\n)
this.testRemovingTrailingCR("\n", "\n");
//Check typical windows case -- windows file endings and file has windows file endings set up
this.testRemovingTrailingCR("\r\n", "\r\n");
//Check problematic case windows file -- windows file endings (\r\n) but linux line endings (\n) set up
this.testRemovingTrailingCR("\r\n", "\n");
//Check problematic case linux file -- linux file endings (\n) but windows file endings set up (\r\n)
//Specific setup for windows line endings will expect \r\n because it has to be set up and is not standard.
}
private void testRemovingTrailingCR(String lineBreakerInFile, String lineBreakerSetup) {
File tempFile = null;
String fileContent = CsvInputFormatTest.FIRST_PART + lineBreakerInFile + CsvInputFormatTest.SECOND_PART + lineBreakerInFile;
try {
// create input file
tempFile = File.createTempFile("CsvInputFormatTest", "tmp");
tempFile.deleteOnExit();
tempFile.setWritable(true);
OutputStreamWriter wrt = new OutputStreamWriter(new FileOutputStream(tempFile));
wrt.write(fileContent);
wrt.close();
final TupleTypeInfo<Tuple1<String>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class);
final CsvInputFormat<Tuple1<String>> inputFormat = new TupleCsvInputFormat<Tuple1<String>>(new Path(tempFile.toURI().toString()), typeInfo);
Configuration parameters = new Configuration();
inputFormat.configure(parameters);
inputFormat.setDelimiter(lineBreakerSetup);
FileInputSplit[] splits = inputFormat.createInputSplits(1);
inputFormat.open(splits[0]);
Tuple1<String> result = inputFormat.nextRecord(new Tuple1<String>());
assertNotNull("Expecting to not return null", result);
assertEquals(FIRST_PART, result.f0);
result = inputFormat.nextRecord(result);
assertNotNull("Expecting to not return null", result);
assertEquals(SECOND_PART, result.f0);
}
catch (Throwable t) {
System.err.println("test failed with exception: " + t.getMessage());
t.printStackTrace(System.err);
fail("Test erroneous");
}
}
private void validatePojoItem(CsvInputFormat<PojoItem> format) throws Exception {
PojoItem item = new PojoItem();
format.nextRecord(item);
assertEquals(123, item.field1);
assertEquals("AAA", item.field2);
assertEquals(Double.valueOf(3.123), item.field3);
assertEquals("BBB", item.field4);
format.nextRecord(item);
assertEquals(456, item.field1);
assertEquals("BBB", item.field2);
assertEquals(Double.valueOf(1.123), item.field3);
assertEquals("AAA", item.field4);
}
@Test
public void testPojoType() throws Exception {
File tempFile = File.createTempFile("CsvReaderPojoType", "tmp");
tempFile.deleteOnExit();
tempFile.setWritable(true);
OutputStreamWriter wrt = new OutputStreamWriter(new FileOutputStream(tempFile));
wrt.write("123,AAA,3.123,BBB\n");
wrt.write("456,BBB,1.123,AAA\n");
wrt.close();
@SuppressWarnings("unchecked")
PojoTypeInfo<PojoItem> typeInfo = (PojoTypeInfo<PojoItem>) TypeExtractor.createTypeInfo(PojoItem.class);
CsvInputFormat<PojoItem> inputFormat = new PojoCsvInputFormat<PojoItem>(new Path(tempFile.toURI().toString()), typeInfo);
inputFormat.configure(new Configuration());
FileInputSplit[] splits = inputFormat.createInputSplits(1);
inputFormat.open(splits[0]);
validatePojoItem(inputFormat);
}
@Test
public void testPojoTypeWithPrivateField() throws Exception {
File tempFile = File.createTempFile("CsvReaderPojoType", "tmp");
tempFile.deleteOnExit();
tempFile.setWritable(true);
OutputStreamWriter wrt = new OutputStreamWriter(new FileOutputStream(tempFile));
wrt.write("123,AAA,3.123,BBB\n");
wrt.write("456,BBB,1.123,AAA\n");
wrt.close();
@SuppressWarnings("unchecked")
PojoTypeInfo<PrivatePojoItem> typeInfo = (PojoTypeInfo<PrivatePojoItem>) TypeExtractor.createTypeInfo(PrivatePojoItem.class);
CsvInputFormat<PrivatePojoItem> inputFormat = new PojoCsvInputFormat<PrivatePojoItem>(new Path(tempFile.toURI().toString()), typeInfo);
inputFormat.configure(new Configuration());
FileInputSplit[] splits = inputFormat.createInputSplits(1);
inputFormat.open(splits[0]);
PrivatePojoItem item = new PrivatePojoItem();
inputFormat.nextRecord(item);
assertEquals(123, item.field1);
assertEquals("AAA", item.field2);
assertEquals(Double.valueOf(3.123), item.field3);
assertEquals("BBB", item.field4);
inputFormat.nextRecord(item);
assertEquals(456, item.field1);
assertEquals("BBB", item.field2);
assertEquals(Double.valueOf(1.123), item.field3);
assertEquals("AAA", item.field4);
}
@Test
public void testPojoTypeWithTrailingEmptyFields() throws Exception {
final String fileContent = "123,,3.123,,\n456,BBB,3.23,,";
final FileInputSplit split = createTempFile(fileContent);
@SuppressWarnings("unchecked")
PojoTypeInfo<PrivatePojoItem> typeInfo = (PojoTypeInfo<PrivatePojoItem>) TypeExtractor.createTypeInfo(PrivatePojoItem.class);
CsvInputFormat<PrivatePojoItem> inputFormat = new PojoCsvInputFormat<PrivatePojoItem>(PATH, typeInfo);
inputFormat.configure(new Configuration());
inputFormat.open(split);
PrivatePojoItem item = new PrivatePojoItem();
inputFormat.nextRecord(item);
assertEquals(123, item.field1);
assertEquals("", item.field2);
assertEquals(Double.valueOf(3.123), item.field3);
assertEquals("", item.field4);
inputFormat.nextRecord(item);
assertEquals(456, item.field1);
assertEquals("BBB", item.field2);
assertEquals(Double.valueOf(3.23), item.field3);
assertEquals("", item.field4);
}
@Test
public void testPojoTypeWithMappingInformation() throws Exception {
File tempFile = File.createTempFile("CsvReaderPojoType", "tmp");
tempFile.deleteOnExit();
tempFile.setWritable(true);
OutputStreamWriter wrt = new OutputStreamWriter(new FileOutputStream(tempFile));
wrt.write("123,3.123,AAA,BBB\n");
wrt.write("456,1.123,BBB,AAA\n");
wrt.close();
@SuppressWarnings("unchecked")
PojoTypeInfo<PojoItem> typeInfo = (PojoTypeInfo<PojoItem>) TypeExtractor.createTypeInfo(PojoItem.class);
CsvInputFormat<PojoItem> inputFormat = new PojoCsvInputFormat<PojoItem>(new Path(tempFile.toURI().toString()), typeInfo, new String[]{"field1", "field3", "field2", "field4"});
inputFormat.configure(new Configuration());
FileInputSplit[] splits = inputFormat.createInputSplits(1);
inputFormat.open(splits[0]);
validatePojoItem(inputFormat);
}
@Test
public void testPojoTypeWithPartialFieldInCSV() throws Exception {
File tempFile = File.createTempFile("CsvReaderPojoType", "tmp");
tempFile.deleteOnExit();
tempFile.setWritable(true);
OutputStreamWriter wrt = new OutputStreamWriter(new FileOutputStream(tempFile));
wrt.write("123,NODATA,AAA,NODATA,3.123,BBB\n");
wrt.write("456,NODATA,BBB,NODATA,1.123,AAA\n");
wrt.close();
@SuppressWarnings("unchecked")
PojoTypeInfo<PojoItem> typeInfo = (PojoTypeInfo<PojoItem>) TypeExtractor.createTypeInfo(PojoItem.class);
CsvInputFormat<PojoItem> inputFormat = new PojoCsvInputFormat<PojoItem>(new Path(tempFile.toURI().toString()), typeInfo, new boolean[]{true, false, true, false, true, true});
inputFormat.configure(new Configuration());
FileInputSplit[] splits = inputFormat.createInputSplits(1);
inputFormat.open(splits[0]);
validatePojoItem(inputFormat);
}
@Test
public void testPojoTypeWithMappingInfoAndPartialField() throws Exception {
File tempFile = File.createTempFile("CsvReaderPojoType", "tmp");
tempFile.deleteOnExit();
tempFile.setWritable(true);
OutputStreamWriter wrt = new OutputStreamWriter(new FileOutputStream(tempFile));
wrt.write("123,3.123,AAA,BBB\n");
wrt.write("456,1.123,BBB,AAA\n");
wrt.close();
@SuppressWarnings("unchecked")
PojoTypeInfo<PojoItem> typeInfo = (PojoTypeInfo<PojoItem>) TypeExtractor.createTypeInfo(PojoItem.class);
CsvInputFormat<PojoItem> inputFormat = new PojoCsvInputFormat<PojoItem>(new Path(tempFile.toURI().toString()), typeInfo, new String[]{"field1", "field4"}, new boolean[]{true, false, false, true});
inputFormat.configure(new Configuration());
FileInputSplit[] splits = inputFormat.createInputSplits(1);
inputFormat.open(splits[0]);
PojoItem item = new PojoItem();
inputFormat.nextRecord(item);
assertEquals(123, item.field1);
assertEquals("BBB", item.field4);
}
@Test
public void testPojoTypeWithInvalidFieldMapping() throws Exception {
File tempFile = File.createTempFile("CsvReaderPojoType", "tmp");
tempFile.deleteOnExit();
tempFile.setWritable(true);
@SuppressWarnings("unchecked")
PojoTypeInfo<PojoItem> typeInfo = (PojoTypeInfo<PojoItem>) TypeExtractor.createTypeInfo(PojoItem.class);
try {
new PojoCsvInputFormat<PojoItem>(new Path(tempFile.toURI().toString()), typeInfo, new String[]{"field1", "field2"});
fail("The number of POJO fields cannot be same as that of selected CSV fields");
} catch (IllegalArgumentException e) {
// success
}
try {
new PojoCsvInputFormat<PojoItem>(new Path(tempFile.toURI().toString()), typeInfo, new String[]{"field1", "field2", null, "field4"});
fail("Fields mapping cannot contain null.");
} catch (NullPointerException e) {
// success
}
try {
new PojoCsvInputFormat<PojoItem>(new Path(tempFile.toURI().toString()), typeInfo, new String[]{"field1", "field2", "field3", "field5"});
fail("Invalid field name");
} catch (IllegalArgumentException e) {
// success
}
}
@Test
public void testQuotedStringParsingWithIncludeFields() throws Exception {
final String fileContent = "\"20:41:52-1-3-2015\"|\"Re: Taskmanager memory error in Eclipse\"|" +
"\"Blahblah <blah@blahblah.org>\"|\"blaaa|\"blubb\"";
final File tempFile = File.createTempFile("CsvReaderQuotedString", "tmp");
tempFile.deleteOnExit();
tempFile.setWritable(true);
OutputStreamWriter writer = new OutputStreamWriter(new FileOutputStream(tempFile));
writer.write(fileContent);
writer.close();
TupleTypeInfo<Tuple2<String, String>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class, String.class);
CsvInputFormat<Tuple2<String, String>> inputFormat = new TupleCsvInputFormat<Tuple2<String, String>>(new Path(tempFile.toURI().toString()), typeInfo, new boolean[]{true, false, true});
inputFormat.enableQuotedStringParsing('"');
inputFormat.setFieldDelimiter("|");
inputFormat.setDelimiter('\n');
inputFormat.configure(new Configuration());
FileInputSplit[] splits = inputFormat.createInputSplits(1);
inputFormat.open(splits[0]);
Tuple2<String, String> record = inputFormat.nextRecord(new Tuple2<String, String>());
assertEquals("20:41:52-1-3-2015", record.f0);
assertEquals("Blahblah <blah@blahblah.org>", record.f1);
}
@Test
public void testQuotedStringParsingWithEscapedQuotes() throws Exception {
final String fileContent = "\"\\\"Hello\\\" World\"|\"We are\\\" young\"";
final File tempFile = File.createTempFile("CsvReaderQuotedString", "tmp");
tempFile.deleteOnExit();
tempFile.setWritable(true);
OutputStreamWriter writer = new OutputStreamWriter(new FileOutputStream(tempFile));
writer.write(fileContent);
writer.close();
TupleTypeInfo<Tuple2<String, String>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class, String.class);
CsvInputFormat<Tuple2<String, String>> inputFormat = new TupleCsvInputFormat<>(new Path(tempFile.toURI().toString()), typeInfo);
inputFormat.enableQuotedStringParsing('"');
inputFormat.setFieldDelimiter("|");
inputFormat.setDelimiter('\n');
inputFormat.configure(new Configuration());
FileInputSplit[] splits = inputFormat.createInputSplits(1);
inputFormat.open(splits[0]);
Tuple2<String, String> record = inputFormat.nextRecord(new Tuple2<String, String>());
assertEquals("\\\"Hello\\\" World", record.f0);
assertEquals("We are\\\" young", record.f1);
}
/**
* Tests that the CSV input format can deal with POJOs which are subclasses.
*
* @throws Exception
*/
@Test
public void testPojoSubclassType() throws Exception {
final String fileContent = "t1,foobar,tweet2\nt2,barfoo,tweet2";
final File tempFile = File.createTempFile("CsvReaderPOJOSubclass", "tmp");
tempFile.deleteOnExit();
OutputStreamWriter writer = new OutputStreamWriter(new FileOutputStream(tempFile));
writer.write(fileContent);
writer.close();
@SuppressWarnings("unchecked")
PojoTypeInfo<TwitterPOJO> typeInfo = (PojoTypeInfo<TwitterPOJO>) TypeExtractor.createTypeInfo(TwitterPOJO.class);
CsvInputFormat<TwitterPOJO> inputFormat = new PojoCsvInputFormat<>(new Path(tempFile.toURI().toString()), typeInfo);
inputFormat.configure(new Configuration());
FileInputSplit[] splits = inputFormat.createInputSplits(1);
inputFormat.open(splits[0]);
List<TwitterPOJO> expected = new ArrayList<>();
for (String line: fileContent.split("\n")) {
String[] elements = line.split(",");
expected.add(new TwitterPOJO(elements[0], elements[1], elements[2]));
}
List<TwitterPOJO> actual = new ArrayList<>();
TwitterPOJO pojo;
while ((pojo = inputFormat.nextRecord(new TwitterPOJO())) != null) {
actual.add(pojo);
}
assertEquals(expected, actual);
}
// --------------------------------------------------------------------------------------------
// Custom types for testing
// --------------------------------------------------------------------------------------------
/**
* Sample test pojo.
*/
public static class PojoItem {
public int field1;
public String field2;
public Double field3;
public String field4;
}
/**
* Sample test pojo with private fields.
*/
public static class PrivatePojoItem {
private int field1;
private String field2;
private Double field3;
private String field4;
public int getField1() {
return field1;
}
public void setField1(int field1) {
this.field1 = field1;
}
public String getField2() {
return field2;
}
public void setField2(String field2) {
this.field2 = field2;
}
public Double getField3() {
return field3;
}
public void setField3(Double field3) {
this.field3 = field3;
}
public String getField4() {
return field4;
}
public void setField4(String field4) {
this.field4 = field4;
}
}
/**
* Sample test pojo.
*/
public static class POJO {
public String table;
public String time;
public POJO() {
this("", "");
}
public POJO(String table, String time) {
this.table = table;
this.time = time;
}
@Override
public boolean equals(Object obj) {
if (obj instanceof POJO) {
POJO other = (POJO) obj;
return table.equals(other.table) && time.equals(other.time);
} else {
return false;
}
}
}
/**
* Sample test pojo representing tweets.
*/
public static class TwitterPOJO extends POJO {
public String tweet;
public TwitterPOJO() {
this("", "", "");
}
public TwitterPOJO(String table, String time, String tweet) {
super(table, time);
this.tweet = tweet;
}
@Override
public boolean equals(Object obj) {
if (obj instanceof TwitterPOJO) {
TwitterPOJO other = (TwitterPOJO) obj;
return super.equals(other) && tweet.equals(other.tweet);
} else {
return false;
}
}
}
}