blob: 5622930fedcc47410099ccaf37c8ad1636687ea9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.java.io;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.api.java.tuple.Tuple8;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.ValueTypeInfo;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.types.BooleanValue;
import org.apache.flink.types.ByteValue;
import org.apache.flink.types.CharValue;
import org.apache.flink.types.DoubleValue;
import org.apache.flink.types.FloatValue;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.LongValue;
import org.apache.flink.types.ShortValue;
import org.apache.flink.types.StringValue;
import org.apache.flink.types.Value;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.util.Arrays;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
/**
* Tests for the CSV reader builder.
*/
public class CSVReaderTest {
@Test
public void testIgnoreHeaderConfigure() {
CsvReader reader = getCsvReader();
reader.ignoreFirstLine();
Assert.assertTrue(reader.skipFirstLineAsHeader);
}
@Test
public void testIgnoreInvalidLinesConfigure() {
CsvReader reader = getCsvReader();
Assert.assertFalse(reader.ignoreInvalidLines);
reader.ignoreInvalidLines();
Assert.assertTrue(reader.ignoreInvalidLines);
}
@Test
public void testIgnoreComments() {
CsvReader reader = getCsvReader();
assertNull(reader.commentPrefix);
reader.ignoreComments("#");
assertEquals("#", reader.commentPrefix);
}
@Test
public void testCharset() {
CsvReader reader = getCsvReader();
assertEquals("UTF-8", reader.getCharset());
reader.setCharset("US-ASCII");
assertEquals("US-ASCII", reader.getCharset());
}
@Test
public void testIncludeFieldsDense() {
CsvReader reader = getCsvReader();
reader.includeFields(true, true, true);
Assert.assertTrue(Arrays.equals(new boolean[] {true, true, true}, reader.includedMask));
reader = getCsvReader();
reader.includeFields("ttt");
Assert.assertTrue(Arrays.equals(new boolean[] {true, true, true}, reader.includedMask));
reader = getCsvReader();
reader.includeFields("TTT");
Assert.assertTrue(Arrays.equals(new boolean[] {true, true, true}, reader.includedMask));
reader = getCsvReader();
reader.includeFields("111");
Assert.assertTrue(Arrays.equals(new boolean[] {true, true, true}, reader.includedMask));
reader = getCsvReader();
reader.includeFields(0x7L);
Assert.assertTrue(Arrays.equals(new boolean[] {true, true, true}, reader.includedMask));
}
@Test
public void testIncludeFieldsSparse() {
CsvReader reader = getCsvReader();
reader.includeFields(false, true, true, false, false, true, false, false);
Assert.assertTrue(Arrays.equals(new boolean[] {false, true, true, false, false, true}, reader.includedMask));
reader = getCsvReader();
reader.includeFields("fttfftff");
Assert.assertTrue(Arrays.equals(new boolean[] {false, true, true, false, false, true}, reader.includedMask));
reader = getCsvReader();
reader.includeFields("FTTFFTFF");
Assert.assertTrue(Arrays.equals(new boolean[] {false, true, true, false, false, true}, reader.includedMask));
reader = getCsvReader();
reader.includeFields("01100100");
Assert.assertTrue(Arrays.equals(new boolean[] {false, true, true, false, false, true}, reader.includedMask));
reader = getCsvReader();
reader.includeFields("0t1f0TFF");
Assert.assertTrue(Arrays.equals(new boolean[] {false, true, true, false, false, true}, reader.includedMask));
reader = getCsvReader();
reader.includeFields(0x26L);
Assert.assertTrue(Arrays.equals(new boolean[] {false, true, true, false, false, true}, reader.includedMask));
}
@Test
public void testIllegalCharInStringMask() {
CsvReader reader = getCsvReader();
try {
reader.includeFields("1t0Tfht");
Assert.fail("Reader accepted an invalid mask string");
}
catch (IllegalArgumentException e) {
// expected
}
}
@Test
public void testIncludeFieldsErrorWhenExcludingAll() {
CsvReader reader = getCsvReader();
try {
reader.includeFields(false, false, false, false, false, false);
Assert.fail("The reader accepted a fields configuration that excludes all fields.");
}
catch (IllegalArgumentException e) {
// all good
}
try {
reader.includeFields(0);
Assert.fail("The reader accepted a fields configuration that excludes all fields.");
}
catch (IllegalArgumentException e) {
// all good
}
try {
reader.includeFields("ffffffffffffff");
Assert.fail("The reader accepted a fields configuration that excludes all fields.");
}
catch (IllegalArgumentException e) {
// all good
}
try {
reader.includeFields("00000000000000000");
Assert.fail("The reader accepted a fields configuration that excludes all fields.");
}
catch (IllegalArgumentException e) {
// all good
}
}
@Test
public void testReturnType() throws Exception {
CsvReader reader = getCsvReader();
DataSource<Item> items = reader.tupleType(Item.class);
Assert.assertTrue(items.getType().getTypeClass() == Item.class);
}
@Test
public void testFieldTypes() throws Exception {
CsvReader reader = getCsvReader();
DataSource<Item> items = reader.tupleType(Item.class);
TypeInformation<?> info = items.getType();
if (!info.isTupleType()) {
Assert.fail();
} else {
TupleTypeInfo<?> tinfo = (TupleTypeInfo<?>) info;
Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, tinfo.getTypeAt(0));
Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, tinfo.getTypeAt(1));
Assert.assertEquals(BasicTypeInfo.DOUBLE_TYPE_INFO, tinfo.getTypeAt(2));
Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, tinfo.getTypeAt(3));
}
CsvInputFormat<?> inputFormat = (CsvInputFormat<?>) items.getInputFormat();
Assert.assertArrayEquals(new Class<?>[]{Integer.class, String.class, Double.class, String.class}, inputFormat.getFieldTypes());
}
@Test
public void testSubClass() throws Exception {
CsvReader reader = getCsvReader();
DataSource<SubItem> sitems = reader.tupleType(SubItem.class);
TypeInformation<?> info = sitems.getType();
Assert.assertEquals(true, info.isTupleType());
Assert.assertEquals(SubItem.class, info.getTypeClass());
@SuppressWarnings("unchecked")
TupleTypeInfo<SubItem> tinfo = (TupleTypeInfo<SubItem>) info;
Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, tinfo.getTypeAt(0));
Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, tinfo.getTypeAt(1));
Assert.assertEquals(BasicTypeInfo.DOUBLE_TYPE_INFO, tinfo.getTypeAt(2));
Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, tinfo.getTypeAt(3));
CsvInputFormat<?> inputFormat = (CsvInputFormat<?>) sitems.getInputFormat();
Assert.assertArrayEquals(new Class<?>[]{Integer.class, String.class, Double.class, String.class}, inputFormat.getFieldTypes());
}
@Test
public void testSubClassWithPartialsInHierarchie() throws Exception {
CsvReader reader = getCsvReader();
DataSource<FinalItem> sitems = reader.tupleType(FinalItem.class);
TypeInformation<?> info = sitems.getType();
Assert.assertEquals(true, info.isTupleType());
Assert.assertEquals(FinalItem.class, info.getTypeClass());
@SuppressWarnings("unchecked")
TupleTypeInfo<SubItem> tinfo = (TupleTypeInfo<SubItem>) info;
Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, tinfo.getTypeAt(0));
Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, tinfo.getTypeAt(1));
Assert.assertEquals(BasicTypeInfo.DOUBLE_TYPE_INFO, tinfo.getTypeAt(2));
Assert.assertEquals(ValueTypeInfo.class, tinfo.getTypeAt(3).getClass());
Assert.assertEquals(ValueTypeInfo.class, tinfo.getTypeAt(4).getClass());
Assert.assertEquals(StringValue.class, ((ValueTypeInfo<?>) tinfo.getTypeAt(3)).getTypeClass());
Assert.assertEquals(LongValue.class, ((ValueTypeInfo<?>) tinfo.getTypeAt(4)).getTypeClass());
CsvInputFormat<?> inputFormat = (CsvInputFormat<?>) sitems.getInputFormat();
Assert.assertArrayEquals(new Class<?>[] {Integer.class, String.class, Double.class, StringValue.class, LongValue.class}, inputFormat.getFieldTypes());
}
@Test
public void testUnsupportedPartialitem() throws Exception {
CsvReader reader = getCsvReader();
try {
reader.tupleType(PartialItem.class);
Assert.fail("tupleType() accepted an underspecified generic class.");
}
catch (Exception e) {
// okay.
}
}
@Test
public void testWithValueType() throws Exception {
CsvReader reader = getCsvReader();
DataSource<Tuple8<StringValue, BooleanValue, ByteValue, ShortValue, IntValue, LongValue, FloatValue, DoubleValue>> items =
reader.types(StringValue.class, BooleanValue.class, ByteValue.class, ShortValue.class, IntValue.class, LongValue.class, FloatValue.class, DoubleValue.class);
TypeInformation<?> info = items.getType();
Assert.assertEquals(true, info.isTupleType());
Assert.assertEquals(Tuple8.class, info.getTypeClass());
}
@Test(expected = IllegalArgumentException.class)
public void testWithInvalidValueType1() throws Exception {
CsvReader reader = getCsvReader();
// CsvReader doesn't support CharValue
reader.types(CharValue.class);
}
@Test(expected = IllegalArgumentException.class)
public void testWithInvalidValueType2() throws Exception {
CsvReader reader = getCsvReader();
// CsvReader doesn't support custom Value type
reader.types(ValueItem.class);
}
private static CsvReader getCsvReader() {
return new CsvReader("/some/none/existing/path", ExecutionEnvironment.createLocalEnvironment(1));
}
// --------------------------------------------------------------------------------------------
// Custom types for testing
// --------------------------------------------------------------------------------------------
private static class Item extends Tuple4<Integer, String, Double, String> {
private static final long serialVersionUID = -7444437337392053502L;
}
private static class SubItem extends Item {
private static final long serialVersionUID = 1L;
}
private static class PartialItem<A, B, C> extends Tuple5<Integer, A, Double, B, C> {
private static final long serialVersionUID = 1L;
}
private static class FinalItem extends PartialItem<String, StringValue, LongValue> {
private static final long serialVersionUID = 1L;
}
private static class ValueItem implements Value {
private int v1;
public int getV1() {
return v1;
}
public void setV1(int v1) {
this.v1 = v1;
}
@Override
public void write(DataOutputView out) throws IOException {
out.writeInt(v1);
}
@Override
public void read(DataInputView in) throws IOException {
v1 = in.readInt();
}
}
}