blob: a0340990db52ad1138081b27b11aa422b9fc7609 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.kududb.spark
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.NullWritable
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext
import org.kududb.annotations.InterfaceStability
import org.kududb.client.{AsyncKuduClient, KuduClient, RowResult}
import org.kududb.mapreduce.KuduTableInputFormat
/**
* KuduContext is a façade for Kudu operations.
*
* If a Kudu client connection is needed as part of a Spark application, a
* [[KuduContext]] should used as a broadcast variable in the job in order to
* share connections among the tasks in a JVM.
*/
@InterfaceStability.Unstable
class KuduContext(kuduMaster: String) extends Serializable {
@transient lazy val syncClient = new KuduClient.KuduClientBuilder(kuduMaster).build()
@transient lazy val asyncClient = new AsyncKuduClient.AsyncKuduClientBuilder(kuduMaster).build()
/**
* Create an RDD from a Kudu table.
*
* @param tableName table to read from
* @param columnProjection list of columns to read
*
* Not specifying this at all (i.e. setting to null) or setting to the special string
* '*' means to project all columns.
* @return a new RDD that maps over the given table for the selected columns
*/
def kuduRDD(sc: SparkContext,
tableName: String,
columnProjection: Seq[String] = Nil): RDD[RowResult] = {
val conf = new Configuration
conf.set("kudu.mapreduce.master.address", kuduMaster)
conf.set("kudu.mapreduce.input.table", tableName)
if (columnProjection.nonEmpty) {
conf.set("kudu.mapreduce.column.projection", columnProjection.mkString(","))
}
val rdd = sc.newAPIHadoopRDD(conf, classOf[KuduTableInputFormat],
classOf[NullWritable], classOf[RowResult])
val columnNames = if (columnProjection.nonEmpty) columnProjection.mkString(", ") else "(*)"
rdd.values.setName(s"KuduRDD { table=$tableName, columnProjection=$columnNames }")
}
}