blob: 34daed76f1ba80f980636473693903a7139f3998 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.client;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.testutils.HoodieClientTestHarness;
import org.apache.hudi.testutils.HoodieClientTestUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class TestMultiFS extends HoodieClientTestHarness {
private static final Logger LOG = LogManager.getLogger(TestMultiFS.class);
private String tablePath = "file:///tmp/hoodie/sample-table";
protected String tableName = "hoodie_rt";
private String tableType = HoodieTableType.COPY_ON_WRITE.name();
@BeforeEach
public void setUp() throws Exception {
initSparkContexts();
initDFS();
initTestDataGenerator();
}
@AfterEach
public void tearDown() throws Exception {
cleanupResources();
}
protected HoodieWriteConfig getHoodieWriteConfig(String basePath) {
return HoodieWriteConfig.newBuilder().withPath(basePath).withEmbeddedTimelineServerEnabled(true)
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2).forTable(tableName)
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).build();
}
@Test
public void readLocalWriteHDFS() throws Exception {
// Initialize table and filesystem
HoodieTableMetaClient.initTableType(hadoopConf, dfsBasePath, HoodieTableType.valueOf(tableType),
tableName, HoodieAvroPayload.class.getName());
// Create write client to write some records in
HoodieWriteConfig cfg = getHoodieWriteConfig(dfsBasePath);
HoodieWriteConfig localConfig = getHoodieWriteConfig(tablePath);
try (SparkRDDWriteClient hdfsWriteClient = getHoodieWriteClient(cfg);
SparkRDDWriteClient localWriteClient = getHoodieWriteClient(localConfig)) {
// Write generated data to hdfs (only inserts)
String readCommitTime = hdfsWriteClient.startCommit();
LOG.info("Starting commit " + readCommitTime);
List<HoodieRecord> records = dataGen.generateInserts(readCommitTime, 100);
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
hdfsWriteClient.upsert(writeRecords, readCommitTime);
// Read from hdfs
FileSystem fs = FSUtils.getFs(dfsBasePath, HoodieTestUtils.getDefaultHadoopConf());
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs.getConf(), dfsBasePath);
HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline();
Dataset<Row> readRecords = HoodieClientTestUtils.readCommit(dfsBasePath, sqlContext, timeline, readCommitTime);
assertEquals(readRecords.count(), records.size(), "Should contain 100 records");
// Write to local
HoodieTableMetaClient.initTableType(hadoopConf, tablePath, HoodieTableType.valueOf(tableType),
tableName, HoodieAvroPayload.class.getName());
String writeCommitTime = localWriteClient.startCommit();
LOG.info("Starting write commit " + writeCommitTime);
List<HoodieRecord> localRecords = dataGen.generateInserts(writeCommitTime, 100);
JavaRDD<HoodieRecord> localWriteRecords = jsc.parallelize(localRecords, 1);
LOG.info("Writing to path: " + tablePath);
localWriteClient.upsert(localWriteRecords, writeCommitTime);
LOG.info("Reading from path: " + tablePath);
fs = FSUtils.getFs(tablePath, HoodieTestUtils.getDefaultHadoopConf());
metaClient = new HoodieTableMetaClient(fs.getConf(), tablePath);
timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline();
Dataset<Row> localReadRecords =
HoodieClientTestUtils.readCommit(tablePath, sqlContext, timeline, writeCommitTime);
assertEquals(localReadRecords.count(), localRecords.size(), "Should contain 100 records");
}
}
}