blob: b2d55a46440abead65ad020f9892621e57f128fa [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.spark.testsuite.index
import java.io.{ByteArrayInputStream, DataOutputStream, ObjectInputStream, ObjectOutputStream}
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import com.sun.xml.internal.messaging.saaj.util.ByteOutputStream
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.BeforeAndAfterAll
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.index.dev.cgindex.{CoarseGrainIndex, CoarseGrainIndexFactory}
import org.apache.carbondata.core.index.dev.{IndexBuilder, IndexModel, IndexWriter}
import org.apache.carbondata.core.index.{IndexInputSplit, IndexMeta, Segment}
import org.apache.carbondata.core.datastore.FileReader
import org.apache.carbondata.core.datastore.block.SegmentProperties
import org.apache.carbondata.core.datastore.compression.SnappyCompressor
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.datastore.page.ColumnPage
import org.apache.carbondata.core.features.TableOperation
import org.apache.carbondata.core.indexstore.blockletindex.BlockletIndexInputSplit
import org.apache.carbondata.core.indexstore.{Blocklet, PartitionSpec}
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, IndexSchema}
import org.apache.carbondata.core.scan.expression.Expression
import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression
import org.apache.carbondata.core.scan.filter.executer.FilterExecutor
import org.apache.carbondata.core.scan.filter.intf.ExpressionType
import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf
import org.apache.carbondata.core.util.{ByteUtil, CarbonProperties}
import org.apache.carbondata.events.Event
import org.apache.carbondata.spark.testsuite.datacompaction.CompactionSupportGlobalSortBigFileTest
class CGIndexFactory(
carbonTable: CarbonTable,
indexSchema: IndexSchema) extends CoarseGrainIndexFactory(carbonTable, indexSchema) {
var identifier: AbsoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
/**
* Return a new write for this indexSchema
*/
override def createWriter(segment: Segment, shardName: String, segmentProperties: SegmentProperties): IndexWriter = {
new CGIndexWriter(carbonTable, segment, shardName, indexSchema)
}
/**
* Get the indexSchema for segmentId
*/
override def getIndexes(segment: Segment): java.util.List[CoarseGrainIndex] = {
val path = identifier.getTablePath
val file = FileFactory.getCarbonFile(
path + "/" + indexSchema.getIndexName + "/" + segment.getSegmentNo)
val files = file.listFiles()
files.map {f =>
val index: CoarseGrainIndex = new CGIndex()
index.init(new IndexModel(f.getCanonicalPath, new Configuration(false)))
index
}.toList.asJava
}
/**
* Get indexes for distributable object.
*/
override def getIndexes(distributable: IndexInputSplit): java.util.List[CoarseGrainIndex] = {
val mapDistributable = distributable.asInstanceOf[BlockletIndexInputSplit]
val index: CoarseGrainIndex = new CGIndex()
index.init(new IndexModel(mapDistributable.getFilePath, new
Configuration(false)))
Seq(index).asJava
}
/**
*
* @param event
*/
override def fireEvent(event: Event): Unit = {
???
}
/**
* Get all distributable objects of a segmentId
*
* @return
*/
override def toDistributable(segment: Segment): java.util.List[IndexInputSplit] = {
val path = identifier.getTablePath
val file = FileFactory.getCarbonFile(
path + "/" + indexSchema.getIndexName + "/" + segment.getSegmentNo)
val files = file.listFiles()
files.map { f =>
val d:IndexInputSplit = new BlockletIndexInputSplit(f.getCanonicalPath)
d
}.toList.asJava
}
/**
* Clear all indexs from memory
*/
override def clear(): Unit = {
}
/**
* Return metadata of this indexSchema
*/
override def getMeta: IndexMeta = {
new IndexMeta(carbonTable.getIndexedColumns(indexSchema.getIndexColumns),
List(ExpressionType.EQUALS, ExpressionType.IN).asJava)
}
/**
* delete indexSchema of the segment
*/
override def deleteIndexData(segment: Segment): Unit = {
}
/**
* delete indexSchema data if any
*/
override def deleteIndexData(): Unit = {
}
/**
* defines the features scopes for the indexSchema
*/
override def willBecomeStale(feature: TableOperation): Boolean = {
false
}
override def createBuilder(segment: Segment,
shardName: String, segmentProperties: SegmentProperties): IndexBuilder = {
???
}
/**
* Get the indexSchema for segmentId and partitionSpecs
*/
override def getIndexes(segment: Segment,
partitionLocations: java.util.Set[Path]): java.util.List[CoarseGrainIndex] = {
getIndexes(segment);
}
}
class CGIndex extends CoarseGrainIndex {
var maxMin: ArrayBuffer[(Int, (Array[Byte], Array[Byte]))] = _
var FileReader: FileReader = _
var filePath: String = _
val compressor = new SnappyCompressor
var shardName: String = _
/**
* It is called to load the index to memory or to initialize it.
*/
override def init(indexModel: IndexModel): Unit = {
val indexPath = FileFactory.getPath(indexModel.getFilePath)
this.shardName = indexPath.getName
this.filePath = indexModel.getFilePath + "/testcg.indexSchema"
val carbonFile = FileFactory.getCarbonFile(filePath)
val size = carbonFile.getSize
FileReader = FileFactory.getFileHolder(FileFactory.getFileType(filePath))
val footerLen = FileReader.readInt(filePath, size-4)
val bytes = FileReader.readByteArray(filePath, size-footerLen-4, footerLen)
val in = new ByteArrayInputStream(compressor.unCompressByte(bytes))
val obj = new ObjectInputStream(in)
maxMin = obj.readObject().asInstanceOf[ArrayBuffer[(Int, (Array[Byte], Array[Byte]))]]
}
/**
* Prune the indexSchema with filter expression. It returns the list of
* blocklets where these filters can exist.
*
* @param filterExp
* @return
*/
override def prune(
filterExp: FilterResolverIntf,
segmentProperties: SegmentProperties,
filterExecuter: FilterExecutor,
carbonTable: CarbonTable): java.util.List[Blocklet] = {
val buffer: ArrayBuffer[Expression] = new ArrayBuffer[Expression]()
val expression = filterExp.getFilterExpression
getEqualToExpression(expression, buffer)
val value = buffer.map { f =>
f.getChildren.get(1).evaluate(null).getString
}
val meta = findMeta(value(0).getBytes)
meta.map { f=>
new Blocklet(shardName, f._1 + "")
}.asJava
}
private def findMeta(value: Array[Byte]) = {
val tuples = maxMin.filter { f =>
ByteUtil.UnsafeComparer.INSTANCE.compareTo(value, f._2._1) <= 0 &&
ByteUtil.UnsafeComparer.INSTANCE.compareTo(value, f._2._2) >= 0
}
tuples
}
private def getEqualToExpression(expression: Expression, buffer: ArrayBuffer[Expression]): Unit = {
if (expression.isInstanceOf[EqualToExpression]) {
buffer += expression
} else {
if (expression.getChildren != null) {
expression.getChildren.asScala.map { f =>
if (f.isInstanceOf[EqualToExpression]) {
buffer += f
}
getEqualToExpression(f, buffer)
}
}
}
}
/**
* Clear complete index table and release memory.
*/
override def clear() = {
???
}
override def isScanRequired(filterExp: FilterResolverIntf): Boolean = ???
/**
* clears all the resources for indexs
*/
override def finish() = {
???
}
override def getNumberOfEntries: Int = 1
}
class CGIndexWriter(
carbonTable: CarbonTable,
segment: Segment,
shardName: String,
indexSchema: IndexSchema)
extends IndexWriter(carbonTable.getTablePath, indexSchema.getIndexName,
carbonTable.getIndexedColumns(indexSchema.getIndexColumns), segment, shardName) {
val blockletList = new ArrayBuffer[Array[Byte]]()
val maxMin = new ArrayBuffer[(Int, (Array[Byte], Array[Byte]))]()
val compressor = new SnappyCompressor
/**
* Start of new block notification.
*
* @param blockId file name of the carbondata file
*/
override def onBlockStart(blockId: String): Unit = {
}
/**
* End of block notification
*/
override def onBlockEnd(blockId: String): Unit = {
}
/**
* Start of new blocklet notification.
*
* @param blockletId sequence number of blocklet in the block
*/
override def onBlockletStart(blockletId: Int): Unit = {
}
/**
* End of blocklet notification
*
* @param blockletId sequence number of blocklet in the block
*/
override def onBlockletEnd(blockletId: Int): Unit = {
val sorted = blockletList
.sortWith((l, r) => ByteUtil.UnsafeComparer.INSTANCE.compareTo(l, r) <= 0)
maxMin +=
((blockletId, (sorted.last, sorted.head)))
blockletList.clear()
}
/**
* Add the column pages row to the indexSchema, order of pages is same as `index_columns` in
* IndexMeta returned in IndexFactory.
*
* Implementation should copy the content of `pages` as needed, because `pages` memory
* may be freed after this method returns, if using unsafe column page.
*/
override def onPageAdded(blockletId: Int,
pageId: Int,
pageSize: Int,
pages: Array[ColumnPage]): Unit = {
val size = pages(0).getPageSize
val list = new ArrayBuffer[Array[Byte]]()
var i = 0
while (i < size) {
val bytes = pages(0).getBytes(i)
val newBytes = new Array[Byte](bytes.length - 2)
System.arraycopy(bytes, 2, newBytes, 0, newBytes.length)
list += newBytes
i = i + 1
}
// Sort based on the column data in order to create index.
val sorted = list
.sortWith((l, r) => ByteUtil.UnsafeComparer.INSTANCE.compareTo(l, r) <= 0)
blockletList += sorted.head
blockletList += sorted.last
}
/**
* This is called during closing of writer.So after this call no more data will be sent to this
* class.
*/
override def finish(): Unit = {
FileFactory.mkdirs(indexPath)
val file = indexPath + "/testcg.indexSchema"
val stream: DataOutputStream = FileFactory
.getDataOutputStream(file)
val out = new ByteOutputStream()
val outStream = new ObjectOutputStream(out)
outStream.writeObject(maxMin)
outStream.close()
val bytes = compressor.compressByte(out.getBytes)
stream.write(bytes.array(), 0, bytes.position())
stream.writeInt(bytes.position())
stream.close()
}
}
class CGIndexTestCase extends QueryTest with BeforeAndAfterAll {
val file2 = resourcesPath + "/compaction/fil2.csv"
override protected def beforeAll(): Unit = {
//n should be about 5000000 of reset if size is default 1024
val n = 150000
CompactionSupportGlobalSortBigFileTest.createFile(file2, n * 4, n)
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS, "true")
sql("DROP TABLE IF EXISTS normal_test")
sql(
"""
| CREATE TABLE normal_test(id INT, name STRING, city STRING, age INT)
| STORED AS carbondata
| TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
""".stripMargin)
sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE normal_test OPTIONS('header'='false')")
}
test("test cg index") {
sql("DROP TABLE IF EXISTS index_test_cg")
sql(
"""
| CREATE TABLE index_test_cg(id INT, name STRING, city STRING, age INT)
| STORED AS carbondata
| TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
""".stripMargin)
sql(s"create index cgindex on table index_test_cg (name)" +
s"as '${classOf[CGIndexFactory].getName}' ")
sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE index_test_cg OPTIONS('header'='false')")
checkAnswer(sql("select * from index_test_cg where name='n502670'"),
sql("select * from normal_test where name='n502670'"))
}
test("test cg index with 2 indexes ") {
sql("DROP TABLE IF EXISTS index_test")
sql(
"""
| CREATE TABLE index_test(id INT, name STRING, city STRING, age INT)
| STORED AS carbondata
| TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
""".stripMargin)
sql(s"create index ggindex1 on table index_test (name) as '${classOf[CGIndexFactory].getName}' ")
sql(s"create index ggindex2 on table index_test (city) as '${classOf[CGIndexFactory].getName}' ")
sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE index_test OPTIONS('header'='false')")
checkAnswer(sql("select * from index_test where name='n502670' and city='c2670'"),
sql("select * from normal_test where name='n502670' and city='c2670'"))
}
test("test invisible index during query") {
val tableName = "index_test"
val indexName1 = "index1"
val indexName2 = "index2"
sql(s"DROP INDEX IF EXISTS $indexName1 ON TABLE $tableName")
sql(s"DROP INDEX IF EXISTS $indexName2 ON TABLE $tableName")
sql(s"DROP TABLE IF EXISTS $tableName")
sql(
s"""
| CREATE TABLE $tableName(id INT, name STRING, city STRING, age INT)
| STORED AS carbondata
| TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
""".stripMargin)
// register indexSchema writer
sql(
s"""
| CREATE INDEX $indexName1
| ON TABLE $tableName (name)
| AS '${classOf[CGIndexFactory].getName}'
""".stripMargin)
sql(
s"""
| CREATE INDEX $indexName2
| ON TABLE $tableName (city)
| AS '${classOf[CGIndexFactory].getName}'
""".stripMargin)
sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE $tableName OPTIONS('header'='false')")
val df1 = sql(s"EXPLAIN EXTENDED SELECT * FROM $tableName WHERE name='n502670' AND city='c2670'").collect()
assert(df1(0).getString(0).contains("CG Index"))
assert(df1(0).getString(0).contains(indexName1))
assert(df1(0).getString(0).contains(indexName2))
// make index1 invisible
sql(s"SET ${CarbonCommonConstants.CARBON_INDEX_VISIBLE}default.$tableName.$indexName1 = false")
val df2 = sql(s"EXPLAIN EXTENDED SELECT * FROM $tableName WHERE name='n502670' AND city='c2670'").collect()
val e = intercept[Exception] {
assert(df2(0).getString(0).contains(indexName1))
}
assert(e.getMessage.contains("did not contain \"" + indexName1))
assert(df2(0).getString(0).contains(indexName2))
checkAnswer(sql(s"SELECT * FROM $tableName WHERE name='n502670' AND city='c2670'"),
sql("SELECT * FROM normal_test WHERE name='n502670' AND city='c2670'"))
// also make index2 invisible
sql(s"SET ${CarbonCommonConstants.CARBON_INDEX_VISIBLE}default.$tableName.$indexName2 = false")
checkAnswer(sql(s"SELECT * FROM $tableName WHERE name='n502670' AND city='c2670'"),
sql("SELECT * FROM normal_test WHERE name='n502670' AND city='c2670'"))
val df3 = sql(s"EXPLAIN EXTENDED SELECT * FROM $tableName WHERE name='n502670' AND city='c2670'").collect()
val e31 = intercept[Exception] {
assert(df3(0).getString(0).contains(indexName1))
}
assert(e31.getMessage.contains("did not contain \"" + indexName1))
val e32 = intercept[Exception] {
assert(df3(0).getString(0).contains(indexName2))
}
assert(e32.getMessage.contains("did not contain \"" + indexName2))
// make index1,index2 visible
sql(s"SET ${CarbonCommonConstants.CARBON_INDEX_VISIBLE}default.$tableName.$indexName1 = true")
sql(s"SET ${CarbonCommonConstants.CARBON_INDEX_VISIBLE}default.$tableName.$indexName1 = true")
checkAnswer(sql(s"SELECT * FROM $tableName WHERE name='n502670' AND city='c2670'"),
sql("SELECT * FROM normal_test WHERE name='n502670' AND city='c2670'"))
val df4 = sql(s"EXPLAIN EXTENDED SELECT * FROM $tableName WHERE name='n502670' AND city='c2670'").collect()
assert(df4(0).getString(0).contains(indexName1))
val e41 = intercept[Exception] {
assert(df3(0).getString(0).contains(indexName2))
}
assert(e41.getMessage.contains("did not contain \"" + indexName2))
}
override protected def afterAll(): Unit = {
defaultConfig()
CompactionSupportGlobalSortBigFileTest.deleteFile(file2)
sql("DROP TABLE IF EXISTS normal_test")
sql("DROP TABLE IF EXISTS index_test")
sql("DROP TABLE IF EXISTS index_test_cg")
sql("DROP TABLE IF EXISTS index_store_test")
sql("DROP TABLE IF EXISTS index_store_test1")
sql("DROP TABLE IF EXISTS index_store_test2")
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS,
CarbonCommonConstants.ENABLE_QUERY_STATISTICS_DEFAULT)
}
}