blob: a1dbe086e5987a5c4b1dc188a9c34957021a5602 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.common.testutils;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieActionInstant;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.common.HoodieCleanStat;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.HoodieLogFormat.Writer;
import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieInstant.State;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.table.timeline.versioning.clean.CleanPlanV2MigrationHandler;
import org.apache.hudi.common.util.CleanerUtils;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieIOException;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.esotericsoftware.kryo.serializers.JavaSerializer;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.util.StringUtils;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.junit.jupiter.api.Assertions.fail;
/**
* A utility class for testing.
*/
public class HoodieTestUtils {
public static final String RAW_TRIPS_TEST_NAME = "raw_trips";
public static final String DEFAULT_WRITE_TOKEN = "1-0-1";
public static final int DEFAULT_LOG_VERSION = 1;
public static final String[] DEFAULT_PARTITION_PATHS = {"2016/03/15", "2015/03/16", "2015/03/17"};
private static Random rand = new Random(46474747);
public static Configuration getDefaultHadoopConf() {
return new Configuration();
}
public static HoodieTableMetaClient init(String basePath) throws IOException {
return init(basePath, HoodieTableType.COPY_ON_WRITE);
}
public static HoodieTableMetaClient init(String basePath, HoodieTableType tableType) throws IOException {
return init(getDefaultHadoopConf(), basePath, tableType);
}
public static HoodieTableMetaClient init(String basePath, HoodieTableType tableType, String bootstrapBasePath) throws IOException {
Properties props = new Properties();
props.setProperty(HoodieTableConfig.HOODIE_BOOTSTRAP_BASE_PATH, bootstrapBasePath);
return init(getDefaultHadoopConf(), basePath, tableType, props);
}
public static HoodieTableMetaClient init(String basePath, HoodieFileFormat baseFileFormat) throws IOException {
return init(getDefaultHadoopConf(), basePath, HoodieTableType.COPY_ON_WRITE, baseFileFormat);
}
public static HoodieTableMetaClient init(Configuration hadoopConf, String basePath) throws IOException {
return init(hadoopConf, basePath, HoodieTableType.COPY_ON_WRITE);
}
public static HoodieTableMetaClient init(Configuration hadoopConf, String basePath, HoodieTableType tableType)
throws IOException {
return init(hadoopConf, basePath, tableType, new Properties());
}
public static HoodieTableMetaClient init(Configuration hadoopConf, String basePath, HoodieTableType tableType,
String tableName)
throws IOException {
Properties properties = new Properties();
properties.setProperty(HoodieTableConfig.HOODIE_TABLE_NAME_PROP_NAME, tableName);
return init(hadoopConf, basePath, tableType, properties);
}
public static HoodieTableMetaClient init(Configuration hadoopConf, String basePath, HoodieTableType tableType,
HoodieFileFormat baseFileFormat)
throws IOException {
Properties properties = new Properties();
properties.setProperty(HoodieTableConfig.HOODIE_BASE_FILE_FORMAT_PROP_NAME, baseFileFormat.toString());
return init(hadoopConf, basePath, tableType, properties);
}
public static HoodieTableMetaClient init(Configuration hadoopConf, String basePath, HoodieTableType tableType,
Properties properties)
throws IOException {
properties.putIfAbsent(HoodieTableConfig.HOODIE_TABLE_NAME_PROP_NAME, RAW_TRIPS_TEST_NAME);
properties.putIfAbsent(HoodieTableConfig.HOODIE_TABLE_TYPE_PROP_NAME, tableType.name());
properties.putIfAbsent(HoodieTableConfig.HOODIE_PAYLOAD_CLASS_PROP_NAME, HoodieAvroPayload.class.getName());
return HoodieTableMetaClient.initTableAndGetMetaClient(hadoopConf, basePath, properties);
}
/**
* @deprecated Use {@link HoodieTestTable} instead.
*/
public static void createCommitFiles(String basePath, String... instantTimes) throws IOException {
for (String instantTime : instantTimes) {
new File(
basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/"
+ HoodieTimeline.makeRequestedCommitFileName(instantTime)).createNewFile();
new File(
basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/"
+ HoodieTimeline.makeInflightCommitFileName(instantTime)).createNewFile();
new File(
basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + HoodieTimeline.makeCommitFileName(instantTime))
.createNewFile();
}
}
public static void createPendingCleanFiles(HoodieTableMetaClient metaClient, String... instantTimes) {
for (String instantTime : instantTimes) {
Arrays.asList(HoodieTimeline.makeRequestedCleanerFileName(instantTime),
HoodieTimeline.makeInflightCleanerFileName(instantTime))
.forEach(f -> {
FSDataOutputStream os = null;
try {
Path commitFile = new Path(Paths
.get(metaClient.getBasePath(), HoodieTableMetaClient.METAFOLDER_NAME, f).toString());
os = metaClient.getFs().create(commitFile, true);
// Write empty clean metadata
os.write(TimelineMetadataUtils.serializeCleanerPlan(
new HoodieCleanerPlan(new HoodieActionInstant("", "", ""), "", new HashMap<>(),
CleanPlanV2MigrationHandler.VERSION, new HashMap<>())).get());
} catch (IOException ioe) {
throw new HoodieIOException(ioe.getMessage(), ioe);
} finally {
if (null != os) {
try {
os.close();
} catch (IOException e) {
throw new HoodieIOException(e.getMessage(), e);
}
}
}
});
}
}
/**
* @deprecated Use {@link HoodieTestTable} instead.
*/
public static String createDataFile(String basePath, String partitionPath, String instantTime, String fileID)
throws IOException {
String folderPath = basePath + "/" + partitionPath + "/";
new File(folderPath).mkdirs();
new File(folderPath + FSUtils.makeDataFileName(instantTime, DEFAULT_WRITE_TOKEN, fileID)).createNewFile();
return fileID;
}
/**
* @deprecated Use {@link HoodieTestTable} instead.
*/
public static String createNewLogFile(FileSystem fs, String basePath, String partitionPath, String instantTime,
String fileID, Option<Integer> version) throws IOException {
String folderPath = basePath + "/" + partitionPath + "/";
boolean makeDir = fs.mkdirs(new Path(folderPath));
if (!makeDir) {
throw new IOException("cannot create directory for path " + folderPath);
}
boolean createFile = fs.createNewFile(new Path(folderPath + FSUtils.makeLogFileName(fileID, ".log", instantTime,
version.orElse(DEFAULT_LOG_VERSION), HoodieLogFormat.UNKNOWN_WRITE_TOKEN)));
if (!createFile) {
throw new IOException(
StringUtils.format("cannot create data file for commit %s and fileId %s", instantTime, fileID));
}
return fileID;
}
/**
* TODO: incorporate into {@link HoodieTestTable}.
*
* @deprecated Use {@link HoodieTestTable} instead.
*/
public static void createCompactionRequest(HoodieTableMetaClient metaClient, String instant,
List<Pair<String, FileSlice>> fileSliceList) throws IOException {
HoodieCompactionPlan plan = CompactionUtils.buildFromFileSlices(fileSliceList, Option.empty(), Option.empty());
HoodieInstant compactionInstant = new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, instant);
metaClient.getActiveTimeline().saveToCompactionRequested(compactionInstant,
TimelineMetadataUtils.serializeCompactionPlan(plan));
}
public static void createCleanFiles(HoodieTableMetaClient metaClient, String basePath,
String instantTime, Configuration configuration)
throws IOException {
createPendingCleanFiles(metaClient, instantTime);
Path commitFile = new Path(
basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + HoodieTimeline.makeCleanerFileName(instantTime));
FileSystem fs = FSUtils.getFs(basePath, configuration);
try (FSDataOutputStream os = fs.create(commitFile, true)) {
HoodieCleanStat cleanStats = new HoodieCleanStat(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS,
DEFAULT_PARTITION_PATHS[rand.nextInt(DEFAULT_PARTITION_PATHS.length)], new ArrayList<>(), new ArrayList<>(),
new ArrayList<>(), instantTime);
// Create the clean metadata
HoodieCleanMetadata cleanMetadata =
CleanerUtils.convertCleanMetadata(instantTime, Option.of(0L), Collections.singletonList(cleanStats));
// Write empty clean metadata
os.write(TimelineMetadataUtils.serializeCleanMetadata(cleanMetadata).get());
}
}
public static <T extends Serializable> T serializeDeserialize(T object, Class<T> clazz) {
// Using Kyro as the default serializer in Spark Jobs
Kryo kryo = new Kryo();
kryo.register(HoodieTableMetaClient.class, new JavaSerializer());
ByteArrayOutputStream baos = new ByteArrayOutputStream();
Output output = new Output(baos);
kryo.writeObject(output, object);
output.close();
Input input = new Input(new ByteArrayInputStream(baos.toByteArray()));
T deseralizedObject = kryo.readObject(input, clazz);
input.close();
return deseralizedObject;
}
/**
* @deprecated Use {@link HoodieTestTable} instead.
*/
public static void writeRecordsToLogFiles(FileSystem fs, String basePath, Schema schema,
List<HoodieRecord> updatedRecords) {
Map<HoodieRecordLocation, List<HoodieRecord>> groupedUpdated =
updatedRecords.stream().collect(Collectors.groupingBy(HoodieRecord::getCurrentLocation));
groupedUpdated.forEach((location, value) -> {
String partitionPath = value.get(0).getPartitionPath();
Writer logWriter;
try {
logWriter = HoodieLogFormat.newWriterBuilder().onParentPath(new Path(basePath, partitionPath))
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId(location.getFileId())
.overBaseCommit(location.getInstantTime()).withFs(fs).build();
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, location.getInstantTime());
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
logWriter.appendBlock(new HoodieAvroDataBlock(value.stream().map(r -> {
try {
GenericRecord val = (GenericRecord) r.getData().getInsertValue(schema).get();
HoodieAvroUtils.addHoodieKeyToRecord(val, r.getRecordKey(), r.getPartitionPath(), "");
return (IndexedRecord) val;
} catch (IOException e) {
return null;
}
}).collect(Collectors.toList()), header));
logWriter.close();
} catch (Exception e) {
fail(e.toString());
}
});
}
// TODO: should be removed
public static FileStatus[] listAllDataFilesInPath(FileSystem fs, String basePath) throws IOException {
return listAllDataFilesInPath(fs, basePath, HoodieFileFormat.PARQUET.getFileExtension());
}
public static FileStatus[] listAllDataFilesInPath(FileSystem fs, String basePath, String datafileExtension)
throws IOException {
RemoteIterator<LocatedFileStatus> itr = fs.listFiles(new Path(basePath), true);
List<FileStatus> returns = new ArrayList<>();
while (itr.hasNext()) {
LocatedFileStatus status = itr.next();
if (status.getPath().getName().contains(datafileExtension) && !status.getPath().getName().contains(".marker")) {
returns.add(status);
}
}
return returns.toArray(new FileStatus[returns.size()]);
}
public static FileStatus[] listAllLogFilesInPath(FileSystem fs, String basePath)
throws IOException {
RemoteIterator<LocatedFileStatus> itr = fs.listFiles(new Path(basePath), true);
List<FileStatus> returns = new ArrayList<>();
while (itr.hasNext()) {
LocatedFileStatus status = itr.next();
if (status.getPath().getName().contains(HoodieFileFormat.HOODIE_LOG.getFileExtension())) {
returns.add(status);
}
}
return returns.toArray(new FileStatus[returns.size()]);
}
public static FileStatus[] listAllDataFilesAndLogFilesInPath(FileSystem fs, String basePath) throws IOException {
return Stream.concat(Arrays.stream(listAllDataFilesInPath(fs, basePath)), Arrays.stream(listAllLogFilesInPath(fs, basePath)))
.toArray(FileStatus[]::new);
}
public static List<HoodieWriteStat> generateFakeHoodieWriteStat(int limit) {
List<HoodieWriteStat> writeStatList = new ArrayList<>();
for (int i = 0; i < limit; i++) {
HoodieWriteStat writeStat = new HoodieWriteStat();
writeStat.setFileId(UUID.randomUUID().toString());
writeStat.setNumDeletes(0);
writeStat.setNumUpdateWrites(100);
writeStat.setNumWrites(100);
writeStat.setPath("/some/fake/path" + i);
writeStat.setPartitionPath("/some/fake/partition/path" + i);
writeStat.setTotalLogFilesCompacted(100L);
RuntimeStats runtimeStats = new RuntimeStats();
runtimeStats.setTotalScanTime(100);
runtimeStats.setTotalCreateTime(100);
runtimeStats.setTotalUpsertTime(100);
writeStat.setRuntimeStats(runtimeStats);
writeStatList.add(writeStat);
}
return writeStatList;
}
}