| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.carbondata.datamap |
| |
| import java.io.{File, IOException} |
| import java.text.SimpleDateFormat |
| import java.util |
| |
| import scala.collection.JavaConverters._ |
| import scala.collection.mutable |
| import scala.collection.mutable.ListBuffer |
| |
| import org.apache.commons.lang3.ArrayUtils |
| import org.apache.hadoop.conf.Configuration |
| import org.apache.hadoop.mapred.JobConf |
| import org.apache.hadoop.mapreduce.{Job, TaskAttemptID, TaskType} |
| import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl |
| import org.apache.spark.{CarbonInputMetrics, Partition, TaskContext} |
| import org.apache.spark.deploy.SparkHadoopUtil |
| import org.apache.spark.sql.SparkSession |
| import org.apache.spark.sql.types.Decimal |
| |
| import org.apache.carbondata.common.logging.LogServiceFactory |
| import org.apache.carbondata.converter.SparkDataTypeConverterImpl |
| import org.apache.carbondata.core.constants.CarbonCommonConstants |
| import org.apache.carbondata.core.datamap.{DataMapStoreManager, Segment} |
| import org.apache.carbondata.core.datamap.dev.DataMapBuilder |
| import org.apache.carbondata.core.datastore.block.SegmentProperties |
| import org.apache.carbondata.core.datastore.impl.FileFactory |
| import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory |
| import org.apache.carbondata.core.indexstore.SegmentPropertiesFetcher |
| import org.apache.carbondata.core.keygenerator.KeyGenerator |
| import org.apache.carbondata.core.keygenerator.mdkey.MultiDimKeyVarLengthGenerator |
| import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes} |
| import org.apache.carbondata.core.metadata.encoder.Encoding |
| import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema, TableInfo} |
| import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn |
| import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper |
| import org.apache.carbondata.core.statusmanager.SegmentStatusManager |
| import org.apache.carbondata.core.util.{CarbonUtil, DataTypeUtil, TaskMetricsMap} |
| import org.apache.carbondata.core.util.path.CarbonTablePath |
| import org.apache.carbondata.datamap.bloom.DataConvertUtil |
| import org.apache.carbondata.events.{BuildDataMapPostExecutionEvent, BuildDataMapPreExecutionEvent, OperationContext, OperationListenerBus} |
| import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonMultiBlockSplit, CarbonProjection, CarbonRecordReader} |
| import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableInputFormat} |
| import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport |
| import org.apache.carbondata.spark.{RefreshResult, RefreshResultImpl} |
| import org.apache.carbondata.spark.rdd.{CarbonRDDWithTableInfo, CarbonSparkPartition} |
| |
| |
| /** |
| * Helper object to rebuild the index DataMap |
| */ |
| object IndexDataMapRebuildRDD { |
| private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) |
| |
| /** |
| * Rebuild the datamap for all existing data in the table |
| */ |
| def rebuildDataMap( |
| sparkSession: SparkSession, |
| carbonTable: CarbonTable, |
| schema: DataMapSchema |
| ): Unit = { |
| val tableIdentifier = carbonTable.getAbsoluteTableIdentifier |
| val segmentStatusManager = new SegmentStatusManager(tableIdentifier) |
| val validAndInvalidSegments = segmentStatusManager |
| .getValidAndInvalidSegments(carbonTable.isChildTableForMV) |
| val validSegments = validAndInvalidSegments.getValidSegments |
| val indexedCarbonColumns = carbonTable.getIndexedColumns(schema) |
| val operationContext = new OperationContext() |
| val buildDataMapPreExecutionEvent = new BuildDataMapPreExecutionEvent(sparkSession, |
| tableIdentifier, |
| mutable.Seq[String](schema.getDataMapName)) |
| OperationListenerBus.getInstance().fireEvent(buildDataMapPreExecutionEvent, operationContext) |
| |
| val segments2DmStorePath = validSegments.asScala.map { segment => |
| val dataMapStorePath = CarbonTablePath.getDataMapStorePath(carbonTable.getTablePath, |
| segment.getSegmentNo, schema.getDataMapName) |
| segment -> dataMapStorePath |
| }.filter(p => !FileFactory.isFileExist(p._2)).toMap |
| |
| segments2DmStorePath.foreach { case (_, dmPath) => |
| if (!FileFactory.mkdirs(dmPath)) { |
| throw new IOException( |
| s"Failed to create directory $dmPath for rebuilding datamap ${ schema.getDataMapName }") |
| } |
| } |
| |
| val status = new IndexDataMapRebuildRDD[String, (String, Boolean)]( |
| sparkSession, |
| new RefreshResultImpl(), |
| carbonTable.getTableInfo, |
| schema.getDataMapName, |
| indexedCarbonColumns.asScala.toArray, |
| segments2DmStorePath.keySet |
| ).collect |
| |
| // for failed segments, clean the result |
| val failedSegments = status |
| .find { case (taskId, (segmentId, rebuildStatus)) => |
| !rebuildStatus |
| } |
| .map { task => |
| val segmentId = task._2._1 |
| val dmPath = segments2DmStorePath.filter(p => p._1.getSegmentNo.equals(segmentId)).values |
| val cleanResult = dmPath.map(p => |
| FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(p))) |
| if (cleanResult.exists(!_)) { |
| LOGGER.error(s"Failed to clean up datamap store for segment_$segmentId") |
| false |
| } else { |
| true |
| } |
| } |
| |
| if (failedSegments.nonEmpty) { |
| throw new Exception(s"Failed to refresh datamap ${ schema.getDataMapName }") |
| } |
| |
| val buildDataMapPostExecutionEvent = new BuildDataMapPostExecutionEvent(sparkSession, |
| tableIdentifier, schema.getDataMapName, validSegments.asScala.map(_.getSegmentNo), true) |
| OperationListenerBus.getInstance().fireEvent(buildDataMapPostExecutionEvent, operationContext) |
| } |
| } |
| |
| class OriginalReadSupport(dataTypes: Array[DataType]) extends CarbonReadSupport[Array[Object]] { |
| override def initialize(carbonColumns: Array[CarbonColumn], |
| carbonTable: CarbonTable): Unit = { |
| } |
| |
| override def readRow(data: Array[Object]): Array[Object] = { |
| dataTypes.zipWithIndex.foreach { case (dataType, i) => |
| if (dataType == DataTypes.STRING && data(i) != null) { |
| data(i) = data(i).toString |
| } |
| } |
| data |
| } |
| |
| override def close(): Unit = { |
| } |
| } |
| |
| /** |
| * This class will generate row value which is raw bytes for the dimensions. |
| */ |
| class RawBytesReadSupport(segmentProperties: SegmentProperties, indexColumns: Array[CarbonColumn]) |
| extends CarbonReadSupport[Array[Object]] { |
| var dimensionKeyGenerator: KeyGenerator = _ |
| // for the dictionary dimensions |
| var indexCol2IdxInDictArray: Map[String, Int] = Map() |
| // for the non dictionary dimensions |
| var indexCol2IdxInNoDictArray: Map[String, Int] = Map() |
| // for the measures |
| var indexCol2IdxInMeasureArray: Map[String, Int] = Map() |
| |
| /** |
| * rebuild process get data from query, if some columns added to table but not in this segment |
| * it will be filled with default value and generate new key for dict dimension. |
| * Here we use same way as `RowIdRestructureBasedRawResultCollector` to prepare |
| * key generator to get surrogate value of dict column result. |
| * So we do not need to make a fake mdk to split when adding row to datamap |
| */ |
| def prepareKeyGenForDictIndexColumns(carbonTable: CarbonTable, |
| dictIndexColumns: ListBuffer[CarbonColumn]): Unit = { |
| |
| val columnCardinality = new util.ArrayList[Integer](dictIndexColumns.length) |
| val columnPartitioner = new util.ArrayList[Integer](dictIndexColumns.length) |
| |
| dictIndexColumns.foreach { col => |
| val dim = carbonTable.getDimensionByName(col.getColName) |
| val currentBlockDimension = segmentProperties.getDimensionFromCurrentBlock(dim) |
| if (null != currentBlockDimension) { |
| columnCardinality.add(segmentProperties.getDimColumnsCardinality.apply( |
| currentBlockDimension.getKeyOrdinal)) |
| columnPartitioner.add(segmentProperties.getDimensionPartitions.apply( |
| currentBlockDimension.getKeyOrdinal |
| )) |
| } else { |
| columnPartitioner.add(1) |
| if (col.hasEncoding(Encoding.DIRECT_DICTIONARY)) { |
| columnCardinality.add(Integer.MAX_VALUE) |
| } else { |
| val defaultValue = col.getDefaultValue |
| if (null != col.getDefaultValue) { |
| columnCardinality.add(CarbonCommonConstants.DICTIONARY_DEFAULT_CARDINALITY + 1) |
| } else { |
| columnCardinality.add(CarbonCommonConstants.DICTIONARY_DEFAULT_CARDINALITY) |
| } |
| } |
| } |
| } |
| |
| if (!columnCardinality.isEmpty) { |
| val latestColumnCardinality = ArrayUtils.toPrimitive(columnCardinality.toArray( |
| new Array[Integer](columnCardinality.size))) |
| val latestColumnPartitioner = ArrayUtils.toPrimitive(columnPartitioner.toArray( |
| new Array[Integer](columnPartitioner.size))) |
| val dimensionBitLength = CarbonUtil.getDimensionBitLength( |
| latestColumnCardinality, latestColumnPartitioner) |
| this.dimensionKeyGenerator = new MultiDimKeyVarLengthGenerator(dimensionBitLength) |
| } |
| } |
| |
| override def initialize(carbonColumns: Array[CarbonColumn], |
| carbonTable: CarbonTable): Unit = { |
| |
| val dictIndexColumns = new ListBuffer[CarbonColumn]() |
| |
| // prepare index info to extract data from query result |
| indexColumns.foreach { col => |
| if (col.isDimension) { |
| val dim = carbonTable.getDimensionByName(col.getColName) |
| if (!dim.isGlobalDictionaryEncoding && !dim.isDirectDictionaryEncoding) { |
| indexCol2IdxInNoDictArray = |
| indexCol2IdxInNoDictArray + (col.getColName -> indexCol2IdxInNoDictArray.size) |
| } else { |
| dictIndexColumns.append(col) |
| indexCol2IdxInDictArray = |
| indexCol2IdxInDictArray + (col.getColName -> indexCol2IdxInDictArray.size) |
| } |
| } else { |
| indexCol2IdxInMeasureArray = |
| indexCol2IdxInMeasureArray + (col.getColName -> indexCol2IdxInMeasureArray.size) |
| } |
| } |
| |
| if (dictIndexColumns.size > 0) { |
| prepareKeyGenForDictIndexColumns(carbonTable, dictIndexColumns) |
| } |
| } |
| |
| /** |
| * input: all the dimensions are bundled in one ByteArrayWrapper in position 0, |
| * then comes the measures one by one; last 3 elements are block/page/row id |
| * output: all the dimensions and measures comes one after another |
| */ |
| override def readRow(data: Array[Object]): Array[Object] = { |
| |
| var surrogatKeys = new Array[Long](0) |
| if(null != dimensionKeyGenerator) { |
| surrogatKeys = dimensionKeyGenerator.getKeyArray( |
| data(0).asInstanceOf[ByteArrayWrapper].getDictionaryKey) |
| } |
| |
| // fill return row from data |
| val rtn = new Array[Object](indexColumns.length + 3) |
| indexColumns.zipWithIndex.foreach { case (col, i) => |
| rtn(i) = if (indexCol2IdxInDictArray.contains(col.getColName)) { |
| surrogatKeys(indexCol2IdxInDictArray(col.getColName)).toInt.asInstanceOf[Integer] |
| } else if (indexCol2IdxInNoDictArray.contains(col.getColName)) { |
| val bytes = data(0).asInstanceOf[ByteArrayWrapper].getNoDictionaryKeyByIndex( |
| indexCol2IdxInNoDictArray(col.getColName)) |
| // no dictionary primitive columns are expected to be in original data while loading, |
| // so convert it to original data |
| if (DataTypeUtil.isPrimitiveColumn(col.getDataType)) { |
| var dataFromBytes = DataTypeUtil |
| .getDataBasedOnDataTypeForNoDictionaryColumn(bytes, col.getDataType) |
| if (dataFromBytes == null) { |
| dataFromBytes = DataConvertUtil |
| .getNullValueForMeasure(col.getDataType, col.getColumnSchema.getScale) |
| } |
| // for timestamp the above method will give the original data, so it should be |
| // converted again to the format to be loaded (without micros) |
| if (null != dataFromBytes && col.getDataType == DataTypes.TIMESTAMP) { |
| dataFromBytes = (dataFromBytes.asInstanceOf[Long] / 1000L).asInstanceOf[Object]; |
| } |
| dataFromBytes |
| } else { |
| bytes |
| } |
| } else { |
| // measures start from 1 |
| val value = data(1 + indexCol2IdxInMeasureArray(col.getColName)) |
| if (null == value) { |
| DataConvertUtil.getNullValueForMeasure(col.getDataType, |
| col.getColumnSchema.getScale) |
| } else if (DataTypes.isDecimal(col.getDataType)) { |
| // In rebuild process, value is built for spark |
| // here we need to convert it to java BigDecimal for carbon |
| value.asInstanceOf[Decimal].toBigDecimal.bigDecimal |
| } else { |
| value |
| } |
| } |
| } |
| rtn(indexColumns.length) = data(data.length - 3) |
| rtn(indexColumns.length + 1) = data(data.length - 2) |
| rtn(indexColumns.length + 2) = data(data.length - 1) |
| rtn |
| } |
| |
| override def close(): Unit = { |
| } |
| } |
| |
| class IndexDataMapRebuildRDD[K, V]( |
| @transient private val session: SparkSession, |
| result: RefreshResult[K, V], |
| @transient private val tableInfo: TableInfo, |
| dataMapName: String, |
| indexColumns: Array[CarbonColumn], |
| segments: Set[Segment]) |
| extends CarbonRDDWithTableInfo[(K, V)](session, Nil, tableInfo.serialize()) { |
| |
| private val dataMapSchema = DataMapStoreManager.getInstance().getDataMapSchema(dataMapName) |
| private val queryId = sparkContext.getConf.get("queryId", System.nanoTime() + "") |
| private val jobTrackerId: String = { |
| val formatter = new SimpleDateFormat("yyyyMMddHHmm") |
| formatter.format(new util.Date()) |
| } |
| |
| override def internalCompute(split: Partition, context: TaskContext): Iterator[(K, V)] = { |
| val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) |
| val carbonTable = CarbonTable.buildFromTableInfo(getTableInfo) |
| val dataMapFactory = DataMapManager.get().getDataMapProvider( |
| carbonTable, dataMapSchema, session).getDataMapFactory |
| var status = false |
| val inputMetrics = new CarbonInputMetrics |
| TaskMetricsMap.getInstance().registerThreadCallback() |
| val inputSplit = split.asInstanceOf[CarbonSparkPartition].split.value |
| val segmentId = inputSplit.getAllSplits.get(0).getSegment.getSegmentNo |
| val segment = segments.find(p => p.getSegmentNo.equals(segmentId)) |
| if (segment.isDefined) { |
| inputMetrics.initBytesReadCallback(context, inputSplit, inputMetricsInterval) |
| |
| val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, split.index, 0) |
| val attemptContext = new TaskAttemptContextImpl(FileFactory.getConfiguration, attemptId) |
| val format = createInputFormat(segment.get, attemptContext) |
| |
| val model = format.createQueryModel(inputSplit, attemptContext) |
| // one query id per table |
| model.setQueryId(queryId) |
| model.setVectorReader(false) |
| model.setRequiredRowId(true) |
| |
| var reader: CarbonRecordReader[Array[Object]] = null |
| var refresher: DataMapBuilder = null |
| try { |
| val segmentPropertiesFetcher = DataMapStoreManager.getInstance().getDataMap(carbonTable, |
| BlockletDataMapFactory.DATA_MAP_SCHEMA).getDataMapFactory |
| .asInstanceOf[SegmentPropertiesFetcher] |
| val segmentProperties = segmentPropertiesFetcher.getSegmentProperties(segment.get) |
| |
| // we use task name as shard name to create the folder for this datamap |
| val shardName = CarbonTablePath.getShardName(inputSplit.getAllSplits.get(0).getBlockPath) |
| refresher = dataMapFactory.createBuilder(segment.get, shardName, segmentProperties) |
| refresher.initialize() |
| |
| model.setForcedDetailRawQuery(refresher.isIndexForCarbonRawBytes) |
| val readSupport = if (refresher.isIndexForCarbonRawBytes) { |
| new RawBytesReadSupport(segmentProperties, indexColumns) |
| } else { |
| new OriginalReadSupport(indexColumns.map(_.getDataType)) |
| } |
| reader = new CarbonRecordReader[Array[Object]](model, readSupport, inputMetrics, |
| attemptContext.getConfiguration) |
| reader.initialize(inputSplit, attemptContext) |
| // skip clear datamap and we will do this adter rebuild |
| reader.setSkipClearDataMapAtClose(true) |
| |
| // Note that datamap rebuilding is based on query, the blockletId in rowWithPosition |
| // is set to relative number in carbondata file in query process. |
| // In order to get absolute blockletId in shard like the one filled in loading process, |
| // here we use another way to generate it. |
| var blockletId = 0 |
| var firstRow = true |
| while (reader.nextKeyValue()) { |
| val rowWithPosition = reader.getCurrentValue |
| val size = rowWithPosition.length |
| val pageId = rowWithPosition(size - 2).asInstanceOf[Int] |
| val rowId = rowWithPosition(size - 1).asInstanceOf[Int] |
| |
| if (!firstRow && pageId == 0 && rowId == 0) { |
| blockletId = blockletId + 1 |
| } else { |
| firstRow = false |
| } |
| refresher.addRow(blockletId, pageId, rowId, rowWithPosition) |
| } |
| |
| refresher.finish() |
| |
| status = true |
| } finally { |
| if (reader != null) { |
| try { |
| reader.close() |
| } catch { |
| case ex: Throwable => |
| LOGGER.error("Failed to close reader", ex) |
| } |
| } |
| |
| if (refresher != null) { |
| try { |
| refresher.close() |
| } catch { |
| case ex: Throwable => |
| LOGGER.error("Failed to close index writer", ex) |
| } |
| } |
| } |
| } |
| |
| new Iterator[(K, V)] { |
| |
| var finished = false |
| |
| override def hasNext: Boolean = { |
| !finished |
| } |
| |
| override def next(): (K, V) = { |
| finished = true |
| result.getKey(split.index.toString, (segmentId, status)) |
| } |
| } |
| } |
| |
| |
| private def createInputFormat(segment: Segment, |
| attemptContext: TaskAttemptContextImpl) = { |
| val format = new CarbonTableInputFormat[Object] |
| val tableInfo1 = getTableInfo |
| val conf = attemptContext.getConfiguration |
| CarbonInputFormat.setTableInfo(conf, tableInfo1) |
| CarbonInputFormat.setDatabaseName(conf, tableInfo1.getDatabaseName) |
| CarbonInputFormat.setTableName(conf, tableInfo1.getFactTable.getTableName) |
| CarbonInputFormat.setDataTypeConverter(conf, classOf[SparkDataTypeConverterImpl]) |
| |
| val identifier = tableInfo1.getOrCreateAbsoluteTableIdentifier() |
| CarbonInputFormat.setTablePath( |
| conf, |
| identifier.appendWithLocalPrefix(identifier.getTablePath)) |
| |
| CarbonInputFormat.setSegmentsToAccess( |
| conf, |
| List(segment).asJava) |
| |
| CarbonInputFormat.setColumnProjection( |
| conf, |
| new CarbonProjection(indexColumns.map(_.getColName))) |
| format |
| } |
| |
| override protected def internalGetPartitions = { |
| if (!dataMapSchema.isIndexDataMap) { |
| throw new UnsupportedOperationException |
| } |
| val conf = FileFactory.getConfiguration |
| val jobConf = new JobConf(conf) |
| SparkHadoopUtil.get.addCredentials(jobConf) |
| val job = Job.getInstance(jobConf) |
| job.getConfiguration.set("query.id", queryId) |
| |
| val format = new CarbonTableInputFormat[Object] |
| |
| CarbonInputFormat.setSegmentsToAccess( |
| job.getConfiguration, |
| segments.toList.asJava) |
| |
| CarbonInputFormat.setTableInfo( |
| job.getConfiguration, |
| tableInfo) |
| CarbonInputFormat.setTablePath( |
| job.getConfiguration, |
| tableInfo.getOrCreateAbsoluteTableIdentifier().getTablePath) |
| CarbonInputFormat.setDatabaseName( |
| job.getConfiguration, |
| tableInfo.getDatabaseName) |
| CarbonInputFormat.setTableName( |
| job.getConfiguration, |
| tableInfo.getFactTable.getTableName) |
| |
| format |
| .getSplits(job) |
| .asScala |
| .map(_.asInstanceOf[CarbonInputSplit]) |
| .groupBy(p => (p.getSegmentId, p.taskId)) |
| .map { group => |
| new CarbonMultiBlockSplit( |
| group._2.asJava, |
| group._2.flatMap(_.getLocations).toArray) |
| } |
| .zipWithIndex |
| .map { split => |
| new CarbonSparkPartition(id, split._2, split._1) |
| } |
| .toArray |
| } |
| } |