blob: 8629a143ea6cf4727f5fa9119f3d144b8e5e14a1 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.kudu.spark.tools
import java.io.File
import java.nio.file.Files
import java.nio.file.Paths
import com.google.common.collect.ImmutableList
import org.apache.kudu.ColumnSchema.ColumnSchemaBuilder
import org.apache.kudu.Schema
import org.apache.kudu.Type
import org.apache.kudu.client.CreateTableOptions
import org.apache.kudu.client.KuduTable
import org.apache.kudu.spark.kudu._
import org.junit.Assert.assertEquals
import org.junit.Before
import org.junit.Test
import scala.collection.JavaConverters._
class TestImportExportFiles extends KuduTestSuite {
private val TableDataPath = "/TestImportExportFiles.csv"
private val TableName = "TestImportExportFiles"
private val TableSchema = {
val columns = ImmutableList.of(
new ColumnSchemaBuilder("key", Type.STRING).key(true).build(),
new ColumnSchemaBuilder("column1_i", Type.STRING).build(),
new ColumnSchemaBuilder("column2_d", Type.STRING)
.nullable(true)
.build(),
new ColumnSchemaBuilder("column3_s", Type.STRING).build(),
new ColumnSchemaBuilder("column4_b", Type.STRING).build()
)
new Schema(columns)
}
private val options = new CreateTableOptions()
.setRangePartitionColumns(List("key").asJava)
.setNumReplicas(1)
@Before
def setUp(): Unit = {
kuduClient.createTable(TableName, TableSchema, options)
}
@Test
def testCSVImport() {
// Get the absolute path of the resource file.
val schemaResource =
classOf[TestImportExportFiles].getResource(TableDataPath)
val dataPath = Paths.get(schemaResource.toURI).toAbsolutePath
ImportExportFiles.testMain(
Array(
"--operation=import",
"--format=csv",
s"--master-addrs=${harness.getMasterAddressesAsString}",
s"--path=$dataPath",
s"--table-name=$TableName",
"--delimiter=,",
"--header=true",
"--inferschema=true"
),
ss
)
val rdd = kuduContext.kuduRDD(ss.sparkContext, TableName, List("key"))
assert(rdd.collect.length == 4)
assertEquals(rdd.collect().mkString(","), "[1],[2],[3],[4]")
}
@Test
def testRoundTrips(): Unit = {
val table = kuduClient.openTable(TableName)
loadSampleData(table, 50)
runRoundTripTest(TableName, s"$TableName-avro", "avro")
runRoundTripTest(TableName, s"$TableName-csv", "csv")
runRoundTripTest(TableName, s"$TableName-parquet", "parquet")
}
// TODO(KUDU-2454): Use random schemas and random data to ensure all type/values round-trip.
private def loadSampleData(table: KuduTable, numRows: Int): Unit = {
val session = kuduClient.newSession()
val rows = Range(0, numRows).map { i =>
val insert = table.newInsert
val row = insert.getRow
row.addString(0, i.toString)
row.addString(1, i.toString)
row.addString(3, i.toString)
row.addString(4, i.toString)
session.apply(insert)
}
session.close
}
private def runRoundTripTest(fromTable: String, toTable: String, format: String): Unit = {
val dir = Files.createTempDirectory("round-trip")
val path = new File(dir.toFile, s"$fromTable-$format").getAbsolutePath
// Export the data.
ImportExportFiles.testMain(
Array(
"--operation=export",
s"--format=$format",
s"--master-addrs=${harness.getMasterAddressesAsString}",
s"--path=$path",
s"--table-name=$fromTable",
s"--header=true"
),
ss
)
// Create the target table.
kuduClient.createTable(toTable, TableSchema, options)
// Import the data.
ImportExportFiles.testMain(
Array(
"--operation=import",
s"--format=$format",
s"--master-addrs=${harness.getMasterAddressesAsString}",
s"--path=$path",
s"--table-name=$toTable",
s"--header=true"
),
ss
)
// Verify the tables match.
// TODO(KUDU-2454): Verify every value to ensure all values round trip.
val rdd1 = kuduContext.kuduRDD(ss.sparkContext, fromTable, List("key"))
val rdd2 = kuduContext.kuduRDD(ss.sparkContext, toTable, List("key"))
assertEquals(rdd1.count(), rdd2.count())
}
}