blob: e82448c8268917cc8c46369f39b0699fc0563b43 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.carbondata.spark.testsuite.insertQuery
import org.apache.spark.sql.{Row, SaveMode}
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.BeforeAndAfterAll
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties
class InsertIntoNonCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
override def beforeAll {
sql("drop table if exists TCarbonSource")
sql(
"create table TCarbonSource (imei string,deviceInformationId int,MAC string,deviceColor " +
"string,device_backColor string,modelId string,marketName string,AMSize string,ROMSize " +
"string,CUPAudit string,CPIClocked string,series string,productionDate timestamp,bomCode " +
"string,internalModels string, deliveryTime string, channelsId string, channelsName string " +
", deliveryAreaId string, deliveryCountry string, deliveryProvince string, deliveryCity " +
"string,deliveryDistrict string, deliveryStreet string, oxSingleNumber string, " +
"ActiveCheckTime string, ActiveAreaId string, ActiveCountry string, ActiveProvince string, " +
"Activecity string, ActiveDistrict string, ActiveStreet string, ActiveOperatorId string, " +
"Active_releaseId string, Active_EMUIVersion string, Active_operaSysVersion string, " +
"Active_BacVerNumber string, Active_BacFlashVer string, Active_webUIVersion string, " +
"Active_webUITypeCarrVer string,Active_webTypeDataVerNumber string, Active_operatorsVersion" +
" string, Active_phonePADPartitionedVersions string, Latest_YEAR int, Latest_MONTH int, " +
"Latest_DAY Decimal(30,10), Latest_HOUR string, Latest_areaId string, Latest_country " +
"string, Latest_province string, Latest_city string, Latest_district string, Latest_street " +
"string, Latest_releaseId string, Latest_EMUIVersion string, Latest_operaSysVersion string," +
" Latest_BacVerNumber string, Latest_BacFlashVer string, Latest_webUIVersion string, " +
"Latest_webUITypeCarrVer string, Latest_webTypeDataVerNumber string, " +
"Latest_operatorsVersion string, Latest_phonePADPartitionedVersions string, " +
"Latest_operatorId string, gamePointDescription string,gamePointId double,contractNumber " +
"BigInt) STORED AS carbondata")
sql(
s"LOAD DATA INPATH '$resourcesPath/100_olap.csv' INTO table TCarbonSource options " +
"('DELIMITER'=',', 'QUOTECHAR'='\', 'FILEHEADER'='imei,deviceInformationId,MAC,deviceColor," +
"device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series," +
"productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId," +
"deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet," +
"oxSingleNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity," +
"ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion," +
"Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion," +
"Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion," +
"Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR," +
"Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street," +
"Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVersion,Latest_BacVerNumber," +
"Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer," +
"Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions," +
"Latest_operatorId,gamePointDescription,gamePointId,contractNumber', " +
"'bad_records_logger_enable'='false','bad_records_action'='FORCE')")
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_IN_QUERY_EXECUTION, "true")
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE, "true")
}
test("insert into hive") {
sql("drop table if exists thive2")
sql(
"create table thive2 row format delimited fields terminated by '\017' stored as textfile as" +
" select imei,deviceInformationId,MAC from TCarbonSource")
checkAnswer(
sql(
"select imei,deviceInformationId,MAC from TCarbonSource order by imei, " +
"deviceInformationId,MAC"),
sql("select imei,deviceInformationId,MAC from thive2 order by imei,deviceInformationId,MAC")
)
sql("drop table thive2")
}
test("insert into parquet") {
sql("drop table if exists tparquet")
sql("create table tparquet(imei string,deviceInformationId int) STORED AS PARQUET")
sql("insert into tparquet select imei,deviceInformationId from TCarbonSource")
checkAnswer(
sql("select imei,deviceInformationId from TCarbonSource order by imei,deviceInformationId"),
sql("select imei,deviceInformationId from tparquet order by imei,deviceInformationId")
)
sql("drop table tparquet")
}
test("insert into hive conditional") {
sql("drop table if exists thive_cond")
sql(
"create table thive_cond row format delimited fields terminated by '\017' stored as " +
"textfile as SELECT(CASE WHEN imei IS NOT NULL THEN imei ELSE MAC END) AS temp FROM " +
"TCarbonSource limit 10")
checkAnswer(
sql("select count(*) from thive_cond"),
Seq(Row(10))
)
sql("drop table thive_cond")
}
test("jvm crash when insert data from datasource table to session table") {
val spark = sqlContext.sparkSession
import spark.implicits._
import scala.util.Random
val r = new Random()
val df = spark.sparkContext.parallelize(1 to 10)
.map(x => (r.nextInt(100000), "name" + x % 8, "city" + x % 50, BigDecimal.apply(x % 60)))
.toDF("ID", "name", "city", "age")
spark.sql("DROP TABLE IF EXISTS personTable")
spark.sql("DROP TABLE IF EXISTS test_table")
df.write.format("carbon").saveAsTable("personTable")
spark.sql("create table test_table(ID int, name string, city string, age decimal) STORED AS carbondata tblproperties('sort_columns'='ID')")
spark.sql("insert into test_table select * from personTable")
spark.sql("insert into test_table select * from personTable limit 2")
assert(spark.sql("select * from test_table").count() == 12)
spark.sql("DROP TABLE IF EXISTS personTable")
spark.sql("DROP TABLE IF EXISTS test_table")
}
test("jvm crash when insert data from datasource table to datasource table") {
val spark = sqlContext.sparkSession
import spark.implicits._
import scala.util.Random
val r = new Random()
val df = spark.sparkContext.parallelize(1 to 10)
.map(x => (r.nextInt(100000), "name" + x % 8, "city" + x % 50, BigDecimal.apply(x % 60)))
.toDF("ID", "name", "city", "age")
spark.sql("DROP TABLE IF EXISTS personTable")
spark.sql("DROP TABLE IF EXISTS test_table")
df.write.format("carbon").saveAsTable("personTable")
spark.sql("create table test_table(ID int, name string, city string, age decimal) using carbon")
spark.sql("insert into test_table select * from personTable")
spark.sql("insert into test_table select * from personTable limit 2")
assert(spark.sql("select * from test_table").count() == 12)
spark.sql("DROP TABLE IF EXISTS personTable")
spark.sql("DROP TABLE IF EXISTS test_table")
}
test("jvm crash when insert data from session table to datasource table") {
val spark = sqlContext.sparkSession
import spark.implicits._
import scala.util.Random
val r = new Random()
val df = spark.sparkContext.parallelize(1 to 10)
.map(x => (r.nextInt(100000), "name" + x % 8, "city" + x % 50, BigDecimal.apply(x % 60)))
.toDF("ID", "name", "city", "age")
spark.sql("DROP TABLE IF EXISTS personTable")
spark.sql("DROP TABLE IF EXISTS test_table")
df.write
.format("carbondata")
.option("tableName", "personTable")
.mode(SaveMode.Overwrite)
.save()
spark.sql("create table test_table(ID int, name string, city string, age decimal) using carbon")
spark.sql("insert into test_table select * from personTable")
spark.sql("insert into test_table select * from personTable limit 2")
assert(spark.sql("select * from test_table").count() == 12)
spark.sql("DROP TABLE IF EXISTS personTable")
spark.sql("DROP TABLE IF EXISTS test_table")
}
override def afterAll {
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_IN_QUERY_EXECUTION, CarbonCommonConstants.ENABLE_UNSAFE_IN_QUERY_EXECUTION_DEFAULTVALUE)
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE, CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE_DEFAULT)
sql("DROP TABLE IF EXISTS TCarbonSource")
}
}