blob: 2507853164604b6aa4b81104e6ed01f64aec6012 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kudu.spark.tools
import java.io.{File, FileOutputStream}
import org.apache.kudu.ColumnSchema.ColumnSchemaBuilder
import org.apache.kudu.{Schema, Type}
import org.apache.kudu.client.CreateTableOptions
import org.apache.kudu.spark.kudu._
import org.apache.spark.sql.SQLContext
import org.junit.Assert._
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import org.scalatest.{FunSuite, Matchers}
import org.spark_project.guava.collect.ImmutableList
import scala.collection.JavaConverters._
@RunWith(classOf[JUnitRunner])
class TestImportExportFiles extends FunSuite with TestContext with Matchers {
private val TABLE_NAME: String = classOf[TestImportExportFiles].getName + "-" + System.currentTimeMillis
var sqlContext : SQLContext = _
var kuduOptions : Map[String, String] = _
test("Spark Import Export") {
val schema: Schema = {
val columns = ImmutableList.of(
new ColumnSchemaBuilder("key", Type.STRING).key(true).build(),
new ColumnSchemaBuilder("column1_i", Type.STRING).build(),
new ColumnSchemaBuilder("column2_d", Type.STRING).nullable(true).build(),
new ColumnSchemaBuilder("column3_s", Type.STRING).build(),
new ColumnSchemaBuilder("column4_b", Type.STRING).build())
new Schema(columns)
}
val tableOptions = new CreateTableOptions().setRangePartitionColumns(List("key").asJava)
.setNumReplicas(1)
kuduClient.createTable(TABLE_NAME, schema, tableOptions)
val data: File = new File("target/", TABLE_NAME+".csv")
writeCsvFile(data)
ImportExportFiles.testMain(Array("--operation=import",
"--format=csv",
s"--master-addrs=${miniCluster.getMasterAddresses}",
s"--path=${"target/"+TABLE_NAME+".csv"}",
s"--table-name=${TABLE_NAME}",
"--delimiter=,",
"--header=true",
"--inferschema=true"), sc)
val rdd = kuduContext.kuduRDD(sc, TABLE_NAME, List("key"))
assert(rdd.collect.length == 4)
assertEquals(rdd.collect().mkString(","),"[1],[2],[3],[4]")
}
def writeCsvFile(data: File)
{
val fos: FileOutputStream = new FileOutputStream(data)
fos.write("key,column1_i,column2_d,column3_s,column4_b\n".getBytes)
fos.write("1,3,2.3,some string,true\n".getBytes)
fos.write("2,5,4.5,some more,false\n".getBytes)
fos.write("3,7,1.2,wait this is not a double bad row,true\n".getBytes)
fos.write("4,9,10.1,trailing separator isn't bad mkay?,true\n".getBytes)
fos.close()
}
}