blob: 35ee557ceae8881ddfcff096408d84b4f207977c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.client;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.testutils.HoodieClientTestBase;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
@SuppressWarnings("unchecked")
/**
* Test-cases for covering HoodieReadClient APIs
*/
public class TestHoodieReadClient extends HoodieClientTestBase {
/**
* Test ReadFilter API after writing new records using HoodieWriteClient.insert.
*/
@Test
public void testReadFilterExistAfterInsert() throws Exception {
testReadFilterExist(getConfig(), SparkRDDWriteClient::insert);
}
/**
* Test ReadFilter API after writing new records using HoodieWriteClient.insertPrepped.
*/
@Test
public void testReadFilterExistAfterInsertPrepped() throws Exception {
testReadFilterExist(getConfig(), SparkRDDWriteClient::insertPreppedRecords);
}
/**
* Test ReadFilter API after writing new records using HoodieWriteClient.bulkInsert.
*/
@Test
public void testReadFilterExistAfterBulkInsert() throws Exception {
testReadFilterExist(getConfigBuilder().withBulkInsertParallelism(1).build(), SparkRDDWriteClient::bulkInsert);
}
/**
* Test ReadFilter API after writing new records using HoodieWriteClient.bulkInsertPrepped.
*/
@Test
public void testReadFilterExistAfterBulkInsertPrepped() throws Exception {
testReadFilterExist(getConfigBuilder().withBulkInsertParallelism(1).build(),
(writeClient, recordRDD, instantTime) -> {
return writeClient.bulkInsertPreppedRecords(recordRDD, instantTime, Option.empty());
});
}
@Test
public void testReadROViewFailsWithoutSqlContext() {
HoodieReadClient readClient = new HoodieReadClient(context, getConfig());
JavaRDD<HoodieKey> recordsRDD = jsc.parallelize(new ArrayList<>(), 1);
assertThrows(IllegalStateException.class, () -> {
readClient.readROView(recordsRDD, 1);
});
}
/**
* Helper to write new records using one of HoodieWriteClient's write API and use ReadClient to test filterExists()
* API works correctly.
*
* @param config Hoodie Write Config
* @param writeFn Write Function for writing records
* @throws Exception in case of error
*/
private void testReadFilterExist(HoodieWriteConfig config,
Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> writeFn) throws Exception {
try (SparkRDDWriteClient writeClient = getHoodieWriteClient(config);) {
HoodieReadClient readClient = getHoodieReadClient(config.getBasePath());
String newCommitTime = writeClient.startCommit();
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 100);
JavaRDD<HoodieRecord> recordsRDD = jsc.parallelize(records, 1);
JavaRDD<HoodieRecord> filteredRDD = readClient.filterExists(recordsRDD);
// Should not find any files
assertEquals(100, filteredRDD.collect().size());
JavaRDD<HoodieRecord> smallRecordsRDD = jsc.parallelize(records.subList(0, 75), 1);
// We create three parquet file, each having one record. (3 different partitions)
List<WriteStatus> statuses = writeFn.apply(writeClient, smallRecordsRDD, newCommitTime).collect();
// Verify there are no errors
assertNoWriteErrors(statuses);
HoodieReadClient anotherReadClient = getHoodieReadClient(config.getBasePath());
filteredRDD = anotherReadClient.filterExists(recordsRDD);
List<HoodieRecord> result = filteredRDD.collect();
// Check results
assertEquals(25, result.size());
// check path exists for written keys
JavaPairRDD<HoodieKey, Option<String>> keyToPathPair =
anotherReadClient.checkExists(recordsRDD.map(HoodieRecord::getKey));
JavaRDD<HoodieKey> keysWithPaths = keyToPathPair.filter(keyPath -> keyPath._2.isPresent())
.map(keyPath -> keyPath._1);
assertEquals(75, keysWithPaths.count());
// verify rows match inserted records
Dataset<Row> rows = anotherReadClient.readROView(keysWithPaths, 1);
assertEquals(75, rows.count());
JavaRDD<HoodieKey> keysWithoutPaths = keyToPathPair.filter(keyPath -> !keyPath._2.isPresent())
.map(keyPath -> keyPath._1);
assertThrows(AnalysisException.class, () -> {
anotherReadClient.readROView(keysWithoutPaths, 1);
});
// Actual tests of getPendingCompactions method are in TestAsyncCompaction
// This is just testing empty list
assertEquals(0, anotherReadClient.getPendingCompactions().size());
}
}
/**
* Test tagLocation API after insert().
*/
@Test
public void testTagLocationAfterInsert() throws Exception {
testTagLocation(getConfig(), SparkRDDWriteClient::insert, SparkRDDWriteClient::upsert, false);
}
/**
* Test tagLocation API after insertPrepped().
*/
@Test
public void testTagLocationAfterInsertPrepped() throws Exception {
testTagLocation(getConfig(), SparkRDDWriteClient::insertPreppedRecords, SparkRDDWriteClient::upsertPreppedRecords,
true);
}
/**
* Test tagLocation API after bulk-insert().
*/
@Test
public void testTagLocationAfterBulkInsert() throws Exception {
testTagLocation(getConfigBuilder().withBulkInsertParallelism(1).build(), SparkRDDWriteClient::bulkInsert,
SparkRDDWriteClient::upsert, false);
}
/**
* Test tagLocation API after bulkInsertPrepped().
*/
@Test
public void testTagLocationAfterBulkInsertPrepped() throws Exception {
testTagLocation(
getConfigBuilder().withBulkInsertParallelism(1).build(), (writeClient, recordRDD, instantTime) -> writeClient
.bulkInsertPreppedRecords(recordRDD, instantTime, Option.empty()),
SparkRDDWriteClient::upsertPreppedRecords, true);
}
/**
* Helper method to test tagLocation after using different HoodieWriteClient write APIS.
*
* @param hoodieWriteConfig Write Config
* @param insertFn Hoodie Write Client first Insert API
* @param updateFn Hoodie Write Client upsert API
* @param isPrepped isPrepped flag.
* @throws Exception in case of error
*/
private void testTagLocation(HoodieWriteConfig hoodieWriteConfig,
Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> insertFn,
Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> updateFn, boolean isPrepped)
throws Exception {
try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig);) {
// Write 1 (only inserts)
String newCommitTime = "001";
String initCommitTime = "000";
int numRecords = 200;
JavaRDD<WriteStatus> result = insertFirstBatch(hoodieWriteConfig, client, newCommitTime, initCommitTime,
numRecords, insertFn, isPrepped, true, numRecords);
// Construct HoodieRecord from the WriteStatus but set HoodieKey, Data and HoodieRecordLocation accordingly
// since they have been modified in the DAG
JavaRDD<HoodieRecord> recordRDD =
jsc.parallelize(result.collect().stream().map(WriteStatus::getWrittenRecords).flatMap(Collection::stream)
.map(record -> new HoodieRecord(record.getKey(), null)).collect(Collectors.toList()));
// Should have 100 records in table (check using Index), all in locations marked at commit
HoodieReadClient readClient = getHoodieReadClient(hoodieWriteConfig.getBasePath());
List<HoodieRecord> taggedRecords = readClient.tagLocation(recordRDD).collect();
checkTaggedRecords(taggedRecords, newCommitTime);
// Write 2 (updates)
String prevCommitTime = newCommitTime;
newCommitTime = "004";
numRecords = 100;
String commitTimeBetweenPrevAndNew = "002";
result = updateBatch(hoodieWriteConfig, client, newCommitTime, prevCommitTime,
Option.of(Arrays.asList(commitTimeBetweenPrevAndNew)), initCommitTime, numRecords, updateFn, isPrepped, true,
numRecords, 200, 2);
recordRDD =
jsc.parallelize(result.collect().stream().map(WriteStatus::getWrittenRecords).flatMap(Collection::stream)
.map(record -> new HoodieRecord(record.getKey(), null)).collect(Collectors.toList()));
// Index should be able to locate all updates in correct locations.
readClient = getHoodieReadClient(hoodieWriteConfig.getBasePath());
taggedRecords = readClient.tagLocation(recordRDD).collect();
checkTaggedRecords(taggedRecords, newCommitTime);
}
}
}