blob: 91448e712fe7e5962e8c754082e092aaf5487ce2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.examples
import java.io.File
import java.sql.{DriverManager, ResultSet, Statement}
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.permission.{FsAction, FsPermission}
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.examples.util.ExampleUtils
import org.apache.carbondata.hive.test.server.HiveEmbeddedServer2
// scalastyle:off println
object HiveExample {
private val driverName: String = "org.apache.hive.jdbc.HiveDriver"
val rootPath = new File(this.getClass.getResource("/").getPath
+ "../../../..").getCanonicalPath
private val targetLoc = s"$rootPath/examples/spark2/target"
val metaStoreLoc = s"$targetLoc/metastore_db"
val storeLocation = s"$targetLoc/store"
val logger = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
def main(args: Array[String]) {
createCarbonTable(storeLocation)
readFromHive
System.exit(0)
}
def createCarbonTable(store: String): Unit = {
val carbonSession = ExampleUtils.createCarbonSession("HiveExample")
carbonSession.sql("""DROP TABLE IF EXISTS HIVE_CARBON_EXAMPLE""".stripMargin)
carbonSession.sql(
s"""
| CREATE TABLE HIVE_CARBON_EXAMPLE
| (ID int,NAME string,SALARY double)
| STORED BY 'carbondata'
""".stripMargin)
var inputPath = FileFactory
.getUpdatedFilePath(s"$rootPath/examples/spark2/src/main/resources/sample.csv")
carbonSession.sql(
s"""
| LOAD DATA LOCAL INPATH '$inputPath'
| INTO TABLE HIVE_CARBON_EXAMPLE
""".stripMargin)
carbonSession.sql(
s"""
| LOAD DATA LOCAL INPATH '$inputPath'
| INTO TABLE HIVE_CARBON_EXAMPLE
""".stripMargin)
carbonSession.sql("SELECT * FROM HIVE_CARBON_EXAMPLE").show()
carbonSession.sql("DROP TABLE IF EXISTS TEST_BOUNDARY")
carbonSession
.sql(
s"""CREATE TABLE TEST_BOUNDARY (c1_int int,c2_Bigint Bigint,c3_Decimal Decimal(38,30),
|c4_double double,c5_string string,c6_Timestamp Timestamp,c7_Datatype_Desc string)
|STORED BY 'org.apache.carbondata.format' """.stripMargin)
inputPath = FileFactory
.getUpdatedFilePath(s"$rootPath/examples/spark2/src/main/resources/Test_Data1.csv")
carbonSession
.sql(
s"LOAD DATA INPATH '$inputPath' INTO table TEST_BOUNDARY OPTIONS('DELIMITER'=','," +
"'QUOTECHAR'='\"', 'BAD_RECORDS_ACTION'='FORCE','FILEHEADER'='c1_int,c2_Bigint," +
"c3_Decimal,c4_double,c5_string,c6_Timestamp,c7_Datatype_Desc')")
carbonSession.sql("""DROP TABLE IF EXISTS complexMap""".stripMargin)
carbonSession.sql("create table complexMap(name map<string,string>) stored by 'carbondata'")
carbonSession
.sql(
"insert into complexMap values(map('Manish','Nalla','Shardul','Singh','Vishal','Kumar'," +
"'EmptyVal','','NullVal', 'null'))")
carbonSession.close()
// delete the already existing lock on metastore so that new derby instance
// for HiveServer can run on the same metastore
checkAndDeleteDBLock
}
def checkAndDeleteDBLock: Unit = {
val dbLockPath = FileFactory.getUpdatedFilePath(s"$metaStoreLoc/db.lck")
val dbexLockPath = FileFactory.getUpdatedFilePath(s"$metaStoreLoc/dbex.lck")
if(FileFactory.isFileExist(dbLockPath)) {
FileFactory.deleteFile(dbLockPath)
}
if(FileFactory.isFileExist(dbexLockPath)) {
FileFactory.deleteFile(dbexLockPath)
}
}
def readFromHive: Unit = {
try {
Class.forName(driverName)
}
catch {
case classNotFoundException: ClassNotFoundException =>
classNotFoundException.printStackTrace()
}
// make HDFS writable
val path = new Path(targetLoc)
val fileSys = path.getFileSystem(FileFactory.getConfiguration)
fileSys.setPermission(path, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL))
val hiveEmbeddedServer2 = new HiveEmbeddedServer2()
hiveEmbeddedServer2.start(targetLoc)
val port = hiveEmbeddedServer2.getFreePort
val connection = DriverManager.getConnection(s"jdbc:hive2://localhost:$port/default", "", "")
val statement: Statement = connection.createStatement
logger.info(s"============HIVE CLI IS STARTED ON PORT $port ==============")
val resultSet: ResultSet = statement.executeQuery("SELECT * FROM HIVE_CARBON_EXAMPLE")
var rowsFetched = 0
var resultId = ""
var resultName = ""
var resultSalary = ""
while (resultSet.next) {
if (rowsFetched == 0) {
println("+---+" + "+-------+" + "+--------------+")
println("| ID|" + "| NAME |" + "| SALARY |")
println("+---+" + "+-------+" + "+--------------+")
resultId = resultSet.getString("id")
resultName = resultSet.getString("name")
resultSalary = resultSet.getString("salary")
println(s"| $resultId |" + s"| $resultName |" + s"| $resultSalary |")
println("+---+" + "+-------+" + "+--------------+")
}
else {
resultId = resultSet.getString("ID")
resultName = resultSet.getString("NAME")
resultSalary = resultSet.getString("SALARY")
println(s"| $resultId |" + s"| $resultName |" + s"| $resultSalary |")
println("+---+" + "+-------+" + "+--------------+")
}
rowsFetched = rowsFetched + 1
}
println(s"******Total Number Of Rows Fetched ****** $rowsFetched")
assert(rowsFetched == 4)
logger.info("Fetching the Individual Columns ")
// fetching the separate columns
var individualColRowsFetched = 0
val resultIndividualCol = statement.executeQuery("SELECT NAME FROM HIVE_CARBON_EXAMPLE")
while (resultIndividualCol.next) {
if (individualColRowsFetched == 0) {
println("+--------------+")
println("| NAME |")
println("+---++---------+")
resultName = resultIndividualCol.getString("name")
println(s"| $resultName |")
println("+---+" + "+---------+")
}
else {
resultName = resultIndividualCol.getString("NAME")
println(s"| $resultName |")
println("+---+" + "+---------+")
}
individualColRowsFetched = individualColRowsFetched + 1
}
println(" ********** Total Rows Fetched When Quering The Individual Columns **********" +
s"$individualColRowsFetched")
assert(individualColRowsFetched == 4)
logger.info("Fetching the Out Of Order Columns ")
val resultOutOfOrderCol = statement
.executeQuery("SELECT SALARY,ID,NAME FROM HIVE_CARBON_EXAMPLE")
var outOfOrderColFetched = 0
while (resultOutOfOrderCol.next()) {
if (outOfOrderColFetched == 0) {
println("+---+" + "+-------+" + "+--------------+")
println("| Salary|" + "| ID |" + "| NAME |")
println("+---+" + "+-------+" + "+--------------+")
resultId = resultOutOfOrderCol.getString("id")
resultName = resultOutOfOrderCol.getString("name")
resultSalary = resultOutOfOrderCol.getString("salary")
println(s"| $resultSalary |" + s"| $resultId |" + s"| $resultName |")
println("+---+" + "+-------+" + "+--------------+")
}
else {
resultId = resultOutOfOrderCol.getString("ID")
resultName = resultOutOfOrderCol.getString("NAME")
resultSalary = resultOutOfOrderCol.getString("SALARY")
println(s"| $resultSalary |" + s"| $resultId |" + s"| $resultName |")
println("+---+" + "+-------+" + "+--------------+")
}
outOfOrderColFetched = outOfOrderColFetched + 1
}
println(" ********** Total Rows Fetched When Quering The Out Of Order Columns **********" +
s"$outOfOrderColFetched")
assert(outOfOrderColFetched == 4)
val resultAggQuery = statement
.executeQuery(
"SELECT min(c3_Decimal) as min, max(c3_Decimal) as max, " +
"sum(c3_Decimal) as sum FROM TEST_BOUNDARY")
var resultAggQueryFetched = 0
var resultMin = ""
var resultMax = ""
var resultSum = ""
while (resultAggQuery.next) {
if (resultAggQueryFetched == 0) {
println("+-----+" + "+-------------------+" + "+--------------------------------+")
println("| min |" + "| max |" + "| sum |")
println("+-----+" + "+-------------------+" + "+--------------------------------+")
resultMin = resultAggQuery.getString("min")
resultMax = resultAggQuery.getString("max")
resultSum = resultAggQuery.getString("sum")
println(s"| $resultMin |" + s"| $resultMax |" + s"| $resultSum|")
println("+-----+" + "+-------------------+" + "+--------------------------------+")
}
resultAggQueryFetched = resultAggQueryFetched + 1
}
println(" ********** Total Rows Fetched When Aggregate Query **********" +
s"$resultAggQueryFetched")
assert(resultAggQueryFetched == 1)
val resultComplexQuery = statement
.executeQuery(
"SELECT name FROM complexMap")
var resultComplex = 0
var name = ""
while (resultComplexQuery.next) {
if (resultComplex == 0) {
println("+------------------------------------------------------------------------------" +
"------+")
println("| name " +
" |")
println("+-------------------------------------------------------------------------------" +
"-----+")
name = resultComplexQuery.getString("name")
println(s"|$name|")
println("+-------------------------------------------------------------------------------" +
"-----+") }
resultComplex = resultComplex + 1
}
println(" ********** Total Rows Fetched When Complex Query **********" +
s"$resultComplex")
assert(resultComplex == 1)
hiveEmbeddedServer2.stop()
}
}