blob: 0700c255e1fa0cefcaded40ffc67eb3756bde124 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.kudu.spark.tools
import java.net.InetAddress
import org.apache.kudu.client.KuduClient
import org.apache.kudu.spark.tools.ImportExportKudu.ArgsCls
import org.apache.spark.sql.SparkSession
import org.apache.spark.SparkConf
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.apache.kudu.spark.kudu._
import org.apache.yetus.audience.InterfaceAudience
import org.apache.yetus.audience.InterfaceStability
@InterfaceAudience.Public
@InterfaceStability.Unstable //TODO: Unstable due to KUDU-2454
object ImportExportKudu {
val log: Logger = LoggerFactory.getLogger(ImportExportKudu.getClass)
def fail(msg: String): Nothing = {
System.err.println(msg)
sys.exit(1)
}
def defaultMasterAddrs: String = InetAddress.getLocalHost.getCanonicalHostName
def usage: String =
s"""
| Usage: --operation=import/export --format=<data-format(csv,parquet,avro)> --master-addrs=<master-addrs> --path=<path> --table-name=<table-name>
| where
| operation: import or export data from or to Kudu tables, default: import
| format: specify the format of data want to import/export, the following formats are supported csv,parquet,avro default:csv
| masterAddrs: comma separated addresses of Kudu master nodes, default: $defaultMasterAddrs
| path: path to input or output for import/export operation, default: file://
| tableName: table name to import/export, default: ""
| columns: columns name for select statement on export from kudu table, default: *
| delimiter: delimiter for csv import/export, default: ,
| header: header for csv import/export, default:false
""".stripMargin
case class ArgsCls(
operation: String = "import",
format: String = "csv",
masterAddrs: String = defaultMasterAddrs,
path: String = "file://",
tableName: String = "",
columns: String = "*",
delimiter: String = ",",
header: String = "false",
inferschema: String = "false")
object ArgsCls {
private def parseInner(options: ArgsCls, args: List[String]): ArgsCls = {
log.info(args.mkString(","))
args match {
case Nil => options
case "--help" :: _ =>
System.err.println(usage)
sys.exit(0)
case flag :: Nil => fail(s"flag $flag has no value\n$usage")
case flag :: value :: tail =>
val newOptions: ArgsCls = flag match {
case "--operation" => options.copy(operation = value)
case "--format" => options.copy(format = value)
case "--master-addrs" => options.copy(masterAddrs = value)
case "--path" => options.copy(path = value)
case "--table-name" => options.copy(tableName = value)
case "--columns" => options.copy(columns = value)
case "--delimiter" => options.copy(delimiter = value)
case "--header" => options.copy(header = value)
case "--inferschema" => options.copy(inferschema = value)
case _ => fail(s"unknown argument given $flag")
}
parseInner(newOptions, tail)
}
}
def parse(args: Array[String]): ArgsCls = {
parseInner(ArgsCls(), args.flatMap(_.split('=')).toList)
}
}
}
object ImportExportFiles {
import ImportExportKudu.fail
def run(args: ArgsCls, ss: SparkSession): Unit = {
val kc = new KuduContext(args.masterAddrs, ss.sparkContext)
val sqlContext = ss.sqlContext
val client: KuduClient = kc.syncClient
if (!client.tableExists(args.tableName)) {
fail(args.tableName + s" table doesn't exist")
}
val kuduOptions =
Map("kudu.table" -> args.tableName, "kudu.master" -> args.masterAddrs)
args.operation match {
case "import" =>
args.format match {
case "csv" =>
val df = sqlContext.read
.option("header", args.header)
.option("delimiter", args.delimiter)
.csv(args.path)
kc.upsertRows(df, args.tableName)
case "parquet" =>
val df = sqlContext.read.parquet(args.path)
kc.upsertRows(df, args.tableName)
case "avro" =>
val df = sqlContext.read
.format("com.databricks.spark.avro")
.load(args.path)
kc.upsertRows(df, args.tableName)
case _ => fail(args.format + s"unknown argument given ")
}
case "export" =>
val df = sqlContext.read.options(kuduOptions).format("kudu").load.select(args.columns)
args.format match {
case "csv" =>
df.write
.format("com.databricks.spark.csv")
.option("header", args.header)
.option("delimiter", args.delimiter)
.save(args.path)
case "parquet" =>
df.write.parquet(args.path)
case "avro" =>
df.write.format("com.databricks.spark.avro").save(args.path)
case _ => fail(args.format + s"unknown argument given ")
}
case _ => fail(args.operation + s"unknown argument given ")
}
}
/**
* Entry point for testing. SparkContext is a singleton,
* so tests must create and manage their own.
*/
@InterfaceAudience.LimitedPrivate(Array("Test"))
def testMain(args: Array[String], ss: SparkSession): Unit = {
run(ArgsCls.parse(args), ss)
}
def main(args: Array[String]): Unit = {
val conf =
new SparkConf().setAppName("Import or Export CSV files from/to Kudu ")
val ss = SparkSession.builder().config(conf).getOrCreate()
testMain(args, ss)
}
}