blob: 67b1c672ec86b2fdfae0255d701f723b1fa89817 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.util.BaseFileUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ParquetUtils;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.index.zorder.ZOrderingIndexHelper;
import org.apache.hudi.optimize.HilbertCurveUtils;
import org.apache.hudi.optimize.ZOrderingUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.io.api.Binary;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.Row$;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.hudi.execution.RangeSampleSort$;
import org.apache.spark.sql.hudi.execution.ZorderingBinarySort;
import org.apache.spark.sql.types.BinaryType;
import org.apache.spark.sql.types.BinaryType$;
import org.apache.spark.sql.types.BooleanType;
import org.apache.spark.sql.types.ByteType;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DateType;
import org.apache.spark.sql.types.DecimalType;
import org.apache.spark.sql.types.DoubleType;
import org.apache.spark.sql.types.FloatType;
import org.apache.spark.sql.types.IntegerType;
import org.apache.spark.sql.types.LongType;
import org.apache.spark.sql.types.LongType$;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.ShortType;
import org.apache.spark.sql.types.StringType;
import org.apache.spark.sql.types.StringType$;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType$;
import org.apache.spark.sql.types.TimestampType;
import org.apache.spark.util.SerializableConfiguration;
import org.davidmoten.hilbert.HilbertCurve;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import scala.collection.JavaConversions;
public class OrderingIndexHelper {
private static final String SPARK_JOB_DESCRIPTION = "spark.job.description";
/**
* Create optimized DataFrame directly
* only support base type data. long,int,short,double,float,string,timestamp,decimal,date,byte
* this method is more effective than createOptimizeDataFrameBySample
*
* @param df a spark DataFrame holds parquet files to be read.
* @param sortCols ordering columns for the curve
* @param fileNum spark partition num
* @param sortMode layout optimization strategy
* @return a dataFrame ordered by the curve.
*/
public static Dataset<Row> createOptimizedDataFrameByMapValue(Dataset<Row> df, List<String> sortCols, int fileNum, String sortMode) {
Map<String, StructField> columnsMap = Arrays.stream(df.schema().fields()).collect(Collectors.toMap(e -> e.name(), e -> e));
int fieldNum = df.schema().fields().length;
List<String> checkCols = sortCols.stream().filter(f -> columnsMap.containsKey(f)).collect(Collectors.toList());
if (sortCols.size() != checkCols.size()) {
return df;
}
// only one col to sort, no need to use z-order
if (sortCols.size() == 1) {
return df.repartitionByRange(fieldNum, org.apache.spark.sql.functions.col(sortCols.get(0)));
}
Map<Integer, StructField> fieldMap = sortCols
.stream().collect(Collectors.toMap(e -> Arrays.asList(df.schema().fields()).indexOf(columnsMap.get(e)), e -> columnsMap.get(e)));
// do optimize
JavaRDD<Row> sortedRDD = null;
switch (HoodieClusteringConfig.BuildLayoutOptimizationStrategy.fromValue(sortMode)) {
case ZORDER:
sortedRDD = createZCurveSortedRDD(df.toJavaRDD(), fieldMap, fieldNum, fileNum);
break;
case HILBERT:
sortedRDD = createHilbertSortedRDD(df.toJavaRDD(), fieldMap, fieldNum, fileNum);
break;
default:
throw new IllegalArgumentException(String.format("new only support z-order/hilbert optimize but find: %s", sortMode));
}
// create new StructType
List<StructField> newFields = new ArrayList<>();
newFields.addAll(Arrays.asList(df.schema().fields()));
newFields.add(new StructField("Index", BinaryType$.MODULE$, true, Metadata.empty()));
// create new DataFrame
return df.sparkSession().createDataFrame(sortedRDD, StructType$.MODULE$.apply(newFields)).drop("Index");
}
private static JavaRDD<Row> createZCurveSortedRDD(JavaRDD<Row> originRDD, Map<Integer, StructField> fieldMap, int fieldNum, int fileNum) {
return originRDD.map(row -> {
List<byte[]> zBytesList = fieldMap.entrySet().stream().map(entry -> {
int index = entry.getKey();
StructField field = entry.getValue();
DataType dataType = field.dataType();
if (dataType instanceof LongType) {
return ZOrderingUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getLong(index));
} else if (dataType instanceof DoubleType) {
return ZOrderingUtil.doubleTo8Byte(row.isNullAt(index) ? Double.MAX_VALUE : row.getDouble(index));
} else if (dataType instanceof IntegerType) {
return ZOrderingUtil.intTo8Byte(row.isNullAt(index) ? Integer.MAX_VALUE : row.getInt(index));
} else if (dataType instanceof FloatType) {
return ZOrderingUtil.doubleTo8Byte(row.isNullAt(index) ? Float.MAX_VALUE : row.getFloat(index));
} else if (dataType instanceof StringType) {
return ZOrderingUtil.utf8To8Byte(row.isNullAt(index) ? "" : row.getString(index));
} else if (dataType instanceof DateType) {
return ZOrderingUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getDate(index).getTime());
} else if (dataType instanceof TimestampType) {
return ZOrderingUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getTimestamp(index).getTime());
} else if (dataType instanceof ByteType) {
return ZOrderingUtil.byteTo8Byte(row.isNullAt(index) ? Byte.MAX_VALUE : row.getByte(index));
} else if (dataType instanceof ShortType) {
return ZOrderingUtil.intTo8Byte(row.isNullAt(index) ? Short.MAX_VALUE : row.getShort(index));
} else if (dataType instanceof DecimalType) {
return ZOrderingUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getDecimal(index).longValue());
} else if (dataType instanceof BooleanType) {
boolean value = row.isNullAt(index) ? false : row.getBoolean(index);
return ZOrderingUtil.intTo8Byte(value ? 1 : 0);
} else if (dataType instanceof BinaryType) {
return ZOrderingUtil.paddingTo8Byte(row.isNullAt(index) ? new byte[] {0} : (byte[]) row.get(index));
}
return null;
}).filter(f -> f != null).collect(Collectors.toList());
byte[][] zBytes = new byte[zBytesList.size()][];
for (int i = 0; i < zBytesList.size(); i++) {
zBytes[i] = zBytesList.get(i);
}
List<Object> zVaules = new ArrayList<>();
zVaules.addAll(scala.collection.JavaConverters.bufferAsJavaListConverter(row.toSeq().toBuffer()).asJava());
zVaules.add(ZOrderingUtil.interleaving(zBytes, 8));
return Row$.MODULE$.apply(JavaConversions.asScalaBuffer(zVaules));
}).sortBy(f -> new ZorderingBinarySort((byte[]) f.get(fieldNum)), true, fileNum);
}
private static JavaRDD<Row> createHilbertSortedRDD(JavaRDD<Row> originRDD, Map<Integer, StructField> fieldMap, int fieldNum, int fileNum) {
return originRDD.mapPartitions(rows -> {
HilbertCurve hilbertCurve = HilbertCurve.bits(63).dimensions(fieldMap.size());
return new Iterator<Row>() {
@Override
public boolean hasNext() {
return rows.hasNext();
}
@Override
public Row next() {
Row row = rows.next();
List<Long> longList = fieldMap.entrySet().stream().map(entry -> {
int index = entry.getKey();
StructField field = entry.getValue();
DataType dataType = field.dataType();
if (dataType instanceof LongType) {
return row.isNullAt(index) ? Long.MAX_VALUE : row.getLong(index);
} else if (dataType instanceof DoubleType) {
return row.isNullAt(index) ? Long.MAX_VALUE : Double.doubleToLongBits(row.getDouble(index));
} else if (dataType instanceof IntegerType) {
return row.isNullAt(index) ? Long.MAX_VALUE : (long)row.getInt(index);
} else if (dataType instanceof FloatType) {
return row.isNullAt(index) ? Long.MAX_VALUE : Double.doubleToLongBits((double) row.getFloat(index));
} else if (dataType instanceof StringType) {
return row.isNullAt(index) ? Long.MAX_VALUE : ZOrderingUtil.convertStringToLong(row.getString(index));
} else if (dataType instanceof DateType) {
return row.isNullAt(index) ? Long.MAX_VALUE : row.getDate(index).getTime();
} else if (dataType instanceof TimestampType) {
return row.isNullAt(index) ? Long.MAX_VALUE : row.getTimestamp(index).getTime();
} else if (dataType instanceof ByteType) {
return row.isNullAt(index) ? Long.MAX_VALUE : ZOrderingUtil.convertBytesToLong(new byte[] {row.getByte(index)});
} else if (dataType instanceof ShortType) {
return row.isNullAt(index) ? Long.MAX_VALUE : (long)row.getShort(index);
} else if (dataType instanceof DecimalType) {
return row.isNullAt(index) ? Long.MAX_VALUE : row.getDecimal(index).longValue();
} else if (dataType instanceof BooleanType) {
boolean value = row.isNullAt(index) ? false : row.getBoolean(index);
return value ? Long.MAX_VALUE : 0;
} else if (dataType instanceof BinaryType) {
return row.isNullAt(index) ? Long.MAX_VALUE : ZOrderingUtil.convertBytesToLong((byte[]) row.get(index));
}
return null;
}).filter(f -> f != null).collect(Collectors.toList());
byte[] hilbertValue = HilbertCurveUtils.indexBytes(
hilbertCurve, longList.stream().mapToLong(l -> l).toArray(), 63);
List<Object> values = new ArrayList<>();
values.addAll(scala.collection.JavaConverters.bufferAsJavaListConverter(row.toSeq().toBuffer()).asJava());
values.add(hilbertValue);
return Row$.MODULE$.apply(JavaConversions.asScalaBuffer(values));
}
};
}).sortBy(f -> new ZorderingBinarySort((byte[]) f.get(fieldNum)), true, fileNum);
}
public static Dataset<Row> createOptimizedDataFrameByMapValue(Dataset<Row> df, String sortCols, int fileNum, String sortMode) {
if (sortCols == null || sortCols.isEmpty() || fileNum <= 0) {
return df;
}
return createOptimizedDataFrameByMapValue(df,
Arrays.stream(sortCols.split(",")).map(f -> f.trim()).collect(Collectors.toList()), fileNum, sortMode);
}
public static Dataset<Row> createOptimizeDataFrameBySample(Dataset<Row> df, List<String> zCols, int fileNum, String sortMode) {
return RangeSampleSort$.MODULE$.sortDataFrameBySample(df, JavaConversions.asScalaBuffer(zCols), fileNum, sortMode);
}
public static Dataset<Row> createOptimizeDataFrameBySample(Dataset<Row> df, String zCols, int fileNum, String sortMode) {
if (zCols == null || zCols.isEmpty() || fileNum <= 0) {
return df;
}
return createOptimizeDataFrameBySample(df, Arrays.stream(zCols.split(",")).map(f -> f.trim()).collect(Collectors.toList()), fileNum, sortMode);
}
/**
* Parse min/max statistics stored in parquet footers for z-sort cols.
* no support collect statistics from timeStampType, since parquet file has not collect the statistics for timeStampType.
* to do adapt for rfc-27
*
* @param df a spark DataFrame holds parquet files to be read.
* @param cols z-sort cols
* @return a dataFrame holds all statistics info.
*/
public static Dataset<Row> getMinMaxValue(Dataset<Row> df, List<String> cols) {
Map<String, DataType> columnsMap = Arrays.stream(df.schema().fields()).collect(Collectors.toMap(e -> e.name(), e -> e.dataType()));
List<String> scanFiles = Arrays.asList(df.inputFiles());
SparkContext sc = df.sparkSession().sparkContext();
JavaSparkContext jsc = new JavaSparkContext(sc);
SerializableConfiguration serializableConfiguration = new SerializableConfiguration(sc.hadoopConfiguration());
int numParallelism = (scanFiles.size() / 3 + 1);
List<HoodieColumnRangeMetadata<Comparable>> colMinMaxInfos;
String previousJobDescription = sc.getLocalProperty(SPARK_JOB_DESCRIPTION);
try {
jsc.setJobDescription("Listing parquet column statistics");
colMinMaxInfos = jsc.parallelize(scanFiles, numParallelism).mapPartitions(paths -> {
Configuration conf = serializableConfiguration.value();
ParquetUtils parquetUtils = (ParquetUtils) BaseFileUtils.getInstance(HoodieFileFormat.PARQUET);
List<Collection<HoodieColumnRangeMetadata<Comparable>>> results = new ArrayList<>();
while (paths.hasNext()) {
String path = paths.next();
results.add(parquetUtils.readRangeFromParquetMetadata(conf, new Path(path), cols));
}
return results.stream().flatMap(f -> f.stream()).iterator();
}).collect();
} finally {
jsc.setJobDescription(previousJobDescription);
}
Map<String, List<HoodieColumnRangeMetadata<Comparable>>> fileToStatsListMap = colMinMaxInfos.stream().collect(Collectors.groupingBy(e -> e.getFilePath()));
JavaRDD<Row> allMetaDataRDD = jsc.parallelize(new ArrayList<>(fileToStatsListMap.values()), 1).map(f -> {
int colSize = f.size();
if (colSize == 0) {
return null;
} else {
List<Object> rows = new ArrayList<>();
rows.add(f.get(0).getFilePath());
cols.stream().forEach(col -> {
HoodieColumnRangeMetadata<Comparable> currentColRangeMetaData =
f.stream().filter(s -> s.getColumnName().trim().equalsIgnoreCase(col)).findFirst().orElse(null);
DataType colType = columnsMap.get(col);
if (currentColRangeMetaData == null || colType == null) {
throw new HoodieException(String.format("cannot collect min/max statistics for col: %s", col));
}
if (colType instanceof IntegerType) {
rows.add(currentColRangeMetaData.getMinValue());
rows.add(currentColRangeMetaData.getMaxValue());
} else if (colType instanceof DoubleType) {
rows.add(currentColRangeMetaData.getMinValue());
rows.add(currentColRangeMetaData.getMaxValue());
} else if (colType instanceof StringType) {
rows.add(currentColRangeMetaData.getMinValue().toString());
rows.add(currentColRangeMetaData.getMaxValue().toString());
} else if (colType instanceof DecimalType) {
rows.add(new BigDecimal(currentColRangeMetaData.getMinValue().toString()));
rows.add(new BigDecimal(currentColRangeMetaData.getMaxValue().toString()));
} else if (colType instanceof DateType) {
rows.add(java.sql.Date.valueOf(currentColRangeMetaData.getMinValue().toString()));
rows.add(java.sql.Date.valueOf(currentColRangeMetaData.getMaxValue().toString()));
} else if (colType instanceof LongType) {
rows.add(currentColRangeMetaData.getMinValue());
rows.add(currentColRangeMetaData.getMaxValue());
} else if (colType instanceof ShortType) {
rows.add(Short.parseShort(currentColRangeMetaData.getMinValue().toString()));
rows.add(Short.parseShort(currentColRangeMetaData.getMaxValue().toString()));
} else if (colType instanceof FloatType) {
rows.add(currentColRangeMetaData.getMinValue());
rows.add(currentColRangeMetaData.getMaxValue());
} else if (colType instanceof BinaryType) {
rows.add(((Binary)currentColRangeMetaData.getMinValue()).getBytes());
rows.add(((Binary)currentColRangeMetaData.getMaxValue()).getBytes());
} else if (colType instanceof BooleanType) {
rows.add(currentColRangeMetaData.getMinValue());
rows.add(currentColRangeMetaData.getMaxValue());
} else if (colType instanceof ByteType) {
rows.add(Byte.valueOf(currentColRangeMetaData.getMinValue().toString()));
rows.add(Byte.valueOf(currentColRangeMetaData.getMaxValue().toString()));
} else {
throw new HoodieException(String.format("Not support type: %s", colType));
}
rows.add(currentColRangeMetaData.getNumNulls());
});
return Row$.MODULE$.apply(JavaConversions.asScalaBuffer(rows));
}
}).filter(f -> f != null);
List<StructField> allMetaDataSchema = new ArrayList<>();
allMetaDataSchema.add(new StructField("file", StringType$.MODULE$, true, Metadata.empty()));
cols.forEach(col -> {
allMetaDataSchema.add(new StructField(col + "_minValue", columnsMap.get(col), true, Metadata.empty()));
allMetaDataSchema.add(new StructField(col + "_maxValue", columnsMap.get(col), true, Metadata.empty()));
allMetaDataSchema.add(new StructField(col + "_num_nulls", LongType$.MODULE$, true, Metadata.empty()));
});
return df.sparkSession().createDataFrame(allMetaDataRDD, StructType$.MODULE$.apply(allMetaDataSchema));
}
public static Dataset<Row> getMinMaxValue(Dataset<Row> df, String cols) {
List<String> rawCols = Arrays.asList(cols.split(",")).stream().map(f -> f.trim()).collect(Collectors.toList());
return getMinMaxValue(df, rawCols);
}
/**
* Update statistics info.
* this method will update old index table by full out join,
* and save the updated table into a new index table based on commitTime.
* old index table will be cleaned also.
*
* @param df a spark DataFrame holds parquet files to be read.
* @param cols z-sort cols.
* @param indexPath index store path.
* @param commitTime current operation commitTime.
* @param validateCommits all validate commits for current table.
* @return
*/
public static void saveStatisticsInfo(Dataset<Row> df, String cols, String indexPath, String commitTime, List<String> validateCommits) {
Path savePath = new Path(indexPath, commitTime);
SparkSession spark = df.sparkSession();
FileSystem fs = FSUtils.getFs(indexPath, spark.sparkContext().hadoopConfiguration());
Dataset<Row> statisticsDF = OrderingIndexHelper.getMinMaxValue(df, cols);
// try to find last validate index table from index path
try {
// If there's currently no index, create one
if (!fs.exists(new Path(indexPath))) {
statisticsDF.repartition(1).write().mode("overwrite").save(savePath.toString());
return;
}
// Otherwise, clean up all indexes but the most recent one
List<String> allIndexTables = Arrays
.stream(fs.listStatus(new Path(indexPath))).filter(f -> f.isDirectory()).map(f -> f.getPath().getName()).collect(Collectors.toList());
List<String> candidateIndexTables = allIndexTables.stream().filter(f -> validateCommits.contains(f)).sorted().collect(Collectors.toList());
List<String> residualTables = allIndexTables.stream().filter(f -> !validateCommits.contains(f)).collect(Collectors.toList());
Option<Dataset> latestIndexData = Option.empty();
if (!candidateIndexTables.isEmpty()) {
latestIndexData = Option.of(spark.read().load(new Path(indexPath, candidateIndexTables.get(candidateIndexTables.size() - 1)).toString()));
// clean old index table, keep at most 1 index table.
candidateIndexTables.remove(candidateIndexTables.size() - 1);
candidateIndexTables.forEach(f -> {
try {
fs.delete(new Path(indexPath, f));
} catch (IOException ie) {
throw new HoodieException(ie);
}
});
}
// clean residualTables
// retried cluster operations at the same instant time is also considered,
// the residual files produced by retried are cleaned up before save statistics
// save statistics info to index table which named commitTime
residualTables.forEach(f -> {
try {
fs.delete(new Path(indexPath, f));
} catch (IOException ie) {
throw new HoodieException(ie);
}
});
if (latestIndexData.isPresent() && latestIndexData.get().schema().equals(statisticsDF.schema())) {
// update the statistics info
String originalTable = "indexTable_" + java.util.UUID.randomUUID().toString().replace("-", "");
String updateTable = "updateTable_" + java.util.UUID.randomUUID().toString().replace("-", "");
latestIndexData.get().registerTempTable(originalTable);
statisticsDF.registerTempTable(updateTable);
// update table by full out join
List columns = Arrays.asList(statisticsDF.schema().fieldNames());
spark.sql(ZOrderingIndexHelper.createIndexMergeSql(originalTable, updateTable, columns)).repartition(1).write().save(savePath.toString());
} else {
statisticsDF.repartition(1).write().mode("overwrite").save(savePath.toString());
}
} catch (IOException e) {
throw new HoodieException(e);
}
}
}