blob: 1fe6800eb7e1e9beb143061a4b2d2da7013f6962 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.utilities.functional;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.testutils.FunctionalTestHarness;
import org.apache.hudi.testutils.HoodieClientTestUtils;
import org.apache.hudi.utilities.HDFSParquetImporter;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.io.Serializable;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@Tag("functional")
public class TestHDFSParquetImporter extends FunctionalTestHarness implements Serializable {
private String basePath;
private transient Path hoodieFolder;
private transient Path srcFolder;
private transient List<GenericRecord> insertData;
@BeforeEach
public void init() throws IOException, ParseException {
basePath = (new Path(dfsBasePath(), Thread.currentThread().getStackTrace()[1].getMethodName())).toString();
// Hoodie root folder.
hoodieFolder = new Path(basePath, "testTarget");
// Create generic records.
srcFolder = new Path(basePath, "testSrc");
insertData = createInsertRecords(srcFolder);
}
@AfterEach
public void clean() throws IOException {
dfs().delete(new Path(basePath), true);
}
/**
* Test successful data import with retries.
*/
@Test
public void testImportWithRetries() throws Exception {
// Create schema file.
String schemaFile = new Path(basePath, "file.schema").toString();
HDFSParquetImporter.Config cfg = getHDFSParquetImporterConfig(srcFolder.toString(), hoodieFolder.toString(),
"testTable", "COPY_ON_WRITE", "_row_key", "timestamp", 1, schemaFile);
AtomicInteger retry = new AtomicInteger(3);
AtomicInteger fileCreated = new AtomicInteger(0);
HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg) {
@Override
protected int dataImport(JavaSparkContext jsc) throws IOException {
int ret = super.dataImport(jsc);
if (retry.decrementAndGet() == 0) {
fileCreated.incrementAndGet();
createSchemaFile(schemaFile);
}
return ret;
}
};
// Schema file is not created so this operation should fail.
assertEquals(0, dataImporter.dataImport(jsc(), retry.get()));
assertEquals(-1, retry.get());
assertEquals(1, fileCreated.get());
// Check if
// 1. .commit file is present
// 2. number of records in each partition == 24
// 3. total number of partitions == 4;
boolean isCommitFilePresent = false;
Map<String, Long> recordCounts = new HashMap<String, Long>();
RemoteIterator<LocatedFileStatus> hoodieFiles = dfs().listFiles(hoodieFolder, true);
while (hoodieFiles.hasNext()) {
LocatedFileStatus f = hoodieFiles.next();
isCommitFilePresent = isCommitFilePresent || f.getPath().toString().endsWith(HoodieTimeline.COMMIT_EXTENSION);
if (f.getPath().toString().endsWith("parquet")) {
String partitionPath = f.getPath().getParent().toString();
long count = sqlContext().read().parquet(f.getPath().toString()).count();
if (!recordCounts.containsKey(partitionPath)) {
recordCounts.put(partitionPath, 0L);
}
recordCounts.put(partitionPath, recordCounts.get(partitionPath) + count);
}
}
assertTrue(isCommitFilePresent, "commit file is missing");
assertEquals(4, recordCounts.size(), "partition is missing");
for (Entry<String, Long> e : recordCounts.entrySet()) {
assertEquals(24, e.getValue().longValue(), "missing records");
}
}
private void insert(JavaSparkContext jsc) throws IOException {
// Create schema file.
String schemaFile = new Path(basePath, "file.schema").toString();
createSchemaFile(schemaFile);
HDFSParquetImporter.Config cfg = getHDFSParquetImporterConfig(srcFolder.toString(), hoodieFolder.toString(),
"testTable", "COPY_ON_WRITE", "_row_key", "timestamp", 1, schemaFile);
HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg);
dataImporter.dataImport(jsc, 0);
}
/**
* Test successful insert and verify data consistency.
*/
@Test
public void testImportWithInsert() throws IOException, ParseException {
insert(jsc());
Dataset<Row> ds = HoodieClientTestUtils.read(jsc(), basePath + "/testTarget", sqlContext(), dfs(), basePath + "/testTarget/*/*/*/*");
List<Row> readData = ds.select("timestamp", "_row_key", "rider", "driver", "begin_lat", "begin_lon", "end_lat", "end_lon").collectAsList();
List<HoodieTripModel> result = readData.stream().map(row ->
new HoodieTripModel(row.getLong(0), row.getString(1), row.getString(2), row.getString(3), row.getDouble(4),
row.getDouble(5), row.getDouble(6), row.getDouble(7)))
.collect(Collectors.toList());
List<HoodieTripModel> expected = insertData.stream().map(g ->
new HoodieTripModel(Long.parseLong(g.get("timestamp").toString()),
g.get("_row_key").toString(),
g.get("rider").toString(),
g.get("driver").toString(),
Double.parseDouble(g.get("begin_lat").toString()),
Double.parseDouble(g.get("begin_lon").toString()),
Double.parseDouble(g.get("end_lat").toString()),
Double.parseDouble(g.get("end_lon").toString())))
.collect(Collectors.toList());
assertTrue(result.containsAll(expected) && expected.containsAll(result) && result.size() == expected.size());
}
/**
* Test upsert data and verify data consistency.
*/
@Test
public void testImportWithUpsert() throws IOException, ParseException {
insert(jsc());
// Create schema file.
String schemaFile = new Path(basePath, "file.schema").toString();
Path upsertFolder = new Path(basePath, "testUpsertSrc");
List<GenericRecord> upsertData = createUpsertRecords(upsertFolder);
HDFSParquetImporter.Config cfg = getHDFSParquetImporterConfig(upsertFolder.toString(), hoodieFolder.toString(),
"testTable", "COPY_ON_WRITE", "_row_key", "timestamp", 1, schemaFile);
cfg.command = "upsert";
HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg);
dataImporter.dataImport(jsc(), 0);
// construct result, remove top 10 and add upsert data.
List<GenericRecord> expectData = insertData.subList(11, 96);
expectData.addAll(upsertData);
// read latest data
Dataset<Row> ds = HoodieClientTestUtils.read(jsc(), basePath + "/testTarget", sqlContext(), dfs(), basePath + "/testTarget/*/*/*/*");
List<Row> readData = ds.select("timestamp", "_row_key", "rider", "driver", "begin_lat", "begin_lon", "end_lat", "end_lon").collectAsList();
List<HoodieTripModel> result = readData.stream().map(row ->
new HoodieTripModel(row.getLong(0), row.getString(1), row.getString(2), row.getString(3), row.getDouble(4),
row.getDouble(5), row.getDouble(6), row.getDouble(7)))
.collect(Collectors.toList());
// get expected result.
List<HoodieTripModel> expected = expectData.stream().map(g ->
new HoodieTripModel(Long.parseLong(g.get("timestamp").toString()),
g.get("_row_key").toString(),
g.get("rider").toString(),
g.get("driver").toString(),
Double.parseDouble(g.get("begin_lat").toString()),
Double.parseDouble(g.get("begin_lon").toString()),
Double.parseDouble(g.get("end_lat").toString()),
Double.parseDouble(g.get("end_lon").toString())))
.collect(Collectors.toList());
assertTrue(result.containsAll(expected) && expected.containsAll(result) && result.size() == expected.size());
}
public List<GenericRecord> createInsertRecords(Path srcFolder) throws ParseException, IOException {
Path srcFile = new Path(srcFolder.toString(), "file1.parquet");
long startTime = HoodieActiveTimeline.COMMIT_FORMATTER.parse("20170203000000").getTime() / 1000;
List<GenericRecord> records = new ArrayList<GenericRecord>();
for (long recordNum = 0; recordNum < 96; recordNum++) {
records.add(HoodieTestDataGenerator.generateGenericRecord(Long.toString(recordNum), "rider-" + recordNum,
"driver-" + recordNum, startTime + TimeUnit.HOURS.toSeconds(recordNum)));
}
try (ParquetWriter<GenericRecord> writer = AvroParquetWriter.<GenericRecord>builder(srcFile)
.withSchema(HoodieTestDataGenerator.AVRO_SCHEMA).withConf(HoodieTestUtils.getDefaultHadoopConf()).build()) {
for (GenericRecord record : records) {
writer.write(record);
}
}
return records;
}
public List<GenericRecord> createUpsertRecords(Path srcFolder) throws ParseException, IOException {
Path srcFile = new Path(srcFolder.toString(), "file1.parquet");
long startTime = HoodieActiveTimeline.COMMIT_FORMATTER.parse("20170203000000").getTime() / 1000;
List<GenericRecord> records = new ArrayList<GenericRecord>();
// 10 for update
for (long recordNum = 0; recordNum < 11; recordNum++) {
records.add(HoodieTestDataGenerator.generateGenericRecord(Long.toString(recordNum), "rider-upsert-" + recordNum,
"driver-upsert" + recordNum, startTime + TimeUnit.HOURS.toSeconds(recordNum)));
}
// 4 for insert
for (long recordNum = 96; recordNum < 100; recordNum++) {
records.add(HoodieTestDataGenerator.generateGenericRecord(Long.toString(recordNum), "rider-upsert-" + recordNum,
"driver-upsert" + recordNum, startTime + TimeUnit.HOURS.toSeconds(recordNum)));
}
try (ParquetWriter<GenericRecord> writer = AvroParquetWriter.<GenericRecord>builder(srcFile)
.withSchema(HoodieTestDataGenerator.AVRO_SCHEMA).withConf(HoodieTestUtils.getDefaultHadoopConf()).build()) {
for (GenericRecord record : records) {
writer.write(record);
}
}
return records;
}
private void createSchemaFile(String schemaFile) throws IOException {
FSDataOutputStream schemaFileOS = dfs().create(new Path(schemaFile));
schemaFileOS.write(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA.getBytes());
schemaFileOS.close();
}
/**
* Tests for scheme file. 1. File is missing. 2. File has invalid data.
*/
@Test
public void testSchemaFile() throws Exception {
// Hoodie root folder
Path hoodieFolder = new Path(basePath, "testTarget");
Path srcFolder = new Path(basePath.toString(), "srcTest");
Path schemaFile = new Path(basePath.toString(), "missingFile.schema");
HDFSParquetImporter.Config cfg = getHDFSParquetImporterConfig(srcFolder.toString(), hoodieFolder.toString(),
"testTable", "COPY_ON_WRITE", "_row_key", "timestamp", 1, schemaFile.toString());
HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg);
// Should fail - return : -1.
assertEquals(-1, dataImporter.dataImport(jsc(), 0));
dfs().create(schemaFile).write("Random invalid schema data".getBytes());
// Should fail - return : -1.
assertEquals(-1, dataImporter.dataImport(jsc(), 0));
}
/**
* Test for missing rowKey and partitionKey.
*/
@Test
public void testRowAndPartitionKey() throws Exception {
// Create schema file.
Path schemaFile = new Path(basePath.toString(), "missingFile.schema");
createSchemaFile(schemaFile.toString());
HDFSParquetImporter dataImporter;
HDFSParquetImporter.Config cfg;
// Check for invalid row key.
cfg = getHDFSParquetImporterConfig(srcFolder.toString(), hoodieFolder.toString(), "testTable", "COPY_ON_WRITE",
"invalidRowKey", "timestamp", 1, schemaFile.toString());
dataImporter = new HDFSParquetImporter(cfg);
assertEquals(-1, dataImporter.dataImport(jsc(), 0));
// Check for invalid partition key.
cfg = getHDFSParquetImporterConfig(srcFolder.toString(), hoodieFolder.toString(), "testTable", "COPY_ON_WRITE",
"_row_key", "invalidTimeStamp", 1, schemaFile.toString());
dataImporter = new HDFSParquetImporter(cfg);
assertEquals(-1, dataImporter.dataImport(jsc(), 0));
}
public HDFSParquetImporter.Config getHDFSParquetImporterConfig(String srcPath, String targetPath, String tableName,
String tableType, String rowKey, String partitionKey, int parallelism, String schemaFile) {
HDFSParquetImporter.Config cfg = new HDFSParquetImporter.Config();
cfg.srcPath = srcPath;
cfg.targetPath = targetPath;
cfg.tableName = tableName;
cfg.tableType = tableType;
cfg.rowKey = rowKey;
cfg.partitionKey = partitionKey;
cfg.parallelism = parallelism;
cfg.schemaFile = schemaFile;
return cfg;
}
/**
* Class used for compare result and expected.
*/
public static class HoodieTripModel {
long timestamp;
String rowKey;
String rider;
String driver;
double beginLat;
double beginLon;
double endLat;
double endLon;
public HoodieTripModel(long timestamp, String rowKey, String rider, String driver, double beginLat,
double beginLon, double endLat, double endLon) {
this.timestamp = timestamp;
this.rowKey = rowKey;
this.rider = rider;
this.driver = driver;
this.beginLat = beginLat;
this.beginLon = beginLon;
this.endLat = endLat;
this.endLon = endLon;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
HoodieTripModel other = (HoodieTripModel) o;
return timestamp == other.timestamp && rowKey.equals(other.rowKey) && rider.equals(other.rider)
&& driver.equals(other.driver) && beginLat == other.beginLat && beginLon == other.beginLon
&& endLat == other.endLat && endLon == other.endLon;
}
@Override
public int hashCode() {
return Objects.hash(timestamp, rowKey, rider, driver, beginLat, beginLon, endLat, endLon);
}
}
}