blob: e4bcfb8574460a0916b3ab9e5bd3c95ec40f91d9 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.kudu.backup
import org.apache.kudu.spark.kudu.KuduContext
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.SparkSession
import org.apache.yetus.audience.InterfaceAudience
import org.apache.yetus.audience.InterfaceStability
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import scala.collection.JavaConverters._
import scala.collection.parallel.ForkJoinTaskSupport
import scala.concurrent.forkjoin.ForkJoinPool;
import scala.util.Failure
import scala.util.Success
import scala.util.Try
/**
* The main class for a Kudu backup spark job.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
object KuduBackup {
val log: Logger = LoggerFactory.getLogger(getClass)
private def doBackup(
tableName: String,
context: KuduContext,
session: SparkSession,
io: BackupIO,
options: BackupOptions,
backupMap: Map[String, BackupGraph]): Unit = {
var tableOptions = options.copy() // Copy the options so we can modify them for the table.
val table = context.syncClient.openTable(tableName)
val tableId = table.getTableId
val backupPath = io.backupPath(tableId, tableName, tableOptions.toMs)
val metadataPath = io.backupMetadataPath(backupPath)
log.info(s"Backing up table $tableName to path: $backupPath")
// Unless we are forcing a full backup or a fromMs was set, find the previous backup and
// use the `to_ms` metadata as the `from_ms` time for this backup.
var incremental = false
if (tableOptions.forceFull) {
log.info("Performing a full backup: forceFull was set to true")
} else if (tableOptions.fromMs != BackupOptions.DefaultFromMS) {
log.info(s"Performing an incremental backup: fromMs was set to ${tableOptions.fromMs}")
incremental = true
} else {
log.info("Looking for a previous backup: forceFull and fromMs options are not set.")
if (backupMap.contains(tableId) && backupMap(tableId).hasFullBackup) {
val base = backupMap(tableId).backupBase
log.info(s"Setting fromMs to ${base.metadata.getToMs} from backup in path: ${base.path}")
tableOptions = tableOptions.copy(fromMs = base.metadata.getToMs)
incremental = true
} else {
log.info("No previous backup was found. Starting a full backup.")
tableOptions = tableOptions.copy(forceFull = true)
}
}
val jobTypeStr = if (incremental) "incremental" else "full"
session.sparkContext.setJobDescription(s"Kudu Backup($jobTypeStr): $tableName")
val rdd = new KuduBackupRDD(table, tableOptions, incremental, context, session.sparkContext)
val df =
session.sqlContext
.createDataFrame(rdd, BackupUtils.dataSchema(table.getSchema, incremental))
// Ensure maximum compatibility for dates before 1582-10-15 or timestamps before
// 1900-01-01T00:00:00Z in Parquet. Otherwise incorrect values may be read by
// Spark 2 or legacy version of Hive. See more details in SPARK-31404.
session.conf.set("spark.sql.legacy.parquet.datetimeRebaseModeInWrite", "LEGACY")
session.conf.set("spark.sql.legacy.parquet.int96RebaseModeInWrite", "LEGACY")
// Write the data to the backup path.
// The backup path contains the timestampMs and should not already exist.
val writer = df.write.mode(SaveMode.ErrorIfExists)
writer
.format(tableOptions.format)
.save(backupPath.toString)
// Generate and output the new metadata for this table.
// The existence of metadata indicates this backup was successful.
val tableMetadata = TableMetadata
.getTableMetadata(table, tableOptions.fromMs, tableOptions.toMs, tableOptions.format)
io.writeTableMetadata(tableMetadata, metadataPath)
}
def run(options: BackupOptions, session: SparkSession): Boolean = {
// Set the job group for all the spark backup jobs.
// Note: The job description will be overridden by each Kudu table job.
session.sparkContext.setJobGroup(s"Kudu Backup @ ${options.toMs}", "Kudu Backup")
log.info(s"Backing up to root path: ${options.rootPath}")
val context =
new KuduContext(
options.kuduMasterAddresses,
session.sparkContext
)
val io = new BackupIO(session.sparkContext.hadoopConfiguration, options.rootPath)
// Read the required backup metadata.
val backupGraphs =
// Only read the backup metadata if it will be used.
if (!options.forceFull || options.fromMs != BackupOptions.DefaultFromMS) {
// Convert the input table names to be backed up into table IDs.
// This will allow us to link against old backup data by referencing
// the static table ID even when the table name changes between backups.
val nameToId = context.syncClient.getTablesList.getTableInfosList.asScala
.filter(info => options.tables.contains(info.getTableName))
.map(info => (info.getTableName, info.getTableId))
.toMap
val tableIds = options.tables.flatMap(nameToId.get)
io.readBackupGraphsByTableId(tableIds)
} else {
Seq[BackupGraph]()
}
// Key the backupMap by the table ID.
val backupMap = backupGraphs.map { graph =>
(graph.tableId, graph)
}.toMap
// Parallelize the processing. Managing resources of parallel backup jobs is very complex, so
// only the simplest possible thing is attempted. Kudu trusts Spark to manage resources.
val parallelTables = options.tables.par
parallelTables.tasksupport = new ForkJoinTaskSupport(
new ForkJoinPool(options.numParallelBackups))
val backupResults = parallelTables.map { tableName =>
val backupResult = Try(doBackup(tableName, context, session, io, options, backupMap))
backupResult match {
case Success(()) =>
log.info(s"Successfully backed up table $tableName")
case Failure(ex) =>
if (options.numParallelBackups == 1 && options.failOnFirstError)
throw ex
else
log.error(s"Failed to back up table $tableName", ex)
}
(tableName, backupResult)
}
backupResults.filter(_._2.isFailure).foreach {
case (tableName, ex) =>
log.error(
s"Failed to back up table $tableName: Look back in the logs for the full exception. Error: ${ex.toString}")
}
!backupResults.exists(_._2.isFailure)
}
def main(args: Array[String]): Unit = {
val options = BackupOptions
.parse(args)
.getOrElse(throw new IllegalArgumentException("could not parse the arguments"))
val session = SparkSession
.builder()
.appName("Kudu Table Backup")
.getOrCreate()
val isRunSuccessful: Boolean = run(options, session)
if (!isRunSuccessful) {
throw new RuntimeException("Kudu Table Backup application failed")
}
session.stop()
}
}