| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.spark.sql.carbondata.datasource |
| |
| import java.io.File |
| |
| import org.apache.carbondata.core.constants.CarbonCommonConstants |
| import org.apache.carbondata.core.util.CarbonProperties |
| import org.apache.carbondata.sdk.util.BinaryUtil |
| import org.apache.commons.codec.binary.{Base64, Hex} |
| import org.apache.commons.io.FileUtils |
| import org.apache.spark.sql.Row |
| import org.apache.spark.sql.carbondata.datasource.TestUtil._ |
| import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema |
| import org.apache.spark.util.SparkUtil |
| import org.scalatest.{BeforeAndAfterAll, FunSuite} |
| |
| class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll { |
| |
| var writerPath = new File(this.getClass.getResource("/").getPath |
| + "../../target/SparkCarbonFileFormat/WriterOutput/") |
| .getCanonicalPath |
| var resourcesPath = new File(this.getClass.getResource("/").getPath |
| + "../../../spark-common-test/src/test/resources/") |
| .getCanonicalPath |
| var outputPath = writerPath + 2 |
| //getCanonicalPath gives path with \, but the code expects /. |
| writerPath = writerPath.replace("\\", "/") |
| |
| var sdkPath = new File(this.getClass.getResource("/").getPath + "../../../../store/sdk/") |
| .getCanonicalPath |
| |
| def buildTestBinaryData(): Any = { |
| FileUtils.deleteDirectory(new File(writerPath)) |
| FileUtils.deleteDirectory(new File(outputPath)) |
| |
| val sourceImageFolder = sdkPath + "/src/test/resources/image/flowers" |
| val sufAnnotation = ".txt" |
| BinaryUtil.binaryToCarbon(sourceImageFolder, writerPath, sufAnnotation, ".jpg") |
| } |
| |
| def cleanTestData() = { |
| FileUtils.deleteDirectory(new File(writerPath)) |
| FileUtils.deleteDirectory(new File(outputPath)) |
| } |
| |
| import spark._ |
| |
| override def beforeAll(): Unit = { |
| CarbonProperties.getInstance() |
| .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, |
| CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT) |
| buildTestBinaryData() |
| |
| FileUtils.deleteDirectory(new File(outputPath)) |
| sql("DROP TABLE IF EXISTS sdkOutputTable") |
| } |
| |
| override def afterAll(): Unit = { |
| cleanTestData() |
| sql("DROP TABLE IF EXISTS sdkOutputTable") |
| } |
| |
| test("Test direct sql read carbon") { |
| assert(new File(writerPath).exists()) |
| checkAnswer( |
| sql(s"SELECT COUNT(*) FROM carbon.`$writerPath`"), |
| Seq(Row(3))) |
| } |
| |
| test("Test read image carbon with spark carbon file format, generate by sdk, CTAS") { |
| sql("DROP TABLE IF EXISTS binaryCarbon") |
| sql("DROP TABLE IF EXISTS binaryCarbon3") |
| FileUtils.deleteDirectory(new File(outputPath)) |
| if (SparkUtil.isSparkVersionEqualTo("2.1")) { |
| sql(s"CREATE TABLE binaryCarbon USING CARBON OPTIONS(PATH '$writerPath')") |
| sql(s"CREATE TABLE binaryCarbon3 USING CARBON OPTIONS(PATH '$outputPath')" + " AS SELECT * FROM binaryCarbon") |
| } else { |
| sql(s"CREATE TABLE binaryCarbon USING CARBON LOCATION '$writerPath'") |
| sql(s"CREATE TABLE binaryCarbon3 USING CARBON LOCATION '$outputPath'" + " AS SELECT * FROM binaryCarbon") |
| } |
| checkAnswer(sql("SELECT COUNT(*) FROM binaryCarbon"), |
| Seq(Row(3))) |
| checkAnswer(sql("SELECT COUNT(*) FROM binaryCarbon3"), |
| Seq(Row(3))) |
| sql("DROP TABLE IF EXISTS binaryCarbon") |
| sql("DROP TABLE IF EXISTS binaryCarbon3") |
| FileUtils.deleteDirectory(new File(outputPath)) |
| } |
| |
| test("Don't support sort_columns") { |
| import spark._ |
| sql("DROP TABLE IF EXISTS binaryTable") |
| var exception = intercept[Exception] { |
| sql( |
| s""" |
| | CREATE TABLE binaryTable ( |
| | id DOUBLE, |
| | label BOOLEAN, |
| | name STRING, |
| | image BINARY, |
| | autoLabel BOOLEAN) |
| | using carbon |
| | options('SORT_COLUMNS'='image') |
| """.stripMargin) |
| // TODO: it should throw exception when create table |
| sql("SELECT COUNT(*) FROM binaryTable").show() |
| } |
| assert(exception.getCause.getMessage.contains("sort columns not supported for array, struct, map, double, float, decimal, varchar, binary")) |
| |
| sql("DROP TABLE IF EXISTS binaryTable") |
| if (SparkUtil.isSparkVersionEqualTo("2.1")) { |
| exception = intercept[Exception] { |
| sql( |
| s""" |
| | CREATE TABLE binaryTable |
| | using carbon |
| | options(PATH '$writerPath', |
| | 'SORT_COLUMNS'='image') |
| """.stripMargin) |
| sql("SELECT COUNT(*) FROM binaryTable").show() |
| } |
| } else { |
| exception = intercept[Exception] { |
| sql( |
| s""" |
| | CREATE TABLE binaryTable |
| | using carbon |
| | options('SORT_COLUMNS'='image') |
| | LOCATION '$writerPath' |
| """.stripMargin) |
| sql("SELECT COUNT(*) FROM binaryTable").show() |
| } |
| } |
| assert(exception.getMessage.contains("Cannot use sort columns during infer schema")) |
| |
| |
| sql("DROP TABLE IF EXISTS binaryTable") |
| if (SparkUtil.isSparkVersionEqualTo("2.1")) { |
| exception = intercept[Exception] { |
| sql( |
| s""" |
| | CREATE TABLE binaryTable ( |
| | id DOUBLE, |
| | label BOOLEAN, |
| | name STRING, |
| | image BINARY, |
| | autoLabel BOOLEAN) |
| | using carbon |
| | options(PATH '$writerPath', |
| | 'SORT_COLUMNS'='image') |
| """.stripMargin) |
| sql("SELECT COUNT(*) FROM binaryTable").show() |
| } |
| } else { |
| exception = intercept[Exception] { |
| sql( |
| s""" |
| | CREATE TABLE binaryTable ( |
| | id DOUBLE, |
| | label BOOLEAN, |
| | name STRING, |
| | image BINARY, |
| | autoLabel BOOLEAN) |
| | using carbon |
| | options('SORT_COLUMNS'='image') |
| | LOCATION '$writerPath' |
| """.stripMargin) |
| sql("SELECT COUNT(*) FROM binaryTable").show() |
| } |
| } |
| assert(exception.getCause.getMessage.contains("sort columns not supported for array, struct, map, double, float, decimal, varchar, binary")) |
| } |
| |
| test("Don't support long_string_columns for binary") { |
| import spark._ |
| sql("DROP TABLE IF EXISTS binaryTable") |
| val exception = intercept[Exception] { |
| sql( |
| s""" |
| | CREATE TABLE binaryTable ( |
| | id DOUBLE, |
| | label BOOLEAN, |
| | name STRING, |
| | image BINARY, |
| | autoLabel BOOLEAN) |
| | using carbon |
| | options('long_string_columns'='image') |
| """.stripMargin) |
| sql("SELECT COUNT(*) FROM binaryTable").show() |
| } |
| assert(exception.getCause.getMessage.contains("long string column : image is not supported for data type: BINARY")) |
| } |
| |
| test("Don't support insert into partition table") { |
| if (SparkUtil.isSparkVersionXandAbove("2.2")) { |
| sql("DROP TABLE IF EXISTS binaryCarbon") |
| sql("DROP TABLE IF EXISTS binaryCarbon2") |
| sql("DROP TABLE IF EXISTS binaryCarbon3") |
| sql("DROP TABLE IF EXISTS binaryCarbon4") |
| sql(s"CREATE TABLE binaryCarbon USING CARBON LOCATION '$writerPath'") |
| sql( |
| s""" |
| | CREATE TABLE binaryCarbon2( |
| | binaryId INT, |
| | binaryName STRING, |
| | binary BINARY, |
| | labelName STRING, |
| | labelContent STRING |
| |) USING CARBON""".stripMargin) |
| sql( |
| s""" |
| | CREATE TABLE binaryCarbon3( |
| | binaryId INT, |
| | binaryName STRING, |
| | binary BINARY, |
| | labelName STRING, |
| | labelContent STRING |
| |) USING CARBON partitioned by (binary) """.stripMargin) |
| sql("select binaryId,binaryName,binary,labelName,labelContent from binaryCarbon where binaryId=0").show() |
| |
| sql("insert into binaryCarbon2 select binaryId,binaryName,binary,labelName,labelContent from binaryCarbon where binaryId=0 ") |
| val carbonResult2 = sql("SELECT * FROM binaryCarbon2") |
| |
| sql("create table binaryCarbon4 using carbon select binaryId,binaryName,binary,labelName,labelContent from binaryCarbon where binaryId=0 ") |
| val carbonResult4 = sql("SELECT * FROM binaryCarbon4") |
| val carbonResult = sql("SELECT * FROM binaryCarbon") |
| |
| assert(3 == carbonResult.collect().length) |
| assert(1 == carbonResult4.collect().length) |
| assert(1 == carbonResult2.collect().length) |
| checkAnswer(carbonResult4, carbonResult2) |
| |
| try { |
| sql("insert into binaryCarbon3 select binaryId,binaryName,binary,labelName,labelContent from binaryCarbon where binaryId=0 ") |
| assert(false) |
| } catch { |
| case e: Exception => |
| e.printStackTrace() |
| assert(true) |
| } |
| sql("DROP TABLE IF EXISTS binaryCarbon") |
| sql("DROP TABLE IF EXISTS binaryCarbon2") |
| sql("DROP TABLE IF EXISTS binaryCarbon3") |
| sql("DROP TABLE IF EXISTS binaryCarbon4") |
| } |
| } |
| |
| test("Test unsafe as false") { |
| CarbonProperties.getInstance() |
| .addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE, "false") |
| FileUtils.deleteDirectory(new File(outputPath)) |
| sql("DROP TABLE IF EXISTS binaryCarbon") |
| sql("DROP TABLE IF EXISTS binaryCarbon3") |
| if (SparkUtil.isSparkVersionEqualTo("2.1")) { |
| sql(s"CREATE TABLE binaryCarbon USING CARBON OPTIONS(PATH '$writerPath')") |
| sql(s"CREATE TABLE binaryCarbon3 USING CARBON OPTIONS(PATH '$outputPath')" + " AS SELECT * FROM binaryCarbon") |
| } else { |
| sql(s"CREATE TABLE binaryCarbon USING CARBON LOCATION '$writerPath'") |
| sql(s"CREATE TABLE binaryCarbon3 USING CARBON LOCATION '$outputPath'" + " AS SELECT * FROM binaryCarbon") |
| } |
| checkAnswer(sql("SELECT COUNT(*) FROM binaryCarbon"), |
| Seq(Row(3))) |
| checkAnswer(sql("SELECT COUNT(*) FROM binaryCarbon3"), |
| Seq(Row(3))) |
| sql("DROP TABLE IF EXISTS binaryCarbon") |
| sql("DROP TABLE IF EXISTS binaryCarbon3") |
| |
| FileUtils.deleteDirectory(new File(outputPath)) |
| CarbonProperties.getInstance() |
| .addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE, |
| CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE_DEFAULT) |
| } |
| |
| test("insert into for hive and carbon, CTAS") { |
| sql("DROP TABLE IF EXISTS hiveTable") |
| sql("DROP TABLE IF EXISTS carbon_table") |
| sql("DROP TABLE IF EXISTS hiveTable2") |
| sql("DROP TABLE IF EXISTS carbon_table2") |
| sql( |
| s""" |
| | CREATE TABLE IF NOT EXISTS hivetable ( |
| | id int, |
| | label boolean, |
| | name string, |
| | image binary, |
| | autoLabel boolean) |
| | row format delimited fields terminated by ',' |
| """.stripMargin) |
| sql("insert into hivetable values(1,true,'Bob','binary',false)") |
| sql("insert into hivetable values(2,false,'Xu','test',true)") |
| |
| sql( |
| s""" |
| | CREATE TABLE IF NOT EXISTS carbon_table ( |
| | id int, |
| | label boolean, |
| | name string, |
| | image binary, |
| | autoLabel boolean) |
| | using carbon |
| """.stripMargin) |
| sql("insert into carbon_table values(1,true,'Bob','binary',false)") |
| sql("insert into carbon_table values(2,false,'Xu','test',true)") |
| |
| val hexHiveResult = sql("SELECT hex(image) FROM hivetable") |
| val hexCarbonResult = sql("SELECT hex(image) FROM carbon_table") |
| checkAnswer(hexHiveResult, hexCarbonResult) |
| hexCarbonResult.collect().foreach { each => |
| val result = new String(Hex.decodeHex((each.getAs[Array[Char]](0)).toString.toCharArray)) |
| assert("binary".equals(result) |
| || "test".equals(result)) |
| } |
| |
| val base64HiveResult = sql("SELECT base64(image) FROM hivetable") |
| val base64CarbonResult = sql("SELECT base64(image) FROM carbon_table") |
| checkAnswer(base64HiveResult, base64CarbonResult) |
| base64CarbonResult.collect().foreach { each => |
| val result = new String(Base64.decodeBase64((each.getAs[Array[Char]](0)).toString)) |
| assert("binary".equals(result) |
| || "test".equals(result)) |
| } |
| |
| val carbonResult = sql("SELECT * FROM carbon_table") |
| val hiveResult = sql("SELECT * FROM hivetable") |
| |
| assert(2 == carbonResult.collect().length) |
| assert(2 == hiveResult.collect().length) |
| checkAnswer(hiveResult, carbonResult) |
| carbonResult.collect().foreach { each => |
| if (1 == each.get(0)) { |
| assert("binary".equals(new String(each.getAs[Array[Byte]](3)))) |
| } else if (2 == each.get(0)) { |
| assert("test".equals(new String(each.getAs[Array[Byte]](3)))) |
| } else { |
| assert(false) |
| } |
| } |
| |
| sql("CREATE TABLE hivetable2 AS SELECT * FROM carbon_table") |
| sql("CREATE TABLE carbon_table2 USING CARBON AS SELECT * FROM hivetable") |
| val carbonResult2 = sql("SELECT * FROM carbon_table2") |
| val hiveResult2 = sql("SELECT * FROM hivetable2") |
| checkAnswer(hiveResult2, carbonResult2) |
| checkAnswer(carbonResult, carbonResult2) |
| checkAnswer(hiveResult, hiveResult2) |
| assert(2 == carbonResult2.collect().length) |
| assert(2 == hiveResult2.collect().length) |
| |
| sql("INSERT INTO hivetable2 SELECT * FROM carbon_table") |
| sql("INSERT INTO carbon_table2 SELECT * FROM hivetable") |
| val carbonResult3 = sql("SELECT * FROM carbon_table2") |
| val hiveResult3 = sql("SELECT * FROM hivetable2") |
| checkAnswer(carbonResult3, hiveResult3) |
| assert(4 == carbonResult3.collect().length) |
| assert(4 == hiveResult3.collect().length) |
| } |
| |
| test("insert into for parquet and carbon, CTAS") { |
| sql("DROP TABLE IF EXISTS parquetTable") |
| sql("DROP TABLE IF EXISTS carbon_table") |
| sql("DROP TABLE IF EXISTS parquetTable2") |
| sql("DROP TABLE IF EXISTS carbon_table2") |
| sql( |
| s""" |
| | CREATE TABLE IF NOT EXISTS parquettable ( |
| | id int, |
| | label boolean, |
| | name string, |
| | image binary, |
| | autoLabel boolean) |
| | using parquet |
| """.stripMargin) |
| sql("insert into parquettable values(1,true,'Bob','binary',false)") |
| sql("insert into parquettable values(2,false,'Xu','test',true)") |
| |
| sql( |
| s""" |
| | CREATE TABLE IF NOT EXISTS carbon_table ( |
| | id int, |
| | label boolean, |
| | name string, |
| | image binary, |
| | autoLabel boolean) |
| | using carbon |
| """.stripMargin) |
| sql("insert into carbon_table values(1,true,'Bob','binary',false)") |
| sql("insert into carbon_table values(2,false,'Xu','test',true)") |
| val carbonResult = sql("SELECT * FROM carbon_table") |
| val parquetResult = sql("SELECT * FROM parquettable") |
| |
| assert(2 == carbonResult.collect().length) |
| assert(2 == parquetResult.collect().length) |
| checkAnswer(parquetResult, carbonResult) |
| carbonResult.collect().foreach { each => |
| if (1 == each.get(0)) { |
| assert("binary".equals(new String(each.getAs[Array[Byte]](3)))) |
| } else if (2 == each.get(0)) { |
| assert("test".equals(new String(each.getAs[Array[Byte]](3)))) |
| } else { |
| assert(false) |
| } |
| } |
| |
| sql("CREATE TABLE parquettable2 AS SELECT * FROM carbon_table") |
| sql("CREATE TABLE carbon_table2 USING CARBON AS SELECT * FROM parquettable") |
| val carbonResult2 = sql("SELECT * FROM carbon_table2") |
| val parquetResult2 = sql("SELECT * FROM parquettable2") |
| checkAnswer(parquetResult2, carbonResult2) |
| checkAnswer(carbonResult, carbonResult2) |
| checkAnswer(parquetResult, parquetResult2) |
| assert(2 == carbonResult2.collect().length) |
| assert(2 == parquetResult2.collect().length) |
| |
| sql("INSERT INTO parquettable2 SELECT * FROM carbon_table") |
| sql("INSERT INTO carbon_table2 SELECT * FROM parquettable") |
| val carbonResult3 = sql("SELECT * FROM carbon_table2") |
| val parquetResult3 = sql("SELECT * FROM parquettable2") |
| checkAnswer(carbonResult3, parquetResult3) |
| assert(4 == carbonResult3.collect().length) |
| assert(4 == parquetResult3.collect().length) |
| } |
| |
| test("insert into carbon as select from hive after hive load data") { |
| sql("DROP TABLE IF EXISTS hiveTable") |
| sql("DROP TABLE IF EXISTS carbon_table") |
| sql("DROP TABLE IF EXISTS hiveTable2") |
| sql("DROP TABLE IF EXISTS carbon_table2") |
| |
| sql( |
| s""" |
| | CREATE TABLE IF NOT EXISTS hivetable ( |
| | id int, |
| | label boolean, |
| | name string, |
| | image binary, |
| | autoLabel boolean) |
| | row format delimited fields terminated by '|' |
| """.stripMargin) |
| sql( |
| s""" |
| | LOAD DATA LOCAL INPATH '$resourcesPath/binarystringdata.csv' |
| | INTO TABLE hivetable |
| """.stripMargin) |
| |
| sql( |
| s""" |
| | CREATE TABLE IF NOT EXISTS carbon_table ( |
| | id int, |
| | label boolean, |
| | name string, |
| | image binary, |
| | autoLabel boolean) |
| | using carbon |
| """.stripMargin) |
| sql("insert into carbon_table select * from hivetable") |
| |
| sqlContext.udf.register("decodeHex", (str: String) => |
| Hex.decodeHex(str.toList.map(_.toInt.toBinaryString).mkString.toCharArray)) |
| sqlContext.udf.register("unHexValue", (str: String) => |
| org.apache.spark.sql.catalyst.expressions.Hex.unhex(str.toList.map(_.toInt.toBinaryString).mkString.getBytes)) |
| sqlContext.udf.register("decodeBase64", (str: String) => Base64.decodeBase64(str.getBytes())) |
| |
| val udfHexResult = sql("SELECT decodeHex(image) FROM carbon_table") |
| val unHexResult = sql("SELECT unHexValue(image) FROM carbon_table") |
| checkAnswer(udfHexResult, unHexResult) |
| |
| val udfBase64Result = sql("SELECT decodeBase64(image) FROM carbon_table") |
| val unbase64Result = sql("SELECT unbase64(image) FROM carbon_table") |
| checkAnswer(udfBase64Result, unbase64Result) |
| |
| val carbonResult = sql("SELECT * FROM carbon_table") |
| val hiveResult = sql("SELECT * FROM hivetable") |
| |
| assert(3 == carbonResult.collect().length) |
| assert(3 == hiveResult.collect().length) |
| checkAnswer(hiveResult, carbonResult) |
| carbonResult.collect().foreach { each => |
| if (2 == each.get(0)) { |
| assert("\u0001history\u0002".equals(new String(each.getAs[Array[Byte]](3)))) |
| } else if (1 == each.get(0)) { |
| assert("\u0001education\u0002".equals(new String(each.getAs[Array[Byte]](3)))) |
| } else if (3 == each.get(0)) { |
| assert("".equals(new String(each.getAs[Array[Byte]](3))) |
| || "\u0001biology\u0002".equals(new String(each.getAs[Array[Byte]](3)))) |
| } else { |
| assert(false) |
| } |
| } |
| |
| sql("CREATE TABLE hivetable2 AS SELECT * FROM carbon_table") |
| sql("CREATE TABLE carbon_table2 USING CARBON AS SELECT * FROM hivetable") |
| val carbonResult2 = sql("SELECT * FROM carbon_table2") |
| val hiveResult2 = sql("SELECT * FROM hivetable2") |
| checkAnswer(hiveResult2, carbonResult2) |
| checkAnswer(carbonResult, carbonResult2) |
| checkAnswer(hiveResult, hiveResult2) |
| assert(3 == carbonResult2.collect().length) |
| assert(3 == hiveResult2.collect().length) |
| |
| sql("INSERT INTO hivetable2 SELECT * FROM carbon_table") |
| sql("INSERT INTO carbon_table2 SELECT * FROM hivetable") |
| val carbonResult3 = sql("SELECT * FROM carbon_table2") |
| val hiveResult3 = sql("SELECT * FROM hivetable2") |
| checkAnswer(carbonResult3, hiveResult3) |
| assert(6 == carbonResult3.collect().length) |
| assert(6 == hiveResult3.collect().length) |
| |
| } |
| |
| test("filter for hive and carbon") { |
| sql("DROP TABLE IF EXISTS hiveTable") |
| sql("DROP TABLE IF EXISTS carbon_table") |
| |
| sql( |
| s""" |
| | CREATE TABLE IF NOT EXISTS hivetable ( |
| | id int, |
| | label boolean, |
| | name string, |
| | image binary, |
| | autoLabel boolean) |
| | row format delimited fields terminated by ',' |
| """.stripMargin) |
| sql("insert into hivetable values(1,true,'Bob','binary',false)") |
| sql("insert into hivetable values(2,false,'Xu','test',true)") |
| |
| sql( |
| s""" |
| | CREATE TABLE IF NOT EXISTS carbon_table ( |
| | id int, |
| | label boolean, |
| | name string, |
| | image binary, |
| | autoLabel boolean) |
| | using carbon |
| """.stripMargin) |
| sql("insert into carbon_table values(1,true,'Bob','binary',false)") |
| sql("insert into carbon_table values(2,false,'Xu','test',true)") |
| |
| // filter with equal |
| val hiveResult = sql("SELECT * FROM hivetable where image=cast('binary' as binary)") |
| val carbonResult = sql("SELECT * FROM carbon_table where image=cast('binary' as binary)") |
| |
| checkAnswer(hiveResult, carbonResult) |
| assert(1 == carbonResult.collect().length) |
| carbonResult.collect().foreach { each => |
| assert(1 == each.get(0)) |
| assert("binary".equals(new String(each.getAs[Array[Byte]](3)))) |
| } |
| |
| // filter with non string |
| val exception = intercept[Exception] { |
| sql("SELECT * FROM carbon_table where image=binary").collect() |
| } |
| assert(exception.getMessage.contains("cannot resolve '`binary`' given input columns")) |
| |
| // filter with not equal |
| val hiveResult3 = sql("SELECT * FROM hivetable where image!=cast('binary' as binary)") |
| val carbonResult3 = sql("SELECT * FROM carbon_table where image!=cast('binary' as binary)") |
| checkAnswer(hiveResult3, carbonResult3) |
| assert(1 == carbonResult3.collect().length) |
| carbonResult3.collect().foreach { each => |
| assert(2 == each.get(0)) |
| assert("test".equals(new String(each.getAs[Array[Byte]](3)))) |
| } |
| |
| // filter with in |
| val hiveResult4 = sql("SELECT * FROM hivetable where image in (cast('binary' as binary))") |
| val carbonResult4 = sql("SELECT * FROM carbon_table where image in (cast('binary' as binary))") |
| checkAnswer(hiveResult4, carbonResult4) |
| assert(1 == carbonResult4.collect().length) |
| carbonResult4.collect().foreach { each => |
| assert(1 == each.get(0)) |
| assert("binary".equals(new String(each.getAs[Array[Byte]](3)))) |
| } |
| |
| // filter with not in |
| val hiveResult5 = sql("SELECT * FROM hivetable where image not in (cast('binary' as binary))") |
| val carbonResult5 = sql("SELECT * FROM carbon_table where image not in (cast('binary' as binary))") |
| checkAnswer(hiveResult5, carbonResult5) |
| assert(1 == carbonResult5.collect().length) |
| carbonResult5.collect().foreach { each => |
| assert(2 == each.get(0)) |
| assert("test".equals(new String(each.getAs[Array[Byte]](3)))) |
| } |
| } |
| |
| test("Spark DataSource don't support update, delete") { |
| sql("DROP TABLE IF EXISTS carbon_table") |
| sql("DROP TABLE IF EXISTS carbon_table2") |
| |
| sql( |
| s""" |
| | CREATE TABLE IF NOT EXISTS carbon_table ( |
| | id int, |
| | label boolean, |
| | name string, |
| | binaryField binary, |
| | autoLabel boolean) |
| | using carbon |
| """.stripMargin) |
| sql("insert into carbon_table values(1,true,'Bob','binary',false)") |
| sql("insert into carbon_table values(2,false,'Xu','test',true)") |
| |
| val carbonResult = sql("SELECT * FROM carbon_table") |
| |
| carbonResult.collect().foreach { each => |
| if (1 == each.get(0)) { |
| assert("binary".equals(new String(each.getAs[Array[Byte]](3)))) |
| } else if (2 == each.get(0)) { |
| assert("test".equals(new String(each.getAs[Array[Byte]](3)))) |
| } else { |
| assert(false) |
| } |
| } |
| |
| var exception = intercept[Exception] { |
| sql("UPDATE carbon_table SET binaryField = 'binary2' WHERE id = 1").show() |
| } |
| assert(exception.getMessage.contains("mismatched input 'UPDATE' expecting")) |
| |
| exception = intercept[Exception] { |
| sql("DELETE FROM carbon_table WHERE id = 1").show() |
| } |
| assert(exception.getMessage.contains("Operation not allowed: DELETE FROM")) |
| } |
| |
| test("test array of binary data type with sparkfileformat ") { |
| sql("drop table if exists carbon_table") |
| sql("drop table if exists parquet_table") |
| sql("create table if not exists carbon_table(id int, label boolean, name string," + |
| "binaryField array<binary>, autoLabel boolean) using carbon") |
| sql("insert into carbon_table values(1,true,'abc',array('binary'),false)") |
| sql("create table if not exists parquet_table(id int, label boolean, name string," + |
| "binaryField array<binary>, autoLabel boolean) using parquet") |
| sql("insert into parquet_table values(1,true,'abc',array('binary'),false)") |
| checkAnswer(sql("SELECT binaryField[0] FROM carbon_table"), |
| sql("SELECT binaryField[0] FROM parquet_table")) |
| sql("drop table if exists carbon_table") |
| sql("drop table if exists parquet_table") |
| } |
| |
| test("test struct of binary data type with sparkfileformat ") { |
| sql("drop table if exists carbon_table") |
| sql("drop table if exists parquet_table") |
| sql("create table if not exists carbon_table(id int, label boolean, name string," + |
| "binaryField struct<b:binary>, autoLabel boolean) using carbon") |
| sql("insert into carbon_table values(1,true,'abc',named_struct('b','binary'),false)") |
| sql("create table if not exists parquet_table(id int, label boolean, name string," + |
| "binaryField struct<b:binary>, autoLabel boolean) using parquet") |
| sql("insert into parquet_table values(1,true,'abc',named_struct('b','binary'),false)") |
| checkAnswer(sql("SELECT binaryField.b FROM carbon_table"), |
| sql("SELECT binaryField.b FROM parquet_table")) |
| sql("drop table if exists carbon_table") |
| sql("drop table if exists parquet_table") |
| } |
| |
| test("test map of binary data type with sparkfileformat") { |
| sql("drop table if exists carbon_table") |
| sql("drop table if exists parquet_table") |
| sql("create table if not exists parquet_table(id int, label boolean, name string," + |
| "binaryField map<int, binary>, autoLabel boolean) using parquet") |
| sql("insert into parquet_table values(1,true,'abc',map(1,'binary'),false)") |
| sql("create table if not exists carbon_table(id int, label boolean, name string," + |
| "binaryField map<int, binary>, autoLabel boolean) using carbon") |
| sql("insert into carbon_table values(1,true,'abc',map(1,'binary'),false)") |
| checkAnswer(sql("SELECT binaryField[1] FROM carbon_table"), |
| sql("SELECT binaryField[1] FROM parquet_table")) |
| sql("drop table if exists carbon_table") |
| sql("drop table if exists parquet_table") |
| } |
| |
| test("test map of array and struct binary data type with sparkfileformat") { |
| sql("drop table if exists carbon_table") |
| sql("drop table if exists parquet_table") |
| sql("create table if not exists parquet_table(id int, label boolean, name string," + |
| "binaryField1 map<int, array<binary>>, binaryField2 map<int, struct<b:binary>> ) " + |
| "using parquet") |
| sql("insert into parquet_table values(1,true,'abc',map(1,array('binary')),map(1," + |
| "named_struct('b','binary')))") |
| sql("create table if not exists carbon_table(id int, label boolean, name string," + |
| "binaryField1 map<int, array<binary>>, binaryField2 map<int, struct<b:binary>> ) " + |
| "using carbon") |
| sql("insert into carbon_table values(1,true,'abc',map(1,array('binary')),map(1," + |
| "named_struct('b','binary')))") |
| checkAnswer(sql("SELECT binaryField1[1][1] FROM carbon_table"), |
| sql("SELECT binaryField1[1][1] FROM parquet_table")) |
| checkAnswer(sql("SELECT binaryField2[1].b FROM carbon_table"), |
| sql("SELECT binaryField2[1].b FROM parquet_table")) |
| sql("drop table if exists carbon_table") |
| } |
| |
| test("test of array of struct and struct of array of binary data type with sparkfileformat") { |
| sql("drop table if exists carbon_table") |
| sql("drop table if exists parquet_table") |
| sql("create table if not exists parquet_table(id int, label boolean, name string," + |
| "binaryField1 array<struct<b1:binary>>, binaryField2 struct<b2:array<binary>> ) " + |
| "using parquet") |
| sql("insert into parquet_table values(1,true,'abc',array(named_struct('b1','binary'))," + |
| "named_struct('b2',array('binary')))") |
| sql("create table if not exists carbon_table(id int, label boolean, name string," + |
| "binaryField1 array<struct<b1:binary>>, binaryField2 struct<b2:array<binary>> ) " + |
| "using carbon") |
| sql("insert into carbon_table values(1,true,'abc',array(named_struct('b1','binary'))," + |
| "named_struct('b2',array('binary')))") |
| checkAnswer(sql("SELECT binaryField1[1].b1 FROM carbon_table"), |
| sql("SELECT binaryField1[1].b1 FROM parquet_table")) |
| checkAnswer(sql("SELECT binaryField2.b2[0] FROM carbon_table"), |
| sql("SELECT binaryField2.b2[0] FROM parquet_table")) |
| sql("drop table if exists carbon_table") |
| sql("drop table if exists parquet_table") |
| } |
| |
| } |