blob: ff4f0a6738dff0a1e02a58811a5bad140b552870 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.integ.testsuite;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.client.HoodieReadClient;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.integ.testsuite.dag.nodes.CleanNode;
import org.apache.hudi.integ.testsuite.dag.nodes.DagNode;
import org.apache.hudi.integ.testsuite.dag.nodes.RollbackNode;
import org.apache.hudi.integ.testsuite.dag.nodes.ScheduleCompactNode;
import org.apache.hudi.integ.testsuite.HoodieTestSuiteJob.HoodieTestSuiteConfig;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.Operation;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
/**
* A writer abstraction for the Hudi test suite. This class wraps different implementations of writers used to perform
* write operations into the target hudi dataset. Current supported writers are {@link HoodieDeltaStreamerWrapper}
* and {@link SparkRDDWriteClient}.
*/
public class HoodieTestSuiteWriter {
private HoodieDeltaStreamerWrapper deltaStreamerWrapper;
private SparkRDDWriteClient writeClient;
protected HoodieTestSuiteConfig cfg;
private Option<String> lastCheckpoint;
private HoodieReadClient hoodieReadClient;
private Properties props;
private String schema;
private transient Configuration configuration;
private transient JavaSparkContext sparkContext;
private static Set<String> VALID_DAG_NODES_TO_ALLOW_WRITE_CLIENT_IN_DELTASTREAMER_MODE = new HashSet<>(
Arrays.asList(RollbackNode.class.getName(), CleanNode.class.getName(), ScheduleCompactNode.class.getName()));
public HoodieTestSuiteWriter(JavaSparkContext jsc, Properties props, HoodieTestSuiteConfig cfg, String schema) throws
Exception {
this(jsc, props, cfg, schema, true);
}
public HoodieTestSuiteWriter(JavaSparkContext jsc, Properties props, HoodieTestSuiteConfig cfg, String schema,
boolean rollbackInflight) throws Exception {
// We ensure that only 1 instance of HoodieWriteClient is instantiated for a HoodieTestSuiteWriter
// This does not instantiate a HoodieWriteClient until a
// {@link HoodieDeltaStreamer#commit(HoodieWriteClient, JavaRDD, Option)} is invoked.
HoodieSparkEngineContext context = new HoodieSparkEngineContext(jsc);
this.deltaStreamerWrapper = new HoodieDeltaStreamerWrapper(cfg, jsc);
this.hoodieReadClient = new HoodieReadClient(context, cfg.targetBasePath);
if (!cfg.useDeltaStreamer) {
this.writeClient = new SparkRDDWriteClient(context, getHoodieClientConfig(cfg, props, schema), rollbackInflight);
}
this.cfg = cfg;
this.configuration = jsc.hadoopConfiguration();
this.sparkContext = jsc;
this.props = props;
this.schema = schema;
}
private HoodieWriteConfig getHoodieClientConfig(HoodieTestSuiteConfig cfg, Properties props, String schema) {
HoodieWriteConfig.Builder builder =
HoodieWriteConfig.newBuilder().combineInput(true, true).withPath(cfg.targetBasePath)
.withAutoCommit(false)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withPayloadClass(cfg.payloadClassName).build())
.forTable(cfg.targetTableName)
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
.withProps(props);
builder = builder.withSchema(schema);
return builder.build();
}
private boolean allowWriteClientAccess(DagNode dagNode) {
if (VALID_DAG_NODES_TO_ALLOW_WRITE_CLIENT_IN_DELTASTREAMER_MODE.contains(dagNode.getClass().getName())) {
return true;
}
return false;
}
public Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> fetchSource() throws Exception {
return this.deltaStreamerWrapper.fetchSource();
}
public Option<String> startCommit() {
if (cfg.useDeltaStreamer) {
return Option.of(HoodieActiveTimeline.createNewInstantTime());
} else {
return Option.of(writeClient.startCommit());
}
}
public JavaRDD<WriteStatus> upsert(Option<String> instantTime) throws Exception {
if (cfg.useDeltaStreamer) {
return deltaStreamerWrapper.upsert(Operation.UPSERT);
} else {
Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> nextBatch = fetchSource();
lastCheckpoint = Option.of(nextBatch.getValue().getLeft());
return writeClient.upsert(nextBatch.getRight().getRight(), instantTime.get());
}
}
public JavaRDD<WriteStatus> insert(Option<String> instantTime) throws Exception {
if (cfg.useDeltaStreamer) {
return deltaStreamerWrapper.insert();
} else {
Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> nextBatch = fetchSource();
lastCheckpoint = Option.of(nextBatch.getValue().getLeft());
return writeClient.insert(nextBatch.getRight().getRight(), instantTime.get());
}
}
public JavaRDD<WriteStatus> bulkInsert(Option<String> instantTime) throws Exception {
if (cfg.useDeltaStreamer) {
return deltaStreamerWrapper.bulkInsert();
} else {
Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> nextBatch = fetchSource();
lastCheckpoint = Option.of(nextBatch.getValue().getLeft());
return writeClient.bulkInsert(nextBatch.getRight().getRight(), instantTime.get());
}
}
public JavaRDD<WriteStatus> compact(Option<String> instantTime) throws Exception {
if (cfg.useDeltaStreamer) {
return deltaStreamerWrapper.compact();
} else {
if (!instantTime.isPresent()) {
Option<Pair<String, HoodieCompactionPlan>> compactionPlanPair = Option
.fromJavaOptional(hoodieReadClient.getPendingCompactions()
.stream().findFirst());
if (compactionPlanPair.isPresent()) {
instantTime = Option.of(compactionPlanPair.get().getLeft());
}
}
if (instantTime.isPresent()) {
return (JavaRDD<WriteStatus>) writeClient.compact(instantTime.get());
} else {
return null;
}
}
}
public Option<String> scheduleCompaction(Option<Map<String, String>> previousCommitExtraMetadata) throws
Exception {
if (!cfg.useDeltaStreamer) {
deltaStreamerWrapper.scheduleCompact();
return Option.empty();
} else {
return writeClient.scheduleCompaction(previousCommitExtraMetadata);
}
}
public void commit(JavaRDD<WriteStatus> records, Option<String> instantTime) {
if (!cfg.useDeltaStreamer) {
Map<String, String> extraMetadata = new HashMap<>();
/** Store the checkpoint in the commit metadata just like
* {@link HoodieDeltaStreamer#commit(SparkRDDWriteClient, JavaRDD, Option)} **/
extraMetadata.put(HoodieDeltaStreamerWrapper.CHECKPOINT_KEY, lastCheckpoint.get());
writeClient.commit(instantTime.get(), records, Option.of(extraMetadata));
}
}
public SparkRDDWriteClient getWriteClient(DagNode dagNode) throws IllegalAccessException {
if (cfg.useDeltaStreamer & !allowWriteClientAccess(dagNode)) {
throw new IllegalAccessException("cannot access write client when testing in deltastreamer mode");
}
synchronized (this) {
if (writeClient == null) {
this.writeClient = new SparkRDDWriteClient(new HoodieSparkEngineContext(this.sparkContext), getHoodieClientConfig(cfg, props, schema), false);
}
}
return writeClient;
}
public HoodieDeltaStreamerWrapper getDeltaStreamerWrapper() {
return deltaStreamerWrapper;
}
public HoodieTestSuiteConfig getCfg() {
return cfg;
}
public Configuration getConfiguration() {
return configuration;
}
public JavaSparkContext getSparkContext() {
return sparkContext;
}
}