| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.carbondata.integration.spark.testsuite.binary |
| |
| import java.util.Arrays |
| import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException |
| import org.apache.carbondata.core.constants.CarbonCommonConstants |
| import org.apache.carbondata.core.metadata.CarbonMetadata |
| import org.apache.carbondata.core.metadata.schema.table.CarbonTable |
| import org.apache.carbondata.core.util.CarbonProperties |
| import org.apache.commons.codec.binary.Hex |
| import org.apache.spark.sql.{AnalysisException, Row} |
| import org.apache.commons.codec.binary.{Base64, Hex} |
| import org.apache.spark.SparkException |
| import org.apache.spark.sql.{AnalysisException, Row} |
| import org.apache.spark.sql.test.util.QueryTest |
| import org.scalatest.BeforeAndAfterAll |
| |
| /** |
| * Test cases for testing binary |
| */ |
| class TestBinaryDataType extends QueryTest with BeforeAndAfterAll { |
| override def beforeAll { |
| } |
| |
| test("Create table and load data with binary column") { |
| sql("DROP TABLE IF EXISTS binaryTable") |
| sql( |
| s""" |
| | CREATE TABLE IF NOT EXISTS binaryTable ( |
| | id int, |
| | label boolean, |
| | name string, |
| | binaryField binary, |
| | autoLabel boolean) |
| | STORED BY 'carbondata' |
| | TBLPROPERTIES('SORT_COLUMNS'='') |
| """.stripMargin) |
| sql( |
| s""" |
| | LOAD DATA LOCAL INPATH '$resourcesPath/binaryDataHex.csv' |
| | INTO TABLE binaryTable |
| | OPTIONS('header'='false') |
| """.stripMargin) |
| |
| val result = sql("desc formatted binaryTable").collect() |
| var flag = false |
| result.foreach { each => |
| if ("binary".equals(each.get(1))) { |
| flag = true |
| } |
| } |
| assert(flag) |
| |
| sqlContext.udf.register("decodeHex", (str: String) => Hex.decodeHex(str.toCharArray)) |
| sqlContext.udf.register("decodeBase64", (str: String) => Base64.decodeBase64(str.getBytes())) |
| |
| val udfHexResult = sql("SELECT decodeHex(binaryField) FROM binaryTable") |
| val unhexResult = sql("SELECT unhex(binaryField) FROM binaryTable") |
| checkAnswer(udfHexResult, unhexResult) |
| |
| val udfBase64Result = sql("SELECT decodeBase64(binaryField) FROM binaryTable") |
| val unbase64Result = sql("SELECT unbase64(binaryField) FROM binaryTable") |
| checkAnswer(udfBase64Result, unbase64Result) |
| |
| checkAnswer(sql("SELECT COUNT(*) FROM binaryTable"), Seq(Row(3))) |
| try { |
| val df = sql("SELECT * FROM binaryTable").collect() |
| assert(3 == df.length) |
| df.foreach { each => |
| assert(5 == each.length) |
| |
| assert(Integer.valueOf(each(0).toString) > 0) |
| assert(each(1).toString.equalsIgnoreCase("false") || (each(1).toString.equalsIgnoreCase("true"))) |
| assert(each(2).toString.contains(".png")) |
| |
| val bytes40 = each.getAs[Array[Byte]](3).slice(0, 40) |
| val binaryName = each(2).toString |
| val expectedBytes = Hex.encodeHex(firstBytes20.get(binaryName).get) |
| assert(Arrays.equals(String.valueOf(expectedBytes).getBytes(), bytes40), "incorrect value for binary data") |
| |
| assert(each(4).toString.equalsIgnoreCase("false") || (each(4).toString.equalsIgnoreCase("true"))) |
| |
| val df = sql("SELECT name,binaryField FROM binaryTable").collect() |
| assert(3 == df.length) |
| df.foreach { each => |
| assert(2 == each.length) |
| val binaryName = each(0).toString |
| val bytes40 = each.getAs[Array[Byte]](1).slice(0, 40) |
| val expectedBytes = Hex.encodeHex(firstBytes20.get(binaryName).get) |
| assert(Arrays.equals(String.valueOf(expectedBytes).getBytes(), bytes40), "incorrect value for binary data") |
| } |
| } |
| } catch { |
| case e: Exception => |
| e.printStackTrace() |
| assert(false) |
| } |
| } |
| |
| private val firstBytes20 = Map("1.png" -> Array[Byte](-119, 80, 78, 71, 13, 10, 26, 10, 0, 0, 0, 13, 73, 72, 68, 82, 0, 0, 1, 74), |
| "2.png" -> Array[Byte](-119, 80, 78, 71, 13, 10, 26, 10, 0, 0, 0, 13, 73, 72, 68, 82, 0, 0, 2, -11), |
| "3.png" -> Array[Byte](-119, 80, 78, 71, 13, 10, 26, 10, 0, 0, 0, 13, 73, 72, 68, 82, 0, 0, 1, 54) |
| ) |
| |
| test("Don't support sort_columns") { |
| sql("DROP TABLE IF EXISTS binaryTable") |
| val exception = intercept[Exception] { |
| sql( |
| s""" |
| | CREATE TABLE IF NOT EXISTS binaryTable ( |
| | id double, |
| | label boolean, |
| | name STRING, |
| | binaryField BINARY, |
| | autoLabel boolean) |
| | STORED BY 'carbondata' |
| | TBLPROPERTIES('SORT_COLUMNS'='binaryField') |
| """.stripMargin) |
| } |
| assert(exception.getMessage.contains("sort_columns is unsupported for binary datatype column")) |
| } |
| |
| test("Unsupport LOCAL_DICTIONARY_INCLUDE for binary") { |
| |
| sql("DROP TABLE IF EXISTS binaryTable") |
| val exception = intercept[MalformedCarbonCommandException] { |
| sql( |
| """ |
| | CREATE TABLE binaryTable( |
| | id int, |
| | name string, |
| | city string, |
| | age int, |
| | binaryField binary) |
| | STORED BY 'org.apache.carbondata.format' |
| | tblproperties('local_dictionary_enable'='true','local_dictionary_include'='binaryField') |
| """.stripMargin) |
| } |
| assert(exception.getMessage.contains( |
| "LOCAL_DICTIONARY_INCLUDE/LOCAL_DICTIONARY_EXCLUDE column: binaryField is not a string/complex/varchar datatype column. " + |
| "LOCAL_DICTIONARY_COLUMN should be no dictionary string/complex/varchar datatype column")) |
| } |
| |
| test("Supports LOCAL_DICTIONARY_EXCLUDE for binary") { |
| sql("DROP TABLE IF EXISTS binaryTable") |
| sql( |
| """ |
| | CREATE TABLE binaryTable( |
| | id int, |
| | name string, |
| | city string, |
| | age int, |
| | binaryField binary) |
| | STORED BY 'org.apache.carbondata.format' |
| | tblproperties('local_dictionary_enable'='true','LOCAL_DICTIONARY_EXCLUDE'='binaryField') |
| """.stripMargin) |
| assert(true) |
| } |
| |
| test("Unsupport inverted_index for binary") { |
| sql("DROP TABLE IF EXISTS binaryTable") |
| val exception = intercept[MalformedCarbonCommandException] { |
| sql( |
| """ |
| | CREATE TABLE binaryTable( |
| | id int, |
| | name string, |
| | city string, |
| | age int, |
| | binaryField binary) |
| | STORED BY 'org.apache.carbondata.format' |
| | tblproperties('inverted_index'='binaryField') |
| """.stripMargin) |
| } |
| assert(exception.getMessage.contains("INVERTED_INDEX column: binaryfield should be present in SORT_COLUMNS")) |
| } |
| |
| test("Unsupport inverted_index and sort_columns for binary") { |
| sql("DROP TABLE IF EXISTS binaryTable") |
| val exception = intercept[MalformedCarbonCommandException] { |
| sql( |
| """ |
| | CREATE TABLE binaryTable( |
| | id int, |
| | name string, |
| | city string, |
| | age int, |
| | binaryField binary) |
| | STORED BY 'org.apache.carbondata.format' |
| | tblproperties('inverted_index'='binaryField','SORT_COLUMNS'='binaryField') |
| """.stripMargin) |
| } |
| assert(exception.getMessage.contains("sort_columns is unsupported for binary datatype column: binaryField")) |
| } |
| |
| test("COLUMN_META_CACHE doesn't support binary") { |
| sql("DROP TABLE IF EXISTS binaryTable") |
| val exception = intercept[Exception] { |
| sql( |
| s""" |
| | CREATE TABLE IF NOT EXISTS binaryTable ( |
| | id INT, |
| | label boolean, |
| | name STRING, |
| | binaryField BINARY, |
| | autoLabel boolean) |
| | STORED BY 'carbondata' |
| | TBLPROPERTIES('COLUMN_META_CACHE'='binaryField') |
| """.stripMargin) |
| } |
| assert(exception.getMessage.contains("binaryfield is a binary data type column and binary data type is not allowed for the option")) |
| } |
| |
| test("RANGE_COLUMN doesn't support binary") { |
| sql("DROP TABLE IF EXISTS binaryTable") |
| val exception = intercept[Exception] { |
| sql( |
| s""" |
| | CREATE TABLE IF NOT EXISTS binaryTable ( |
| | id INT, |
| | label boolean, |
| | name STRING, |
| | binaryField BINARY, |
| | autoLabel boolean) |
| | STORED BY 'carbondata' |
| | TBLPROPERTIES('RANGE_COLUMN'='binaryField') |
| """.stripMargin) |
| } |
| assert(exception.getMessage.contains("RANGE_COLUMN doesn't support binary data type")) |
| } |
| |
| test("Test carbon.column.compressor=zstd") { |
| sql("DROP TABLE IF EXISTS binaryTable") |
| sql( |
| s""" |
| | CREATE TABLE IF NOT EXISTS binaryTable ( |
| | id INT, |
| | label boolean, |
| | name STRING, |
| | binaryField BINARY, |
| | autoLabel boolean) |
| | STORED BY 'carbondata' |
| | TBLPROPERTIES('carbon.column.compressor'='zstd') |
| """.stripMargin) |
| sql( |
| s""" |
| | LOAD DATA LOCAL INPATH '$resourcesPath/binaryDataHex.csv' |
| | INTO TABLE binaryTable |
| | OPTIONS('header'='false') |
| """.stripMargin) |
| checkAnswer(sql("SELECT COUNT(*) FROM binaryTable"), Seq(Row(3))) |
| val value = sql("SELECT * FROM binaryTable").collect() |
| value.foreach { each => |
| assert(5 == each.length) |
| assert(1 == each.getAs[Int](0) || 2 == each.getAs[Int](0) || 3 == each.getAs[Int](0)) |
| assert(".png".equals(each.getAs(2).toString.substring(1, 5))) |
| assert("89504e470d0a1a0a0000000d4948445200000".equals(new String(each.getAs[Array[Byte]](3).slice(0, 37)))) |
| } |
| } |
| |
| test("Test carbon.column.compressor=gzip") { |
| sql("DROP TABLE IF EXISTS binaryTable") |
| sql( |
| s""" |
| | CREATE TABLE IF NOT EXISTS binaryTable ( |
| | id INT, |
| | label boolean, |
| | name STRING, |
| | binaryField BINARY, |
| | autoLabel boolean) |
| | STORED BY 'carbondata' |
| | TBLPROPERTIES('carbon.column.compressor'='gzip') |
| """.stripMargin) |
| sql( |
| s""" |
| | LOAD DATA LOCAL INPATH '$resourcesPath/binaryDataHex.csv' |
| | INTO TABLE binaryTable |
| | OPTIONS('header'='false') |
| """.stripMargin) |
| checkAnswer(sql("SELECT COUNT(*) FROM binaryTable"), Seq(Row(3))) |
| val value = sql("SELECT * FROM binaryTable").collect() |
| value.foreach { each => |
| assert(5 == each.length) |
| assert(1 == each.getAs[Int](0) || 2 == each.getAs[Int](0) || 3 == each.getAs[Int](0)) |
| assert(".png".equals(each.getAs(2).toString.substring(1, 5))) |
| assert("89504e470d0a1a0a0000000d4948445200000".equals(new String(each.getAs[Array[Byte]](3).slice(0, 37)))) |
| } |
| } |
| |
| test("Test carbon.column.compressor=snappy") { |
| sql("DROP TABLE IF EXISTS binaryTable") |
| sql( |
| s""" |
| | CREATE TABLE IF NOT EXISTS binaryTable ( |
| | id INT, |
| | label boolean, |
| | name STRING, |
| | binaryField BINARY, |
| | autoLabel boolean) |
| | STORED BY 'carbondata' |
| | TBLPROPERTIES('carbon.column.compressor'='snappy') |
| """.stripMargin) |
| sql( |
| s""" |
| | LOAD DATA LOCAL INPATH '$resourcesPath/binaryDataHex.csv' |
| | INTO TABLE binaryTable |
| | OPTIONS('header'='false') |
| """.stripMargin) |
| checkAnswer(sql("SELECT COUNT(*) FROM binaryTable"), Seq(Row(3))) |
| val value = sql("SELECT * FROM binaryTable").collect() |
| value.foreach { each => |
| assert(5 == each.length) |
| assert(1 == each.getAs[Int](0) || 2 == each.getAs[Int](0) || 3 == each.getAs[Int](0)) |
| assert(".png".equals(each.getAs(2).toString.substring(1, 5))) |
| assert("89504e470d0a1a0a0000000d4948445200000".equals(new String(each.getAs[Array[Byte]](3).slice(0, 37)))) |
| } |
| } |
| |
| test("Support filter other column in binary table") { |
| sql("DROP TABLE IF EXISTS binaryTable") |
| sql( |
| s""" |
| | CREATE TABLE IF NOT EXISTS binaryTable ( |
| | id INT, |
| | label boolean, |
| | name STRING, |
| | binaryField BINARY, |
| | autoLabel boolean) |
| | STORED BY 'carbondata' |
| | TBLPROPERTIES('carbon.column.compressor'='zstd') |
| """.stripMargin) |
| sql( |
| s""" |
| | LOAD DATA LOCAL INPATH '$resourcesPath/binaryDataHex.csv' |
| | INTO TABLE binaryTable |
| | OPTIONS('header'='false') |
| """.stripMargin) |
| checkAnswer(sql("SELECT COUNT(*) FROM binaryTable where id =1"), Seq(Row(1))) |
| |
| |
| sql("insert into binaryTable values(1,true,'Bob','hello',false)") |
| checkAnswer(sql("SELECT COUNT(*) FROM binaryTable where binaryField =cast('hello' as binary)"), Seq(Row(1))) |
| } |
| |
| test("Test create table with buckets unsafe") { |
| CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT, "true") |
| sql("DROP TABLE IF EXISTS binaryTable") |
| sql( |
| s""" |
| | CREATE TABLE IF NOT EXISTS binaryTable ( |
| | id INT, |
| | label boolean, |
| | name STRING, |
| | binaryField BINARY, |
| | autoLabel boolean) |
| | STORED BY 'carbondata' |
| | TBLPROPERTIES('BUCKETNUMBER'='4', 'BUCKETCOLUMNS'='binaryField') |
| """.stripMargin) |
| sql( |
| s""" |
| | LOAD DATA LOCAL INPATH '$resourcesPath/binaryDataHex.csv' |
| | INTO TABLE binaryTable |
| | OPTIONS('header'='false') |
| """.stripMargin) |
| |
| CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT, "false") |
| val table: CarbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "binaryTable") |
| if (table != null && table.getBucketingInfo() != null) { |
| assert(true) |
| } else { |
| assert(false, "Bucketing info does not exist") |
| } |
| } |
| |
| test("insert into for hive and carbon") { |
| sql("DROP TABLE IF EXISTS hiveTable") |
| sql("DROP TABLE IF EXISTS carbontable") |
| sql("DROP TABLE IF EXISTS hiveTable2") |
| sql("DROP TABLE IF EXISTS carbontable2") |
| |
| sql( |
| s""" |
| | CREATE TABLE IF NOT EXISTS hivetable ( |
| | id int, |
| | label boolean, |
| | name string, |
| | binaryField binary, |
| | autoLabel boolean) |
| | row format delimited fields terminated by ',' |
| """.stripMargin) |
| sql("insert into hivetable values(1,true,'Bob','binary',false)") |
| sql("insert into hivetable values(2,false,'Xu','test',true)") |
| sql("insert into hivetable select 2,false,'Xu',cast('carbon' as binary),true") |
| val hiveResult = sql("SELECT * FROM hivetable") |
| |
| sql( |
| s""" |
| | CREATE TABLE IF NOT EXISTS carbontable ( |
| | id int, |
| | label boolean, |
| | name string, |
| | binaryField binary, |
| | autoLabel boolean) |
| | STORED BY 'carbondata' |
| """.stripMargin) |
| sql("insert into carbontable values(1,true,'Bob','binary',false)") |
| sql("insert into carbontable values(2,false,'Xu','test',true)") |
| sql("insert into carbontable select 2,false,'Xu',cast('carbon' as binary),true") |
| val carbonResult = sql("SELECT * FROM carbontable") |
| checkAnswer(hiveResult, carbonResult) |
| |
| sql("CREATE TABLE hivetable2 AS SELECT * FROM carbontable") |
| sql("CREATE TABLE carbontable2 AS SELECT * FROM hivetable") |
| val carbonResult2 = sql("SELECT * FROM carbontable2") |
| val hiveResult2 = sql("SELECT * FROM hivetable2") |
| checkAnswer(hiveResult2, carbonResult2) |
| checkAnswer(carbonResult, carbonResult2) |
| checkAnswer(hiveResult, hiveResult2) |
| assert(3 == carbonResult2.collect().length) |
| assert(3 == hiveResult2.collect().length) |
| |
| sql("INSERT INTO hivetable2 SELECT * FROM carbontable") |
| sql("INSERT INTO carbontable2 SELECT * FROM hivetable") |
| val carbonResult3 = sql("SELECT * FROM carbontable2") |
| val hiveResult3 = sql("SELECT * FROM hivetable2") |
| checkAnswer(carbonResult3, hiveResult3) |
| assert(6 == carbonResult3.collect().length) |
| assert(6 == hiveResult3.collect().length) |
| } |
| |
| test("Support filter for hive and carbon") { |
| sql("DROP TABLE IF EXISTS hiveTable") |
| sql("DROP TABLE IF EXISTS carbontable") |
| |
| sql( |
| s""" |
| | CREATE TABLE IF NOT EXISTS hivetable ( |
| | id int, |
| | label boolean, |
| | name string, |
| | binaryField binary, |
| | autoLabel boolean) |
| | row format delimited fields terminated by ',' |
| """.stripMargin) |
| sql("insert into hivetable values(1,true,'Bob','binary',false)") |
| sql("insert into hivetable values(2,false,'Xu','test',true)") |
| val hiveResult = sql("SELECT * FROM hivetable where binaryField=cast('binary' as binary)") |
| |
| sql( |
| s""" |
| | CREATE TABLE IF NOT EXISTS carbontable ( |
| | id int, |
| | label boolean, |
| | name string, |
| | binaryField binary, |
| | autoLabel boolean) |
| | STORED BY 'carbondata' |
| """.stripMargin) |
| sql("insert into carbontable values(1,true,'Bob','binary',false)") |
| sql("insert into carbontable values(2,false,'Xu','test',true)") |
| val carbonResult = sql("SELECT * FROM carbontable where binaryField=cast('binary' as binary)") |
| checkAnswer(hiveResult, carbonResult) |
| assert(1 == carbonResult.collect().length) |
| carbonResult.collect().foreach { each => |
| if (1 == each.get(0)) { |
| assert("binary".equals(new String(each.getAs[Array[Byte]](3)))) |
| } else if (2 == each.get(0)) { |
| assert("test".equals(new String(each.getAs[Array[Byte]](3)))) |
| } else { |
| assert(false) |
| } |
| } |
| |
| // filter with non string |
| val exception = intercept[Exception] { |
| sql("SELECT * FROM carbontable where binaryField=binary").collect() |
| } |
| assert(exception.getMessage.contains("cannot resolve '`binary`' given input columns")) |
| |
| // filter with not equal |
| val hiveResult3 = sql("SELECT * FROM hivetable where binaryField!=cast('binary' as binary)") |
| val carbonResult3 = sql("SELECT * FROM carbontable where binaryField!=cast('binary' as binary)") |
| checkAnswer(hiveResult3, carbonResult3) |
| assert(1 == carbonResult3.collect().length) |
| carbonResult3.collect().foreach { each => |
| assert(2 == each.get(0)) |
| assert("test".equals(new String(each.getAs[Array[Byte]](3)))) |
| } |
| |
| // filter with in |
| val hiveResult4 = sql("SELECT * FROM hivetable where binaryField in (cast('binary' as binary))") |
| val carbonResult4 = sql("SELECT * FROM carbontable where binaryField in (cast('binary' as binary))") |
| checkAnswer(hiveResult4, carbonResult4) |
| assert(1 == carbonResult4.collect().length) |
| carbonResult4.collect().foreach { each => |
| assert(1 == each.get(0)) |
| assert("binary".equals(new String(each.getAs[Array[Byte]](3)))) |
| } |
| |
| // filter with not in |
| val hiveResult5 = sql("SELECT * FROM hivetable where binaryField not in (cast('binary' as binary))") |
| val carbonResult5 = sql("SELECT * FROM carbontable where binaryField not in (cast('binary' as binary))") |
| checkAnswer(hiveResult5, carbonResult5) |
| assert(1 == carbonResult5.collect().length) |
| carbonResult5.collect().foreach { each => |
| assert(2 == each.get(0)) |
| assert("test".equals(new String(each.getAs[Array[Byte]](3)))) |
| } |
| } |
| |
| test("Support update and delete ") { |
| sql("DROP TABLE IF EXISTS carbontable") |
| |
| sql( |
| s""" |
| | CREATE TABLE IF NOT EXISTS carbontable ( |
| | id int, |
| | label boolean, |
| | name string, |
| | binaryField binary, |
| | autoLabel boolean) |
| | STORED BY 'carbondata' |
| """.stripMargin) |
| sql("insert into carbontable values(1,true,'Bob','binary',false)") |
| sql("insert into carbontable values(2,false,'Xu','test',true)") |
| var carbonResult = sql("SELECT * FROM carbontable where binaryField=cast('binary' as binary)") |
| assert(1 == carbonResult.collect().length) |
| carbonResult.collect().foreach { each => |
| if (1 == each.get(0)) { |
| assert("binary".equals(new String(each.getAs[Array[Byte]](3)))) |
| } else if (2 == each.get(0)) { |
| assert("test".equals(new String(each.getAs[Array[Byte]](3)))) |
| } else { |
| assert(false) |
| } |
| } |
| |
| // Update for binary in carbon |
| sql("UPDATE carbontable SET (name) = ('David') WHERE id = 1").show() |
| sql("UPDATE carbontable SET (binaryField) = ('carbon2') WHERE id = 1").show() |
| |
| carbonResult = sql("SELECT * FROM carbontable where binaryField=cast('binary' as binary)") |
| carbonResult.collect().foreach { each => |
| if (1 == each.get(0)) { |
| assert("carbon2".equals(new String(each.getAs[Array[Byte]](3)))) |
| } else if (2 == each.get(0)) { |
| assert("test".equals(new String(each.getAs[Array[Byte]](3)))) |
| } else { |
| assert(false) |
| } |
| } |
| |
| // test cast string to binary, binary to string |
| val stringValue = sql("SELECT cast(binaryField as string) FROM carbontable WHERE id = 1").collect() |
| stringValue.foreach { each => |
| assert("carbon2".equals(each.getAs(0))) |
| } |
| val binaryValue = sql("SELECT cast(name as binary) FROM carbontable WHERE id = 1").collect() |
| binaryValue.foreach { each => |
| assert("David".equals(new String(each.getAs[Array[Byte]](0)))) |
| } |
| |
| // Test delete |
| sql("DELETE FROM carbontable WHERE id = 2").show() |
| |
| carbonResult = sql("SELECT * FROM carbontable where binaryField=cast('binary' as binary)") |
| carbonResult.collect().foreach { each => |
| if (1 == each.get(0)) { |
| assert("carbon2".equals(new String(each.getAs[Array[Byte]](3)))) |
| } else { |
| assert(false) |
| } |
| } |
| } |
| |
| test("Create table and load data with binary column for hive and carbon, CTAS and insert int hive table select from carbon table") { |
| sql("DROP TABLE IF EXISTS hivetable") |
| sql("DROP TABLE IF EXISTS hivetable2") |
| sql("DROP TABLE IF EXISTS hivetable3") |
| sql("DROP TABLE IF EXISTS carbontable") |
| sql( |
| s""" |
| | CREATE TABLE IF NOT EXISTS hivetable ( |
| | id int, |
| | label boolean, |
| | name string, |
| | binaryField binary, |
| | autoLabel boolean) |
| | row format delimited fields terminated by '|' |
| """.stripMargin) |
| sql( |
| s""" |
| | LOAD DATA LOCAL INPATH '$resourcesPath/binarystringdata.csv' |
| | INTO TABLE hivetable |
| """.stripMargin) |
| |
| sql( |
| s""" |
| | CREATE TABLE IF NOT EXISTS carbontable ( |
| | id int, |
| | label boolean, |
| | name string, |
| | binaryField binary, |
| | autoLabel boolean) |
| | STORED BY 'carbondata' |
| """.stripMargin) |
| sql( |
| s""" |
| | LOAD DATA LOCAL INPATH '$resourcesPath/binarystringdata.csv' |
| | INTO TABLE carbontable |
| | OPTIONS('header'='false','DELIMITER'='|','bad_records_action'='fail') |
| """.stripMargin) |
| |
| val hexHiveResult = sql("SELECT hex(binaryField) FROM hivetable") |
| val hexCarbonResult = sql("SELECT hex(binaryField) FROM carbontable") |
| checkAnswer(hexHiveResult, hexCarbonResult) |
| hexCarbonResult.collect().foreach { each => |
| val result = new String(Hex.decodeHex((each.getAs[Array[Char]](0)).toString.toCharArray)) |
| assert("\u0001history\u0002".equals(result) |
| || "\u0001biology\u0002".equals(result) |
| || "\u0001education\u0002".equals(result)) |
| } |
| |
| val base64HiveResult = sql("SELECT base64(binaryField) FROM hivetable") |
| val base64CarbonResult = sql("SELECT base64(binaryField) FROM carbontable") |
| checkAnswer(base64HiveResult, base64CarbonResult) |
| base64CarbonResult.collect().foreach { each => |
| val result = new String(Base64.decodeBase64((each.getAs[Array[Char]](0)).toString)) |
| assert("\u0001history\u0002".equals(result) |
| || "\u0001biology\u0002".equals(result) |
| || "\u0001education\u0002".equals(result)) |
| } |
| |
| |
| val hiveResult = sql("SELECT * FROM hivetable") |
| val carbonResult = sql("SELECT * FROM carbontable") |
| checkAnswer(hiveResult, carbonResult) |
| checkAnswer(sql("SELECT COUNT(*) FROM hivetable"), Seq(Row(3))) |
| try { |
| val carbonDF = carbonResult.collect() |
| assert(3 == carbonDF.length) |
| carbonDF.foreach { each => |
| assert(5 == each.length) |
| |
| assert(Integer.valueOf(each(0).toString) > 0) |
| assert(each(1).toString.equalsIgnoreCase("false") || (each(1).toString.equalsIgnoreCase("true"))) |
| assert(each(2).toString.contains(".png")) |
| |
| val value = new String(each.getAs[Array[Byte]](3)) |
| assert("\u0001history\u0002".equals(value) || "\u0001biology\u0002".equals(value) |
| || "\u0001education\u0002".equals(value) || "".equals(value)) |
| assert(each(4).toString.equalsIgnoreCase("false") || (each(4).toString.equalsIgnoreCase("true"))) |
| } |
| |
| val df = hiveResult.collect() |
| assert(3 == df.length) |
| df.foreach { each => |
| assert(5 == each.length) |
| |
| assert(Integer.valueOf(each(0).toString) > 0) |
| assert(each(1).toString.equalsIgnoreCase("false") || (each(1).toString.equalsIgnoreCase("true"))) |
| assert(each(2).toString.contains(".png")) |
| |
| |
| val value = new String(each.getAs[Array[Byte]](3)) |
| assert("\u0001history\u0002".equals(value) || "\u0001biology\u0002".equals(value) |
| || "\u0001education\u0002".equals(value) || "".equals(value)) |
| assert(each(4).toString.equalsIgnoreCase("false") || (each(4).toString.equalsIgnoreCase("true"))) |
| } |
| |
| sql( |
| s""" |
| | CREATE TABLE IF NOT EXISTS hivetable2 ( |
| | id int, |
| | label boolean, |
| | name string, |
| | binaryField binary, |
| | autoLabel boolean) |
| | row format delimited fields terminated by '|' |
| """.stripMargin) |
| sql("insert into hivetable2 select * from carbontable") |
| sql("create table hivetable3 as select * from carbontable") |
| val hiveResult2 = sql("SELECT * FROM hivetable2") |
| val hiveResult3 = sql("SELECT * FROM hivetable3") |
| checkAnswer(hiveResult, hiveResult2) |
| checkAnswer(hiveResult2, hiveResult3) |
| } catch { |
| case e: Exception => |
| e.printStackTrace() |
| assert(false) |
| } |
| } |
| |
| // TODO |
| ignore("Create table and load data with binary column for hive and carbon, CTAS and insert int hive table select from carbon table, for null value") { |
| sql("DROP TABLE IF EXISTS hivetable") |
| sql("DROP TABLE IF EXISTS hivetable2") |
| sql("DROP TABLE IF EXISTS hivetable3") |
| sql("DROP TABLE IF EXISTS carbontable") |
| sql( |
| s""" |
| | CREATE TABLE IF NOT EXISTS hivetable ( |
| | id int, |
| | label boolean, |
| | name string, |
| | binaryField binary, |
| | autoLabel boolean) |
| | row format delimited fields terminated by '|' |
| """.stripMargin) |
| sql( |
| s""" |
| | LOAD DATA LOCAL INPATH '$resourcesPath/binaryStringNullData.csv' |
| | INTO TABLE hivetable |
| """.stripMargin) |
| |
| sql( |
| s""" |
| | CREATE TABLE IF NOT EXISTS carbontable ( |
| | id int, |
| | label boolean, |
| | name string, |
| | binaryField binary, |
| | autoLabel boolean) |
| | STORED BY 'carbondata' |
| """.stripMargin) |
| sql( |
| s""" |
| | LOAD DATA LOCAL INPATH '$resourcesPath/binaryStringNullData.csv' |
| | INTO TABLE carbontable |
| | OPTIONS('header'='false','DELIMITER'='|','bad_records_action'='fail') |
| """.stripMargin) |
| |
| val hiveResult = sql("SELECT * FROM hivetable") |
| val carbonResult = sql("SELECT * FROM carbontable") |
| checkAnswer(hiveResult, carbonResult) |
| checkAnswer(sql("SELECT COUNT(*) FROM hivetable"), Seq(Row(4))) |
| try { |
| val carbonDF = carbonResult.collect() |
| assert(4 == carbonDF.length) |
| carbonDF.foreach { each => |
| assert(5 == each.length) |
| |
| assert(Integer.valueOf(each(0).toString) > 0) |
| assert(each(1).toString.equalsIgnoreCase("false") || (each(1).toString.equalsIgnoreCase("true"))) |
| assert(each(2).toString.contains(".png")) |
| |
| val value = new String(each.getAs[Array[Byte]](3)) |
| assert("\u0001history\u0002".equals(value) || "\u0001biology\u0002".equals(value) |
| || "\u0001education\u0002".equals(value) || "".equals(value)) |
| assert(each(4).toString.equalsIgnoreCase("false") || (each(4).toString.equalsIgnoreCase("true"))) |
| } |
| |
| val df = hiveResult.collect() |
| assert(4 == df.length) |
| df.foreach { each => |
| assert(5 == each.length) |
| |
| assert(Integer.valueOf(each(0).toString) > 0) |
| assert(each(1).toString.equalsIgnoreCase("false") || (each(1).toString.equalsIgnoreCase("true"))) |
| assert(each(2).toString.contains(".png")) |
| |
| |
| val value = new String(each.getAs[Array[Byte]](3)) |
| assert("\u0001history\u0002".equals(value) || "\u0001biology\u0002".equals(value) |
| || "\u0001education\u0002".equals(value) || "".equals(value)) |
| assert(each(4).toString.equalsIgnoreCase("false") || (each(4).toString.equalsIgnoreCase("true"))) |
| } |
| |
| sql( |
| s""" |
| | CREATE TABLE IF NOT EXISTS hivetable2 ( |
| | id int, |
| | label boolean, |
| | name string, |
| | binaryField binary, |
| | autoLabel boolean) |
| | row format delimited fields terminated by '|' |
| """.stripMargin) |
| sql("insert into hivetable2 select * from carbontable") |
| sql("create table hivetable3 as select * from carbontable") |
| val hiveResult2 = sql("SELECT * FROM hivetable2") |
| val hiveResult3 = sql("SELECT * FROM hivetable3") |
| checkAnswer(hiveResult, hiveResult2) |
| checkAnswer(hiveResult2, hiveResult3) |
| } catch { |
| case e: Exception => |
| e.printStackTrace() |
| assert(false) |
| } |
| } |
| |
| test("insert into carbon as select from hive after hive load data") { |
| sql("DROP TABLE IF EXISTS hiveTable") |
| sql("DROP TABLE IF EXISTS carbontable") |
| sql("DROP TABLE IF EXISTS hiveTable2") |
| sql("DROP TABLE IF EXISTS carbontable2") |
| |
| sql( |
| s""" |
| | CREATE TABLE IF NOT EXISTS hivetable ( |
| | id int, |
| | label boolean, |
| | name string, |
| | binaryField binary, |
| | autoLabel boolean) |
| | row format delimited fields terminated by '|' |
| """.stripMargin) |
| sql( |
| s""" |
| | LOAD DATA LOCAL INPATH '$resourcesPath/binarystringdata.csv' |
| | INTO TABLE hivetable |
| """.stripMargin) |
| |
| sql( |
| s""" |
| | CREATE TABLE IF NOT EXISTS carbontable ( |
| | id int, |
| | label boolean, |
| | name string, |
| | binaryField binary, |
| | autoLabel boolean) |
| | STORED BY 'carbondata' |
| """.stripMargin) |
| sql("insert into carbontable select * from hivetable") |
| val carbonResult = sql("SELECT * FROM carbontable") |
| val hiveResult = sql("SELECT * FROM hivetable") |
| |
| assert(3 == carbonResult.collect().length) |
| assert(3 == hiveResult.collect().length) |
| checkAnswer(hiveResult, carbonResult) |
| carbonResult.collect().foreach { each => |
| if (2 == each.get(0)) { |
| assert("\u0001history\u0002".equals(new String(each.getAs[Array[Byte]](3)))) |
| } else if (1 == each.get(0)) { |
| assert("\u0001education\u0002".equals(new String(each.getAs[Array[Byte]](3)))) |
| } else if (3 == each.get(0)) { |
| assert("\u0001biology\u0002".equals(new String(each.getAs[Array[Byte]](3))) |
| || "".equals(new String(each.getAs[Array[Byte]](3)))) |
| } else { |
| assert(false) |
| } |
| } |
| |
| sql("CREATE TABLE hivetable2 AS SELECT * FROM carbontable") |
| sql("CREATE TABLE carbontable2 STORED BY 'carbondata' AS SELECT * FROM hivetable") |
| val carbonResult2 = sql("SELECT * FROM carbontable2") |
| val hiveResult2 = sql("SELECT * FROM hivetable2") |
| checkAnswer(hiveResult2, carbonResult2) |
| checkAnswer(carbonResult, carbonResult2) |
| checkAnswer(hiveResult, hiveResult2) |
| assert(3 == carbonResult2.collect().length) |
| assert(3 == hiveResult2.collect().length) |
| |
| sql("INSERT INTO hivetable2 SELECT * FROM carbontable") |
| sql("INSERT INTO carbontable2 SELECT * FROM hivetable") |
| val carbonResult3 = sql("SELECT * FROM carbontable2") |
| val hiveResult3 = sql("SELECT * FROM hivetable2") |
| checkAnswer(carbonResult3, hiveResult3) |
| assert(6 == carbonResult3.collect().length) |
| assert(6 == hiveResult3.collect().length) |
| } |
| |
| test("compaction for binary") { |
| CarbonProperties.getInstance() |
| .addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "false") |
| .addProperty(CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD, |
| CarbonCommonConstants.DEFAULT_SEGMENT_LEVEL_THRESHOLD) |
| sql("DROP TABLE IF EXISTS carbontable") |
| sql( |
| s""" |
| | CREATE TABLE IF NOT EXISTS carbontable ( |
| | id int, |
| | label boolean, |
| | name string, |
| | binaryField binary, |
| | autoLabel boolean) |
| | STORED BY 'carbondata' |
| """.stripMargin) |
| for (i <- 0 until (3)) { |
| sql( |
| s""" |
| | LOAD DATA LOCAL INPATH '$resourcesPath/binarystringdata.csv' |
| | INTO TABLE carbontable |
| | OPTIONS('header'='false','DELIMITER'='|') |
| """.stripMargin) |
| } |
| // 3 segments, no compaction |
| var segments = sql("SHOW SEGMENTS FOR TABLE carbontable") |
| var SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) } |
| assert(!SegmentSequenceIds.contains("0.1")) |
| assert(SegmentSequenceIds.length == 3) |
| for (i <- 0 until (3)) { |
| sql("insert into carbontable values(1,true,'Bob','binary',false)") |
| } |
| |
| // without auto compaction will not compact |
| segments = sql("SHOW SEGMENTS FOR TABLE carbontable") |
| SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) } |
| assert(!SegmentSequenceIds.contains("0.1")) |
| assert(SegmentSequenceIds.length == 6) |
| |
| // minor compaction |
| sql("alter table carbontable compact 'MINOR'") |
| segments = sql("SHOW SEGMENTS FOR TABLE carbontable") |
| SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) } |
| assert(SegmentSequenceIds.contains("0.1")) |
| assert(!SegmentSequenceIds.contains("0.2")) |
| assert(SegmentSequenceIds.length == 7) |
| |
| // major compaction |
| sql("alter table carbontable compact 'major'") |
| segments = sql("SHOW SEGMENTS FOR TABLE carbontable") |
| SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) } |
| assert(SegmentSequenceIds.contains("0.2")) |
| assert(SegmentSequenceIds.contains("0.1")) |
| assert(SegmentSequenceIds.length == 8) |
| |
| // clean files |
| segments = sql("CLEAN FILES FOR TABLE carbontable") |
| segments = sql("SHOW SEGMENTS FOR TABLE carbontable") |
| SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) } |
| assert(SegmentSequenceIds.contains("0.2")) |
| assert(!SegmentSequenceIds.contains("0.1")) |
| assert(SegmentSequenceIds.length == 1) |
| |
| CarbonProperties.getInstance() |
| .addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "true") |
| for (i <- 0 until (4)) { |
| sql("insert into carbontable values(1,true,'Bob','binary',false)") |
| } |
| // auto compaction |
| segments = sql("SHOW SEGMENTS FOR TABLE carbontable") |
| SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) } |
| assert(SegmentSequenceIds.contains("6.1")) |
| assert(!SegmentSequenceIds.contains("0.1")) |
| assert(SegmentSequenceIds.contains("0.2")) |
| assert(SegmentSequenceIds.length == 6) |
| |
| // check the data |
| val carbonResult = sql("SELECT * FROM carbontable") |
| carbonResult.collect().foreach { each => |
| if (2 == each.get(0)) { |
| assert("\u0001history\u0002".equals(new String(each.getAs[Array[Byte]](3)))) |
| } else if (1 == each.get(0)) { |
| assert("\u0001education\u0002".equals(new String(each.getAs[Array[Byte]](3))) |
| || "binary".equals(new String(each.getAs[Array[Byte]](3)))) |
| } else if (3 == each.get(0)) { |
| assert("\u0001biology\u0002".equals(new String(each.getAs[Array[Byte]](3))) |
| || "".equals(new String(each.getAs[Array[Byte]](3)))) |
| } else { |
| assert(false) |
| } |
| } |
| |
| CarbonProperties.getInstance() |
| .addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, |
| CarbonCommonConstants.DEFAULT_ENABLE_AUTO_LOAD_MERGE) |
| } |
| |
| test("alter table for binary") { |
| sql("DROP TABLE IF EXISTS carbontable") |
| sql("DROP TABLE IF EXISTS binarytable") |
| sql( |
| s""" |
| | CREATE TABLE IF NOT EXISTS carbontable ( |
| | id int, |
| | label boolean, |
| | name string) |
| | STORED BY 'carbondata' |
| """.stripMargin) |
| |
| |
| sql("insert into carbontable values(1,true,'Bob')") |
| |
| sql( |
| s""" |
| | alter table carbontable add columns ( |
| | binaryField binary, |
| | autoLabel boolean) |
| | TBLPROPERTIES('DEFAULT.VALUE.binaryField'='binary','DEFAULT.VALUE.autoLabel'='true') |
| """.stripMargin) |
| |
| var carbonResult = sql("SELECT * FROM carbontable") |
| carbonResult.collect().foreach { each => |
| if (1 == each.get(0)) { |
| assert("binary".equals(new String(each.getAs[Array[Byte]](3)))) |
| } else { |
| assert(false) |
| } |
| } |
| |
| sql( |
| s""" |
| | LOAD DATA LOCAL INPATH '$resourcesPath/binarystringdata.csv' |
| | INTO TABLE carbontable |
| | OPTIONS('header'='false','DELIMITER'='|') |
| """.stripMargin) |
| |
| |
| sql("insert into carbontable values(1,true,'Bob','binary',false)") |
| |
| carbonResult = sql("SELECT * FROM carbontable") |
| carbonResult.collect().foreach { each => |
| if (2 == each.get(0)) { |
| assert("\u0001history\u0002".equals(new String(each.getAs[Array[Byte]](3)))) |
| } else if (1 == each.get(0)) { |
| assert("\u0001education\u0002".equals(new String(each.getAs[Array[Byte]](3))) |
| || "binary".equals(new String(each.getAs[Array[Byte]](3)))) |
| } else if (3 == each.get(0)) { |
| assert("\u0001biology\u0002".equals(new String(each.getAs[Array[Byte]](3))) |
| || "".equals(new String(each.getAs[Array[Byte]](3)))) |
| } else { |
| assert(false) |
| } |
| } |
| |
| var result = sql("show tables") |
| result.collect().foreach { each => |
| assert(!"binarytable".equalsIgnoreCase(each.getAs[String](1))) |
| } |
| |
| // rename |
| sql( |
| s""" |
| | alter table carbontable RENAME TO binarytable |
| """.stripMargin) |
| result = sql("show tables") |
| assert(result.collect().exists { each => |
| "binarytable".equalsIgnoreCase(each.getAs[String](1)) |
| }) |
| |
| // add columns after rename |
| sql( |
| s""" |
| | alter table binarytable add columns ( |
| | binaryField2 binary, |
| | autoLabel2 boolean) |
| | TBLPROPERTIES('DEFAULT.VALUE.binaryField2'='binary','DEFAULT.VALUE.autoLabel2'='true') |
| """.stripMargin) |
| sql("insert into binarytable values(1,true,'Bob','binary',false,'binary',false)") |
| |
| carbonResult = sql("SELECT * FROM binarytable") |
| carbonResult.collect().foreach { each => |
| if (2 == each.get(0)) { |
| assert("\u0001history\u0002".equals(new String(each.getAs[Array[Byte]](3)))) |
| } else if (1 == each.get(0)) { |
| assert("\u0001education\u0002".equals(new String(each.getAs[Array[Byte]](3))) |
| || "binary".equals(new String(each.getAs[Array[Byte]](3)))) |
| } else if (3 == each.get(0)) { |
| assert(null == each.getAs[Array[Byte]](3) |
| || "\u0001biology\u0002".equals(new String(each.getAs[Array[Byte]](3)))) |
| } else { |
| assert(false) |
| } |
| } |
| |
| // drop columns after rename |
| sql( |
| s""" |
| | alter table binarytable drop columns ( |
| | binaryField2, |
| | autoLabel2) |
| """.stripMargin) |
| sql("insert into binarytable values(1,true,'Bob','binary',false)") |
| |
| carbonResult = sql("SELECT * FROM binarytable") |
| carbonResult.collect().foreach { each => |
| if (2 == each.get(0)) { |
| assert("\u0001history\u0002".equals(new String(each.getAs[Array[Byte]](3)))) |
| } else if (1 == each.get(0)) { |
| assert("\u0001education\u0002".equals(new String(each.getAs[Array[Byte]](3))) |
| || "binary".equals(new String(each.getAs[Array[Byte]](3)))) |
| } else if (3 == each.get(0)) { |
| assert("\u0001biology\u0002".equals(new String(each.getAs[Array[Byte]](3))) |
| || "".equals(new String(each.getAs[Array[Byte]](3)))) |
| } else { |
| assert(false) |
| } |
| } |
| |
| // change data type |
| val e = intercept[Exception] { |
| sql(s"alter table binarytable CHANGE binaryField binaryField3 STRING ") |
| } |
| assert(e.getMessage.contains("operation failed for default.binarytable: Alter table data type change operation failed: Given column binaryfield with data type BINARY cannot be modified. Only Int and Decimal data types are allowed for modification")) |
| } |
| |
| test("Create table and load data with binary column for hive: test encode with base64") { |
| sql("DROP TABLE IF EXISTS hivetable") |
| sql("DROP TABLE IF EXISTS carbontable") |
| sql( |
| s""" |
| | CREATE TABLE IF NOT EXISTS hivetable ( |
| | id int, |
| | label boolean, |
| | name string, |
| | binaryField binary, |
| | autoLabel boolean) |
| | row format delimited fields terminated by ',' |
| """.stripMargin) |
| sql( |
| s""" |
| | LOAD DATA LOCAL INPATH '$resourcesPath/binaryDataBase64.csv' |
| | INTO TABLE hivetable |
| """.stripMargin) |
| |
| sql( |
| s""" |
| | CREATE TABLE IF NOT EXISTS carbontable ( |
| | id int, |
| | label boolean, |
| | name string, |
| | binaryField binary, |
| | autoLabel boolean) |
| | STORED BY 'carbondata' |
| """.stripMargin) |
| sql( |
| s""" |
| | LOAD DATA LOCAL INPATH '$resourcesPath/binaryDataBase64.csv' |
| | INTO TABLE carbontable |
| | OPTIONS('header'='false','DELIMITER'=',','binary_decoder'='baSe64') |
| """.stripMargin) |
| |
| val hiveResult = sql("SELECT * FROM hivetable") |
| val carbonResult = sql("SELECT * FROM carbontable") |
| checkAnswer(hiveResult, carbonResult) |
| |
| checkAnswer(sql("SELECT COUNT(*) FROM hivetable"), Seq(Row(3))) |
| try { |
| val carbonDF = carbonResult.collect() |
| assert(3 == carbonDF.length) |
| carbonDF.foreach { each => |
| assert(5 == each.length) |
| |
| assert(Integer.valueOf(each(0).toString) > 0) |
| assert(each(1).toString.equalsIgnoreCase("false") || (each(1).toString.equalsIgnoreCase("true"))) |
| assert(each(2).toString.contains(".png")) |
| |
| val value = each.getAs[Array[Byte]](3).slice(0, 10) |
| assert(new String(Base64.encodeBase64(value)).equals("iVBORw0KGgoAAA==")) |
| assert(each(4).toString.equalsIgnoreCase("false") || (each(4).toString.equalsIgnoreCase("true"))) |
| } |
| |
| val df = hiveResult.collect() |
| assert(3 == df.length) |
| df.foreach { each => |
| assert(5 == each.length) |
| |
| assert(Integer.valueOf(each(0).toString) > 0) |
| assert(each(1).toString.equalsIgnoreCase("false") || (each(1).toString.equalsIgnoreCase("true"))) |
| assert(each(2).toString.contains(".png")) |
| |
| |
| val value = each.getAs[Array[Byte]](3).slice(0, 10) |
| assert(new String(Base64.encodeBase64(value)).equals("iVBORw0KGgoAAA==")) |
| assert(each(4).toString.equalsIgnoreCase("false") || (each(4).toString.equalsIgnoreCase("true"))) |
| } |
| } catch { |
| case e: Exception => |
| e.printStackTrace() |
| assert(false) |
| } |
| } |
| |
| test("Create table and load data with binary column for hive: test encode with base64 and streaming = true") { |
| sql("DROP TABLE IF EXISTS hivetable") |
| sql("DROP TABLE IF EXISTS carbontable") |
| sql( |
| s""" |
| | CREATE TABLE IF NOT EXISTS hivetable ( |
| | id int, |
| | label boolean, |
| | name string, |
| | binaryField binary, |
| | autoLabel boolean) |
| | row format delimited fields terminated by ',' |
| """.stripMargin) |
| sql( |
| s""" |
| | LOAD DATA LOCAL INPATH '$resourcesPath/binaryDataBase64.csv' |
| | INTO TABLE hivetable |
| """.stripMargin) |
| |
| sql( |
| s""" |
| | CREATE TABLE IF NOT EXISTS carbontable ( |
| | id int, |
| | label boolean, |
| | name string, |
| | binaryField binary, |
| | autoLabel boolean) |
| | STORED BY 'carbondata' |
| | tblproperties('streaming'='true') |
| """.stripMargin) |
| sql( |
| s""" |
| | LOAD DATA LOCAL INPATH '$resourcesPath/binaryDataBase64.csv' |
| | INTO TABLE carbontable |
| | OPTIONS('header'='false','DELIMITER'=',','binary_decoder'='baSe64') |
| """.stripMargin) |
| |
| val hiveResult = sql("SELECT * FROM hivetable") |
| val carbonResult = sql("SELECT * FROM carbontable") |
| checkAnswer(hiveResult, carbonResult) |
| |
| checkAnswer(sql("SELECT COUNT(*) FROM hivetable"), Seq(Row(3))) |
| try { |
| val carbonDF = carbonResult.collect() |
| assert(3 == carbonDF.length) |
| carbonDF.foreach { each => |
| assert(5 == each.length) |
| |
| assert(Integer.valueOf(each(0).toString) > 0) |
| assert(each(1).toString.equalsIgnoreCase("false") || (each(1).toString.equalsIgnoreCase("true"))) |
| assert(each(2).toString.contains(".png")) |
| |
| val value = each.getAs[Array[Byte]](3).slice(0, 10) |
| assert(new String(Base64.encodeBase64(value)).equals("iVBORw0KGgoAAA==")) |
| assert(each(4).toString.equalsIgnoreCase("false") || (each(4).toString.equalsIgnoreCase("true"))) |
| } |
| |
| val df = hiveResult.collect() |
| assert(3 == df.length) |
| df.foreach { each => |
| assert(5 == each.length) |
| |
| assert(Integer.valueOf(each(0).toString) > 0) |
| assert(each(1).toString.equalsIgnoreCase("false") || (each(1).toString.equalsIgnoreCase("true"))) |
| assert(each(2).toString.contains(".png")) |
| |
| |
| val value = each.getAs[Array[Byte]](3).slice(0, 10) |
| assert(new String(Base64.encodeBase64(value)).equals("iVBORw0KGgoAAA==")) |
| assert(each(4).toString.equalsIgnoreCase("false") || (each(4).toString.equalsIgnoreCase("true"))) |
| } |
| } catch { |
| case e: Exception => |
| e.printStackTrace() |
| assert(false) |
| } |
| } |
| |
| test("Create table and load data with binary column for hive: test encode without \u0001 and not base64") { |
| // Carbon will throw exception if the data is not base64 when carbon set binary_decoder is base64 |
| // hive will save as original data if the data is not base64 |
| sql("DROP TABLE IF EXISTS hivetable") |
| sql("DROP TABLE IF EXISTS carbontable") |
| sql( |
| s""" |
| | CREATE TABLE IF NOT EXISTS hivetable ( |
| | id int, |
| | label boolean, |
| | name string, |
| | binaryField binary, |
| | autoLabel boolean) |
| | row format delimited fields terminated by '|' |
| """.stripMargin) |
| sql( |
| s""" |
| | LOAD DATA LOCAL INPATH '$resourcesPath/binarystringdata2.csv' |
| | INTO TABLE hivetable |
| """.stripMargin) |
| |
| sql( |
| s""" |
| | CREATE TABLE IF NOT EXISTS carbontable ( |
| | id int, |
| | label boolean, |
| | name string, |
| | binaryField binary, |
| | autoLabel boolean) |
| | STORED BY 'carbondata' |
| """.stripMargin) |
| val e = intercept[Exception] { |
| sql( |
| s""" |
| | LOAD DATA LOCAL INPATH '$resourcesPath/binarystringdata2.csv' |
| | INTO TABLE carbontable |
| | OPTIONS('header'='false','DELIMITER'='|','binary_decoder'='baSe64') |
| """.stripMargin) |
| } |
| assert(e.getMessage.contains("Binary decoder is base64, but data is not base64")) |
| } |
| |
| test("Create table and load data with binary column with Hex decode") { |
| sql("DROP TABLE IF EXISTS binaryTable") |
| sql( |
| s""" |
| | CREATE TABLE IF NOT EXISTS binaryTable ( |
| | id int, |
| | label boolean, |
| | name string, |
| | binaryField binary, |
| | autoLabel boolean) |
| | STORED BY 'carbondata' |
| | TBLPROPERTIES('SORT_COLUMNS'='') |
| """.stripMargin) |
| sql( |
| s""" |
| | LOAD DATA LOCAL INPATH '$resourcesPath/binaryDataHex.csv' |
| | INTO TABLE binaryTable |
| | OPTIONS('header'='false','binary_decoder'='hex') |
| """.stripMargin) |
| |
| val result = sql("desc formatted binaryTable").collect() |
| var flag = false |
| result.foreach { each => |
| if ("binary".equals(each.get(1))) { |
| flag = true |
| } |
| } |
| assert(flag) |
| |
| checkAnswer(sql("SELECT COUNT(*) FROM binaryTable"), Seq(Row(3))) |
| try { |
| val df = sql("SELECT * FROM binaryTable").collect() |
| assert(3 == df.length) |
| df.foreach { each => |
| assert(5 == each.length) |
| |
| assert(Integer.valueOf(each(0).toString) > 0) |
| assert(each(1).toString.equalsIgnoreCase("false") || (each(1).toString.equalsIgnoreCase("true"))) |
| assert(each(2).toString.contains(".png")) |
| |
| val bytes20 = each.getAs[Array[Byte]](3).slice(0, 20) |
| val binaryName = each(2).toString |
| assert(Arrays.equals(firstBytes20.get(binaryName).get, bytes20), "incorrect value for binary data") |
| |
| assert(each(4).toString.equalsIgnoreCase("false") || (each(4).toString.equalsIgnoreCase("true"))) |
| |
| val df = sql("SELECT name,binaryField FROM binaryTable").collect() |
| assert(3 == df.length) |
| df.foreach { each => |
| assert(2 == each.length) |
| val binaryName = each(0).toString |
| val bytes20 = each.getAs[Array[Byte]](1).slice(0, 20) |
| assert(Arrays.equals(firstBytes20.get(binaryName).get, bytes20), "incorrect value for binary data") |
| } |
| } |
| } catch { |
| case e: Exception => |
| e.printStackTrace() |
| assert(false) |
| } |
| } |
| |
| test("Create table and load data with binary column with invalid value") { |
| sql("DROP TABLE IF EXISTS binaryTable") |
| sql( |
| s""" |
| | CREATE TABLE IF NOT EXISTS binaryTable ( |
| | id int, |
| | label boolean, |
| | name string, |
| | binaryField binary, |
| | autoLabel boolean) |
| | STORED BY 'carbondata' |
| | TBLPROPERTIES('SORT_COLUMNS'='') |
| """.stripMargin) |
| val e = intercept[Exception] { |
| sql( |
| s""" |
| | LOAD DATA LOCAL INPATH '$resourcesPath/binaryDataHex.csv' |
| | INTO TABLE binaryTable |
| | OPTIONS('header'='false','binary_decoder'='he') |
| """.stripMargin) |
| } |
| assert(e.getMessage().contains( |
| "Binary decoder only support Base64, Hex or no decode for string, don't support he")) |
| } |
| |
| test("insert into partition table") { |
| sql("DROP TABLE IF EXISTS hive_table") |
| sql("DROP TABLE IF EXISTS hive_table2") |
| sql("DROP TABLE IF EXISTS parquet_table") |
| sql("DROP TABLE IF EXISTS carbon_partition_table") |
| |
| sql("set hive.exec.dynamic.partition.mode=strict") |
| |
| sql( |
| s""" |
| | CREATE TABLE IF NOT EXISTS hive_table ( |
| | name STRING, |
| | id string) |
| | PARTITIONED BY(photo binary) |
| | row format delimited fields terminated by '|' |
| """.stripMargin) |
| |
| sql("INSERT INTO hive_table PARTITION(photo='binary') select 'a','b'"); |
| sql("INSERT INTO hive_table PARTITION(photo='1') select 'a','b'"); |
| checkAnswer(sql("select cast(photo as string) from hive_table"), Seq(Row("binary"), Row("1"))); |
| |
| |
| sql( |
| s""" |
| | CREATE TABLE IF NOT EXISTS hive_table2 ( |
| | name STRING, |
| | id string) |
| | PARTITIONED BY(photo binary) |
| """.stripMargin) |
| |
| sql("INSERT INTO hive_table2 PARTITION(photo='binary') select 'a','b'"); |
| sql("INSERT INTO hive_table2 PARTITION(photo='1') select 'a','b'"); |
| checkAnswer(sql("select cast(photo as string) from hive_table2"), Seq(Row("binary"), Row("1"))); |
| |
| sql( |
| s""" |
| | CREATE TABLE IF NOT EXISTS parquet_table ( |
| | name STRING, |
| | id string) |
| | PARTITIONED BY(photo binary) |
| | STORED AS PARQUET |
| """.stripMargin) |
| |
| sql("INSERT INTO parquet_table PARTITION(photo='binary') select 'a','b'"); |
| sql("INSERT INTO parquet_table PARTITION(photo='1') select 'a','b'"); |
| |
| sql("select cast(photo as string) from parquet_table").show() |
| //TODOļ¼ is it a bug in parquet? |
| // checkAnswer(sql("select cast(photo as string) from parquet_table"), Seq(Row(),Row())); |
| |
| sql( |
| s""" |
| | CREATE TABLE IF NOT EXISTS carbon_partition_table ( |
| | name STRING, |
| | id string) |
| | PARTITIONED BY(photo binary) |
| | STORED BY 'carbondata' |
| """.stripMargin) |
| |
| |
| sql("INSERT INTO carbon_partition_table PARTITION(photo='binary') select 'a','b'"); |
| sql("INSERT INTO carbon_partition_table PARTITION(photo='1') select 'a','b'"); |
| sql("select * from carbon_partition_table").show() |
| sql("select cast(photo as string) from carbon_partition_table").show() |
| checkAnswer(sql("select cast(photo as string) from carbon_partition_table"), Seq(Row("binary"), Row("1"))) |
| checkAnswer(sql("select * from carbon_partition_table"), sql("select * from hive_table")) |
| |
| val e = intercept[SparkException] { |
| sql("insert into hive_table select 'a','b','binary'"); |
| } |
| |
| assert(e.getMessage.contains("Dynamic partition strict mode requires at least one static partition column")) |
| |
| val eInt = intercept[Exception] { |
| sql("insert into hive_table select 'a','b','1'"); |
| } |
| |
| val e2 = intercept[SparkException] { |
| sql("insert into hive_table2 select 'a','b','binary'"); |
| } |
| |
| assert(e2.getMessage.contains("Dynamic partition strict mode requires at least one static partition column")) |
| |
| val eInt2 = intercept[Exception] { |
| sql("insert into hive_table2 select 'a','b','1'"); |
| } |
| |
| val e3 = intercept[SparkException] { |
| sql("insert into parquet_table select 'a','b','binary'"); |
| } |
| |
| assert(e3.getMessage.contains("Dynamic partition strict mode requires at least one static partition column")) |
| |
| val eInt3 = intercept[Exception] { |
| sql("insert into parquet_table select 'a','b','1'"); |
| } |
| |
| sql("insert into carbon_partition_table select 'a','b','binary'"); |
| sql("insert into carbon_partition_table select 'a','b','1'"); |
| |
| checkAnswer(sql("select cast(photo as string) from carbon_partition_table"), |
| Seq(Row("binary"), Row("1"), Row("binary"), Row("1"))) |
| |
| sql("select * from carbon_partition_table").show() |
| |
| // set hive.exec.dynamic.partition.mode=nonstrict |
| sql("set hive.exec.dynamic.partition.mode=nonstrict") |
| sql("insert into hive_table select 'a','b','binary'"); |
| val eInt11 = intercept[AnalysisException] { |
| sql("insert into hive_table select 'a','b',1"); |
| } |
| assert(eInt11.getMessage.contains("cannot resolve 'CAST(`1` AS BINARY)' due to data type mismatch: cannot cast ")) |
| |
| checkAnswer(sql("select cast(photo as string) from hive_table"), |
| Seq(Row("binary"), Row("1"), Row("binary"))) |
| |
| sql("insert into hive_table2 select 'a','b','binary'"); |
| val eInt22 = intercept[AnalysisException] { |
| sql("insert into hive_table2 select 'a','b',1"); |
| } |
| assert(eInt22.getMessage.contains("cannot resolve 'CAST(`1` AS BINARY)' due to data type mismatch: cannot cast ")) |
| |
| checkAnswer(sql("select cast(photo as string) from hive_table2"), |
| Seq(Row("binary"), Row("1"), Row("binary"))) |
| |
| sql("insert into parquet_table select 'a','b','binary'"); |
| val eInt32 = intercept[AnalysisException] { |
| sql("insert into parquet_table select 'a','b',1"); |
| } |
| assert(eInt32.getMessage.contains("cannot resolve 'CAST(`1` AS BINARY)' due to data type mismatch: cannot cast ")) |
| |
| //TODO: is it bug in parquet? |
| // checkAnswer(sql("select cast(photo as string) from parquet_table"), |
| // Seq(Row(),Row(),Row())) |
| |
| sql("insert into carbon_partition_table select 'a','b','binary'"); |
| sql("insert into carbon_partition_table select 'a','b','1'"); |
| |
| checkAnswer(sql("select cast(photo as string) from carbon_partition_table"), |
| Seq(Row("binary"), Row("1"), Row("binary"), Row("1"), Row("binary"), Row("1"))) |
| } |
| |
| test("Select query with average function for substring of binary column is executed.") { |
| sql("DROP TABLE IF EXISTS uniqdata") |
| sql( |
| s""" |
| | CREATE TABLE uniqdata ( |
| | CUST_ID int, |
| | CUST_NAME binary, |
| | ACTIVE_EMUI_VERSION string, |
| | DOB timestamp, |
| | DOJ timestamp, |
| | BIGINT_COLUMN1 bigint, |
| | BIGINT_COLUMN2 bigint, |
| | DECIMAL_COLUMN1 decimal(30,10), |
| | DECIMAL_COLUMN2 decimal(36,10), |
| | Double_COLUMN1 double, |
| | Double_COLUMN2 double, |
| | INTEGER_COLUMN1 int) |
| | STORED BY 'org.apache.carbondata.format' |
| | TBLPROPERTIES('table_blocksize'='2000') |
| """.stripMargin) |
| sql( |
| s""" |
| | LOAD DATA inpath '$resourcesPath/restructure/data_2000.csv' |
| | into table uniqdata |
| | OPTIONS( |
| | 'DELIMITER'=',' , |
| | 'QUOTECHAR'='"', |
| | 'BAD_RECORDS_ACTION'='FORCE', |
| | 'FILEHEADER'='CUST_ID,CUST_NAME,ACTIVE_EMUI_VERSION,DOB,DOJ,BIGINT_COLUMN1,BIGINT_COLUMN2,DECIMAL_COLUMN1,DECIMAL_COLUMN2,Double_COLUMN1,Double_COLUMN2,INTEGER_COLUMN1') |
| """.stripMargin) |
| |
| sql( |
| s"""select substr(CUST_NAME,1,2) |
| | from uniqdata |
| """.stripMargin).show() |
| |
| val e1 = intercept[Exception] { |
| sql( |
| s"""select avg(substr(CUST_NAME,1,2)) |
| | from uniqdata |
| """.stripMargin).show() |
| } |
| assert(e1.getMessage.contains("cannot resolve 'avg(substring(uniqdata.`CUST_NAME`, 1, 2))' due to data type mismatch: function average requires numeric types, not BinaryType")) |
| |
| val e2 = intercept[Exception] { |
| sql( |
| s""" |
| | select |
| | max(substr(CUST_NAME,1,2)), |
| | min(substr(CUST_NAME,1,2)), |
| | avg(substr(CUST_NAME,1,2)), |
| | count(substr(CUST_NAME,1,2)), |
| | sum(substr(CUST_NAME,1,2)), |
| | variance(substr(CUST_NAME,1,2)) |
| | from uniqdata |
| | where CUST_ID IS NULL or DOB IS NOT NULL or BIGINT_COLUMN1 =1233720368578 or DECIMAL_COLUMN1 = 12345678901.1234000058 or Double_COLUMN1 = 1.12345674897976E10 or INTEGER_COLUMN1 IS NULL limit 10 |
| """.stripMargin) |
| } |
| assert(e2.getMessage.contains("cannot resolve 'avg(substring(uniqdata.`CUST_NAME`, 1, 2))' due to data type mismatch: function average requires numeric types, not BinaryType")) |
| |
| val e3 = intercept[Exception] { |
| sql( |
| s""" |
| | select |
| | max(substring(CUST_NAME,1,2)), |
| | min(substring(CUST_NAME,1,2)), |
| | avg(substring(CUST_NAME,1,2)), |
| | count(substring(CUST_NAME,1,2)), |
| | sum(substring(CUST_NAME,1,2)), |
| | variance(substring(CUST_NAME,1,2)) |
| | from uniqdata |
| | where CUST_ID IS NULL or DOB IS NOT NULL or BIGINT_COLUMN1 =1233720368578 or DECIMAL_COLUMN1 = 12345678901.1234000058 or Double_COLUMN1 = 1.12345674897976E10 or INTEGER_COLUMN1 IS NULL limit 10 |
| """.stripMargin) |
| } |
| assert(e3.getMessage.contains("cannot resolve 'avg(substring(uniqdata.`CUST_NAME`, 1, 2))' due to data type mismatch: function average requires numeric types, not BinaryType")) |
| |
| } |
| |
| test("test binary insert with int value") { |
| sql("DROP TABLE IF EXISTS binaryTable") |
| sql( |
| s""" |
| | CREATE TABLE IF NOT EXISTS binaryTable ( |
| | binaryField binary) |
| | STORED BY 'carbondata' |
| """.stripMargin) |
| val exception = intercept[AnalysisException] { |
| sql("insert into binaryTable select 1") |
| } |
| assert(exception.getMessage() |
| .contains( |
| "cannot resolve 'CAST(`1` AS BINARY)' due to data type mismatch: ")) |
| sql("DROP TABLE binaryTable") |
| } |
| |
| override def afterAll: Unit = { |
| sql("DROP TABLE IF EXISTS binaryTable") |
| sql("DROP TABLE IF EXISTS hiveTable") |
| sql("DROP TABLE IF EXISTS hive_table") |
| } |
| } |