blob: c7974b3eec20450a52d114600c7f6eabeb61e880 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.utilities.deltastreamer;
import org.apache.hudi.DataSourceUtils;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HiveSyncTool;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
import java.io.IOException;
import java.io.Serializable;
import java.util.HashMap;
/**
* Performs bootstrap from a non-hudi source.
*/
public class BootstrapExecutor implements Serializable {
private static final Logger LOG = LogManager.getLogger(BootstrapExecutor.class);
/**
* Config.
*/
private final HoodieDeltaStreamer.Config cfg;
/**
* Schema provider that supplies the command for reading the input and writing out the target table.
*/
private transient SchemaProvider schemaProvider;
/**
* Spark context.
*/
private transient JavaSparkContext jssc;
/**
* Bag of properties with source, hoodie client, key generator etc.
*/
private final TypedProperties props;
/**
* Hadoop Configuration.
*/
private final Configuration configuration;
/**
* Bootstrap Configuration.
*/
private final HoodieWriteConfig bootstrapConfig;
/**
* FileSystem instance.
*/
private transient FileSystem fs;
private String bootstrapBasePath;
/**
* Bootstrap Executor.
* @param cfg DeltaStreamer Config
* @param jssc Java Spark Context
* @param fs File System
* @param properties Bootstrap Writer Properties
* @throws IOException
*/
public BootstrapExecutor(HoodieDeltaStreamer.Config cfg, JavaSparkContext jssc, FileSystem fs, Configuration conf,
TypedProperties properties) throws IOException {
this.cfg = cfg;
this.jssc = jssc;
this.fs = fs;
this.configuration = conf;
this.props = properties;
ValidationUtils.checkArgument(properties.containsKey(HoodieTableConfig.HOODIE_BOOTSTRAP_BASE_PATH),
HoodieTableConfig.HOODIE_BOOTSTRAP_BASE_PATH + " must be specified.");
this.bootstrapBasePath = properties.getString(HoodieTableConfig.HOODIE_BOOTSTRAP_BASE_PATH);
// Add more defaults if full bootstrap requested
this.props.putIfAbsent(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY(),
DataSourceWriteOptions.DEFAULT_PAYLOAD_OPT_VAL());
this.schemaProvider = UtilHelpers.createSchemaProvider(cfg.schemaProviderClassName, props, jssc);
HoodieWriteConfig.Builder builder =
HoodieWriteConfig.newBuilder().withPath(cfg.targetBasePath)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withInlineCompaction(false).build())
.forTable(cfg.targetTableName)
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
.withAutoCommit(true)
.withProps(props);
if (null != schemaProvider && null != schemaProvider.getTargetSchema()) {
builder = builder.withSchema(schemaProvider.getTargetSchema().toString());
}
this.bootstrapConfig = builder.build();
LOG.info("Created bootstrap executor with configs : " + bootstrapConfig.getProps());
}
/**
* Executes Bootstrap.
*/
public void execute() throws IOException {
initializeTable();
SparkRDDWriteClient bootstrapClient = new SparkRDDWriteClient(new HoodieSparkEngineContext(jssc), bootstrapConfig, true);
try {
HashMap<String, String> checkpointCommitMetadata = new HashMap<>();
checkpointCommitMetadata.put(HoodieDeltaStreamer.CHECKPOINT_KEY, cfg.checkpoint);
if (cfg.checkpoint != null) {
checkpointCommitMetadata.put(HoodieDeltaStreamer.CHECKPOINT_RESET_KEY, cfg.checkpoint);
}
bootstrapClient.bootstrap(Option.of(checkpointCommitMetadata));
syncHive();
} finally {
bootstrapClient.close();
}
}
/**
* Sync to Hive.
*/
private void syncHive() {
if (cfg.enableHiveSync) {
HiveSyncConfig hiveSyncConfig = DataSourceUtils.buildHiveSyncConfig(props, cfg.targetBasePath, cfg.baseFileFormat);
LOG.info("Syncing target hoodie table with hive table(" + hiveSyncConfig.tableName + "). Hive metastore URL :"
+ hiveSyncConfig.jdbcUrl + ", basePath :" + cfg.targetBasePath);
new HiveSyncTool(hiveSyncConfig, new HiveConf(configuration, HiveConf.class), fs).syncHoodieTable();
}
}
private void initializeTable() throws IOException {
if (fs.exists(new Path(cfg.targetBasePath))) {
throw new HoodieException("target base path already exists at " + cfg.targetBasePath
+ ". Cannot bootstrap data on top of an existing table");
}
HoodieTableMetaClient.initTableTypeWithBootstrap(new Configuration(jssc.hadoopConfiguration()),
cfg.targetBasePath, HoodieTableType.valueOf(cfg.tableType), cfg.targetTableName, "archived", cfg.payloadClassName,
cfg.baseFileFormat, cfg.bootstrapIndexClass, bootstrapBasePath);
}
public HoodieWriteConfig getBootstrapConfig() {
return bootstrapConfig;
}
}