blob: 7e32acbb15efc4d545415dedfeabf3e1df83d6b2 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.kudu.spark.tools
import java.io.{File, FileOutputStream}
import org.apache.kudu.ColumnSchema.ColumnSchemaBuilder
import org.apache.kudu.{Schema, Type}
import org.apache.kudu.client.CreateTableOptions
import org.apache.kudu.spark.kudu._
import org.apache.spark.sql.SQLContext
import org.junit.Assert._
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import org.scalatest.{FunSuite, Matchers}
import org.spark_project.guava.collect.ImmutableList
import scala.collection.JavaConverters._
@RunWith(classOf[JUnitRunner])
class TestImportExportFiles extends FunSuite with TestContext with Matchers {
private val TABLE_NAME: String = classOf[TestImportExportFiles].getName + "-" + System.currentTimeMillis
var sqlContext : SQLContext = _
var kuduOptions : Map[String, String] = _
test("Spark Import Export") {
val schema: Schema = {
val columns = ImmutableList.of(
new ColumnSchemaBuilder("key", Type.STRING).key(true).build(),
new ColumnSchemaBuilder("column1_i", Type.STRING).build(),
new ColumnSchemaBuilder("column2_d", Type.STRING).nullable(true).build(),
new ColumnSchemaBuilder("column3_s", Type.STRING).build(),
new ColumnSchemaBuilder("column4_b", Type.STRING).build())
new Schema(columns)
}
val tableOptions = new CreateTableOptions().setRangePartitionColumns(List("key").asJava)
.setNumReplicas(1)
kuduClient.createTable(TABLE_NAME, schema, tableOptions)
val data: File = new File("target/", TABLE_NAME+".csv")
writeCsvFile(data)
ImportExportFiles.testMain(Array("--operation=import",
"--format=csv",
s"--master-addrs=${miniCluster.getMasterAddresses}",
s"--path=${"target/"+TABLE_NAME+".csv"}",
s"--table-name=${TABLE_NAME}",
"--delimiter=,",
"--header=true",
"--inferschema=true"), sc)
val rdd = kuduContext.kuduRDD(sc, TABLE_NAME, List("key"))
assert(rdd.collect.length == 4)
assertEquals(rdd.collect().mkString(","),"[1],[2],[3],[4]")
}
def writeCsvFile(data: File)
{
val fos: FileOutputStream = new FileOutputStream(data)
fos.write("key,column1_i,column2_d,column3_s,column4_b\n".getBytes)
fos.write("1,3,2.3,some string,true\n".getBytes)
fos.write("2,5,4.5,some more,false\n".getBytes)
fos.write("3,7,1.2,wait this is not a double bad row,true\n".getBytes)
fos.write("4,9,10.1,trailing separator isn't bad mkay?,true\n".getBytes)
fos.close()
}
}