blob: c2faa83c8ab141f57e856e6cf269b453abab14cf [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.testutils;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.HoodieAvroWriteSupport;
import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.bloom.BloomFilterFactory;
import org.apache.hudi.common.bloom.BloomFilterTypeCode;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.testutils.FileCreateUtils;
import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.config.HoodieStorageConfig;
import org.apache.hudi.io.storage.HoodieAvroParquetConfig;
import org.apache.hudi.io.storage.HoodieParquetWriter;
import org.apache.hudi.table.HoodieTable;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroSchemaConverter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import java.nio.file.Paths;
import java.util.UUID;
import static org.apache.hudi.common.testutils.FileCreateUtils.baseFileName;
public class HoodieWriteableTestTable extends HoodieTestTable {
private final Schema schema;
private final BloomFilter filter;
private HoodieWriteableTestTable(String basePath, FileSystem fs, HoodieTableMetaClient metaClient, Schema schema, BloomFilter filter) {
super(basePath, fs, metaClient);
this.schema = schema;
this.filter = filter;
}
public static HoodieWriteableTestTable of(HoodieTableMetaClient metaClient, Schema schema, BloomFilter filter) {
return new HoodieWriteableTestTable(metaClient.getBasePath(), metaClient.getRawFs(), metaClient, schema, filter);
}
public static HoodieWriteableTestTable of(HoodieTableMetaClient metaClient, Schema schema) {
BloomFilter filter = BloomFilterFactory
.createBloomFilter(10000, 0.0000001, -1, BloomFilterTypeCode.SIMPLE.name());
return of(metaClient, schema, filter);
}
public static HoodieWriteableTestTable of(HoodieTable hoodieTable, Schema schema) {
HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
return of(metaClient, schema);
}
public static HoodieWriteableTestTable of(HoodieTable hoodieTable, Schema schema, BloomFilter filter) {
HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
return of(metaClient, schema, filter);
}
@Override
public HoodieWriteableTestTable addCommit(String instantTime) throws Exception {
return (HoodieWriteableTestTable) super.addCommit(instantTime);
}
@Override
public HoodieWriteableTestTable forCommit(String instantTime) {
return (HoodieWriteableTestTable) super.forCommit(instantTime);
}
public String withInserts(String partition) throws Exception {
return withInserts(partition, new HoodieRecord[0]);
}
public String withInserts(String partition, HoodieRecord... records) throws Exception {
String fileId = UUID.randomUUID().toString();
withInserts(partition, fileId, records);
return fileId;
}
public HoodieWriteableTestTable withInserts(String partition, String fileId) throws Exception {
return withInserts(partition, fileId, new HoodieRecord[0]);
}
public HoodieWriteableTestTable withInserts(String partition, String fileId, HoodieRecord... records) throws Exception {
FileCreateUtils.createPartitionMetaFile(basePath, partition);
String fileName = baseFileName(currentInstantTime, fileId);
HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(
new AvroSchemaConverter().convert(schema), schema, filter);
HoodieAvroParquetConfig config = new HoodieAvroParquetConfig(writeSupport, CompressionCodecName.GZIP,
ParquetWriter.DEFAULT_BLOCK_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE, 120 * 1024 * 1024,
new Configuration(), Double.parseDouble(HoodieStorageConfig.DEFAULT_STREAM_COMPRESSION_RATIO));
try (HoodieParquetWriter writer = new HoodieParquetWriter(
currentInstantTime,
new Path(Paths.get(basePath, partition, fileName).toString()),
config, schema, new SparkTaskContextSupplier())) {
int seqId = 1;
for (HoodieRecord record : records) {
GenericRecord avroRecord = (GenericRecord) record.getData().getInsertValue(schema).get();
HoodieAvroUtils.addCommitMetadataToRecord(avroRecord, currentInstantTime, String.valueOf(seqId++));
HoodieAvroUtils.addHoodieKeyToRecord(avroRecord, record.getRecordKey(), record.getPartitionPath(), fileName);
writer.writeAvro(record.getRecordKey(), avroRecord);
filter.add(record.getRecordKey());
}
}
return this;
}
}