blob: 663e247d1405b7c6309f0de2f7107c09a3fb9608 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.examples
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.carbondata.examples.util.ExampleUtils
case class StructElement(school: Array[String], age: Int)
case class StructElement1(school: Array[String], school1: Array[String], age: Int)
case class ComplexTypeData(id: Int, name: String, city: String, salary: Float, file: StructElement)
case class ComplexTypeData1(id: Int,
name: String,
city: String,
salary: Float,
file: StructElement1)
case class ComplexTypeData2(id: Int, name: String, city: String, salary: Float, file: Array[String])
// scalastyle:off println
object DataFrameComplexTypeExample {
def main(args: Array[String]) {
val spark = ExampleUtils.createCarbonSession("DataFrameComplexTypeExample", 4)
exampleBody(spark)
spark.close()
}
def exampleBody(spark: SparkSession): Unit = {
val complexTypeNoDictionaryTableName = s"complex_type_noDictionary_table"
val complexTypeNoDictionaryTableNameArray = s"complex_type_noDictionary_array_table"
import spark.implicits._
// drop table if exists previously
spark.sql(s"DROP TABLE IF EXISTS ${ complexTypeNoDictionaryTableName }")
spark.sql(s"DROP TABLE IF EXISTS ${ complexTypeNoDictionaryTableNameArray }")
spark.sql(
s"""
| CREATE TABLE ${ complexTypeNoDictionaryTableNameArray }(
| id INT,
| name STRING,
| city STRING,
| salary FLOAT,
| file array<string>
| )
| STORED BY 'carbondata'
| TBLPROPERTIES(
| 'sort_columns'='name')
| """.stripMargin)
spark.sql(
s"""
| CREATE TABLE ${ complexTypeNoDictionaryTableName }(
| id INT,
| name STRING,
| city STRING,
| salary FLOAT,
| file struct<school:array<string>, school1:array<string>, age:int>
| )
| STORED BY 'carbondata'
| TBLPROPERTIES(
| 'sort_columns'='name')
| """.stripMargin)
val sc = spark.sparkContext
// generate data
val df2 = sc.parallelize(Seq(
ComplexTypeData2(1, "index_1", "city_1", 10000.0f, Array("struct_11", "struct_12")),
ComplexTypeData2(2, "index_2", "city_2", 20000.0f, Array("struct_21", "struct_22")),
ComplexTypeData2(3, "index_3", "city_3", 30000.0f, Array("struct_31", "struct_32"))
)).toDF
// generate data
val df1 = sc.parallelize(Seq(
ComplexTypeData1(1, "index_1", "city_1", 10000.0f,
StructElement1(Array("struct_11", "struct_12"), Array("struct_11", "struct_12"), 10)),
ComplexTypeData1(2, "index_2", "city_2", 20000.0f,
StructElement1(Array("struct_21", "struct_22"), Array("struct_11", "struct_12"), 20)),
ComplexTypeData1(3, "index_3", "city_3", 30000.0f,
StructElement1(Array("struct_31", "struct_32"), Array("struct_11", "struct_12"), 30))
)).toDF
df1.printSchema()
df1.write
.format("carbondata")
.option("tableName", complexTypeNoDictionaryTableName)
.mode(SaveMode.Append)
.save()
df2.printSchema()
df2.write
.format("carbondata")
.option("tableName", complexTypeNoDictionaryTableNameArray)
.mode(SaveMode.Append)
.save()
spark.sql(s"select count(*) from ${ complexTypeNoDictionaryTableName }")
.show(100, truncate = false)
spark.sql(s"select * from ${ complexTypeNoDictionaryTableName } order by id desc")
.show(300, truncate = false)
spark.sql(s"select * " +
s"from ${ complexTypeNoDictionaryTableName } " +
s"where id = 100000001 or id = 1 limit 100").show(100, truncate = false)
spark.sql(s"select * " +
s"from ${ complexTypeNoDictionaryTableName } " +
s"where id > 10 limit 100").show(100, truncate = false)
// show segments
spark.sql(s"SHOW SEGMENTS FOR TABLE ${ complexTypeNoDictionaryTableName }").show(false)
// drop table
spark.sql(s"DROP TABLE IF EXISTS ${ complexTypeNoDictionaryTableName }")
spark.sql(s"select count(*) from ${ complexTypeNoDictionaryTableNameArray }")
.show(100, truncate = false)
spark.sql(s"select * from ${ complexTypeNoDictionaryTableNameArray } order by id desc")
.show(300, truncate = false)
spark.sql(s"select * " +
s"from ${ complexTypeNoDictionaryTableNameArray } " +
s"where id = 100000001 or id = 1 limit 100").show(100, truncate = false)
spark.sql(s"select * " +
s"from ${ complexTypeNoDictionaryTableNameArray } " +
s"where id > 10 limit 100").show(100, truncate = false)
// show segments
spark.sql(s"SHOW SEGMENTS FOR TABLE ${ complexTypeNoDictionaryTableNameArray }").show(false)
// drop table
spark.sql(s"DROP TABLE IF EXISTS ${ complexTypeNoDictionaryTableNameArray }")
}
}
// scalastyle:on println