blob: d555954b9e9fc3054662c6b3691d3432bb7a6169 [file] [log] [blame]
/*
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package org.apache.phoenix.spark
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.{HBaseConfiguration, HConstants}
import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver
import org.apache.phoenix.mapreduce.util.{ColumnInfoToStringEncoderDecoder, PhoenixConfigurationUtil}
import org.apache.phoenix.util.{ColumnInfo, PhoenixRuntime}
import scala.collection.JavaConversions._
@deprecated("Use the DataSource V2 API implementation (see PhoenixDataSource)")
object ConfigurationUtil extends Serializable {
def getOutputConfiguration(tableName: String, columns: Seq[String], zkUrl: Option[String], tenantId: Option[String] = None, conf: Option[Configuration] = None): Configuration = {
// Create an HBaseConfiguration object from the passed in config, if present
val config = conf match {
case Some(c) => HBaseConfiguration.create(c)
case _ => HBaseConfiguration.create()
}
// Set the tenantId in the config if present
tenantId match {
case Some(id) => setTenantId(config, id)
case _ =>
}
// Set the table to save to
PhoenixConfigurationUtil.setOutputTableName(config, tableName)
PhoenixConfigurationUtil.setPhysicalTableName(config, tableName)
// Infer column names from the DataFrame schema
PhoenixConfigurationUtil.setUpsertColumnNames(config, Array(columns : _*))
// Override the Zookeeper URL if present. Throw exception if no address given.
zkUrl match {
case Some(url) => setZookeeperURL(config, url)
case _ => {
if (ConfigurationUtil.getZookeeperURL(config).isEmpty) {
throw new UnsupportedOperationException(
s"One of zkUrl or '${HConstants.ZOOKEEPER_QUORUM}' config property must be provided"
)
}
}
}
// Return the configuration object
config
}
def setZookeeperURL(conf: Configuration, zkUrl: String) = {
val info = PhoenixEmbeddedDriver.ConnectionInfo.create(zkUrl)
conf.set(HConstants.ZOOKEEPER_QUORUM, info.getZookeeperQuorum)
if (info.getPort != null)
conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, info.getPort)
if (info.getRootNode != null)
conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, info.getRootNode)
}
def setTenantId(conf: Configuration, tenantId: String) = {
conf.set(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId)
}
// Return a serializable representation of the columns
def encodeColumns(conf: Configuration) = {
ColumnInfoToStringEncoderDecoder.encode(conf, PhoenixConfigurationUtil.getUpsertColumnMetadataList(conf)
)
}
// Decode the columns to a list of ColumnInfo objects
def decodeColumns(conf: Configuration): List[ColumnInfo] = {
ColumnInfoToStringEncoderDecoder.decode(conf).toList
}
def getZookeeperURL(conf: Configuration): Option[String] = {
List(
Option(conf.get(HConstants.ZOOKEEPER_QUORUM)),
Option(conf.get(HConstants.ZOOKEEPER_CLIENT_PORT)),
Option(conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT))
).flatten match {
case Nil => None
case x: List[String] => Some(x.mkString(":"))
}
}
}