blob: 74fbc2ab22e43f3a094670fd8a6147cb7757aa89 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.spark.testsuite.datamap
import java.util
import scala.collection.JavaConverters._
import org.apache.spark.sql.{DataFrame, SaveMode}
import org.apache.spark.sql.test.util.CarbonQueryTest
import org.scalatest.BeforeAndAfterAll
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, Segment}
import org.apache.carbondata.core.datamap.dev.{DataMapBuilder, DataMapWriter}
import org.apache.carbondata.core.datamap.dev.cgdatamap.{CoarseGrainDataMap, CoarseGrainDataMapFactory}
import org.apache.carbondata.core.datastore.block.SegmentProperties
import org.apache.carbondata.core.datastore.page.ColumnPage
import org.apache.carbondata.core.features.TableOperation
import org.apache.carbondata.core.indexstore.PartitionSpec
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.metadata.datatype.DataTypes
import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema}
import org.apache.carbondata.core.scan.filter.intf.ExpressionType
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.events.Event
class C2DataMapFactory(
carbonTable: CarbonTable,
dataMapSchema: DataMapSchema) extends CoarseGrainDataMapFactory(carbonTable, dataMapSchema) {
var identifier: AbsoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
override def fireEvent(event: Event): Unit = ???
override def clear(): Unit = {}
override def getDataMaps(distributable: DataMapDistributable): util.List[CoarseGrainDataMap] = ???
override def getDataMaps(segment: Segment): util.List[CoarseGrainDataMap] = ???
override def createWriter(segment: Segment, shardName: String, segmentProperties: SegmentProperties): DataMapWriter =
DataMapWriterSuite.dataMapWriterC2Mock(identifier, "testdm", segment, shardName)
override def getMeta: DataMapMeta =
new DataMapMeta(carbonTable.getIndexedColumns(dataMapSchema), List(ExpressionType.EQUALS).asJava)
/**
* Get all distributable objects of a segmentId
*
* @return
*/
override def toDistributable(segmentId: Segment): util.List[DataMapDistributable] = {
util.Collections.emptyList()
}
/**
* delete datamap of the segment
*/
override def deleteDatamapData(segment: Segment): Unit = {
}
/**
* delete datamap data if any
*/
override def deleteDatamapData(): Unit = {
}
/**
* defines the features scopes for the datamap
*/
override def willBecomeStale(operation: TableOperation): Boolean = {
false
}
override def createBuilder(segment: Segment,
shardName: String, segmentProperties: SegmentProperties): DataMapBuilder = {
???
}
/**
* Get the datamap for segmentId and partitionSpecs
*/
override def getDataMaps(segment: Segment,
partitions: util.List[PartitionSpec]): util.List[CoarseGrainDataMap] = {
???
}
}
class DataMapWriterSuite extends CarbonQueryTest with BeforeAndAfterAll {
def buildTestData(numRows: Int): DataFrame = {
import sqlContext.implicits._
sqlContext.sparkContext.parallelize(1 to numRows, 1)
.map(x => ("a" + x, "b", x))
.toDF("c1", "c2", "c3")
}
def dropTable(): Unit = {
sql("DROP TABLE IF EXISTS carbon1")
sql("DROP TABLE IF EXISTS carbon2")
}
override def beforeAll {
dropTable()
}
test("test write datamap 2 pages") {
sql(s"CREATE TABLE carbon1(c1 STRING, c2 STRING, c3 INT) STORED BY 'org.apache.carbondata.format'")
// register datamap writer
sql(
s"""
| CREATE DATAMAP test1 ON TABLE carbon1
| USING '${classOf[C2DataMapFactory].getName}'
| DMPROPERTIES('index_columns'='c2')
""".stripMargin)
val df = buildTestData(33000)
// save dataframe to carbon file
df.write
.format("carbondata")
.option("tableName", "carbon1")
.option("tempCSV", "false")
.option("sort_columns","c1")
.mode(SaveMode.Overwrite)
.save()
assert(DataMapWriterSuite.callbackSeq.head.contains("block start"))
assert(DataMapWriterSuite.callbackSeq.last.contains("block end"))
assert(
DataMapWriterSuite.callbackSeq.slice(1, DataMapWriterSuite.callbackSeq.length - 1) == Seq(
"blocklet start 0",
"add page data: blocklet 0, page 0",
"add page data: blocklet 0, page 1",
"blocklet end: 0"
))
DataMapWriterSuite.callbackSeq = Seq()
}
test("test write datamap 2 blocklet") {
sql(s"CREATE TABLE carbon2(c1 STRING, c2 STRING, c3 INT) STORED BY 'org.apache.carbondata.format'")
sql(
s"""
| CREATE DATAMAP test2 ON TABLE carbon2
| USING '${classOf[C2DataMapFactory].getName}'
| DMPROPERTIES('index_columns'='c2')
""".stripMargin)
CarbonProperties.getInstance()
.addProperty("carbon.blockletgroup.size.in.mb", "16")
CarbonProperties.getInstance()
.addProperty("carbon.number.of.cores.while.loading",
CarbonCommonConstants.NUM_CORES_DEFAULT_VAL)
val df = buildTestData(300000)
// save dataframe to carbon file
df.write
.format("carbondata")
.option("tableName", "carbon2")
.option("tempCSV", "false")
.option("sort_columns","c1")
.option("SORT_SCOPE","GLOBAL_SORT")
.mode(SaveMode.Overwrite)
.save()
assert(DataMapWriterSuite.callbackSeq.head.contains("block start"))
assert(DataMapWriterSuite.callbackSeq.last.contains("block end"))
// corrected test case the min "carbon.blockletgroup.size.in.mb" size could not be less than
// 64 MB
assert(
DataMapWriterSuite.callbackSeq.slice(1, DataMapWriterSuite.callbackSeq.length - 1) == Seq(
"blocklet start 0",
"add page data: blocklet 0, page 0",
"add page data: blocklet 0, page 1",
"add page data: blocklet 0, page 2",
"add page data: blocklet 0, page 3",
"add page data: blocklet 0, page 4",
"add page data: blocklet 0, page 5",
"add page data: blocklet 0, page 6",
"add page data: blocklet 0, page 7",
"add page data: blocklet 0, page 8",
"add page data: blocklet 0, page 9",
"blocklet end: 0"
))
DataMapWriterSuite.callbackSeq = Seq()
}
override def afterAll {
dropTable()
}
}
object DataMapWriterSuite {
var callbackSeq: Seq[String] = Seq[String]()
def dataMapWriterC2Mock(identifier: AbsoluteTableIdentifier, dataMapName:String, segment: Segment,
shardName: String) =
new DataMapWriter(identifier.getTablePath, dataMapName, Seq().asJava, segment, shardName) {
override def onPageAdded(
blockletId: Int,
pageId: Int,
pageSize: Int,
pages: Array[ColumnPage]): Unit = {
assert(pages.length == 1)
assert(pages(0).getDataType == DataTypes.STRING)
val bytes: Array[Byte] = pages(0).getBytes(0)
assert(bytes.sameElements(Seq(0, 1, 'b'.toByte)))
callbackSeq :+= s"add page data: blocklet $blockletId, page $pageId"
}
override def onBlockletEnd(blockletId: Int): Unit = {
callbackSeq :+= s"blocklet end: $blockletId"
}
override def onBlockEnd(blockId: String): Unit = {
callbackSeq :+= s"block end $blockId"
}
override def onBlockletStart(blockletId: Int): Unit = {
callbackSeq :+= s"blocklet start $blockletId"
}
/**
* Start of new block notification.
*
* @param blockId file name of the carbondata file
*/
override def onBlockStart(blockId: String): Unit = {
callbackSeq :+= s"block start $blockId"
}
/**
* This is called during closing of writer.So after this call no more data will be sent to this
* class.
*/
override def finish() = {
}
}
}