| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hudi.utilities.deltastreamer; |
| |
| import org.apache.hudi.AvroConversionUtils; |
| import org.apache.hudi.DataSourceUtils; |
| import org.apache.hudi.avro.HoodieAvroUtils; |
| import org.apache.hudi.client.SparkRDDWriteClient; |
| import org.apache.hudi.client.WriteStatus; |
| import org.apache.hudi.client.common.HoodieSparkEngineContext; |
| import org.apache.hudi.common.config.TypedProperties; |
| import org.apache.hudi.common.fs.FSUtils; |
| import org.apache.hudi.common.model.HoodieCommitMetadata; |
| import org.apache.hudi.common.model.HoodieRecord; |
| import org.apache.hudi.common.model.HoodieRecordPayload; |
| import org.apache.hudi.common.model.HoodieTableType; |
| import org.apache.hudi.common.table.HoodieTableMetaClient; |
| import org.apache.hudi.common.table.timeline.HoodieInstant; |
| import org.apache.hudi.common.table.timeline.HoodieTimeline; |
| import org.apache.hudi.common.util.Option; |
| import org.apache.hudi.common.util.ReflectionUtils; |
| import org.apache.hudi.common.util.ValidationUtils; |
| import org.apache.hudi.common.util.collection.Pair; |
| import org.apache.hudi.config.HoodieCompactionConfig; |
| import org.apache.hudi.config.HoodieWriteConfig; |
| import org.apache.hudi.exception.HoodieException; |
| import org.apache.hudi.hive.HiveSyncConfig; |
| import org.apache.hudi.hive.HiveSyncTool; |
| import org.apache.hudi.keygen.KeyGenerator; |
| import org.apache.hudi.sync.common.AbstractSyncTool; |
| import org.apache.hudi.utilities.UtilHelpers; |
| import org.apache.hudi.exception.HoodieDeltaStreamerException; |
| import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.Config; |
| import org.apache.hudi.utilities.callback.kafka.HoodieWriteCommitKafkaCallback; |
| import org.apache.hudi.utilities.callback.kafka.HoodieWriteCommitKafkaCallbackConfig; |
| import org.apache.hudi.utilities.schema.DelegatingSchemaProvider; |
| import org.apache.hudi.utilities.schema.SchemaProvider; |
| import org.apache.hudi.utilities.sources.InputBatch; |
| import org.apache.hudi.utilities.transform.Transformer; |
| |
| import com.codahale.metrics.Timer; |
| import org.apache.avro.Schema; |
| import org.apache.avro.generic.GenericRecord; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.hive.conf.HiveConf; |
| import org.apache.log4j.LogManager; |
| import org.apache.log4j.Logger; |
| import org.apache.spark.api.java.JavaRDD; |
| import org.apache.spark.api.java.JavaSparkContext; |
| import org.apache.spark.sql.Dataset; |
| import org.apache.spark.sql.Row; |
| import org.apache.spark.sql.SparkSession; |
| |
| import java.io.IOException; |
| import java.io.Serializable; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.function.Function; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Objects; |
| import java.util.Properties; |
| import java.util.Set; |
| import java.util.stream.Collectors; |
| |
| import scala.collection.JavaConversions; |
| |
| import static org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_KEY; |
| import static org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_RESET_KEY; |
| import static org.apache.hudi.config.HoodieCompactionConfig.INLINE_COMPACT_PROP; |
| import static org.apache.hudi.config.HoodieWriteConfig.COMBINE_BEFORE_INSERT_PROP; |
| import static org.apache.hudi.config.HoodieWriteConfig.COMBINE_BEFORE_UPSERT_PROP; |
| import static org.apache.hudi.config.HoodieWriteConfig.HOODIE_AUTO_COMMIT_PROP; |
| import static org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE; |
| import static org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_STRUCT_NAME; |
| |
| /** |
| * Sync's one batch of data to hoodie table. |
| */ |
| public class DeltaSync implements Serializable { |
| |
| private static final long serialVersionUID = 1L; |
| private static final Logger LOG = LogManager.getLogger(DeltaSync.class); |
| |
| /** |
| * Delta Sync Config. |
| */ |
| private final HoodieDeltaStreamer.Config cfg; |
| |
| /** |
| * Source to pull deltas from. |
| */ |
| private transient SourceFormatAdapter formatAdapter; |
| |
| /** |
| * User Provided Schema Provider. |
| */ |
| private transient SchemaProvider userProvidedSchemaProvider; |
| |
| /** |
| * Schema provider that supplies the command for reading the input and writing out the target table. |
| */ |
| private transient SchemaProvider schemaProvider; |
| |
| /** |
| * Allows transforming source to target table before writing. |
| */ |
| private transient Option<Transformer> transformer; |
| |
| /** |
| * Extract the key for the target table. |
| */ |
| private KeyGenerator keyGenerator; |
| |
| /** |
| * Filesystem used. |
| */ |
| private transient FileSystem fs; |
| |
| /** |
| * Spark context. |
| */ |
| private transient JavaSparkContext jssc; |
| |
| /** |
| * Spark Session. |
| */ |
| private transient SparkSession sparkSession; |
| |
| /** |
| * Hive Config. |
| */ |
| private transient Configuration conf; |
| |
| /** |
| * Bag of properties with source, hoodie client, key generator etc. |
| */ |
| private final TypedProperties props; |
| |
| /** |
| * Callback when write client is instantiated. |
| */ |
| private transient Function<SparkRDDWriteClient, Boolean> onInitializingHoodieWriteClient; |
| |
| /** |
| * Timeline with completed commits. |
| */ |
| private transient Option<HoodieTimeline> commitTimelineOpt; |
| |
| /** |
| * Write Client. |
| */ |
| private transient SparkRDDWriteClient writeClient; |
| |
| private transient HoodieDeltaStreamerMetrics metrics; |
| |
| public DeltaSync(HoodieDeltaStreamer.Config cfg, SparkSession sparkSession, SchemaProvider schemaProvider, |
| TypedProperties props, JavaSparkContext jssc, FileSystem fs, Configuration conf, |
| Function<SparkRDDWriteClient, Boolean> onInitializingHoodieWriteClient) throws IOException { |
| |
| this.cfg = cfg; |
| this.jssc = jssc; |
| this.sparkSession = sparkSession; |
| this.fs = fs; |
| this.onInitializingHoodieWriteClient = onInitializingHoodieWriteClient; |
| this.props = props; |
| this.userProvidedSchemaProvider = schemaProvider; |
| |
| refreshTimeline(); |
| // Register User Provided schema first |
| registerAvroSchemas(schemaProvider); |
| |
| this.transformer = UtilHelpers.createTransformer(cfg.transformerClassNames); |
| this.keyGenerator = DataSourceUtils.createKeyGenerator(props); |
| |
| this.metrics = new HoodieDeltaStreamerMetrics(getHoodieClientConfig(this.schemaProvider)); |
| |
| this.formatAdapter = new SourceFormatAdapter( |
| UtilHelpers.createSource(cfg.sourceClassName, props, jssc, sparkSession, schemaProvider, metrics)); |
| this.conf = conf; |
| } |
| |
| /** |
| * Refresh Timeline. |
| * |
| * @throws IOException in case of any IOException |
| */ |
| private void refreshTimeline() throws IOException { |
| if (fs.exists(new Path(cfg.targetBasePath))) { |
| HoodieTableMetaClient meta = new HoodieTableMetaClient(new Configuration(fs.getConf()), cfg.targetBasePath, |
| cfg.payloadClassName); |
| switch (meta.getTableType()) { |
| case COPY_ON_WRITE: |
| this.commitTimelineOpt = Option.of(meta.getActiveTimeline().getCommitTimeline().filterCompletedInstants()); |
| break; |
| case MERGE_ON_READ: |
| this.commitTimelineOpt = Option.of(meta.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants()); |
| break; |
| default: |
| throw new HoodieException("Unsupported table type :" + meta.getTableType()); |
| } |
| } else { |
| this.commitTimelineOpt = Option.empty(); |
| HoodieTableMetaClient.initTableType(new Configuration(jssc.hadoopConfiguration()), cfg.targetBasePath, |
| HoodieTableType.valueOf(cfg.tableType), cfg.targetTableName, "archived", cfg.payloadClassName, cfg.baseFileFormat); |
| } |
| } |
| |
| /** |
| * Run one round of delta sync and return new compaction instant if one got scheduled. |
| */ |
| public Pair<Option<String>, JavaRDD<WriteStatus>> syncOnce() throws IOException { |
| Pair<Option<String>, JavaRDD<WriteStatus>> result = null; |
| Timer.Context overallTimerContext = metrics.getOverallTimerContext(); |
| |
| // Refresh Timeline |
| refreshTimeline(); |
| |
| Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> srcRecordsWithCkpt = readFromSource(commitTimelineOpt); |
| |
| if (null != srcRecordsWithCkpt) { |
| // this is the first input batch. If schemaProvider not set, use it and register Avro Schema and start |
| // compactor |
| if (null == writeClient) { |
| this.schemaProvider = srcRecordsWithCkpt.getKey(); |
| // Setup HoodieWriteClient and compaction now that we decided on schema |
| setupWriteClient(); |
| } |
| |
| result = writeToSink(srcRecordsWithCkpt.getRight().getRight(), |
| srcRecordsWithCkpt.getRight().getLeft(), metrics, overallTimerContext); |
| } |
| |
| // Clear persistent RDDs |
| jssc.getPersistentRDDs().values().forEach(JavaRDD::unpersist); |
| return result; |
| } |
| |
| /** |
| * Read from Upstream Source and apply transformation if needed. |
| * |
| * @param commitTimelineOpt Timeline with completed commits |
| * @return Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> Input data read from upstream source, consists |
| * of schemaProvider, checkpointStr and hoodieRecord |
| * @throws Exception in case of any Exception |
| */ |
| public Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> readFromSource(Option<HoodieTimeline> commitTimelineOpt) throws IOException { |
| // Retrieve the previous round checkpoints, if any |
| Option<String> resumeCheckpointStr = Option.empty(); |
| if (commitTimelineOpt.isPresent()) { |
| Option<HoodieInstant> lastCommit = commitTimelineOpt.get().lastInstant(); |
| if (lastCommit.isPresent()) { |
| HoodieCommitMetadata commitMetadata = HoodieCommitMetadata |
| .fromBytes(commitTimelineOpt.get().getInstantDetails(lastCommit.get()).get(), HoodieCommitMetadata.class); |
| if (cfg.checkpoint != null && !cfg.checkpoint.equals(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))) { |
| resumeCheckpointStr = Option.of(cfg.checkpoint); |
| } else if (commitMetadata.getMetadata(CHECKPOINT_KEY) != null) { |
| //if previous checkpoint is an empty string, skip resume use Option.empty() |
| if (!commitMetadata.getMetadata(CHECKPOINT_KEY).isEmpty()) { |
| resumeCheckpointStr = Option.of(commitMetadata.getMetadata(CHECKPOINT_KEY)); |
| } |
| } else if (HoodieTimeline.compareTimestamps(HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS, |
| HoodieTimeline.LESSER_THAN, lastCommit.get().getTimestamp())) { |
| throw new HoodieDeltaStreamerException( |
| "Unable to find previous checkpoint. Please double check if this table " |
| + "was indeed built via delta streamer. Last Commit :" + lastCommit + ", Instants :" |
| + commitTimelineOpt.get().getInstants().collect(Collectors.toList()) + ", CommitMetadata=" |
| + commitMetadata.toJsonString()); |
| } |
| } |
| } else { |
| HoodieTableMetaClient.initTableType(new Configuration(jssc.hadoopConfiguration()), cfg.targetBasePath, |
| HoodieTableType.valueOf(cfg.tableType), cfg.targetTableName, "archived", cfg.payloadClassName, cfg.baseFileFormat); |
| } |
| |
| if (!resumeCheckpointStr.isPresent() && cfg.checkpoint != null) { |
| resumeCheckpointStr = Option.of(cfg.checkpoint); |
| } |
| LOG.info("Checkpoint to resume from : " + resumeCheckpointStr); |
| |
| final Option<JavaRDD<GenericRecord>> avroRDDOptional; |
| final String checkpointStr; |
| final SchemaProvider schemaProvider; |
| if (transformer.isPresent()) { |
| // Transformation is needed. Fetch New rows in Row Format, apply transformation and then convert them |
| // to generic records for writing |
| InputBatch<Dataset<Row>> dataAndCheckpoint = |
| formatAdapter.fetchNewDataInRowFormat(resumeCheckpointStr, cfg.sourceLimit); |
| |
| Option<Dataset<Row>> transformed = |
| dataAndCheckpoint.getBatch().map(data -> transformer.get().apply(jssc, sparkSession, data, props)); |
| checkpointStr = dataAndCheckpoint.getCheckpointForNextBatch(); |
| if (this.userProvidedSchemaProvider != null && this.userProvidedSchemaProvider.getTargetSchema() != null) { |
| // If the target schema is specified through Avro schema, |
| // pass in the schema for the Row-to-Avro conversion |
| // to avoid nullability mismatch between Avro schema and Row schema |
| avroRDDOptional = transformed |
| .map(t -> AvroConversionUtils.createRdd( |
| t, this.userProvidedSchemaProvider.getTargetSchema(), |
| HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE).toJavaRDD()); |
| schemaProvider = this.userProvidedSchemaProvider; |
| } else { |
| // Use Transformed Row's schema if not overridden. If target schema is not specified |
| // default to RowBasedSchemaProvider |
| schemaProvider = |
| transformed |
| .map(r -> (SchemaProvider) new DelegatingSchemaProvider(props, jssc, |
| dataAndCheckpoint.getSchemaProvider(), |
| UtilHelpers.createRowBasedSchemaProvider(r.schema(), props, jssc))) |
| .orElse(dataAndCheckpoint.getSchemaProvider()); |
| avroRDDOptional = transformed |
| .map(t -> AvroConversionUtils.createRdd( |
| t, HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE).toJavaRDD()); |
| } |
| } else { |
| // Pull the data from the source & prepare the write |
| InputBatch<JavaRDD<GenericRecord>> dataAndCheckpoint = |
| formatAdapter.fetchNewDataInAvroFormat(resumeCheckpointStr, cfg.sourceLimit); |
| avroRDDOptional = dataAndCheckpoint.getBatch(); |
| checkpointStr = dataAndCheckpoint.getCheckpointForNextBatch(); |
| schemaProvider = dataAndCheckpoint.getSchemaProvider(); |
| } |
| |
| if (Objects.equals(checkpointStr, resumeCheckpointStr.orElse(null))) { |
| LOG.info("No new data, source checkpoint has not changed. Nothing to commit. Old checkpoint=(" |
| + resumeCheckpointStr + "). New Checkpoint=(" + checkpointStr + ")"); |
| return null; |
| } |
| |
| if ((!avroRDDOptional.isPresent()) || (avroRDDOptional.get().isEmpty())) { |
| LOG.info("No new data, perform empty commit."); |
| return Pair.of(schemaProvider, Pair.of(checkpointStr, jssc.emptyRDD())); |
| } |
| |
| JavaRDD<GenericRecord> avroRDD = avroRDDOptional.get(); |
| JavaRDD<HoodieRecord> records = avroRDD.map(gr -> { |
| HoodieRecordPayload payload = DataSourceUtils.createPayload(cfg.payloadClassName, gr, |
| (Comparable) HoodieAvroUtils.getNestedFieldVal(gr, cfg.sourceOrderingField, false)); |
| return new HoodieRecord<>(keyGenerator.getKey(gr), payload); |
| }); |
| |
| return Pair.of(schemaProvider, Pair.of(checkpointStr, records)); |
| } |
| |
| /** |
| * Perform Hoodie Write. Run Cleaner, schedule compaction and syncs to hive if needed. |
| * |
| * @param records Input Records |
| * @param checkpointStr Checkpoint String |
| * @param metrics Metrics |
| * @param overallTimerContext Timer Context |
| * @return Option Compaction instant if one is scheduled |
| */ |
| private Pair<Option<String>, JavaRDD<WriteStatus>> writeToSink(JavaRDD<HoodieRecord> records, String checkpointStr, |
| HoodieDeltaStreamerMetrics metrics, |
| Timer.Context overallTimerContext) { |
| Option<String> scheduledCompactionInstant = Option.empty(); |
| // filter dupes if needed |
| if (cfg.filterDupes) { |
| records = DataSourceUtils.dropDuplicates(jssc, records, writeClient.getConfig()); |
| } |
| |
| boolean isEmpty = records.isEmpty(); |
| |
| // try to start a new commit |
| String instantTime = startCommit(); |
| LOG.info("Starting commit : " + instantTime); |
| |
| JavaRDD<WriteStatus> writeStatusRDD; |
| switch (cfg.operation) { |
| case INSERT: |
| writeStatusRDD = writeClient.insert(records, instantTime); |
| break; |
| case UPSERT: |
| writeStatusRDD = writeClient.upsert(records, instantTime); |
| break; |
| case BULK_INSERT: |
| writeStatusRDD = writeClient.bulkInsert(records, instantTime); |
| break; |
| default: |
| throw new HoodieDeltaStreamerException("Unknown operation : " + cfg.operation); |
| } |
| |
| long totalErrorRecords = writeStatusRDD.mapToDouble(WriteStatus::getTotalErrorRecords).sum().longValue(); |
| long totalRecords = writeStatusRDD.mapToDouble(WriteStatus::getTotalRecords).sum().longValue(); |
| boolean hasErrors = totalErrorRecords > 0; |
| long hiveSyncTimeMs = 0; |
| long metaSyncTimeMs = 0; |
| if (!hasErrors || cfg.commitOnErrors) { |
| HashMap<String, String> checkpointCommitMetadata = new HashMap<>(); |
| checkpointCommitMetadata.put(CHECKPOINT_KEY, checkpointStr); |
| if (cfg.checkpoint != null) { |
| checkpointCommitMetadata.put(CHECKPOINT_RESET_KEY, cfg.checkpoint); |
| } |
| |
| if (hasErrors) { |
| LOG.warn("Some records failed to be merged but forcing commit since commitOnErrors set. Errors/Total=" |
| + totalErrorRecords + "/" + totalRecords); |
| } |
| |
| boolean success = writeClient.commit(instantTime, writeStatusRDD, Option.of(checkpointCommitMetadata)); |
| if (success) { |
| LOG.info("Commit " + instantTime + " successful!"); |
| |
| // Schedule compaction if needed |
| if (cfg.isAsyncCompactionEnabled()) { |
| scheduledCompactionInstant = writeClient.scheduleCompaction(Option.empty()); |
| } |
| |
| if (!isEmpty) { |
| syncMeta(metrics); |
| } |
| } else { |
| LOG.info("Commit " + instantTime + " failed!"); |
| throw new HoodieException("Commit " + instantTime + " failed!"); |
| } |
| } else { |
| LOG.error("Delta Sync found errors when writing. Errors/Total=" + totalErrorRecords + "/" + totalRecords); |
| LOG.error("Printing out the top 100 errors"); |
| writeStatusRDD.filter(WriteStatus::hasErrors).take(100).forEach(ws -> { |
| LOG.error("Global error :", ws.getGlobalError()); |
| if (ws.getErrors().size() > 0) { |
| ws.getErrors().forEach((key, value) -> LOG.trace("Error for key:" + key + " is " + value)); |
| } |
| }); |
| // Rolling back instant |
| writeClient.rollback(instantTime); |
| throw new HoodieException("Commit " + instantTime + " failed and rolled-back !"); |
| } |
| long overallTimeMs = overallTimerContext != null ? overallTimerContext.stop() : 0; |
| |
| // Send DeltaStreamer Metrics |
| metrics.updateDeltaStreamerMetrics(overallTimeMs); |
| return Pair.of(scheduledCompactionInstant, writeStatusRDD); |
| } |
| |
| /** |
| * Try to start a new commit. |
| * <p> |
| * Exception will be thrown if it failed in 2 tries. |
| * |
| * @return Instant time of the commit |
| */ |
| private String startCommit() { |
| final int maxRetries = 2; |
| int retryNum = 1; |
| RuntimeException lastException = null; |
| while (retryNum <= maxRetries) { |
| try { |
| return writeClient.startCommit(); |
| } catch (IllegalArgumentException ie) { |
| lastException = ie; |
| LOG.error("Got error trying to start a new commit. Retrying after sleeping for a sec", ie); |
| retryNum++; |
| try { |
| Thread.sleep(1000); |
| } catch (InterruptedException e) { |
| // No-Op |
| } |
| } |
| } |
| throw lastException; |
| } |
| |
| private String getSyncClassShortName(String syncClassName) { |
| return syncClassName.substring(syncClassName.lastIndexOf(".") + 1); |
| } |
| |
| private void syncMeta(HoodieDeltaStreamerMetrics metrics) { |
| Set<String> syncClientToolClasses = new HashSet<>(Arrays.asList(cfg.syncClientToolClass.split(","))); |
| // for backward compatibility |
| if (cfg.enableHiveSync) { |
| cfg.enableMetaSync = true; |
| syncClientToolClasses.add(HiveSyncTool.class.getName()); |
| LOG.info("When set --enable-hive-sync will use HiveSyncTool for backward compatibility"); |
| } |
| if (cfg.enableMetaSync) { |
| for (String impl : syncClientToolClasses) { |
| Timer.Context syncContext = metrics.getMetaSyncTimerContext(); |
| impl = impl.trim(); |
| AbstractSyncTool syncTool = null; |
| switch (impl) { |
| case "org.apache.hudi.hive.HiveSyncTool": |
| HiveSyncConfig hiveSyncConfig = DataSourceUtils.buildHiveSyncConfig(props, cfg.targetBasePath, cfg.baseFileFormat); |
| LOG.info("Syncing target hoodie table with hive table(" + hiveSyncConfig.tableName + "). Hive metastore URL :" |
| + hiveSyncConfig.jdbcUrl + ", basePath :" + cfg.targetBasePath); |
| syncTool = new HiveSyncTool(hiveSyncConfig, new HiveConf(conf, HiveConf.class), fs); |
| break; |
| default: |
| FileSystem fs = FSUtils.getFs(cfg.targetBasePath, jssc.hadoopConfiguration()); |
| Properties properties = new Properties(); |
| properties.putAll(props); |
| properties.put("basePath", cfg.targetBasePath); |
| syncTool = (AbstractSyncTool) ReflectionUtils.loadClass(impl, new Class[]{Properties.class, FileSystem.class}, properties, fs); |
| } |
| syncTool.syncHoodieTable(); |
| long metaSyncTimeMs = syncContext != null ? syncContext.stop() : 0; |
| metrics.updateDeltaStreamerMetaSyncMetrics(getSyncClassShortName(impl), metaSyncTimeMs); |
| } |
| } |
| } |
| |
| public void syncHive() { |
| HiveSyncConfig hiveSyncConfig = DataSourceUtils.buildHiveSyncConfig(props, cfg.targetBasePath, cfg.baseFileFormat); |
| LOG.info("Syncing target hoodie table with hive table(" + hiveSyncConfig.tableName + "). Hive metastore URL :" |
| + hiveSyncConfig.jdbcUrl + ", basePath :" + cfg.targetBasePath); |
| HiveConf hiveConf = new HiveConf(conf, HiveConf.class); |
| LOG.info("Hive Conf => " + hiveConf.getAllProperties().toString()); |
| LOG.info("Hive Sync Conf => " + hiveSyncConfig.toString()); |
| new HiveSyncTool(hiveSyncConfig, hiveConf, fs).syncHoodieTable(); |
| } |
| |
| public void syncHive(HiveConf conf) { |
| this.conf = conf; |
| syncHive(); |
| } |
| |
| /** |
| * Note that depending on configs and source-type, schemaProvider could either be eagerly or lazily created. |
| * SchemaProvider creation is a precursor to HoodieWriteClient and AsyncCompactor creation. This method takes care of |
| * this constraint. |
| */ |
| private void setupWriteClient() { |
| LOG.info("Setting up Hoodie Write Client"); |
| if ((null != schemaProvider) && (null == writeClient)) { |
| registerAvroSchemas(schemaProvider); |
| HoodieWriteConfig hoodieCfg = getHoodieClientConfig(schemaProvider); |
| writeClient = new SparkRDDWriteClient<>(new HoodieSparkEngineContext(jssc), hoodieCfg, true); |
| onInitializingHoodieWriteClient.apply(writeClient); |
| } |
| } |
| |
| /** |
| * Helper to construct Write Client config. |
| * |
| * @param schemaProvider Schema Provider |
| */ |
| private HoodieWriteConfig getHoodieClientConfig(SchemaProvider schemaProvider) { |
| final boolean combineBeforeUpsert = true; |
| final boolean autoCommit = false; |
| HoodieWriteConfig.Builder builder = |
| HoodieWriteConfig.newBuilder().withPath(cfg.targetBasePath).combineInput(cfg.filterDupes, combineBeforeUpsert) |
| .withCompactionConfig(HoodieCompactionConfig.newBuilder().withPayloadClass(cfg.payloadClassName) |
| // Inline compaction is disabled for continuous mode. otherwise enabled for MOR |
| .withInlineCompaction(cfg.isInlineCompactionEnabled()).build()) |
| .forTable(cfg.targetTableName) |
| .withAutoCommit(autoCommit).withProps(props); |
| |
| if (null != schemaProvider && null != schemaProvider.getTargetSchema()) { |
| builder = builder.withSchema(schemaProvider.getTargetSchema().toString()); |
| } |
| HoodieWriteConfig config = builder.build(); |
| |
| // set default value for {@link HoodieWriteCommitKafkaCallbackConfig} if needed. |
| if (config.writeCommitCallbackOn() && HoodieWriteCommitKafkaCallback.class.getName().equals(config.getCallbackClass())) { |
| HoodieWriteCommitKafkaCallbackConfig.setCallbackKafkaConfigIfNeeded(config.getProps()); |
| } |
| |
| // Validate what deltastreamer assumes of write-config to be really safe |
| ValidationUtils.checkArgument(config.isInlineCompaction() == cfg.isInlineCompactionEnabled(), |
| String.format("%s should be set to %s", INLINE_COMPACT_PROP, cfg.isInlineCompactionEnabled())); |
| ValidationUtils.checkArgument(!config.shouldAutoCommit(), |
| String.format("%s should be set to %s", HOODIE_AUTO_COMMIT_PROP, autoCommit)); |
| ValidationUtils.checkArgument(config.shouldCombineBeforeInsert() == cfg.filterDupes, |
| String.format("%s should be set to %s", COMBINE_BEFORE_INSERT_PROP, cfg.filterDupes)); |
| ValidationUtils.checkArgument(config.shouldCombineBeforeUpsert(), |
| String.format("%s should be set to %s", COMBINE_BEFORE_UPSERT_PROP, combineBeforeUpsert)); |
| |
| return config; |
| } |
| |
| /** |
| * Register Avro Schemas. |
| * |
| * @param schemaProvider Schema Provider |
| */ |
| private void registerAvroSchemas(SchemaProvider schemaProvider) { |
| // register the schemas, so that shuffle does not serialize the full schemas |
| if (null != schemaProvider) { |
| List<Schema> schemas = new ArrayList<>(); |
| schemas.add(schemaProvider.getSourceSchema()); |
| if (schemaProvider.getTargetSchema() != null) { |
| schemas.add(schemaProvider.getTargetSchema()); |
| } |
| |
| LOG.info("Registering Schema :" + schemas); |
| jssc.sc().getConf().registerAvroSchemas(JavaConversions.asScalaBuffer(schemas).toList()); |
| } |
| } |
| |
| /** |
| * Close all resources. |
| */ |
| public void close() { |
| if (null != writeClient) { |
| writeClient.close(); |
| writeClient = null; |
| } |
| } |
| |
| public FileSystem getFs() { |
| return fs; |
| } |
| |
| public TypedProperties getProps() { |
| return props; |
| } |
| |
| public Config getCfg() { |
| return cfg; |
| } |
| |
| public Option<HoodieTimeline> getCommitTimelineOpt() { |
| return commitTimelineOpt; |
| } |
| } |