| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.carbondata.index |
| |
| import java.io.IOException |
| |
| import scala.collection.JavaConverters._ |
| import scala.collection.mutable |
| import scala.collection.mutable.ListBuffer |
| |
| import org.apache.hadoop.mapreduce.{TaskAttemptID, TaskType} |
| import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl |
| import org.apache.spark.{CarbonInputMetrics, Partition, TaskContext} |
| import org.apache.spark.sql.SparkSession |
| import org.apache.spark.sql.types.Decimal |
| |
| import org.apache.carbondata.common.logging.LogServiceFactory |
| import org.apache.carbondata.converter.SparkDataTypeConverterImpl |
| import org.apache.carbondata.core.datastore.block.SegmentProperties |
| import org.apache.carbondata.core.datastore.impl.FileFactory |
| import org.apache.carbondata.core.index.{IndexStoreManager, Segment} |
| import org.apache.carbondata.core.index.dev.IndexBuilder |
| import org.apache.carbondata.core.indexstore.SegmentPropertiesFetcher |
| import org.apache.carbondata.core.indexstore.blockletindex.BlockletIndexFactory |
| import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes} |
| import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, IndexSchema, TableInfo} |
| import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn |
| import org.apache.carbondata.core.readcommitter.ReadCommittedScope |
| import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper |
| import org.apache.carbondata.core.statusmanager.SegmentStatusManager |
| import org.apache.carbondata.core.util.{ByteUtil, DataTypeUtil, TaskMetricsMap} |
| import org.apache.carbondata.core.util.path.CarbonTablePath |
| import org.apache.carbondata.events.{BuildIndexPostExecutionEvent, BuildIndexPreExecutionEvent, OperationContext, OperationListenerBus} |
| import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonMultiBlockSplit, CarbonProjection, CarbonRecordReader} |
| import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableInputFormat} |
| import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport |
| import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil |
| import org.apache.carbondata.index.bloom.DataConvertUtil |
| import org.apache.carbondata.spark.{RefreshResult, RefreshResultImpl} |
| import org.apache.carbondata.spark.rdd.{CarbonRDDWithTableInfo, CarbonSparkPartition} |
| import org.apache.carbondata.spark.util.CarbonSparkUtil |
| |
| |
| /** |
| * Helper object to rebuild the index Index |
| */ |
| object IndexRebuildRDD { |
| private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) |
| |
| /** |
| * Rebuild the indexSchema for all existing data in the table |
| */ |
| def rebuildIndex( |
| sparkSession: SparkSession, |
| carbonTable: CarbonTable, |
| schema: IndexSchema |
| ): Unit = { |
| val tableIdentifier = carbonTable.getAbsoluteTableIdentifier |
| val segmentStatusManager = new SegmentStatusManager(tableIdentifier) |
| val validAndInvalidSegments = segmentStatusManager |
| .getValidAndInvalidSegments(carbonTable.isMV) |
| val validSegments = validAndInvalidSegments.getValidSegments |
| val indexedCarbonColumns = carbonTable.getIndexedColumns(schema.getIndexColumns) |
| val operationContext = new OperationContext() |
| val buildIndexPreExecutionEvent = new BuildIndexPreExecutionEvent(sparkSession, |
| tableIdentifier, |
| mutable.Seq[String](schema.getIndexName)) |
| OperationListenerBus.getInstance().fireEvent(buildIndexPreExecutionEvent, operationContext) |
| |
| val segments2DmStorePath = validSegments.asScala.map { segment => |
| val indexStorePath = CarbonTablePath.getIndexesStorePath(carbonTable.getTablePath, |
| segment.getSegmentNo, schema.getIndexName) |
| segment -> indexStorePath |
| }.filter(p => !FileFactory.isFileExist(p._2)).toMap |
| |
| segments2DmStorePath.foreach { case (_, dmPath) => |
| if (!FileFactory.mkdirs(dmPath)) { |
| throw new IOException( |
| s"Failed to create directory $dmPath for rebuilding indexSchema ${ schema.getIndexName }") |
| } |
| } |
| |
| val status = new IndexRebuildRDD[String, (String, Boolean)]( |
| sparkSession, |
| new RefreshResultImpl(), |
| carbonTable.getTableInfo, |
| schema, |
| indexedCarbonColumns.asScala.toArray, |
| segments2DmStorePath.keySet |
| ).collect |
| |
| // for failed segments, clean the result |
| val failedSegments = status |
| .find { case (taskId, (segmentId, rebuildStatus)) => |
| !rebuildStatus |
| } |
| .map { task => |
| val segmentId = task._2._1 |
| val dmPath = segments2DmStorePath.filter(p => p._1.getSegmentNo.equals(segmentId)).values |
| val cleanResult = dmPath.map(p => |
| FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(p))) |
| if (cleanResult.exists(!_)) { |
| LOGGER.error(s"Failed to clean up indexSchema store for segment_$segmentId") |
| false |
| } else { |
| true |
| } |
| } |
| |
| if (failedSegments.nonEmpty) { |
| throw new Exception(s"Failed to refresh indexSchema ${ schema.getIndexName }") |
| } |
| |
| val buildIndexPostExecutionEvent = BuildIndexPostExecutionEvent(sparkSession, |
| tableIdentifier, schema.getIndexName, validSegments.asScala.map(_.getSegmentNo), true) |
| OperationListenerBus.getInstance().fireEvent(buildIndexPostExecutionEvent, operationContext) |
| } |
| } |
| |
| class OriginalReadSupport(dataTypes: Array[DataType]) extends CarbonReadSupport[Array[Object]] { |
| override def initialize(carbonColumns: Array[CarbonColumn], |
| carbonTable: CarbonTable): Unit = { |
| } |
| |
| override def readRow(data: Array[Object]): Array[Object] = { |
| dataTypes.zipWithIndex.foreach { case (dataType, i) => |
| if (dataType == DataTypes.STRING && data(i) != null) { |
| data(i) = data(i).toString |
| } |
| } |
| data |
| } |
| |
| override def close(): Unit = { |
| } |
| } |
| |
| /** |
| * This class will generate row value which is raw bytes for the dimensions. |
| */ |
| class RawBytesReadSupport(segmentProperties: SegmentProperties, indexColumns: Array[CarbonColumn]) |
| extends CarbonReadSupport[Array[Object]] { |
| // for the dictionary dimensions |
| var indexCol2IdxInDictArray: Map[String, Int] = Map() |
| // for the non dictionary dimensions |
| var indexCol2IdxInNoDictArray: Map[String, Int] = Map() |
| // for the measures |
| var indexCol2IdxInMeasureArray: Map[String, Int] = Map() |
| |
| override def initialize(carbonColumns: Array[CarbonColumn], |
| carbonTable: CarbonTable): Unit = { |
| |
| val dictIndexColumns = new ListBuffer[CarbonColumn]() |
| |
| // prepare index info to extract data from query result |
| indexColumns.foreach { col => |
| if (col.isDimension) { |
| val dim = carbonTable.getDimensionByName(col.getColName) |
| if (dim.getDataType != DataTypes.DATE) { |
| indexCol2IdxInNoDictArray = |
| indexCol2IdxInNoDictArray + (col.getColName -> indexCol2IdxInNoDictArray.size) |
| } else { |
| dictIndexColumns.append(col) |
| indexCol2IdxInDictArray = |
| indexCol2IdxInDictArray + (col.getColName -> indexCol2IdxInDictArray.size) |
| } |
| } else { |
| indexCol2IdxInMeasureArray = |
| indexCol2IdxInMeasureArray + (col.getColName -> indexCol2IdxInMeasureArray.size) |
| } |
| } |
| |
| } |
| |
| /** |
| * input: all the dimensions are bundled in one ByteArrayWrapper in position 0, |
| * then comes the measures one by one; last 3 elements are block/page/row id |
| * output: all the dimensions and measures comes one after another |
| */ |
| override def readRow(data: Array[Object]): Array[Object] = { |
| |
| val surrogateKeys = if (segmentProperties.getNumberOfDictDimensions > 0) { |
| ByteUtil.convertBytesToLongArray( |
| data(0).asInstanceOf[ByteArrayWrapper].getDictionaryKey) |
| } else { |
| new Array[Long](0) |
| } |
| |
| // fill return row from data |
| val rtn = new Array[Object](indexColumns.length + 3) |
| indexColumns.zipWithIndex.foreach { case (col, i) => |
| rtn(i) = if (indexCol2IdxInDictArray.contains(col.getColName)) { |
| surrogateKeys(indexCol2IdxInDictArray(col.getColName)).toInt.asInstanceOf[Integer] |
| } else if (indexCol2IdxInNoDictArray.contains(col.getColName)) { |
| val bytes = data(0).asInstanceOf[ByteArrayWrapper].getNoDictionaryKeyByIndex( |
| indexCol2IdxInNoDictArray(col.getColName)) |
| // no dictionary primitive columns are expected to be in original data while loading, |
| // so convert it to original data |
| if (DataTypeUtil.isPrimitiveColumn(col.getDataType)) { |
| var dataFromBytes = DataTypeUtil |
| .getDataBasedOnDataTypeForNoDictionaryColumn(bytes, col.getDataType) |
| if (dataFromBytes == null) { |
| dataFromBytes = DataConvertUtil |
| .getNullValueForMeasure(col.getDataType, col.getColumnSchema.getScale) |
| } |
| // for timestamp the above method will give the original data, so it should be |
| // converted again to the format to be loaded (without micros) |
| if (null != dataFromBytes && col.getDataType == DataTypes.TIMESTAMP) { |
| dataFromBytes = (dataFromBytes.asInstanceOf[Long] / 1000L).asInstanceOf[Object]; |
| } |
| dataFromBytes |
| } else { |
| bytes |
| } |
| } else { |
| // measures start from 1 |
| val value = data(1 + indexCol2IdxInMeasureArray(col.getColName)) |
| if (null == value) { |
| DataConvertUtil.getNullValueForMeasure(col.getDataType, |
| col.getColumnSchema.getScale) |
| } else if (DataTypes.isDecimal(col.getDataType)) { |
| // In rebuild process, value is built for spark |
| // here we need to convert it to java BigDecimal for carbon |
| value.asInstanceOf[Decimal].toBigDecimal.bigDecimal |
| } else { |
| value |
| } |
| } |
| } |
| rtn(indexColumns.length) = data(data.length - 3) |
| rtn(indexColumns.length + 1) = data(data.length - 2) |
| rtn(indexColumns.length + 2) = data(data.length - 1) |
| rtn |
| } |
| |
| override def close(): Unit = { |
| } |
| } |
| |
| class IndexRebuildRDD[K, V]( |
| @transient private val session: SparkSession, |
| result: RefreshResult[K, V], |
| @transient private val tableInfo: TableInfo, |
| indexSchema: IndexSchema, |
| indexColumns: Array[CarbonColumn], |
| segments: Set[Segment]) |
| extends CarbonRDDWithTableInfo[(K, V)](session, Nil, tableInfo.serialize()) { |
| |
| private val queryId = sparkContext.getConf.get("queryId", System.nanoTime() + "") |
| private val jobTrackerId = CarbonInputFormatUtil.createJobTrackerID() |
| |
| private var readCommittedScope: ReadCommittedScope = _ |
| |
| override def internalCompute(split: Partition, context: TaskContext): Iterator[(K, V)] = { |
| val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) |
| val carbonTable = CarbonTable.buildFromTableInfo(getTableInfo) |
| val indexFactory = new IndexProvider(carbonTable, indexSchema, session).getIndexFactory |
| var status = false |
| val inputMetrics = new CarbonInputMetrics |
| TaskMetricsMap.getInstance().registerThreadCallback() |
| val inputSplit = split.asInstanceOf[CarbonSparkPartition].split.value |
| val segmentId = inputSplit.getAllSplits.get(0).getSegment.getSegmentNo |
| val segment = segments.find(p => p.getSegmentNo.equals(segmentId)) |
| if (segment.isDefined) { |
| inputMetrics.initBytesReadCallback(context, inputSplit, inputMetricsInterval) |
| |
| val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, split.index, 0) |
| val attemptContext = new TaskAttemptContextImpl(FileFactory.getConfiguration, attemptId) |
| val format = createInputFormat(segment.get, attemptContext) |
| segment.get.setReadCommittedScope(readCommittedScope) |
| val model = format.createQueryModel(inputSplit, attemptContext) |
| // one query id per table |
| model.setQueryId(queryId) |
| model.setVectorReader(false) |
| model.setRequiredRowId(true) |
| |
| var reader: CarbonRecordReader[Array[Object]] = null |
| var refresher: IndexBuilder = null |
| try { |
| val segmentPropertiesFetcher = IndexStoreManager.getInstance().getIndex(carbonTable, |
| BlockletIndexFactory.INDEX_SCHEMA).getIndexFactory |
| .asInstanceOf[SegmentPropertiesFetcher] |
| val segmentProperties = segmentPropertiesFetcher.getSegmentProperties(segment.get) |
| |
| // we use task name as shard name to create the folder for this indexSchema |
| val shardName = CarbonTablePath.getShardName(inputSplit.getAllSplits.get(0).getBlockPath) |
| refresher = indexFactory.createBuilder(segment.get, shardName, segmentProperties) |
| refresher.initialize() |
| |
| model.setForcedDetailRawQuery(refresher.isIndexForCarbonRawBytes) |
| val readSupport = if (refresher.isIndexForCarbonRawBytes) { |
| new RawBytesReadSupport(segmentProperties, indexColumns) |
| } else { |
| new OriginalReadSupport(indexColumns.map(_.getDataType)) |
| } |
| reader = new CarbonRecordReader[Array[Object]](model, readSupport, inputMetrics, |
| attemptContext.getConfiguration) |
| reader.initialize(inputSplit, attemptContext) |
| // skip clear indexSchema and we will do this after rebuild |
| reader.setSkipClearIndexAtClose(true) |
| |
| // Note that indexSchema rebuilding is based on query, the blockletId in rowWithPosition |
| // is set to relative number in carbondata file in query process. |
| // In order to get absolute blockletId in shard like the one filled in loading process, |
| // here we use another way to generate it. |
| var blockletId = 0 |
| var firstRow = true |
| while (reader.nextKeyValue()) { |
| val rowWithPosition = reader.getCurrentValue |
| val size = rowWithPosition.length |
| val pageId = rowWithPosition(size - 2).asInstanceOf[Int] |
| val rowId = rowWithPosition(size - 1).asInstanceOf[Int] |
| |
| if (!firstRow && pageId == 0 && rowId == 0) { |
| blockletId = blockletId + 1 |
| } else { |
| firstRow = false |
| } |
| refresher.addRow(blockletId, pageId, rowId, rowWithPosition) |
| } |
| |
| refresher.finish() |
| |
| status = true |
| } finally { |
| if (reader != null) { |
| try { |
| reader.close() |
| } catch { |
| case ex: Throwable => |
| LOGGER.error("Failed to close reader", ex) |
| } |
| } |
| |
| if (refresher != null) { |
| try { |
| refresher.close() |
| } catch { |
| case ex: Throwable => |
| LOGGER.error("Failed to close index writer", ex) |
| } |
| } |
| } |
| } |
| |
| new Iterator[(K, V)] { |
| |
| var finished = false |
| |
| override def hasNext: Boolean = { |
| !finished |
| } |
| |
| override def next(): (K, V) = { |
| finished = true |
| result.getKey(split.index.toString, (segmentId, status)) |
| } |
| } |
| } |
| |
| |
| private def createInputFormat(segment: Segment, |
| attemptContext: TaskAttemptContextImpl) = { |
| val format = new CarbonTableInputFormat[Object] |
| val tableInfo1 = getTableInfo |
| val conf = attemptContext.getConfiguration |
| CarbonInputFormat.setTableInfo(conf, tableInfo1) |
| CarbonInputFormat.setDatabaseName(conf, tableInfo1.getDatabaseName) |
| CarbonInputFormat.setTableName(conf, tableInfo1.getFactTable.getTableName) |
| CarbonInputFormat.setDataTypeConverter(conf, classOf[SparkDataTypeConverterImpl]) |
| |
| val identifier = tableInfo1.getOrCreateAbsoluteTableIdentifier() |
| CarbonInputFormat.setTablePath( |
| conf, |
| identifier.appendWithLocalPrefix(identifier.getTablePath)) |
| |
| CarbonInputFormat.setSegmentsToAccess( |
| conf, |
| List(segment).asJava) |
| |
| CarbonInputFormat.setColumnProjection( |
| conf, |
| new CarbonProjection(indexColumns.map(_.getColName))) |
| format |
| } |
| |
| override protected def internalGetPartitions = { |
| if (!indexSchema.isIndex) { |
| throw new UnsupportedOperationException |
| } |
| val job = CarbonSparkUtil.createHadoopJob() |
| job.getConfiguration.set("query.id", queryId) |
| |
| val format = new CarbonTableInputFormat[Object] |
| |
| CarbonInputFormat.setSegmentsToAccess( |
| job.getConfiguration, |
| segments.toList.asJava) |
| |
| CarbonInputFormat.setTableInfo( |
| job.getConfiguration, |
| tableInfo) |
| CarbonInputFormat.setTablePath( |
| job.getConfiguration, |
| tableInfo.getOrCreateAbsoluteTableIdentifier().getTablePath) |
| CarbonInputFormat.setDatabaseName( |
| job.getConfiguration, |
| tableInfo.getDatabaseName) |
| CarbonInputFormat.setTableName( |
| job.getConfiguration, |
| tableInfo.getFactTable.getTableName) |
| |
| // make the partitions based on block path so that all the CarbonInputSplits in a |
| // MultiBlockSplit are used for bloom reading. This means 1 task for 1 shard(unique block path). |
| val splits = format.getSplits(job) |
| readCommittedScope = format.getReadCommitted(job, null) |
| splits.asScala |
| .map(_.asInstanceOf[CarbonInputSplit]) |
| .groupBy(p => (p.getSegmentId, p.taskId, p.getBlockPath)) |
| .map { group => |
| new CarbonMultiBlockSplit( |
| group._2.asJava, |
| group._2.flatMap(_.getLocations).toArray) |
| } |
| .zipWithIndex |
| .map { split => |
| new CarbonSparkPartition(id, split._2, split._1) |
| } |
| .toArray |
| } |
| } |