blob: 87c2d84ea5290af9aee9d7118da004a3f8ae41e4 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.kudu.backup
import org.apache.kudu.backup.Backup.TableMetadataPB
import org.apache.kudu.client.AlterTableOptions
import org.apache.kudu.client.KuduPartitioner
import org.apache.kudu.client.Partition
import org.apache.kudu.client.RangePartitionBound
import org.apache.kudu.client.RangePartitionWithCustomHashSchema
import org.apache.kudu.client.SessionConfiguration.FlushMode
import org.apache.kudu.spark.kudu.KuduContext
import org.apache.kudu.spark.kudu.RowConverter
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SparkSession
import org.apache.yetus.audience.InterfaceAudience
import org.apache.yetus.audience.InterfaceStability
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import scala.collection.JavaConverters._
import scala.collection.parallel.ForkJoinTaskSupport
import scala.concurrent.forkjoin.ForkJoinPool
import scala.util.Failure
import scala.util.Success
import scala.util.Try
* The main class for a Kudu restore spark job.
object KuduRestore {
val log: Logger = LoggerFactory.getLogger(getClass)
val ImpalaPrefix = "impala::"
* Returns the table name in which the data will be restored considering the flags removeImpalaPrefix,
* newDatabaseName and tableSuffix
def getRestoreTableName(fullTableName: String, options: RestoreOptions): String = {
// Break the table down into prefix::databaseName.tableName
var prefix = ""
var databaseName = ""
var tableName = fullTableName
val hasImpalaPrefix = tableName.startsWith(ImpalaPrefix)
if (hasImpalaPrefix) {
prefix = ImpalaPrefix
tableName = tableName.substring(ImpalaPrefix.length)
val hasDatabase = tableName.contains(".")
if (hasDatabase) {
databaseName = tableName.substring(0, tableName.indexOf(".") + 1)
tableName = tableName.substring(tableName.indexOf(".") + 1)
// If the user does not want the Impala prefix, drop it
if (options.removeImpalaPrefix) {
prefix = ""
// If there is a databaseName specified by the user, use that
if (options.newDatabaseName.nonEmpty) {
databaseName = options.newDatabaseName.concat(".")
private def doRestore(
tableName: String,
context: KuduContext,
session: SparkSession,
io: BackupIO,
options: RestoreOptions,
backupMap: Map[String, BackupGraph]): Unit = {
if (!backupMap.contains(tableName)) {
throw new RuntimeException(s"No valid backups found for table: $tableName")
val graph = backupMap(tableName)
val restorePath = graph.restorePath
val lastMetadata = restorePath.backups.last.metadata
val restoreName = getRestoreTableName(lastMetadata.getTableName, options)
val numJobs = restorePath.backups.size
var currentJob = 1
restorePath.backups.foreach { backup =>
session.sparkContext.setJobDescription(s"Kudu Restore($currentJob/$numJobs): $tableName")"Restoring table $tableName from path: ${backup.path}")
val metadata = backup.metadata
val isFullRestore = metadata.getFromMs == 0
// TODO (KUDU-2788): Store the full metadata to compare/validate for each applied partial.
// On the full restore we may need to create the table.
if (isFullRestore) {
if (options.createTables) {"Creating restore table $restoreName")
// We use the last schema in the restore path when creating the table to
// ensure the table is created in its final state.
val backupSchema = BackupUtils.dataSchema(TableMetadata.getKuduSchema(metadata))
val rowActionCol =
var data =
// Default the the row action column with a value of "UPSERT" so that the
// rows from a full backup, which don't have a row action, are upserted.
// TODO(ghenke): Consider using INSERT_IGNORE for full backups.
.fill(RowAction.UPSERT.getValue, Seq(rowActionCol))
// Adjust for dropped and renamed columns.
data = adjustSchema(data, metadata, lastMetadata, rowActionCol)
val restoreSchema = data.schema
// Write the data to Kudu.
data.queryExecution.toRdd.foreachPartition { internalRows =>
val table = context.syncClient.openTable(restoreName)
val converter = new RowConverter(table.getSchema, restoreSchema, false)
val partitioner = createPartitionFilter(metadata, lastMetadata)
val session = context.syncClient.newSession
// In the case of task retries we need to ignore NotFound errors for deleted rows.
// This can't occur if DELETE_IGNORE is used, but still needs to be set in the case
// DELETE is used for backwards compatibility.
try for (internalRow <- internalRows) {
// Convert the InternalRows to Rows.
// This avoids any corruption as reported in SPARK-26880.
val row = converter.toRow(internalRow)
// Get the operation type based on the row action column.
// This will always be the last column in the row.
val rowActionValue = row.getByte(row.length - 1)
val rowAction = RowAction.fromValue(rowActionValue)
// Generate an operation based on the row action.
val operation = rowAction match {
case RowAction.UPSERT => table.newUpsert()
case RowAction.DELETE => {
if (context.supportsIgnoreOperations) {
} else {
case _ => throw new IllegalStateException(s"Unsupported RowAction: $rowAction")
// Convert the Spark row to a partial row and set it on the operation.
val partialRow = converter.toPartialRow(row)
// Drop rows that are not covered by the partitioner. This is how we
// detect a partition which was dropped between backups and filter
// out the rows from that dropped partition.
if (partitioner.isCovered(partialRow)) {
} finally {
// Fail the task if there are any errors.
// It is important to capture all of the errors via getRowErrors and then check
// the length because each call to session.getPendingErrors clears the ErrorCollector.
val pendingErrors = session.getPendingErrors
if (pendingErrors.getRowErrors.nonEmpty) {
val errors = pendingErrors.getRowErrors
val sample = errors.take(5).map(_.getErrorStatus).mkString
if (pendingErrors.isOverflowed) {
throw new RuntimeException(
s"PendingErrors overflowed. Failed to write at least ${errors.length} rows " +
s"to Kudu; Sample errors: $sample")
} else {
throw new RuntimeException(
s"Failed to write ${errors.length} rows to Kudu; Sample errors: $sample")
currentJob += 1
def run(options: RestoreOptions, session: SparkSession): Boolean = {
// Set the job group for all the spark restore jobs.
// Note: The job description will be overridden by each Kudu table job.
session.sparkContext.setJobGroup(s"Kudu Restore @ ${options.timestampMs}", "Kudu Restore")"Restoring from path: ${options.rootPath}")
val context =
new KuduContext(
val io = new BackupIO(session.sparkContext.hadoopConfiguration, options.rootPath)
// Read the required backup metadata.
val backupGraphs = io.readBackupGraphsByTableName(options.tables, options.timestampMs)
// Key the backupMap by the last table name.
val backupMap = backupGraphs
// Parallelize the processing. Managing resources of parallel restore jobs is very complex, so
// only the simplest possible thing is attempted. Kudu trusts Spark to manage resources.
// TODO (KUDU-2832): If the job fails to restore a table it may still create the table, which
// will cause subsequent restores to fail unless the table is deleted or the restore suffix is
// changed. We ought to try to clean up the mess when a failure happens.
val parallelTables = options.tables.par
parallelTables.tasksupport = new ForkJoinTaskSupport(
new ForkJoinPool(options.numParallelRestores))
val restoreResults = { tableName =>
val restoreResult =
Try(doRestore(tableName, context, session, io, options, backupMap))
restoreResult match {
case Success(()) =>"Successfully restored table $tableName")
case Failure(ex) =>
if (options.numParallelRestores == 1 && options.failOnFirstError)
throw ex
log.error(s"Failed to restore table $tableName", ex)
(tableName, restoreResult)
restoreResults.filter(_._2.isFailure).foreach {
case (tableName, ex) =>
s"Failed to restore table $tableName: Look back in the logs for the full exception. Error: ${ex.toString}")
// Kudu isn't good at creating a lot of tablets at once, and by default tables may only be created
// with at most 60 tablets. Additional tablets can be added later by adding range partitions. So,
// to restore tables with more tablets than that, we need to create the table piece-by-piece. This
// does so in the simplest way: creating the table with the first range partition, if there is
// one, and then altering it to add the rest of the partitions, one partition at a time.
private def createTableRangePartitionByRangePartition(
restoreName: String,
metadata: TableMetadataPB,
restoreOwner: Boolean,
context: KuduContext): Unit = {
// Create the table with the first range partition (or none if there are none).
val schema = TableMetadata.getKuduSchema(metadata)
val options = TableMetadata.getCreateTableOptionsWithoutRangePartitions(metadata, restoreOwner)
// Returns the range bounds of the ranges that contain the table wide hash schema.
val boundsWithoutHashSchema = TableMetadata.getRangeBoundPartialRows(metadata)
// Returns the range bounds and hash schema of the ranges that contain a custom hash schema.
val boundsWithCustomHashSchema =
if (boundsWithoutHashSchema.nonEmpty) {
// Adds the first range partition with table wide hash schema through create.
boundsWithoutHashSchema.headOption.foreach(bound => {
val (lower, upper) = bound
options.addRangePartition(lower, upper)
context.createTable(restoreName, schema, options)
// Add the rest of the range partitions with table wide hash schema through alters.
boundsWithoutHashSchema.tail.foreach(bound => {
val (lower, upper) = bound
val options = new AlterTableOptions()
options.addRangePartition(lower, upper)
context.syncClient.alterTable(restoreName, options)
// Adds range partitions with custom hash schema through alters.
boundsWithCustomHashSchema.foreach(bound => {
val rangePartition = new RangePartitionWithCustomHashSchema(
bound.hashSchemas.asScala.foreach { hp =>
val columnNames = { id =>
rangePartition.addHashPartitions(columnNames.asJava, hp.getNumBuckets, hp.getSeed)
val options = new AlterTableOptions()
context.syncClient.alterTable(restoreName, options)
} else if (boundsWithCustomHashSchema.nonEmpty) {
// Adds first range partition with custom hash schema through create.
boundsWithCustomHashSchema.headOption.foreach(bound => {
val rangePartition = new RangePartitionWithCustomHashSchema(
bound.hashSchemas.asScala.foreach { hp =>
val columnNames = { id =>
rangePartition.addHashPartitions(columnNames.asJava, hp.getNumBuckets, hp.getSeed)
context.createTable(restoreName, schema, options)
// Adds rest of range partitions with custom hash schema through alters.
boundsWithCustomHashSchema.tail.foreach(bound => {
val rangePartition = new RangePartitionWithCustomHashSchema(
bound.hashSchemas.asScala.foreach { hp =>
val columnNames = { id =>
rangePartition.addHashPartitions(columnNames.asJava, hp.getNumBuckets, hp.getSeed)
val options = new AlterTableOptions()
context.syncClient.alterTable(restoreName, options)
* Returns a modified DataFrame with columns adjusted to match the lastMetadata.
private def adjustSchema(
df: DataFrame,
currentMetadata: TableMetadataPB,
lastMetadata: TableMetadataPB,
rowActionCol: String): DataFrame = {"Adjusting columns to handle alterations")
val idToName =
// Ignore the rowActionCol, which isn't a real column.
val currentColumns = currentMetadata.getColumnIdsMap.asScala.filter(_._1 != rowActionCol)
var result = df
// First drop all the columns that no longer exist.
// This is required to be sure a rename doesn't collide with an old column.
currentColumns.foreach {
case (colName, id) =>
if (!idToName.contains(id)) {
// If the last metadata doesn't contain the id, the column is dropped."Dropping the column $colName from backup data")
result = result.drop(colName)
// Then rename all the columns that were renamed in the last metadata.
currentColumns.foreach {
case (colName, id) =>
if (idToName.contains(id) && idToName(id) != colName) {
// If the final name doesn't match the current name, the column is renamed."Renamed the column $colName to ${idToName(id)} in backup data")
result = result.withColumnRenamed(colName, idToName(id))
* Creates a KuduPartitioner that can be used to filter out rows for the current
* backup data which no longer apply to partitions in the last metadata.
* In order to do this, tablet metadata are compared in the current metadata to the
* last metadata. Tablet IDs that are not in the final metadata are filtered out and
* the remaining tablet metadata is used to create a KuduPartitioner. The resulting
* KuduPartitioner can then be used to filter out rows that are no longer valid
* because those rows will fall into a non-covered range.
private def createPartitionFilter(
currentMetadata: TableMetadataPB,
lastMetadata: TableMetadataPB): KuduPartitioner = {
val lastTablets = lastMetadata.getTabletsMap
val validTablets =
currentMetadata.getTabletsMap.asScala.flatMap {
case (id, pm) =>
if (lastTablets.containsKey(id)) {
// Create the partition object needed for the KuduPartitioner.
val partition = new Partition(
Some((id, partition))
} else {
// Ignore tablets that are no longer valid
val partitionSchema = TableMetadata.getPartitionSchema(currentMetadata)
new KuduPartitioner(partitionSchema, validTablets.asJava)
def main(args: Array[String]): Unit = {
val options = RestoreOptions
.getOrElse(throw new IllegalArgumentException("could not parse the arguments"))
val session = SparkSession
.appName("Kudu Table Restore")
val isRunSuccessful: Boolean = run(options, session)
if (!isRunSuccessful) {
throw new RuntimeException("Kudu Table Restore application failed!")