| /* |
| Licensed under the Apache License, Version 2.0 (the "License"); |
| you may not use this file except in compliance with the License. |
| You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| */ |
| package org.apache.phoenix.spark |
| |
| import org.apache.hadoop.conf.Configuration |
| import org.apache.hadoop.io.NullWritable |
| import org.apache.phoenix.mapreduce.PhoenixOutputFormat |
| import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil |
| import org.apache.phoenix.util.SchemaUtil |
| import org.apache.spark.sql.DataFrame |
| |
| import scala.collection.JavaConversions._ |
| |
| @deprecated("Use the DataSource V2 API implementation (see PhoenixDataSource)") |
| class DataFrameFunctions(data: DataFrame) extends Serializable { |
| def saveToPhoenix(parameters: Map[String, String]): Unit = { |
| saveToPhoenix(parameters("table"), zkUrl = parameters.get("zkUrl"), tenantId = parameters.get("TenantId"), |
| skipNormalizingIdentifier=parameters.contains("skipNormalizingIdentifier")) |
| } |
| def saveToPhoenix(tableName: String, conf: Configuration = new Configuration, |
| zkUrl: Option[String] = None, tenantId: Option[String] = None, skipNormalizingIdentifier: Boolean = false): Unit = { |
| |
| // Retrieve the schema field names and normalize to Phoenix, need to do this outside of mapPartitions |
| val fieldArray = getFieldArray(skipNormalizingIdentifier, data) |
| |
| |
| // Create a configuration object to use for saving |
| @transient val outConfig = ConfigurationUtil.getOutputConfiguration(tableName, fieldArray, zkUrl, tenantId, Some(conf)) |
| |
| // Retrieve the zookeeper URL |
| val zkUrlFinal = ConfigurationUtil.getZookeeperURL(outConfig) |
| |
| // Map the row objects into PhoenixRecordWritable |
| val phxRDD = data.rdd.mapPartitions{ rows => |
| |
| // Create a within-partition config to retrieve the ColumnInfo list |
| @transient val partitionConfig = ConfigurationUtil.getOutputConfiguration(tableName, fieldArray, zkUrlFinal, tenantId) |
| @transient val columns = PhoenixConfigurationUtil.getUpsertColumnMetadataList(partitionConfig).toList |
| |
| rows.map { row => |
| val rec = new PhoenixRecordWritable(columns) |
| row.toSeq.foreach { e => rec.add(e) } |
| (null, rec) |
| } |
| } |
| |
| // Save it |
| phxRDD.saveAsNewAPIHadoopFile( |
| Option( |
| conf.get("mapreduce.output.fileoutputformat.outputdir") |
| ).getOrElse( |
| Option(conf.get("mapred.output.dir")).getOrElse("") |
| ), |
| classOf[NullWritable], |
| classOf[PhoenixRecordWritable], |
| classOf[PhoenixOutputFormat[PhoenixRecordWritable]], |
| outConfig |
| ) |
| } |
| |
| def getFieldArray(skipNormalizingIdentifier: Boolean = false, data: DataFrame) = { |
| if (skipNormalizingIdentifier) { |
| data.schema.fieldNames.map(x => x) |
| } else { |
| data.schema.fieldNames.map(x => SchemaUtil.normalizeIdentifier(x)) |
| } |
| } |
| } |