blob: 37b8cdca88bbcca3d488bc531836f7307cbf9cbf [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.io.storage;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.bloom.BloomFilterFactory;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.testutils.HoodieClientTestHarness;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.testutils.SparkDatasetTestUtils;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.InternalRow;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import static org.junit.jupiter.api.Assertions.assertEquals;
/**
* Unit tests {@link HoodieInternalRowParquetWriter}.
*/
public class TestHoodieInternalRowParquetWriter extends HoodieClientTestHarness {
private static final Random RANDOM = new Random();
@BeforeEach
public void setUp() throws Exception {
initSparkContexts("TestHoodieInternalRowParquetWriter");
initPath();
initFileSystem();
initTestDataGenerator();
initMetaClient();
}
@AfterEach
public void tearDown() throws Exception {
cleanupResources();
}
@Test
public void endToEndTest() throws IOException {
HoodieWriteConfig cfg = SparkDatasetTestUtils.getConfigBuilder(basePath).build();
for (int i = 0; i < 5; i++) {
// init write support and parquet config
HoodieRowParquetWriteSupport writeSupport = getWriteSupport(cfg, hadoopConf);
HoodieRowParquetConfig parquetConfig = new HoodieRowParquetConfig(writeSupport,
CompressionCodecName.SNAPPY, cfg.getParquetBlockSize(), cfg.getParquetPageSize(), cfg.getParquetMaxFileSize(),
writeSupport.getHadoopConf(), cfg.getParquetCompressionRatio());
// prepare path
String fileId = UUID.randomUUID().toString();
Path filePath = new Path(basePath + "/" + fileId);
String partitionPath = HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
metaClient.getFs().mkdirs(new Path(basePath));
// init writer
HoodieInternalRowParquetWriter writer = new HoodieInternalRowParquetWriter(filePath, parquetConfig);
// generate input
int size = 10 + RANDOM.nextInt(100);
// Generate inputs
Dataset<Row> inputRows = SparkDatasetTestUtils.getRandomRows(sqlContext, size, partitionPath, false);
List<InternalRow> internalRows = SparkDatasetTestUtils.toInternalRows(inputRows, SparkDatasetTestUtils.ENCODER);
// issue writes
for (InternalRow internalRow : internalRows) {
writer.write(internalRow);
}
// close the writer
writer.close();
// verify rows
Dataset<Row> result = sqlContext.read().parquet(basePath);
assertEquals(0, inputRows.except(result).count());
}
}
private HoodieRowParquetWriteSupport getWriteSupport(HoodieWriteConfig writeConfig, Configuration hadoopConf) {
BloomFilter filter = BloomFilterFactory.createBloomFilter(
writeConfig.getBloomFilterNumEntries(),
writeConfig.getBloomFilterFPP(),
writeConfig.getDynamicBloomFilterMaxNumEntries(),
writeConfig.getBloomFilterType());
return new HoodieRowParquetWriteSupport(hadoopConf, SparkDatasetTestUtils.STRUCT_TYPE, filter);
}
}