blob: 4fb9f221cbdd7486ed90aa8b8c0882a8a4556a19 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.client;
import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIndexException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.SparkHoodieIndex;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.StructType;
import java.io.Serializable;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import scala.Tuple2;
/**
* Provides an RDD based API for accessing/filtering Hoodie tables, based on keys.
*/
public class HoodieReadClient<T extends HoodieRecordPayload> implements Serializable {
private static final long serialVersionUID = 1L;
/**
* TODO: We need to persist the index type into hoodie.properties and be able to access the index just with a simple
* basepath pointing to the table. Until, then just always assume a BloomIndex
*/
private final transient HoodieIndex<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> index;
private HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> hoodieTable;
private transient Option<SQLContext> sqlContextOpt;
private final transient HoodieSparkEngineContext context;
private final transient Configuration hadoopConf;
/**
* @param basePath path to Hoodie table
*/
public HoodieReadClient(HoodieSparkEngineContext context, String basePath) {
this(context, HoodieWriteConfig.newBuilder().withPath(basePath)
// by default we use HoodieBloomIndex
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).build());
}
/**
* @param context
* @param basePath
* @param sqlContext
*/
public HoodieReadClient(HoodieSparkEngineContext context, String basePath, SQLContext sqlContext) {
this(context, basePath);
this.sqlContextOpt = Option.of(sqlContext);
}
/**
* @param clientConfig instance of HoodieWriteConfig
*/
public HoodieReadClient(HoodieSparkEngineContext context, HoodieWriteConfig clientConfig) {
this.context = context;
this.hadoopConf = context.getHadoopConf().get();
final String basePath = clientConfig.getBasePath();
// Create a Hoodie table which encapsulated the commits and files visible
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, basePath, true);
this.hoodieTable = HoodieSparkTable.create(clientConfig, context, metaClient);
this.index = SparkHoodieIndex.createIndex(clientConfig);
this.sqlContextOpt = Option.empty();
}
/**
* Adds support for accessing Hoodie built tables from SparkSQL, as you normally would.
*
* @return SparkConf object to be used to construct the SparkContext by caller
*/
public static SparkConf addHoodieSupport(SparkConf conf) {
conf.set("spark.sql.hive.convertMetastoreParquet", "false");
return conf;
}
private void assertSqlContext() {
if (!sqlContextOpt.isPresent()) {
throw new IllegalStateException("SQLContext must be set, when performing dataframe operations");
}
}
private Option<String> convertToDataFilePath(Option<Pair<String, String>> partitionPathFileIDPair) {
if (partitionPathFileIDPair.isPresent()) {
HoodieBaseFile dataFile = hoodieTable.getBaseFileOnlyView()
.getLatestBaseFile(partitionPathFileIDPair.get().getLeft(), partitionPathFileIDPair.get().getRight()).get();
return Option.of(dataFile.getPath());
} else {
return Option.empty();
}
}
/**
* Given a bunch of hoodie keys, fetches all the individual records out as a data frame.
*
* @return a dataframe
*/
public Dataset<Row> readROView(JavaRDD<HoodieKey> hoodieKeys, int parallelism) {
assertSqlContext();
JavaPairRDD<HoodieKey, Option<Pair<String, String>>> lookupResultRDD = checkExists(hoodieKeys);
JavaPairRDD<HoodieKey, Option<String>> keyToFileRDD =
lookupResultRDD.mapToPair(r -> new Tuple2<>(r._1, convertToDataFilePath(r._2)));
List<String> paths = keyToFileRDD.filter(keyFileTuple -> keyFileTuple._2().isPresent())
.map(keyFileTuple -> keyFileTuple._2().get()).collect();
// record locations might be same for multiple keys, so need a unique list
Set<String> uniquePaths = new HashSet<>(paths);
Dataset<Row> originalDF = sqlContextOpt.get().read().parquet(uniquePaths.toArray(new String[uniquePaths.size()]));
StructType schema = originalDF.schema();
JavaPairRDD<HoodieKey, Row> keyRowRDD = originalDF.javaRDD().mapToPair(row -> {
HoodieKey key = new HoodieKey(row.getAs(HoodieRecord.RECORD_KEY_METADATA_FIELD),
row.getAs(HoodieRecord.PARTITION_PATH_METADATA_FIELD));
return new Tuple2<>(key, row);
});
// Now, we need to further filter out, for only rows that match the supplied hoodie keys
JavaRDD<Row> rowRDD = keyRowRDD.join(keyToFileRDD, parallelism).map(tuple -> tuple._2()._1());
return sqlContextOpt.get().createDataFrame(rowRDD, schema);
}
/**
* Checks if the given [Keys] exists in the hoodie table and returns [Key, Option[FullFilePath]] If the optional
* FullFilePath value is not present, then the key is not found. If the FullFilePath value is present, it is the path
* component (without scheme) of the URI underlying file
*/
public JavaPairRDD<HoodieKey, Option<Pair<String, String>>> checkExists(JavaRDD<HoodieKey> hoodieKeys) {
return index.tagLocation(hoodieKeys.map(k -> new HoodieRecord<>(k, null)), context, hoodieTable)
.mapToPair(hr -> new Tuple2<>(hr.getKey(), hr.isCurrentLocationKnown()
? Option.of(Pair.of(hr.getPartitionPath(), hr.getCurrentLocation().getFileId()))
: Option.empty())
);
}
/**
* Filter out HoodieRecords that already exists in the output folder. This is useful in deduplication.
*
* @param hoodieRecords Input RDD of Hoodie records.
* @return A subset of hoodieRecords RDD, with existing records filtered out.
*/
public JavaRDD<HoodieRecord<T>> filterExists(JavaRDD<HoodieRecord<T>> hoodieRecords) {
JavaRDD<HoodieRecord<T>> recordsWithLocation = tagLocation(hoodieRecords);
return recordsWithLocation.filter(v1 -> !v1.isCurrentLocationKnown());
}
/**
* Looks up the index and tags each incoming record with a location of a file that contains the row (if it is actually
* present). Input RDD should contain no duplicates if needed.
*
* @param hoodieRecords Input RDD of Hoodie records
* @return Tagged RDD of Hoodie records
*/
public JavaRDD<HoodieRecord<T>> tagLocation(JavaRDD<HoodieRecord<T>> hoodieRecords) throws HoodieIndexException {
return index.tagLocation(hoodieRecords, context, hoodieTable);
}
/**
* Return all pending compactions with instant time for clients to decide what to compact next.
*
* @return
*/
public List<Pair<String, HoodieCompactionPlan>> getPendingCompactions() {
HoodieTableMetaClient metaClient =
new HoodieTableMetaClient(hadoopConf, hoodieTable.getMetaClient().getBasePath(), true);
return CompactionUtils.getAllPendingCompactionPlans(metaClient).stream()
.map(
instantWorkloadPair -> Pair.of(instantWorkloadPair.getKey().getTimestamp(), instantWorkloadPair.getValue()))
.collect(Collectors.toList());
}
}