| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hudi.hadoop.realtime; |
| |
| import org.apache.hudi.avro.HoodieAvroUtils; |
| import org.apache.hudi.avro.model.HoodieCompactionPlan; |
| import org.apache.hudi.common.config.HoodieCommonConfig; |
| import org.apache.hudi.common.config.HoodieMemoryConfig; |
| import org.apache.hudi.common.fs.FSUtils; |
| import org.apache.hudi.common.model.FileSlice; |
| import org.apache.hudi.common.model.HoodieCommitMetadata; |
| import org.apache.hudi.common.model.HoodieLogFile; |
| import org.apache.hudi.common.model.HoodieReplaceCommitMetadata; |
| import org.apache.hudi.common.model.HoodieTableType; |
| import org.apache.hudi.common.model.HoodieWriteStat; |
| import org.apache.hudi.common.model.WriteOperationType; |
| import org.apache.hudi.common.table.log.HoodieLogFormat; |
| import org.apache.hudi.common.table.log.HoodieLogFormat.Writer; |
| import org.apache.hudi.common.table.log.block.HoodieLogBlock; |
| import org.apache.hudi.common.table.timeline.HoodieTimeline; |
| import org.apache.hudi.common.testutils.InProcessTimeGenerator; |
| import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; |
| import org.apache.hudi.common.testutils.FileCreateUtils; |
| import org.apache.hudi.common.testutils.HoodieTestUtils; |
| import org.apache.hudi.common.testutils.SchemaTestUtil; |
| import org.apache.hudi.common.util.CommitUtils; |
| import org.apache.hudi.common.util.Option; |
| import org.apache.hudi.common.util.collection.ExternalSpillableMap; |
| import org.apache.hudi.common.util.collection.Pair; |
| import org.apache.hudi.exception.HoodieException; |
| import org.apache.hudi.hadoop.RealtimeFileStatus; |
| import org.apache.hudi.hadoop.testutils.InputFormatTestUtil; |
| import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils; |
| |
| import org.apache.avro.Schema; |
| import org.apache.avro.Schema.Field; |
| import org.apache.avro.generic.GenericRecord; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileStatus; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; |
| import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat; |
| import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; |
| import org.apache.hadoop.io.ArrayWritable; |
| import org.apache.hadoop.io.BooleanWritable; |
| import org.apache.hadoop.io.DoubleWritable; |
| import org.apache.hadoop.io.FloatWritable; |
| import org.apache.hadoop.io.IntWritable; |
| import org.apache.hadoop.io.LongWritable; |
| import org.apache.hadoop.io.NullWritable; |
| import org.apache.hadoop.io.Writable; |
| import org.apache.hadoop.mapred.FileInputFormat; |
| import org.apache.hadoop.mapred.FileSplit; |
| import org.apache.hadoop.mapred.InputSplit; |
| import org.apache.hadoop.mapred.JobConf; |
| import org.apache.hadoop.mapred.RecordReader; |
| import org.apache.hadoop.mapred.Reporter; |
| import org.junit.jupiter.api.AfterEach; |
| import org.junit.jupiter.api.BeforeEach; |
| import org.junit.jupiter.api.Test; |
| import org.junit.jupiter.api.io.TempDir; |
| import org.junit.jupiter.params.ParameterizedTest; |
| import org.junit.jupiter.params.provider.Arguments; |
| import org.junit.jupiter.params.provider.MethodSource; |
| |
| import java.io.File; |
| import java.io.FileOutputStream; |
| import java.io.IOException; |
| import java.net.URI; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.stream.Collectors; |
| import java.util.stream.Stream; |
| |
| import static org.apache.hudi.common.table.timeline.TimelineMetadataUtils.serializeCommitMetadata; |
| import static org.apache.hudi.hadoop.realtime.HoodieRealtimeRecordReader.REALTIME_SKIP_MERGE_PROP; |
| import static org.junit.jupiter.api.Assertions.assertEquals; |
| import static org.junit.jupiter.api.Assertions.assertFalse; |
| import static org.junit.jupiter.api.Assertions.assertTrue; |
| import static org.junit.jupiter.params.provider.Arguments.arguments; |
| |
| public class TestHoodieRealtimeRecordReader { |
| |
| private static final String PARTITION_COLUMN = "datestr"; |
| private JobConf baseJobConf; |
| private FileSystem fs; |
| private Configuration hadoopConf; |
| |
| @BeforeEach |
| public void setUp() { |
| hadoopConf = HoodieTestUtils.getDefaultHadoopConf(); |
| hadoopConf.set("fs.defaultFS", "file:///"); |
| hadoopConf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName()); |
| baseJobConf = new JobConf(hadoopConf); |
| baseJobConf.set(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE.key(), String.valueOf(1024 * 1024)); |
| fs = FSUtils.getFs(basePath.toUri().toString(), baseJobConf); |
| } |
| |
| @AfterEach |
| public void tearDown() throws Exception { |
| if (fs != null) { |
| fs.delete(new Path(basePath.toString()), true); |
| fs.close(); |
| } |
| if (baseJobConf != null) { |
| baseJobConf.clear(); |
| } |
| } |
| |
| @TempDir |
| public java.nio.file.Path basePath; |
| |
| private Writer writeLogFile(File partitionDir, Schema schema, String fileId, String baseCommit, String newCommit, |
| int numberOfRecords) throws InterruptedException, IOException { |
| return InputFormatTestUtil.writeDataBlockToLogFile(partitionDir, fs, schema, fileId, baseCommit, newCommit, |
| numberOfRecords, 0, |
| 0); |
| } |
| |
| private void setHiveColumnNameProps(List<Schema.Field> fields, JobConf jobConf, boolean isPartitioned) { |
| String names = fields.stream().map(Field::name).collect(Collectors.joining(",")); |
| String positions = fields.stream().map(f -> String.valueOf(f.pos())).collect(Collectors.joining(",")); |
| jobConf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, names); |
| jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, positions); |
| |
| String hiveOrderedColumnNames = fields.stream().filter(field -> !field.name().equalsIgnoreCase(PARTITION_COLUMN)) |
| .map(Field::name).collect(Collectors.joining(",")); |
| if (isPartitioned) { |
| hiveOrderedColumnNames += "," + PARTITION_COLUMN; |
| jobConf.set(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, PARTITION_COLUMN); |
| } |
| jobConf.set(hive_metastoreConstants.META_TABLE_COLUMNS, hiveOrderedColumnNames); |
| } |
| |
| @ParameterizedTest |
| @MethodSource("testArguments") |
| public void testReader(ExternalSpillableMap.DiskMapType diskMapType, |
| boolean isCompressionEnabled, |
| boolean partitioned) throws Exception { |
| testReaderInternal(diskMapType, isCompressionEnabled, partitioned); |
| } |
| |
| @Test |
| public void testHFileInlineReader() throws Exception { |
| testReaderInternal(ExternalSpillableMap.DiskMapType.BITCASK, false, false, |
| HoodieLogBlock.HoodieLogBlockType.HFILE_DATA_BLOCK); |
| } |
| |
| @Test |
| public void testParquetInlineReader() throws Exception { |
| testReaderInternal(ExternalSpillableMap.DiskMapType.BITCASK, false, false, |
| HoodieLogBlock.HoodieLogBlockType.PARQUET_DATA_BLOCK); |
| } |
| |
| private void testReaderInternal(ExternalSpillableMap.DiskMapType diskMapType, |
| boolean isCompressionEnabled, |
| boolean partitioned) throws Exception { |
| testReaderInternal(diskMapType, isCompressionEnabled, partitioned, HoodieLogBlock.HoodieLogBlockType.AVRO_DATA_BLOCK); |
| } |
| |
| private void testReaderInternal(ExternalSpillableMap.DiskMapType diskMapType, |
| boolean isCompressionEnabled, |
| boolean partitioned, HoodieLogBlock.HoodieLogBlockType logBlockType) throws Exception { |
| // initial commit |
| Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getEvolvedSchema()); |
| HoodieTestUtils.init(hadoopConf, basePath.toString(), HoodieTableType.MERGE_ON_READ); |
| String baseInstant = "100"; |
| File partitionDir = partitioned ? InputFormatTestUtil.prepareParquetTable(basePath, schema, 1, 100, baseInstant, |
| HoodieTableType.MERGE_ON_READ) |
| : InputFormatTestUtil.prepareNonPartitionedParquetTable(basePath, schema, 1, 100, baseInstant, |
| HoodieTableType.MERGE_ON_READ); |
| |
| HoodieCommitMetadata commitMetadata = CommitUtils.buildMetadata(Collections.emptyList(), Collections.emptyMap(), Option.empty(), WriteOperationType.UPSERT, |
| schema.toString(), HoodieTimeline.DELTA_COMMIT_ACTION); |
| FileCreateUtils.createDeltaCommit(basePath.toString(), baseInstant, commitMetadata); |
| // Add the paths |
| FileInputFormat.setInputPaths(baseJobConf, partitionDir.getPath()); |
| |
| List<Pair<String, Integer>> logVersionsWithAction = new ArrayList<>(); |
| logVersionsWithAction.add(Pair.of(HoodieTimeline.DELTA_COMMIT_ACTION, 1)); |
| logVersionsWithAction.add(Pair.of(HoodieTimeline.DELTA_COMMIT_ACTION, 2)); |
| // TODO: HUDI-154 Once Hive 2.x PR (PR-674) is merged, enable this change |
| // logVersionsWithAction.add(Pair.of(HoodieTimeline.ROLLBACK_ACTION, 3)); |
| FileSlice fileSlice = |
| new FileSlice(partitioned ? FSUtils.getRelativePartitionPath(new Path(basePath.toString()), |
| new Path(partitionDir.getAbsolutePath())) : "default", baseInstant, "fileid0"); |
| logVersionsWithAction.forEach(logVersionWithAction -> { |
| try { |
| // update files or generate new log file |
| int logVersion = logVersionWithAction.getRight(); |
| String action = logVersionWithAction.getKey(); |
| int baseInstantTs = Integer.parseInt(baseInstant); |
| String instantTime = String.valueOf(baseInstantTs + logVersion); |
| String latestInstant = |
| action.equals(HoodieTimeline.ROLLBACK_ACTION) ? String.valueOf(baseInstantTs + logVersion - 2) |
| : instantTime; |
| |
| HoodieLogFormat.Writer writer; |
| if (action.equals(HoodieTimeline.ROLLBACK_ACTION)) { |
| writer = InputFormatTestUtil.writeRollback(partitionDir, fs, "fileid0", baseInstant, instantTime, |
| String.valueOf(baseInstantTs + logVersion - 1), logVersion); |
| } else { |
| writer = |
| InputFormatTestUtil.writeDataBlockToLogFile(partitionDir, fs, schema, "fileid0", baseInstant, |
| instantTime, 120, 0, logVersion, logBlockType); |
| } |
| long size = writer.getCurrentSize(); |
| writer.close(); |
| assertTrue(size > 0, "block - size should be > 0"); |
| FileCreateUtils.createDeltaCommit(basePath.toString(), instantTime, commitMetadata); |
| |
| // create a split with baseFile (parquet file written earlier) and new log file(s) |
| fileSlice.addLogFile(writer.getLogFile()); |
| HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit( |
| new FileSplit(new Path(partitionDir + "/fileid0_1-0-1_" + baseInstant + ".parquet"), 0, 1, baseJobConf), |
| basePath.toUri().toString(), fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()) |
| .collect(Collectors.toList()), |
| instantTime, |
| false, |
| Option.empty()); |
| |
| // create a RecordReader to be used by HoodieRealtimeRecordReader |
| RecordReader<NullWritable, ArrayWritable> reader = new MapredParquetInputFormat().getRecordReader( |
| new FileSplit(split.getPath(), 0, fs.getLength(split.getPath()), (String[]) null), baseJobConf, null); |
| JobConf jobConf = new JobConf(baseJobConf); |
| List<Schema.Field> fields = schema.getFields(); |
| setHiveColumnNameProps(fields, jobConf, partitioned); |
| |
| jobConf.setEnum(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.key(), diskMapType); |
| jobConf.setBoolean(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(), isCompressionEnabled); |
| |
| // validate record reader compaction |
| long logTmpFileStartTime = System.currentTimeMillis(); |
| HoodieRealtimeRecordReader recordReader = new HoodieRealtimeRecordReader(split, jobConf, reader); |
| |
| // use reader to read base Parquet File and log file, merge in flight and return latest commit |
| // here all 100 records should be updated, see above |
| // another 20 new insert records should also output with new commit time. |
| NullWritable key = recordReader.createKey(); |
| ArrayWritable value = recordReader.createValue(); |
| int recordCnt = 0; |
| while (recordReader.next(key, value)) { |
| Writable[] values = value.get(); |
| // check if the record written is with latest commit, here "101" |
| assertEquals(latestInstant, values[0].toString()); |
| key = recordReader.createKey(); |
| value = recordReader.createValue(); |
| recordCnt++; |
| } |
| recordReader.getPos(); |
| assertEquals(1.0, recordReader.getProgress(), 0.05); |
| assertEquals(120, recordCnt); |
| recordReader.close(); |
| // the temp file produced by logScanner should be deleted |
| assertTrue(!getLogTempFile(logTmpFileStartTime, System.currentTimeMillis(), diskMapType.toString()).exists()); |
| } catch (Exception ioe) { |
| throw new HoodieException(ioe.getMessage(), ioe); |
| } |
| }); |
| |
| // Add Rollback last version to next log-file |
| |
| } |
| |
| private File getLogTempFile(long startTime, long endTime, String diskType) { |
| return Arrays.stream(new File("/tmp").listFiles()) |
| .filter(f -> f.isDirectory() && f.getName().startsWith("hudi-" + diskType) && f.lastModified() > startTime && f.lastModified() < endTime) |
| .findFirst() |
| .orElse(new File("")); |
| } |
| |
| @Test |
| public void testUnMergedReader() throws Exception { |
| // initial commit |
| Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getEvolvedSchema()); |
| HoodieTestUtils.init(hadoopConf, basePath.toString(), HoodieTableType.MERGE_ON_READ); |
| String instantTime = "100"; |
| final int numRecords = 1000; |
| final int firstBatchLastRecordKey = numRecords - 1; |
| final int secondBatchLastRecordKey = 2 * numRecords - 1; |
| File partitionDir = InputFormatTestUtil.prepareParquetTable(basePath, schema, 1, numRecords, instantTime, |
| HoodieTableType.MERGE_ON_READ); |
| HoodieCommitMetadata commitMetadata = CommitUtils.buildMetadata(Collections.emptyList(), Collections.emptyMap(), Option.empty(), WriteOperationType.UPSERT, |
| schema.toString(), HoodieTimeline.DELTA_COMMIT_ACTION); |
| FileCreateUtils.createDeltaCommit(basePath.toString(), instantTime, commitMetadata); |
| // Add the paths |
| FileInputFormat.setInputPaths(baseJobConf, partitionDir.getPath()); |
| |
| // insert new records to log file |
| String newCommitTime = "101"; |
| HoodieLogFormat.Writer writer = |
| InputFormatTestUtil.writeDataBlockToLogFile(partitionDir, fs, schema, "fileid0", instantTime, newCommitTime, |
| numRecords, numRecords, 0); |
| long size = writer.getCurrentSize(); |
| writer.close(); |
| assertTrue(size > 0, "block - size should be > 0"); |
| FileCreateUtils.createDeltaCommit(basePath.toString(), newCommitTime, commitMetadata); |
| |
| // create a split with baseFile (parquet file written earlier) and new log file(s) |
| HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit( |
| new FileSplit(new Path(partitionDir + "/fileid0_1-0-1_" + instantTime + ".parquet"), 0, 1, baseJobConf), |
| basePath.toUri().toString(), Collections.singletonList(writer.getLogFile()), newCommitTime, false, Option.empty()); |
| |
| // create a RecordReader to be used by HoodieRealtimeRecordReader |
| RecordReader<NullWritable, ArrayWritable> reader = new MapredParquetInputFormat().getRecordReader( |
| new FileSplit(split.getPath(), 0, fs.getLength(split.getPath()), (String[]) null), baseJobConf, null); |
| JobConf jobConf = new JobConf(baseJobConf); |
| List<Schema.Field> fields = schema.getFields(); |
| setHiveColumnNameProps(fields, jobConf, true); |
| // Enable merge skipping. |
| jobConf.set(REALTIME_SKIP_MERGE_PROP, "true"); |
| |
| // validate unmerged record reader |
| RealtimeUnmergedRecordReader recordReader = new RealtimeUnmergedRecordReader(split, jobConf, reader); |
| |
| // use reader to read base Parquet File and log file |
| // here all records should be present. Also ensure log records are in order. |
| NullWritable key = recordReader.createKey(); |
| ArrayWritable value = recordReader.createValue(); |
| int numRecordsAtCommit1 = 0; |
| int numRecordsAtCommit2 = 0; |
| Set<Integer> seenKeys = new HashSet<>(); |
| int lastSeenKeyFromLog = firstBatchLastRecordKey; |
| while (recordReader.next(key, value)) { |
| Writable[] values = value.get(); |
| String gotCommit = values[0].toString(); |
| String keyStr = values[2].toString(); |
| int gotKey = Integer.parseInt(keyStr.substring("key".length())); |
| if (gotCommit.equals(newCommitTime)) { |
| numRecordsAtCommit2++; |
| assertTrue(gotKey > firstBatchLastRecordKey); |
| assertTrue(gotKey <= secondBatchLastRecordKey); |
| assertEquals(gotKey, lastSeenKeyFromLog + 1); |
| lastSeenKeyFromLog++; |
| } else { |
| numRecordsAtCommit1++; |
| assertTrue(gotKey >= 0); |
| assertTrue(gotKey <= firstBatchLastRecordKey); |
| } |
| // Ensure unique key |
| assertFalse(seenKeys.contains(gotKey)); |
| seenKeys.add(gotKey); |
| key = recordReader.createKey(); |
| value = recordReader.createValue(); |
| } |
| assertEquals(numRecords, numRecordsAtCommit1); |
| assertEquals(numRecords, numRecordsAtCommit2); |
| assertEquals(2 * numRecords, seenKeys.size()); |
| assertEquals(1.0, recordReader.getProgress(), 0.05); |
| recordReader.close(); |
| } |
| |
| @ParameterizedTest |
| @MethodSource("testArguments") |
| public void testReaderWithNestedAndComplexSchema(ExternalSpillableMap.DiskMapType diskMapType, |
| boolean isCompressionEnabled) throws Exception { |
| // initial commit |
| Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getComplexEvolvedSchema()); |
| HoodieTestUtils.init(hadoopConf, basePath.toString(), HoodieTableType.MERGE_ON_READ); |
| String instantTime = "100"; |
| int numberOfRecords = 100; |
| int numberOfLogRecords = numberOfRecords / 2; |
| File partitionDir = InputFormatTestUtil.prepareParquetTable(basePath, schema, 1, numberOfRecords, |
| instantTime, HoodieTableType.MERGE_ON_READ); |
| |
| HoodieCommitMetadata commitMetadata = CommitUtils.buildMetadata(Collections.emptyList(), Collections.emptyMap(), Option.empty(), WriteOperationType.UPSERT, |
| schema.toString(), HoodieTimeline.COMMIT_ACTION); |
| FileCreateUtils.createCommit(basePath.toString(), instantTime, Option.of(commitMetadata)); |
| // Add the paths |
| FileInputFormat.setInputPaths(baseJobConf, partitionDir.getPath()); |
| |
| // update files or generate new log file |
| String newCommitTime = "101"; |
| HoodieLogFormat.Writer writer = |
| writeLogFile(partitionDir, schema, "fileid0", instantTime, newCommitTime, numberOfLogRecords); |
| long size = writer.getCurrentSize(); |
| writer.close(); |
| assertTrue(size > 0, "block - size should be > 0"); |
| commitMetadata = CommitUtils.buildMetadata(Collections.emptyList(), Collections.emptyMap(), Option.empty(), WriteOperationType.UPSERT, |
| schema.toString(), HoodieTimeline.DELTA_COMMIT_ACTION); |
| FileCreateUtils.createDeltaCommit(basePath.toString(), newCommitTime, commitMetadata); |
| |
| // create a split with baseFile (parquet file written earlier) and new log file(s) |
| HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit( |
| new FileSplit(new Path(partitionDir + "/fileid0_1-0-1_" + instantTime + ".parquet"), 0, 1, baseJobConf), |
| basePath.toUri().toString(), Collections.singletonList(writer.getLogFile()), newCommitTime, false, Option.empty()); |
| |
| // create a RecordReader to be used by HoodieRealtimeRecordReader |
| RecordReader<NullWritable, ArrayWritable> reader = new MapredParquetInputFormat().getRecordReader( |
| new FileSplit(split.getPath(), 0, fs.getLength(split.getPath()), (String[]) null), baseJobConf, null); |
| JobConf jobConf = new JobConf(baseJobConf); |
| List<Schema.Field> fields = schema.getFields(); |
| setHiveColumnNameProps(fields, jobConf, true); |
| |
| jobConf.setEnum(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.key(), diskMapType); |
| jobConf.setBoolean(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(), isCompressionEnabled); |
| |
| // validate record reader compaction |
| HoodieRealtimeRecordReader recordReader = new HoodieRealtimeRecordReader(split, jobConf, reader); |
| |
| // use reader to read base Parquet File and log file, merge in flight and return latest commit |
| // here the first 50 records should be updated, see above |
| NullWritable key = recordReader.createKey(); |
| ArrayWritable value = recordReader.createValue(); |
| int numRecordsRead = 0; |
| while (recordReader.next(key, value)) { |
| int currentRecordNo = numRecordsRead; |
| ++numRecordsRead; |
| Writable[] values = value.get(); |
| String recordCommitTime; |
| // check if the record written is with latest commit, here "101" |
| if (numRecordsRead > numberOfLogRecords) { |
| recordCommitTime = instantTime; |
| } else { |
| recordCommitTime = newCommitTime; |
| } |
| String recordCommitTimeSuffix = "@" + recordCommitTime; |
| |
| assertEquals(values[0].toString(), recordCommitTime); |
| key = recordReader.createKey(); |
| value = recordReader.createValue(); |
| |
| // Assert type STRING |
| assertEquals(values[5].toString(), "field" + currentRecordNo, "test value for field: field1"); |
| assertEquals(values[6].toString(), "field" + currentRecordNo + recordCommitTimeSuffix, |
| "test value for field: field2"); |
| assertEquals(values[7].toString(), "name" + currentRecordNo, |
| "test value for field: name"); |
| |
| // Assert type INT |
| IntWritable intWritable = (IntWritable) values[8]; |
| assertEquals(intWritable.get(), currentRecordNo + recordCommitTime.hashCode(), |
| "test value for field: favoriteIntNumber"); |
| |
| // Assert type LONG |
| LongWritable longWritable = (LongWritable) values[9]; |
| assertEquals(longWritable.get(), currentRecordNo + recordCommitTime.hashCode(), |
| "test value for field: favoriteNumber"); |
| |
| // Assert type FLOAT |
| FloatWritable floatWritable = (FloatWritable) values[10]; |
| assertEquals(floatWritable.get(), (float) ((currentRecordNo + recordCommitTime.hashCode()) / 1024.0), 0, |
| "test value for field: favoriteFloatNumber"); |
| |
| // Assert type DOUBLE |
| DoubleWritable doubleWritable = (DoubleWritable) values[11]; |
| assertEquals(doubleWritable.get(), (currentRecordNo + recordCommitTime.hashCode()) / 1024.0, 0, |
| "test value for field: favoriteDoubleNumber"); |
| |
| // Assert type MAP |
| ArrayWritable mapItem = (ArrayWritable) values[12]; |
| Writable mapItemValue1 = mapItem.get()[0]; |
| Writable mapItemValue2 = mapItem.get()[1]; |
| |
| assertEquals(((ArrayWritable) mapItemValue1).get()[0].toString(), "mapItem1", |
| "test value for field: tags"); |
| assertEquals(((ArrayWritable) mapItemValue2).get()[0].toString(), "mapItem2", |
| "test value for field: tags"); |
| assertEquals(((ArrayWritable) mapItemValue1).get().length, 2, |
| "test value for field: tags"); |
| assertEquals(((ArrayWritable) mapItemValue2).get().length, 2, |
| "test value for field: tags"); |
| Writable mapItemValue1value = ((ArrayWritable) mapItemValue1).get()[1]; |
| Writable mapItemValue2value = ((ArrayWritable) mapItemValue2).get()[1]; |
| assertEquals(((ArrayWritable) mapItemValue1value).get()[0].toString(), "item" + currentRecordNo, |
| "test value for field: tags[\"mapItem1\"].item1"); |
| assertEquals(((ArrayWritable) mapItemValue2value).get()[0].toString(), "item2" + currentRecordNo, |
| "test value for field: tags[\"mapItem2\"].item1"); |
| assertEquals(((ArrayWritable) mapItemValue1value).get()[1].toString(), "item" + currentRecordNo + recordCommitTimeSuffix, |
| "test value for field: tags[\"mapItem1\"].item2"); |
| assertEquals(((ArrayWritable) mapItemValue2value).get()[1].toString(), "item2" + currentRecordNo + recordCommitTimeSuffix, |
| "test value for field: tags[\"mapItem2\"].item2"); |
| |
| // Assert type RECORD |
| ArrayWritable recordItem = (ArrayWritable) values[13]; |
| Writable[] nestedRecord = recordItem.get(); |
| assertFalse(((BooleanWritable) nestedRecord[0]).get(), "test value for field: testNestedRecord.isAdmin"); |
| assertEquals(nestedRecord[1].toString(), "UserId" + currentRecordNo + recordCommitTimeSuffix, |
| "test value for field: testNestedRecord.userId"); |
| |
| // Assert type ARRAY |
| ArrayWritable arrayValue = (ArrayWritable) values[14]; |
| Writable[] arrayValues = arrayValue.get(); |
| for (int i = 0; i < arrayValues.length; i++) { |
| assertEquals("stringArray" + i + recordCommitTimeSuffix, arrayValues[i].toString(), |
| "test value for field: stringArray"); |
| } |
| reader.close(); |
| } |
| } |
| |
| @ParameterizedTest |
| @MethodSource("testArguments") |
| public void testSchemaEvolutionAndRollbackBlockInLastLogFile(ExternalSpillableMap.DiskMapType diskMapType, |
| boolean isCompressionEnabled) throws Exception { |
| // initial commit |
| List<HoodieLogFile> logFiles = new ArrayList<>(); |
| Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema()); |
| HoodieTestUtils.init(hadoopConf, basePath.toString(), HoodieTableType.MERGE_ON_READ); |
| String instantTime = "100"; |
| int numberOfRecords = 100; |
| int numberOfLogRecords = numberOfRecords / 2; |
| File partitionDir = |
| InputFormatTestUtil.prepareSimpleParquetTable(basePath, schema, 1, numberOfRecords, |
| instantTime, HoodieTableType.MERGE_ON_READ); |
| HoodieCommitMetadata commitMetadata = CommitUtils.buildMetadata(Collections.emptyList(), Collections.emptyMap(), Option.empty(), WriteOperationType.UPSERT, |
| schema.toString(), HoodieTimeline.COMMIT_ACTION); |
| FileCreateUtils.createCommit(basePath.toString(), instantTime, Option.of(commitMetadata)); |
| // Add the paths |
| FileInputFormat.setInputPaths(baseJobConf, partitionDir.getPath()); |
| List<Field> firstSchemaFields = schema.getFields(); |
| |
| // update files and generate new log file but don't commit |
| schema = SchemaTestUtil.getComplexEvolvedSchema(); |
| String newCommitTime = "101"; |
| HoodieLogFormat.Writer writer = |
| InputFormatTestUtil.writeDataBlockToLogFile(partitionDir, fs, schema, "fileid0", instantTime, newCommitTime, |
| numberOfLogRecords, 0, 1); |
| long size = writer.getCurrentSize(); |
| logFiles.add(writer.getLogFile()); |
| writer.close(); |
| assertTrue(size > 0, "block - size should be > 0"); |
| |
| // write rollback for the previous block in new log file version |
| newCommitTime = "102"; |
| writer = InputFormatTestUtil.writeRollbackBlockToLogFile(partitionDir, fs, schema, "fileid0", instantTime, |
| newCommitTime, "101", 1); |
| logFiles.add(writer.getLogFile()); |
| writer.close(); |
| |
| commitMetadata = CommitUtils.buildMetadata(Collections.emptyList(), Collections.emptyMap(), Option.empty(), WriteOperationType.UPSERT, |
| schema.toString(), HoodieTimeline.DELTA_COMMIT_ACTION); |
| FileCreateUtils.createDeltaCommit(basePath.toString(), instantTime, commitMetadata); |
| |
| // create a split with baseFile (parquet file written earlier) and new log file(s) |
| HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit( |
| new FileSplit(new Path(partitionDir + "/fileid0_1_" + instantTime + ".parquet"), 0, 1, baseJobConf), |
| basePath.toUri().toString(), logFiles, newCommitTime, false, Option.empty()); |
| |
| // create a RecordReader to be used by HoodieRealtimeRecordReader |
| RecordReader<NullWritable, ArrayWritable> reader = new MapredParquetInputFormat().getRecordReader( |
| new FileSplit(split.getPath(), 0, fs.getLength(split.getPath()), (String[]) null), baseJobConf, null); |
| JobConf jobConf = new JobConf(baseJobConf); |
| List<Schema.Field> fields = schema.getFields(); |
| |
| assertFalse(firstSchemaFields.containsAll(fields)); |
| |
| // Try to read all the fields passed by the new schema |
| setHiveColumnNameProps(fields, jobConf, true); |
| |
| jobConf.setEnum(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.key(), diskMapType); |
| jobConf.setBoolean(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(), isCompressionEnabled); |
| |
| HoodieRealtimeRecordReader recordReader; |
| try { |
| // validate record reader compaction |
| recordReader = new HoodieRealtimeRecordReader(split, jobConf, reader); |
| throw new RuntimeException("should've failed the previous line"); |
| } catch (HoodieException e) { |
| // expected, field not found since the data written with the evolved schema was rolled back |
| } |
| |
| // Try to read all the fields passed by the new schema |
| setHiveColumnNameProps(firstSchemaFields, jobConf, true); |
| // This time read only the fields which are part of parquet |
| recordReader = new HoodieRealtimeRecordReader(split, jobConf, reader); |
| // use reader to read base Parquet File and log file |
| NullWritable key = recordReader.createKey(); |
| ArrayWritable value = recordReader.createValue(); |
| while (recordReader.next(key, value)) { |
| // keep reading |
| } |
| reader.close(); |
| } |
| |
| @Test |
| public void testSchemaEvolution() throws Exception { |
| ExternalSpillableMap.DiskMapType diskMapType = ExternalSpillableMap.DiskMapType.BITCASK; |
| boolean isCompressionEnabled = true; |
| // initial commit |
| List<HoodieLogFile> logFiles = new ArrayList<>(); |
| Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema()); |
| HoodieTestUtils.init(hadoopConf, basePath.toString(), HoodieTableType.MERGE_ON_READ); |
| String instantTime = "100"; |
| int numberOfRecords = 100; |
| int numberOfLogRecords = numberOfRecords / 2; |
| File partitionDir = |
| InputFormatTestUtil.prepareSimpleParquetTable(basePath, schema, 1, numberOfRecords, |
| instantTime, HoodieTableType.MERGE_ON_READ); |
| HoodieCommitMetadata commitMetadata = CommitUtils.buildMetadata(Collections.emptyList(), Collections.emptyMap(), Option.empty(), WriteOperationType.UPSERT, |
| schema.toString(), HoodieTimeline.COMMIT_ACTION); |
| FileCreateUtils.createCommit(basePath.toString(), instantTime, Option.of(commitMetadata)); |
| // Add the paths |
| FileInputFormat.setInputPaths(baseJobConf, partitionDir.getPath()); |
| List<Field> firstSchemaFields = schema.getFields(); |
| |
| // 2nd commit w/ evolved schema |
| Schema evolvedSchema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getEvolvedCompatibleSchema()); |
| List<Field> secondSchemaFields = evolvedSchema.getFields(); |
| String newCommitTime = "101"; |
| File partitionDir1 = |
| InputFormatTestUtil.prepareSimpleParquetTable(basePath, evolvedSchema, 1, numberOfRecords, |
| instantTime, HoodieTableType.MERGE_ON_READ, "2017", "05", "01"); |
| HoodieCommitMetadata commitMetadata1 = CommitUtils.buildMetadata(Collections.emptyList(), Collections.emptyMap(), Option.empty(), WriteOperationType.UPSERT, |
| evolvedSchema.toString(), HoodieTimeline.COMMIT_ACTION); |
| FileCreateUtils.createCommit(basePath.toString(), newCommitTime, Option.of(commitMetadata1)); |
| // Add the paths |
| FileInputFormat.setInputPaths(baseJobConf, partitionDir1.getPath()); |
| |
| // create a split with baseFile from 1st commit. |
| HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit( |
| new FileSplit(new Path(partitionDir + "/fileid0_1_" + instantTime + ".parquet"), 0, 1, baseJobConf), |
| basePath.toUri().toString(), logFiles, newCommitTime, false, Option.empty()); |
| |
| // create a RecordReader to be used by HoodieRealtimeRecordReader |
| RecordReader<NullWritable, ArrayWritable> reader = new MapredParquetInputFormat().getRecordReader( |
| new FileSplit(split.getPath(), 0, fs.getLength(split.getPath()), (String[]) null), baseJobConf, null); |
| JobConf jobConf = new JobConf(baseJobConf); |
| |
| // Try to read all the fields passed by the new schema |
| setHiveColumnNameProps(secondSchemaFields, jobConf, true); |
| // This time read only the fields which are part of parquet |
| HoodieRealtimeRecordReader recordReader = new HoodieRealtimeRecordReader(split, jobConf, reader); |
| // use reader to read base Parquet File and log file |
| NullWritable key = recordReader.createKey(); |
| ArrayWritable value = recordReader.createValue(); |
| while (recordReader.next(key, value)) { |
| // keep reading |
| } |
| reader.close(); |
| } |
| |
| private static Stream<Arguments> testArguments() { |
| // Arg1: ExternalSpillableMap Type, Arg2: isDiskMapCompressionEnabled, Arg3: partitioned |
| return Stream.of( |
| arguments(ExternalSpillableMap.DiskMapType.BITCASK, false, false), |
| arguments(ExternalSpillableMap.DiskMapType.ROCKS_DB, false, false), |
| arguments(ExternalSpillableMap.DiskMapType.BITCASK, true, false), |
| arguments(ExternalSpillableMap.DiskMapType.ROCKS_DB, true, false), |
| arguments(ExternalSpillableMap.DiskMapType.BITCASK, false, true), |
| arguments(ExternalSpillableMap.DiskMapType.ROCKS_DB, false, true), |
| arguments(ExternalSpillableMap.DiskMapType.BITCASK, true, true), |
| arguments(ExternalSpillableMap.DiskMapType.ROCKS_DB, true, true) |
| ); |
| } |
| |
| @Test |
| public void testIncrementalWithOnlylog() throws Exception { |
| // initial commit |
| Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getEvolvedSchema()); |
| HoodieTestUtils.init(hadoopConf, basePath.toString(), HoodieTableType.MERGE_ON_READ); |
| String instantTime = "100"; |
| final int numRecords = 1000; |
| File partitionDir = InputFormatTestUtil.prepareParquetTable(basePath, schema, 1, numRecords, instantTime, |
| HoodieTableType.MERGE_ON_READ); |
| createDeltaCommitFile(basePath, instantTime, "2016/05/01", "2016/05/01/fileid0_1-0-1_100.parquet", "fileid0", schema.toString()); |
| // Add the paths |
| FileInputFormat.setInputPaths(baseJobConf, partitionDir.getPath()); |
| |
| // insert new records to log file |
| try { |
| String newCommitTime = "102"; |
| HoodieLogFormat.Writer writer = |
| InputFormatTestUtil.writeDataBlockToLogFile(partitionDir, fs, schema, "fileid0", instantTime, newCommitTime, |
| numRecords, numRecords, 0); |
| writer.close(); |
| createDeltaCommitFile(basePath, newCommitTime, "2016/05/01", "2016/05/01/.fileid0_102.log.0_1-0-1", "fileid0", schema.toString()); |
| |
| InputFormatTestUtil.setupIncremental(baseJobConf, "101", 1); |
| |
| HoodieParquetRealtimeInputFormat inputFormat = new HoodieParquetRealtimeInputFormat(); |
| inputFormat.setConf(baseJobConf); |
| InputSplit[] splits = inputFormat.getSplits(baseJobConf, 1); |
| assertEquals(1, splits.length); |
| JobConf newJobConf = new JobConf(baseJobConf); |
| List<Schema.Field> fields = schema.getFields(); |
| setHiveColumnNameProps(fields, newJobConf, false); |
| newJobConf.set("columns.types", "string,string,string,string,string,string,string,string,bigint,string,string"); |
| RecordReader<NullWritable, ArrayWritable> reader = inputFormat.getRecordReader(splits[0], newJobConf, Reporter.NULL); |
| // use reader to read log file. |
| NullWritable key = reader.createKey(); |
| ArrayWritable value = reader.createValue(); |
| while (reader.next(key, value)) { |
| Writable[] values = value.get(); |
| // since we set incremental start commit as 101 and commit_number as 1. |
| // the data belong to commit 102 should be read out. |
| assertEquals(newCommitTime, values[0].toString()); |
| key = reader.createKey(); |
| value = reader.createValue(); |
| } |
| reader.close(); |
| } catch (IOException e) { |
| throw new HoodieException(e.getMessage(), e); |
| } |
| } |
| |
| @Test |
| public void testIncrementalWithReplace() throws Exception { |
| // initial commit |
| Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getEvolvedSchema()); |
| HoodieTestUtils.init(hadoopConf, basePath.toString(), HoodieTableType.MERGE_ON_READ); |
| String baseInstant = "100"; |
| File partitionDir = InputFormatTestUtil.prepareParquetTable(basePath, schema, 1, 100, baseInstant, |
| HoodieTableType.MERGE_ON_READ); |
| createDeltaCommitFile(basePath, baseInstant, "2016/05/01", "2016/05/01/fileid0_1-0-1_100.parquet", "fileid0", schema.toString()); |
| // Add the paths |
| FileInputFormat.setInputPaths(baseJobConf, partitionDir.getPath()); |
| |
| InputFormatTestUtil.simulateInserts(partitionDir, ".parquet", "fileid1", 1, "200"); |
| Map<String, List<String>> partitionToReplaceFileIds = new HashMap<>(); |
| List<String> replacedFileId = new ArrayList<>(); |
| replacedFileId.add("fileid0"); |
| partitionToReplaceFileIds.put("2016/05/01", replacedFileId); |
| createReplaceCommitFile(basePath, |
| "200", "2016/05/01", "2016/05/01/fileid10_1-0-1_200.parquet", "fileid10", partitionToReplaceFileIds); |
| |
| InputFormatTestUtil.setupIncremental(baseJobConf, "0", 1); |
| |
| HoodieParquetRealtimeInputFormat inputFormat = new HoodieParquetRealtimeInputFormat(); |
| inputFormat.setConf(baseJobConf); |
| InputSplit[] splits = inputFormat.getSplits(baseJobConf, 1); |
| assertTrue(splits.length == 1); |
| JobConf newJobConf = new JobConf(baseJobConf); |
| List<Schema.Field> fields = schema.getFields(); |
| setHiveColumnNameProps(fields, newJobConf, false); |
| newJobConf.set("columns.types", "string,string,string,string,string,string,string,string,bigint,string,string"); |
| RecordReader<NullWritable, ArrayWritable> reader = inputFormat.getRecordReader(splits[0], newJobConf, Reporter.NULL); |
| |
| // use reader to read log file. |
| NullWritable key = reader.createKey(); |
| ArrayWritable value = reader.createValue(); |
| while (reader.next(key, value)) { |
| Writable[] values = value.get(); |
| // since we set incremental start commit as 0 and commit_number as 1. |
| // the data belong to commit 100 should be read out. |
| assertEquals("100", values[0].toString()); |
| key = reader.createKey(); |
| value = reader.createValue(); |
| } |
| reader.close(); |
| } |
| |
| private void createReplaceCommitFile( |
| java.nio.file.Path basePath, |
| String commitNumber, |
| String partitionPath, |
| String filePath, |
| String fileId, |
| Map<String, List<String>> partitionToReplaceFileIds) throws IOException { |
| List<HoodieWriteStat> writeStats = new ArrayList<>(); |
| HoodieWriteStat writeStat = createHoodieWriteStat(basePath, commitNumber, partitionPath, filePath, fileId); |
| writeStats.add(writeStat); |
| HoodieReplaceCommitMetadata replaceMetadata = new HoodieReplaceCommitMetadata(); |
| replaceMetadata.setPartitionToReplaceFileIds(partitionToReplaceFileIds); |
| writeStats.forEach(stat -> replaceMetadata.addWriteStat(partitionPath, stat)); |
| File file = basePath.resolve(".hoodie") |
| .resolve(commitNumber + "_" + InProcessTimeGenerator.createNewInstantTime() + ".replacecommit").toFile(); |
| file.createNewFile(); |
| FileOutputStream fileOutputStream = new FileOutputStream(file); |
| fileOutputStream.write(serializeCommitMetadata(replaceMetadata).get()); |
| fileOutputStream.flush(); |
| fileOutputStream.close(); |
| } |
| |
| private HoodieWriteStat createHoodieWriteStat(java.nio.file.Path basePath, String commitNumber, String partitionPath, String filePath, String fileId) { |
| HoodieWriteStat writeStat = new HoodieWriteStat(); |
| writeStat.setFileId(fileId); |
| writeStat.setNumDeletes(0); |
| writeStat.setNumUpdateWrites(100); |
| writeStat.setNumWrites(100); |
| writeStat.setPath(filePath); |
| writeStat.setFileSizeInBytes(new File(new Path(basePath.toString(), filePath).toString()).length()); |
| writeStat.setPartitionPath(partitionPath); |
| writeStat.setTotalLogFilesCompacted(100L); |
| HoodieWriteStat.RuntimeStats runtimeStats = new HoodieWriteStat.RuntimeStats(); |
| runtimeStats.setTotalScanTime(100); |
| runtimeStats.setTotalCreateTime(100); |
| runtimeStats.setTotalUpsertTime(100); |
| writeStat.setRuntimeStats(runtimeStats); |
| return writeStat; |
| } |
| |
| private void createDeltaCommitFile( |
| java.nio.file.Path basePath, |
| String commitNumber, |
| String partitionPath, |
| String filePath, |
| String fileId, |
| String schemaStr) throws IOException { |
| List<HoodieWriteStat> writeStats = new ArrayList<>(); |
| HoodieWriteStat writeStat = createHoodieWriteStat(basePath, commitNumber, partitionPath, filePath, fileId); |
| writeStats.add(writeStat); |
| |
| HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata(); |
| writeStats.forEach(stat -> commitMetadata.addWriteStat(partitionPath, stat)); |
| if (schemaStr != null) { |
| commitMetadata.getExtraMetadata().put(HoodieCommitMetadata.SCHEMA_KEY, schemaStr); |
| } |
| File file = basePath.resolve(".hoodie") |
| .resolve(commitNumber + "_" + InProcessTimeGenerator.createNewInstantTime() + ".deltacommit").toFile(); |
| file.createNewFile(); |
| FileOutputStream fileOutputStream = new FileOutputStream(file); |
| fileOutputStream.write(serializeCommitMetadata(commitMetadata).get()); |
| fileOutputStream.flush(); |
| fileOutputStream.close(); |
| } |
| |
| @Test |
| public void testLogOnlyReader() throws Exception { |
| // initial commit |
| Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getEvolvedSchema()); |
| URI baseUri = basePath.toUri(); |
| HoodieTestUtils.init(hadoopConf, baseUri.toString(), HoodieTableType.MERGE_ON_READ); |
| String baseInstant = "100"; |
| File partitionDir = InputFormatTestUtil.prepareNonPartitionedParquetTable(basePath, schema, 1, 100, baseInstant, |
| HoodieTableType.MERGE_ON_READ); |
| FileCreateUtils.createDeltaCommit(basePath.toString(), baseInstant); |
| // Add the paths |
| FileInputFormat.setInputPaths(baseJobConf, partitionDir.toURI().toString()); |
| |
| FileSlice fileSlice = new FileSlice("default", baseInstant, "fileid1"); |
| try { |
| // update files or generate new log file |
| int logVersion = 1; |
| int baseInstantTs = Integer.parseInt(baseInstant); |
| String instantTime = String.valueOf(baseInstantTs + logVersion); |
| HoodieLogFormat.Writer writer = InputFormatTestUtil.writeDataBlockToLogFile(partitionDir, fs, schema, "fileid1", baseInstant, |
| instantTime, 100, 0, logVersion); |
| long size = writer.getCurrentSize(); |
| writer.close(); |
| assertTrue(size > 0, "block - size should be > 0"); |
| HoodieCommitMetadata commitMetadata = CommitUtils.buildMetadata(Collections.emptyList(), Collections.emptyMap(), Option.empty(), WriteOperationType.UPSERT, |
| schema.toString(), HoodieTimeline.COMMIT_ACTION); |
| FileCreateUtils.createDeltaCommit(basePath.toString(), instantTime, commitMetadata); |
| // create a split with new log file(s) |
| fileSlice.addLogFile(new HoodieLogFile(writer.getLogFile().getPath(), size)); |
| RealtimeFileStatus realtimeFileStatus = new RealtimeFileStatus( |
| new FileStatus(writer.getLogFile().getFileSize(), false, 1, 1, 0, writer.getLogFile().getPath()), |
| baseUri.toString(), |
| fileSlice.getLogFiles().collect(Collectors.toList()), |
| false, |
| Option.empty()); |
| realtimeFileStatus.setMaxCommitTime(instantTime); |
| HoodieRealtimePath realtimePath = (HoodieRealtimePath) realtimeFileStatus.getPath(); |
| HoodieRealtimeFileSplit split = |
| new HoodieRealtimeFileSplit(new FileSplit(realtimePath, 0, 0, new String[] {""}), realtimePath); |
| |
| JobConf newJobConf = new JobConf(baseJobConf); |
| List<Schema.Field> fields = schema.getFields(); |
| setHiveColumnNameProps(fields, newJobConf, false); |
| // create a dummy RecordReader to be used by HoodieRealtimeRecordReader |
| RecordReader<NullWritable, ArrayWritable> reader = new HoodieRealtimeRecordReader(split, newJobConf, new HoodieEmptyRecordReader(split, newJobConf)); |
| // use reader to read log file. |
| NullWritable key = reader.createKey(); |
| ArrayWritable value = reader.createValue(); |
| while (reader.next(key, value)) { |
| Writable[] values = value.get(); |
| assertEquals(instantTime, values[0].toString()); |
| key = reader.createKey(); |
| value = reader.createValue(); |
| } |
| reader.close(); |
| } catch (Exception e) { |
| throw new HoodieException(e.getMessage(), e); |
| } |
| } |
| |
| @Test |
| public void testRealtimeInputFormatEmptyFileSplit() throws Exception { |
| // Add the empty paths |
| String emptyPath = ClassLoader.getSystemResource("emptyFile").getPath(); |
| FileInputFormat.setInputPaths(baseJobConf, emptyPath); |
| |
| HoodieParquetRealtimeInputFormat inputFormat = new HoodieParquetRealtimeInputFormat(); |
| inputFormat.setConf(baseJobConf); |
| |
| InputSplit[] inputSplits = inputFormat.getSplits(baseJobConf, 10); |
| assertEquals(1, inputSplits.length); |
| assertEquals(true, inputSplits[0] instanceof RealtimeSplit); |
| |
| FileStatus[] files = inputFormat.listStatus(baseJobConf); |
| assertEquals(1, files.length); |
| assertEquals(true, files[0] instanceof RealtimeFileStatus); |
| } |
| |
| @Test |
| public void testIncrementalWithCompaction() throws Exception { |
| // initial commit |
| Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getEvolvedSchema()); |
| HoodieTestUtils.init(hadoopConf, basePath.toString(), HoodieTableType.MERGE_ON_READ); |
| String baseInstant = "100"; |
| File partitionDir = InputFormatTestUtil.prepareParquetTable(basePath, schema, 1, 100, baseInstant, |
| HoodieTableType.MERGE_ON_READ); |
| createDeltaCommitFile(basePath, baseInstant, "2016/05/01", "2016/05/01/fileid0_1-0-1_100.parquet", "fileid0", schema.toString()); |
| // Add the paths |
| FileInputFormat.setInputPaths(baseJobConf, partitionDir.getPath()); |
| |
| createCompactionFile(basePath, "125"); |
| |
| // add inserts after compaction timestamp |
| InputFormatTestUtil.simulateInserts(partitionDir, ".parquet", "fileId2", 5, "200"); |
| InputFormatTestUtil.commit(basePath, "200"); |
| |
| InputFormatTestUtil.setupIncremental(baseJobConf, "100", 10); |
| |
| // verify that incremental reads do NOT show inserts after compaction timestamp |
| HoodieParquetRealtimeInputFormat inputFormat = new HoodieParquetRealtimeInputFormat(); |
| inputFormat.setConf(baseJobConf); |
| InputSplit[] splits = inputFormat.getSplits(baseJobConf, 1); |
| assertTrue(splits.length == 0); |
| } |
| |
| @Test |
| public void testAvroToArrayWritable() throws IOException { |
| Schema schema = SchemaTestUtil.getEvolvedSchema(); |
| GenericRecord record = SchemaTestUtil.generateAvroRecordFromJson(schema, 1, "100", "100", false); |
| ArrayWritable aWritable = (ArrayWritable) HoodieRealtimeRecordReaderUtils.avroToArrayWritable(record, schema); |
| assertEquals(schema.getFields().size(), aWritable.get().length); |
| |
| // In some queries, generic records that Hudi gets are just part of the full records. |
| // Here test the case that some fields are missing in the record. |
| Schema schemaWithMetaFields = HoodieAvroUtils.addMetadataFields(schema); |
| ArrayWritable aWritable2 = (ArrayWritable) HoodieRealtimeRecordReaderUtils.avroToArrayWritable(record, schemaWithMetaFields); |
| assertEquals(schemaWithMetaFields.getFields().size(), aWritable2.get().length); |
| } |
| |
| private File createCompactionFile(java.nio.file.Path basePath, String commitTime) |
| throws IOException { |
| File file = basePath.resolve(".hoodie") |
| .resolve(HoodieTimeline.makeRequestedCompactionFileName(commitTime)).toFile(); |
| assertTrue(file.createNewFile()); |
| FileOutputStream os = new FileOutputStream(file); |
| try { |
| HoodieCompactionPlan compactionPlan = HoodieCompactionPlan.newBuilder().setVersion(2).build(); |
| // Write empty commit metadata |
| os.write(TimelineMetadataUtils.serializeCompactionPlan(compactionPlan).get()); |
| return file; |
| } finally { |
| os.close(); |
| } |
| } |
| } |