blob: 5201e60fd9999846a399b71da6eb5bd212236317 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.kudu.backup
import org.apache.kudu.client.AsyncKuduClient
import org.apache.yetus.audience.InterfaceAudience
import org.apache.yetus.audience.InterfaceStability
import scopt.OptionParser
case class BackupOptions(
tables: Seq[String],
rootPath: String,
kuduMasterAddresses: String = InetAddress.getLocalHost.getCanonicalHostName,
toMs: Long = System.currentTimeMillis(),
forceFull: Boolean = BackupOptions.DefaultForceFull,
fromMs: Long = BackupOptions.DefaultFromMS,
format: String = BackupOptions.DefaultFormat,
scanBatchSize: Int = BackupOptions.DefaultScanBatchSize,
scanRequestTimeoutMs: Long = BackupOptions.DefaultScanRequestTimeoutMs,
scanLeaderOnly: Boolean = BackupOptions.DefaultScanLeaderOnly,
scanPrefetching: Boolean = BackupOptions.DefaultScanPrefetching,
keepAlivePeriodMs: Long = BackupOptions.DefaultKeepAlivePeriodMs,
failOnFirstError: Boolean = BackupOptions.DefaultFailOnFirstError,
numParallelBackups: Int = BackupOptions.DefaultNumParallelBackups,
splitSizeBytes: Option[Long] = None)
object BackupOptions {
val DefaultForceFull: Boolean = false
val DefaultFromMS: Long = 0
val DefaultFormat: String = "parquet"
val DefaultScanBatchSize: Int = 1024 * 1024 * 20 // 20 MiB
val DefaultScanRequestTimeoutMs: Long =
AsyncKuduClient.DEFAULT_OPERATION_TIMEOUT_MS // 30 seconds
val DefaultScanLeaderOnly: Boolean = false
// TODO (KUDU-1260): Add a test and enable by default?
val DefaultScanPrefetching: Boolean = false
val DefaultKeepAlivePeriodMs: Long = AsyncKuduClient.DEFAULT_KEEP_ALIVE_PERIOD_MS
val DefaultFailOnFirstError: Boolean = false
val DefaultNumParallelBackups = 1
val DefaultSplitSizeBytes: Option[Long] = None
// We use the program name to make the help output show a the spark invocation required.
val ClassName: String = KuduBackup.getClass.getCanonicalName.dropRight(1) // Remove trailing `$`
val ProgramName: String = "spark-submit --class " + ClassName + " [spark-options] " +
val parser: OptionParser[BackupOptions] =
new OptionParser[BackupOptions](ProgramName) {
.action((v, o) => o.copy(rootPath = v))
.text("The root path to output backup data. Accepts any Spark compatible path.")
.action((v, o) => o.copy(kuduMasterAddresses = v))
.text("Comma-separated addresses of Kudu masters. Default: localhost")
.action((v, o) => o.copy(forceFull = v))
.text("If true, this will be a full backup even if another full already exists. " +
"Default: " + DefaultForceFull)
.action((v, o) => o.copy(fromMs = v))
"A UNIX timestamp in milliseconds that defines the start time of an incremental " +
"backup. If unset, the fromMs will be defined by previous backups in the root " +
.action((v, o) => o.copy(toMs = v))
// TODO (KUDU-2677): Document the limitations based on cluster configuration.
.text("A UNIX timestamp in milliseconds since the epoch to execute scans at. " +
"Default: `System.currentTimeMillis()`")
.action((v, o) => o.copy(scanBatchSize = v))
.text("The maximum number of bytes returned by the scanner, on each batch. " +
"Default: " + DefaultScanBatchSize)
.action((v, o) => o.copy(scanRequestTimeoutMs = v))
.text("Sets how long in milliseconds each scan request to a server can last. " +
"Default: " + DefaultScanRequestTimeoutMs)
.action((v, o) => o.copy(keepAlivePeriodMs = v))
.text("Sets the period at which to send keep-alive requests to the tablet server to " +
"ensure that scanners do not time out. Default: " + DefaultKeepAlivePeriodMs)
.action((v, o) => o.copy(scanLeaderOnly = v))
.text("If true scans will only use the leader replica, otherwise scans will take place " +
"at the closest replica. Default: " + DefaultScanLeaderOnly)
.action((v, o) => o.copy(format = v))
.text("The file format to use when writing the data. Default: " + DefaultFormat)
.action((_, o) => o.copy(scanPrefetching = true))
.text("An experimental flag to enable pre-fetching data. " +
"Default: " + DefaultScanPrefetching)
.action((_, o) => o.copy(failOnFirstError = true))
.text("Whether to fail the backup job as soon as a single table backup fails. " +
"Default: " + DefaultFailOnFirstError)
.action((v, o) => o.copy(numParallelBackups = v))
"The number of tables to back up in parallel. Backup leaves it to Spark to manage " +
"the resources of parallel jobs. Overrides --failOnFirstError. This option is " +
"experimental. Default: " + DefaultNumParallelBackups)
.action((v, o) => o.copy(splitSizeBytes = Some(v)))
"Sets the target number of bytes per spark task. If set, tablet's primary key range " +
"will be split to generate uniform task sizes instead of the default of 1 task per " +
"tablet. This option is experimental.")
help("help").text("prints this usage text")
.action((v, o) => o.copy(tables = o.tables :+ v))
.text("A list of tables to be backed up.")
* Parses the passed arguments into Some[KuduBackupOptions].
* If the arguments are bad, an error message is displayed
* and None is returned.
* @param args The arguments to parse.
* @return Some[KuduBackupOptions] if parsing was successful, None if not.
def parse(args: Seq[String]): Option[BackupOptions] = {
parser.parse(args, BackupOptions(Seq(), null))
case class RestoreOptions(
tables: Seq[String],
rootPath: String,
kuduMasterAddresses: String = InetAddress.getLocalHost.getCanonicalHostName,
removeImpalaPrefix: Boolean = RestoreOptions.DefaultRemoveImpalaPrefix,
newDatabaseName: String = "",
tableSuffix: String = "",
createTables: Boolean = RestoreOptions.DefaultCreateTables,
timestampMs: Long = System.currentTimeMillis(),
failOnFirstError: Boolean = RestoreOptions.DefaultFailOnFirstError,
numParallelRestores: Int = RestoreOptions.DefaultNumParallelRestores,
restoreOwner: Boolean = RestoreOptions.DefaultRestoreOwner)
object RestoreOptions {
val DefaultRemoveImpalaPrefix: Boolean = false
val DefaultCreateTables: Boolean = true
val DefaultFailOnFirstError = false
val DefaultNumParallelRestores = 1
val DefaultRestoreOwner: Boolean = true
val ClassName: String = KuduRestore.getClass.getCanonicalName.dropRight(1) // Remove trailing `$`
val ProgramName: String = "spark-submit --class " + ClassName + " [spark-options] " +
val parser: OptionParser[RestoreOptions] =
new OptionParser[RestoreOptions](ProgramName) {
.action((v, o) => o.copy(rootPath = v))
.text("The root path to the backup data. Accepts any Spark compatible path.")
.action((v, o) => o.copy(kuduMasterAddresses = v))
.text("Comma-separated addresses of Kudu masters. Default: localhost")
.action((v, o) => o.copy(createTables = v))
.text("If true, create the tables during restore. Set to false if the target tables " +
"already exist. Default: " + DefaultCreateTables)
.action((v, o) => o.copy(removeImpalaPrefix = v))
.text("If true, removes the \"impala::\" prefix, if present from the restored table names. This is " +
"advisable if backup was taken in a Kudu cluster without HMS sync and restoring to " +
"Kudu cluster which has HMS sync in place. Only used when createTables is true. Default: " +
.action((v, o) => o.copy(newDatabaseName = v))
"If set, replaces the existing database name and if there is no existing database name, a new database " +
"name is added. Setting this to an empty string will have the same effect of not using the flag at all. " +
"For example, if this is set to newdb for the tables testtable and impala::db.testtable the restored " +
"tables will have the names newdb.testtable and impala::newdb.testtable respectively, assuming " +
"removeImpalaPrefix is set to false")
.action((v, o) => o.copy(tableSuffix = v))
.text("If set, the suffix to add to the restored table names. Only used when " +
"createTables is true.")
.action((v, o) => o.copy(timestampMs = v))
.text("A UNIX timestamp in milliseconds that defines the latest time to use when " +
"selecting restore candidates. Default: `System.currentTimeMillis()`")
.action((v, o) => o.copy(failOnFirstError = true))
.text("Whether to fail the restore job as soon as a single table restore fails. " +
"Default: " + DefaultFailOnFirstError)
.action((v, o) => o.copy(numParallelRestores = v))
"The number of tables to restore in parallel. Restore leaves it to Spark to manage " +
"the resources of parallel jobs. Overrides --failOnFirstError. This option is " +
"experimental. Default: " + DefaultNumParallelRestores)
.action((v, o) => o.copy(restoreOwner = v))
"If true, it restores table ownership when creating new tables, otherwise creates " +
"tables as the logged in user. Only used when createTables is true. Default: " +
help("help").text("prints this usage text")
.action((v, o) => o.copy(tables = o.tables :+ v))
.text("A list of tables to be restored.")
* Parses the passed arguments into Some[KuduRestoreOptions].
* If the arguments are bad, an error message is displayed
* and None is returned.
* @param args The arguments to parse.
* @return Some[KuduRestoreOptions] if parsing was successful, None if not.
def parse(args: Seq[String]): Option[RestoreOptions] = {
parser.parse(args, RestoreOptions(Seq(), null))