| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.flink.formats.avro; |
| |
| import org.apache.flink.api.java.tuple.Tuple2; |
| import org.apache.flink.configuration.Configuration; |
| import org.apache.flink.core.fs.FileInputSplit; |
| import org.apache.flink.core.fs.Path; |
| import org.apache.flink.formats.avro.generated.Address; |
| import org.apache.flink.formats.avro.generated.Colors; |
| import org.apache.flink.formats.avro.generated.Fixed16; |
| import org.apache.flink.formats.avro.generated.Fixed2; |
| import org.apache.flink.formats.avro.generated.User; |
| |
| import org.apache.avro.file.DataFileWriter; |
| import org.apache.avro.io.DatumWriter; |
| import org.apache.avro.specific.SpecificDatumWriter; |
| import org.joda.time.DateTime; |
| import org.joda.time.LocalDate; |
| import org.joda.time.LocalTime; |
| import org.junit.After; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.Test; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.math.BigDecimal; |
| import java.nio.ByteBuffer; |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.Random; |
| |
| import static org.junit.Assert.assertEquals; |
| |
| /** |
| * Test the avro input format. |
| * (The testcase is mostly the getting started tutorial of avro) |
| * http://avro.apache.org/docs/current/gettingstartedjava.html |
| */ |
| public class AvroSplittableInputFormatTest { |
| |
| private File testFile; |
| |
| static final String TEST_NAME = "Alyssa"; |
| |
| static final String TEST_ARRAY_STRING_1 = "ELEMENT 1"; |
| static final String TEST_ARRAY_STRING_2 = "ELEMENT 2"; |
| |
| static final boolean TEST_ARRAY_BOOLEAN_1 = true; |
| static final boolean TEST_ARRAY_BOOLEAN_2 = false; |
| |
| static final Colors TEST_ENUM_COLOR = Colors.GREEN; |
| |
| static final String TEST_MAP_KEY1 = "KEY 1"; |
| static final long TEST_MAP_VALUE1 = 8546456L; |
| static final String TEST_MAP_KEY2 = "KEY 2"; |
| static final long TEST_MAP_VALUE2 = 17554L; |
| |
| static final Integer TEST_NUM = 239; |
| static final String TEST_STREET = "Baker Street"; |
| static final String TEST_CITY = "London"; |
| static final String TEST_STATE = "London"; |
| static final String TEST_ZIP = "NW1 6XE"; |
| |
| static final int NUM_RECORDS = 5000; |
| |
| @Before |
| public void createFiles() throws IOException { |
| testFile = File.createTempFile("AvroSplittableInputFormatTest", null); |
| |
| ArrayList<CharSequence> stringArray = new ArrayList<>(); |
| stringArray.add(TEST_ARRAY_STRING_1); |
| stringArray.add(TEST_ARRAY_STRING_2); |
| |
| ArrayList<Boolean> booleanArray = new ArrayList<>(); |
| booleanArray.add(TEST_ARRAY_BOOLEAN_1); |
| booleanArray.add(TEST_ARRAY_BOOLEAN_2); |
| |
| HashMap<CharSequence, Long> longMap = new HashMap<>(); |
| longMap.put(TEST_MAP_KEY1, TEST_MAP_VALUE1); |
| longMap.put(TEST_MAP_KEY2, TEST_MAP_VALUE2); |
| |
| Address addr = new Address(); |
| addr.setNum(TEST_NUM); |
| addr.setStreet(TEST_STREET); |
| addr.setCity(TEST_CITY); |
| addr.setState(TEST_STATE); |
| addr.setZip(TEST_ZIP); |
| |
| User user1 = new User(); |
| user1.setName(TEST_NAME); |
| user1.setFavoriteNumber(256); |
| user1.setTypeDoubleTest(123.45d); |
| user1.setTypeBoolTest(true); |
| user1.setTypeArrayString(stringArray); |
| user1.setTypeArrayBoolean(booleanArray); |
| user1.setTypeEnum(TEST_ENUM_COLOR); |
| user1.setTypeMap(longMap); |
| user1.setTypeNested(addr); |
| user1.setTypeBytes(ByteBuffer.allocate(10)); |
| user1.setTypeDate(LocalDate.parse("2014-03-01")); |
| user1.setTypeTimeMillis(LocalTime.parse("12:12:12")); |
| user1.setTypeTimeMicros(123456); |
| user1.setTypeTimestampMillis(DateTime.parse("2014-03-01T12:12:12.321Z")); |
| user1.setTypeTimestampMicros(123456L); |
| // 20.00 |
| user1.setTypeDecimalBytes(ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray())); |
| // 20.00 |
| user1.setTypeDecimalFixed(new Fixed2(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray())); |
| |
| // Construct via builder |
| User user2 = User.newBuilder() |
| .setName(TEST_NAME) |
| .setFavoriteColor("blue") |
| .setFavoriteNumber(null) |
| .setTypeBoolTest(false) |
| .setTypeDoubleTest(1.337d) |
| .setTypeNullTest(null) |
| .setTypeLongTest(1337L) |
| .setTypeArrayString(new ArrayList<>()) |
| .setTypeArrayBoolean(new ArrayList<>()) |
| .setTypeNullableArray(null) |
| .setTypeEnum(Colors.RED) |
| .setTypeMap(new HashMap<>()) |
| .setTypeFixed(new Fixed16()) |
| .setTypeUnion(123L) |
| .setTypeNested( |
| Address.newBuilder().setNum(TEST_NUM).setStreet(TEST_STREET) |
| .setCity(TEST_CITY).setState(TEST_STATE).setZip(TEST_ZIP) |
| .build()) |
| .setTypeBytes(ByteBuffer.allocate(10)) |
| .setTypeDate(LocalDate.parse("2014-03-01")) |
| .setTypeTimeMillis(LocalTime.parse("12:12:12")) |
| .setTypeTimeMicros(123456) |
| .setTypeTimestampMillis(DateTime.parse("2014-03-01T12:12:12.321Z")) |
| .setTypeTimestampMicros(123456L) |
| // 20.00 |
| .setTypeDecimalBytes(ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray())) |
| // 20.00 |
| .setTypeDecimalFixed(new Fixed2(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray())) |
| .build(); |
| DatumWriter<User> userDatumWriter = new SpecificDatumWriter<>(User.class); |
| DataFileWriter<User> dataFileWriter = new DataFileWriter<>(userDatumWriter); |
| dataFileWriter.create(user1.getSchema(), testFile); |
| dataFileWriter.append(user1); |
| dataFileWriter.append(user2); |
| |
| Random rnd = new Random(1337); |
| for (int i = 0; i < NUM_RECORDS - 2; i++) { |
| User user = new User(); |
| user.setName(TEST_NAME + rnd.nextInt()); |
| user.setFavoriteNumber(rnd.nextInt()); |
| user.setTypeDoubleTest(rnd.nextDouble()); |
| user.setTypeBoolTest(true); |
| user.setTypeArrayString(stringArray); |
| user.setTypeArrayBoolean(booleanArray); |
| user.setTypeEnum(TEST_ENUM_COLOR); |
| user.setTypeMap(longMap); |
| Address address = new Address(); |
| address.setNum(TEST_NUM); |
| address.setStreet(TEST_STREET); |
| address.setCity(TEST_CITY); |
| address.setState(TEST_STATE); |
| address.setZip(TEST_ZIP); |
| user.setTypeNested(address); |
| user.setTypeBytes(ByteBuffer.allocate(10)); |
| user.setTypeDate(LocalDate.parse("2014-03-01")); |
| user.setTypeTimeMillis(LocalTime.parse("12:12:12")); |
| user.setTypeTimeMicros(123456); |
| user.setTypeTimestampMillis(DateTime.parse("2014-03-01T12:12:12.321Z")); |
| user.setTypeTimestampMicros(123456L); |
| // 20.00 |
| user.setTypeDecimalBytes(ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray())); |
| // 20.00 |
| user.setTypeDecimalFixed(new Fixed2(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray())); |
| |
| dataFileWriter.append(user); |
| } |
| dataFileWriter.close(); |
| } |
| |
| @Test |
| public void testSplittedIF() throws IOException { |
| Configuration parameters = new Configuration(); |
| |
| AvroInputFormat<User> format = new AvroInputFormat<>(new Path(testFile.getAbsolutePath()), User.class); |
| |
| format.configure(parameters); |
| FileInputSplit[] splits = format.createInputSplits(4); |
| assertEquals(splits.length, 4); |
| int elements = 0; |
| int[] elementsPerSplit = new int[4]; |
| for (int i = 0; i < splits.length; i++) { |
| format.open(splits[i]); |
| while (!format.reachedEnd()) { |
| User u = format.nextRecord(null); |
| Assert.assertTrue(u.getName().toString().startsWith(TEST_NAME)); |
| elements++; |
| elementsPerSplit[i]++; |
| } |
| format.close(); |
| } |
| |
| Assert.assertEquals(1604, elementsPerSplit[0]); |
| Assert.assertEquals(1203, elementsPerSplit[1]); |
| Assert.assertEquals(1203, elementsPerSplit[2]); |
| Assert.assertEquals(990, elementsPerSplit[3]); |
| Assert.assertEquals(NUM_RECORDS, elements); |
| format.close(); |
| } |
| |
| @Test |
| public void testAvroRecoveryWithFailureAtStart() throws Exception { |
| final int recordsUntilCheckpoint = 132; |
| |
| Configuration parameters = new Configuration(); |
| |
| AvroInputFormat<User> format = new AvroInputFormat<>(new Path(testFile.getAbsolutePath()), User.class); |
| format.configure(parameters); |
| |
| FileInputSplit[] splits = format.createInputSplits(4); |
| assertEquals(splits.length, 4); |
| |
| int elements = 0; |
| int[] elementsPerSplit = new int[4]; |
| for (int i = 0; i < splits.length; i++) { |
| format.reopen(splits[i], format.getCurrentState()); |
| while (!format.reachedEnd()) { |
| User u = format.nextRecord(null); |
| Assert.assertTrue(u.getName().toString().startsWith(TEST_NAME)); |
| elements++; |
| |
| if (format.getRecordsReadFromBlock() == recordsUntilCheckpoint) { |
| |
| // do the whole checkpoint-restore procedure and see if we pick up from where we left off. |
| Tuple2<Long, Long> state = format.getCurrentState(); |
| |
| // this is to make sure that nothing stays from the previous format |
| // (as it is going to be in the normal case) |
| format = new AvroInputFormat<>(new Path(testFile.getAbsolutePath()), User.class); |
| |
| format.reopen(splits[i], state); |
| assertEquals(format.getRecordsReadFromBlock(), recordsUntilCheckpoint); |
| } |
| elementsPerSplit[i]++; |
| } |
| format.close(); |
| } |
| |
| Assert.assertEquals(1604, elementsPerSplit[0]); |
| Assert.assertEquals(1203, elementsPerSplit[1]); |
| Assert.assertEquals(1203, elementsPerSplit[2]); |
| Assert.assertEquals(990, elementsPerSplit[3]); |
| Assert.assertEquals(NUM_RECORDS, elements); |
| format.close(); |
| } |
| |
| @Test |
| public void testAvroRecovery() throws Exception { |
| final int recordsUntilCheckpoint = 132; |
| |
| Configuration parameters = new Configuration(); |
| |
| AvroInputFormat<User> format = new AvroInputFormat<>(new Path(testFile.getAbsolutePath()), User.class); |
| format.configure(parameters); |
| |
| FileInputSplit[] splits = format.createInputSplits(4); |
| assertEquals(splits.length, 4); |
| |
| int elements = 0; |
| int[] elementsPerSplit = new int[4]; |
| for (int i = 0; i < splits.length; i++) { |
| format.open(splits[i]); |
| while (!format.reachedEnd()) { |
| User u = format.nextRecord(null); |
| Assert.assertTrue(u.getName().toString().startsWith(TEST_NAME)); |
| elements++; |
| |
| if (format.getRecordsReadFromBlock() == recordsUntilCheckpoint) { |
| |
| // do the whole checkpoint-restore procedure and see if we pick up from where we left off. |
| Tuple2<Long, Long> state = format.getCurrentState(); |
| |
| // this is to make sure that nothing stays from the previous format |
| // (as it is going to be in the normal case) |
| format = new AvroInputFormat<>(new Path(testFile.getAbsolutePath()), User.class); |
| |
| format.reopen(splits[i], state); |
| assertEquals(format.getRecordsReadFromBlock(), recordsUntilCheckpoint); |
| } |
| elementsPerSplit[i]++; |
| } |
| format.close(); |
| } |
| |
| Assert.assertEquals(1604, elementsPerSplit[0]); |
| Assert.assertEquals(1203, elementsPerSplit[1]); |
| Assert.assertEquals(1203, elementsPerSplit[2]); |
| Assert.assertEquals(990, elementsPerSplit[3]); |
| Assert.assertEquals(NUM_RECORDS, elements); |
| format.close(); |
| } |
| |
| /* |
| This test is gave the reference values for the test of Flink's IF. |
| |
| This dependency needs to be added |
| |
| <dependency> |
| <groupId>org.apache.avro</groupId> |
| <artifactId>avro-mapred</artifactId> |
| <version>1.7.6</version> |
| </dependency> |
| |
| <dependency> |
| <groupId>com.alibaba.blink</groupId> |
| <artifactId>flink-hadoop-compatibility_2.11</artifactId> |
| <version>1.6-SNAPSHOT</version> |
| </dependency> |
| |
| <dependency> |
| <groupId>com.google.guava</groupId> |
| <artifactId>guava</artifactId> |
| <version>16.0</version> |
| </dependency> |
| |
| @Test |
| public void testHadoop() throws Exception { |
| JobConf jf = new JobConf(); |
| FileInputFormat.addInputPath(jf, new org.apache.hadoop.fs.Path(testFile.toURI())); |
| jf.setBoolean(org.apache.avro.mapred.AvroInputFormat.IGNORE_FILES_WITHOUT_EXTENSION_KEY, false); |
| org.apache.avro.mapred.AvroInputFormat<User> format = new org.apache.avro.mapred.AvroInputFormat<User>(); |
| InputSplit[] sp = format.getSplits(jf, 4); |
| int elementsPerSplit[] = new int[4]; |
| int cnt = 0; |
| int i = 0; |
| for (InputSplit s:sp) { |
| RecordReader<AvroWrapper<User>, NullWritable> r = format.getRecordReader(s, jf, new HadoopDummyReporter()); |
| AvroWrapper<User> k = r.createKey(); |
| NullWritable v = r.createValue(); |
| |
| while (r.next(k, v)) { |
| cnt++; |
| elementsPerSplit[i]++; |
| } |
| i++; |
| } |
| System.out.println("Status " + Arrays.toString(elementsPerSplit)); |
| } */ |
| |
| @After |
| @SuppressWarnings("ResultOfMethodCallIgnored") |
| public void deleteFiles() { |
| testFile.delete(); |
| } |
| } |