| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hudi |
| |
| import org.apache.avro.Schema |
| import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder, IndexedRecord} |
| import org.apache.hadoop.conf.Configuration |
| import org.apache.hadoop.fs.Path |
| import org.apache.hadoop.mapred.JobConf |
| import org.apache.hudi.HoodieConversionUtils.toScalaOption |
| import org.apache.hudi.HoodieMergeOnReadRDD.{AvroDeserializerSupport, collectFieldOrdinals, getPartitionPath, projectAvro, projectAvroUnsafe, projectRowUnsafe, resolveAvroSchemaNullability} |
| import org.apache.hudi.MergeOnReadSnapshotRelation.getFilePath |
| import org.apache.hudi.common.config.HoodieMetadataConfig |
| import org.apache.hudi.common.engine.HoodieLocalEngineContext |
| import org.apache.hudi.common.fs.FSUtils |
| import org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath |
| import org.apache.hudi.common.model.{HoodieLogFile, HoodieRecord, HoodieRecordPayload, OverwriteWithLatestAvroPayload} |
| import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner |
| import org.apache.hudi.common.util.ValidationUtils.checkState |
| import org.apache.hudi.config.HoodiePayloadConfig |
| import org.apache.hudi.exception.HoodieException |
| import org.apache.hudi.hadoop.config.HoodieRealtimeConfig |
| import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes |
| import org.apache.hudi.metadata.HoodieTableMetadata.getDataTableBasePathFromMetadataTable |
| import org.apache.hudi.metadata.{HoodieBackedTableMetadata, HoodieTableMetadata} |
| import org.apache.hudi.internal.schema.InternalSchema |
| import org.apache.spark.rdd.RDD |
| import org.apache.spark.sql.avro.HoodieAvroDeserializer |
| import org.apache.spark.sql.catalyst.InternalRow |
| import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, UnsafeProjection} |
| import org.apache.spark.sql.execution.datasources.PartitionedFile |
| import org.apache.spark.sql.types.StructType |
| import org.apache.spark.{Partition, SerializableWritable, SparkContext, TaskContext} |
| |
| import java.io.Closeable |
| import java.util.Properties |
| import scala.annotation.tailrec |
| import scala.collection.JavaConverters._ |
| import scala.util.Try |
| |
| case class HoodieMergeOnReadPartition(index: Int, split: HoodieMergeOnReadFileSplit) extends Partition |
| |
| class HoodieMergeOnReadRDD(@transient sc: SparkContext, |
| @transient config: Configuration, |
| fullSchemaFileReader: PartitionedFile => Iterator[InternalRow], |
| requiredSchemaFileReader: PartitionedFile => Iterator[InternalRow], |
| tableSchema: HoodieTableSchema, |
| requiredSchema: HoodieTableSchema, |
| tableState: HoodieTableState, |
| mergeType: String, |
| @transient fileSplits: Seq[HoodieMergeOnReadFileSplit]) |
| extends RDD[InternalRow](sc, Nil) with HoodieUnsafeRDD { |
| |
| protected val maxCompactionMemoryInBytes: Long = getMaxCompactionMemoryInBytes(new JobConf(config)) |
| |
| private val confBroadcast = sc.broadcast(new SerializableWritable(config)) |
| private val payloadProps = tableState.preCombineFieldOpt |
| .map(preCombineField => |
| HoodiePayloadConfig.newBuilder |
| .withPayloadOrderingField(preCombineField) |
| .build |
| .getProps |
| ) |
| .getOrElse(new Properties()) |
| |
| private val whitelistedPayloadClasses: Set[String] = Seq( |
| classOf[OverwriteWithLatestAvroPayload] |
| ).map(_.getName).toSet |
| |
| override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { |
| val mergeOnReadPartition = split.asInstanceOf[HoodieMergeOnReadPartition] |
| val iter = mergeOnReadPartition.split match { |
| case dataFileOnlySplit if dataFileOnlySplit.logFiles.isEmpty => |
| requiredSchemaFileReader.apply(dataFileOnlySplit.dataFile.get) |
| |
| case logFileOnlySplit if logFileOnlySplit.dataFile.isEmpty => |
| new LogFileIterator(logFileOnlySplit, getConfig) |
| |
| case split if mergeType.equals(DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL) => |
| val baseFileIterator = requiredSchemaFileReader.apply(split.dataFile.get) |
| new SkipMergeIterator(split, baseFileIterator, getConfig) |
| |
| case split if mergeType.equals(DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL) => |
| val (baseFileIterator, schema) = readBaseFile(split) |
| new RecordMergingFileIterator(split, baseFileIterator, schema, getConfig) |
| |
| case _ => throw new HoodieException(s"Unable to select an Iterator to read the Hoodie MOR File Split for " + |
| s"file path: ${mergeOnReadPartition.split.dataFile.get.filePath}" + |
| s"log paths: ${mergeOnReadPartition.split.logFiles.toString}" + |
| s"hoodie table path: ${tableState.tablePath}" + |
| s"spark partition Index: ${mergeOnReadPartition.index}" + |
| s"merge type: ${mergeType}") |
| } |
| |
| if (iter.isInstanceOf[Closeable]) { |
| // register a callback to close logScanner which will be executed on task completion. |
| // when tasks finished, this method will be called, and release resources. |
| Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => iter.asInstanceOf[Closeable].close())) |
| } |
| |
| iter |
| } |
| |
| private def readBaseFile(split: HoodieMergeOnReadFileSplit): (Iterator[InternalRow], HoodieTableSchema) = { |
| // NOTE: This is an optimization making sure that even for MOR tables we fetch absolute minimum |
| // of the stored data possible, while still properly executing corresponding relation's semantic |
| // and meet the query's requirements. |
| // |
| // Here we assume that iff queried table |
| // a) It does use one of the standard (and whitelisted) Record Payload classes |
| // then we can avoid reading and parsing the records w/ _full_ schema, and instead only |
| // rely on projected one, nevertheless being able to perform merging correctly |
| if (!whitelistedPayloadClasses.contains(tableState.recordPayloadClassName)) |
| (fullSchemaFileReader(split.dataFile.get), tableSchema) |
| else |
| (requiredSchemaFileReader(split.dataFile.get), requiredSchema) |
| } |
| |
| override protected def getPartitions: Array[Partition] = |
| fileSplits.zipWithIndex.map(file => HoodieMergeOnReadPartition(file._2, file._1)).toArray |
| |
| private def getConfig: Configuration = { |
| val conf = confBroadcast.value.value |
| HoodieMergeOnReadRDD.CONFIG_INSTANTIATION_LOCK.synchronized { |
| new Configuration(conf) |
| } |
| } |
| |
| /** |
| * Provided w/ instance of [[HoodieMergeOnReadFileSplit]], iterates over all of the records stored in |
| * Delta Log files (represented as [[InternalRow]]s) |
| */ |
| private class LogFileIterator(split: HoodieMergeOnReadFileSplit, |
| config: Configuration) |
| extends Iterator[InternalRow] with Closeable with AvroDeserializerSupport { |
| |
| protected override val requiredAvroSchema: Schema = new Schema.Parser().parse(requiredSchema.avroSchemaStr) |
| protected override val requiredStructTypeSchema: StructType = requiredSchema.structTypeSchema |
| |
| protected val logFileReaderAvroSchema: Schema = new Schema.Parser().parse(tableSchema.avroSchemaStr) |
| |
| protected val recordBuilder: GenericRecordBuilder = new GenericRecordBuilder(requiredAvroSchema) |
| protected var recordToLoad: InternalRow = _ |
| |
| // TODO validate whether we need to do UnsafeProjection |
| protected val unsafeProjection: UnsafeProjection = UnsafeProjection.create(requiredStructTypeSchema) |
| |
| // NOTE: This maps _required_ schema fields onto the _full_ table schema, collecting their "ordinals" |
| // w/in the record payload. This is required, to project records read from the Delta Log file |
| // which always reads records in full schema (never projected, due to the fact that DL file might |
| // be stored in non-columnar formats like Avro, HFile, etc) |
| private val requiredSchemaFieldOrdinals: List[Int] = collectFieldOrdinals(requiredAvroSchema, logFileReaderAvroSchema) |
| |
| // TODO: now logScanner with internalSchema support column project, we may no need projectAvroUnsafe |
| private var logScanner = |
| HoodieMergeOnReadRDD.scanLog(split.logFiles, getPartitionPath(split), logFileReaderAvroSchema, tableState, |
| maxCompactionMemoryInBytes, config, tableSchema.internalSchema) |
| |
| private val logRecords = logScanner.getRecords.asScala |
| |
| // NOTE: This iterator iterates over already projected (in required schema) records |
| // NOTE: This have to stay lazy to make sure it's initialized only at the point where it's |
| // going to be used, since we modify `logRecords` before that and therefore can't do it any earlier |
| protected lazy val logRecordsIterator: Iterator[Option[GenericRecord]] = |
| logRecords.iterator.map { |
| case (_, record) => |
| val avroRecordOpt = toScalaOption(record.getData.getInsertValue(logFileReaderAvroSchema, payloadProps)) |
| avroRecordOpt.map { |
| avroRecord => projectAvroUnsafe(avroRecord, requiredAvroSchema, requiredSchemaFieldOrdinals, recordBuilder) |
| } |
| } |
| |
| protected def removeLogRecord(key: String): Option[HoodieRecord[_ <: HoodieRecordPayload[_]]] = |
| logRecords.remove(key) |
| |
| override def hasNext: Boolean = hasNextInternal |
| |
| // NOTE: It's crucial for this method to be annotated w/ [[@tailrec]] to make sure |
| // that recursion is unfolded into a loop to avoid stack overflows while |
| // handling records |
| @tailrec private def hasNextInternal: Boolean = { |
| logRecordsIterator.hasNext && { |
| val avroRecordOpt = logRecordsIterator.next() |
| if (avroRecordOpt.isEmpty) { |
| // Record has been deleted, skipping |
| this.hasNextInternal |
| } else { |
| recordToLoad = unsafeProjection(deserialize(avroRecordOpt.get)) |
| true |
| } |
| } |
| } |
| |
| override final def next(): InternalRow = recordToLoad |
| |
| override def close(): Unit = |
| if (logScanner != null) { |
| try { |
| logScanner.close() |
| } finally { |
| logScanner = null |
| } |
| } |
| } |
| |
| /** |
| * Provided w/ instance of [[HoodieMergeOnReadFileSplit]], provides an iterator over all of the records stored in |
| * Base file as well as all of the Delta Log files simply returning concatenation of these streams, while not |
| * performing any combination/merging of the records w/ the same primary keys (ie producing duplicates potentially) |
| */ |
| private class SkipMergeIterator(split: HoodieMergeOnReadFileSplit, |
| baseFileIterator: Iterator[InternalRow], |
| config: Configuration) |
| extends LogFileIterator(split, config) { |
| |
| override def hasNext: Boolean = { |
| if (baseFileIterator.hasNext) { |
| val curRow = baseFileIterator.next() |
| recordToLoad = unsafeProjection(curRow) |
| true |
| } else { |
| super[LogFileIterator].hasNext |
| } |
| } |
| } |
| |
| /** |
| * Provided w/ instance of [[HoodieMergeOnReadFileSplit]], provides an iterator over all of the records stored in |
| * a) Base file and all of the b) Delta Log files combining records with the same primary key from both of these |
| * streams |
| */ |
| private class RecordMergingFileIterator(split: HoodieMergeOnReadFileSplit, |
| baseFileIterator: Iterator[InternalRow], |
| baseFileReaderSchema: HoodieTableSchema, |
| config: Configuration) |
| extends LogFileIterator(split, config) { |
| |
| // NOTE: Record-merging iterator supports 2 modes of operation merging records bearing either |
| // - Full table's schema |
| // - Projected schema |
| // As such, no particular schema could be assumed, and therefore we rely on the caller |
| // to correspondingly set the scheme of the expected output of base-file reader |
| private val baseFileReaderAvroSchema = new Schema.Parser().parse(baseFileReaderSchema.avroSchemaStr) |
| private val requiredSchemaFieldOrdinals: List[Int] = collectFieldOrdinals(requiredAvroSchema, baseFileReaderAvroSchema) |
| |
| private val serializer = sparkAdapter.createAvroSerializer(baseFileReaderSchema.structTypeSchema, |
| baseFileReaderAvroSchema, resolveAvroSchemaNullability(baseFileReaderAvroSchema)) |
| |
| private val recordKeyOrdinal = baseFileReaderSchema.structTypeSchema.fieldIndex(tableState.recordKeyField) |
| |
| override def hasNext: Boolean = hasNextInternal |
| |
| // NOTE: It's crucial for this method to be annotated w/ [[@tailrec]] to make sure |
| // that recursion is unfolded into a loop to avoid stack overflows while |
| // handling records |
| @tailrec private def hasNextInternal: Boolean = { |
| if (baseFileIterator.hasNext) { |
| val curRowRecord = baseFileIterator.next() |
| val curKey = curRowRecord.getString(recordKeyOrdinal) |
| val updatedRecordOpt = removeLogRecord(curKey) |
| if (updatedRecordOpt.isEmpty) { |
| // No merge needed, load current row with required projected schema |
| recordToLoad = unsafeProjection(projectRowUnsafe(curRowRecord, requiredSchema.structTypeSchema, requiredSchemaFieldOrdinals)) |
| true |
| } else { |
| val mergedAvroRecordOpt = merge(serialize(curRowRecord), updatedRecordOpt.get) |
| if (mergedAvroRecordOpt.isEmpty) { |
| // Record has been deleted, skipping |
| this.hasNextInternal |
| } else { |
| // NOTE: In occurrence of a merge we can't know the schema of the record being returned, b/c |
| // record from the Delta Log will bear (full) Table schema, while record from the Base file |
| // might already be read in projected one (as an optimization). |
| // As such we can't use more performant [[projectAvroUnsafe]], and instead have to fallback |
| // to [[projectAvro]] |
| val projectedAvroRecord = projectAvro(mergedAvroRecordOpt.get, requiredAvroSchema, recordBuilder) |
| recordToLoad = unsafeProjection(deserialize(projectedAvroRecord)) |
| true |
| } |
| } |
| } else { |
| super[LogFileIterator].hasNext |
| } |
| } |
| |
| private def serialize(curRowRecord: InternalRow): GenericRecord = |
| serializer.serialize(curRowRecord).asInstanceOf[GenericRecord] |
| |
| private def merge(curAvroRecord: GenericRecord, newRecord: HoodieRecord[_ <: HoodieRecordPayload[_]]): Option[IndexedRecord] = { |
| // NOTE: We have to pass in Avro Schema used to read from Delta Log file since we invoke combining API |
| // on the record from the Delta Log |
| toScalaOption(newRecord.getData.combineAndGetUpdateValue(curAvroRecord, logFileReaderAvroSchema, payloadProps)) |
| } |
| } |
| } |
| |
| private object HoodieMergeOnReadRDD { |
| |
| val CONFIG_INSTANTIATION_LOCK = new Object() |
| |
| def scanLog(logFiles: List[HoodieLogFile], |
| partitionPath: Path, |
| logSchema: Schema, |
| tableState: HoodieTableState, |
| maxCompactionMemoryInBytes: Long, |
| hadoopConf: Configuration, internalSchema: InternalSchema = InternalSchema.getEmptyInternalSchema): HoodieMergedLogRecordScanner = { |
| val tablePath = tableState.tablePath |
| val fs = FSUtils.getFs(tablePath, hadoopConf) |
| |
| if (HoodieTableMetadata.isMetadataTable(tablePath)) { |
| val metadataConfig = HoodieMetadataConfig.newBuilder().enable(true).build() |
| val dataTableBasePath = getDataTableBasePathFromMetadataTable(tablePath) |
| val metadataTable = new HoodieBackedTableMetadata( |
| new HoodieLocalEngineContext(hadoopConf), metadataConfig, |
| dataTableBasePath, |
| hadoopConf.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP, HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH)) |
| |
| // NOTE: In case of Metadata Table partition path equates to partition name (since there's just one level |
| // of indirection among MT partitions) |
| val relativePartitionPath = getRelativePartitionPath(new Path(tablePath), partitionPath) |
| metadataTable.getLogRecordScanner(logFiles.asJava, relativePartitionPath).getLeft |
| } else { |
| val logRecordScannerBuilder = HoodieMergedLogRecordScanner.newBuilder() |
| .withFileSystem(fs) |
| .withBasePath(tablePath) |
| .withLogFilePaths(logFiles.map(logFile => getFilePath(logFile.getPath)).asJava) |
| .withReaderSchema(logSchema) |
| .withLatestInstantTime(tableState.latestCommitTimestamp) |
| .withReadBlocksLazily( |
| Try(hadoopConf.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP, |
| HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED).toBoolean) |
| .getOrElse(false)) |
| .withReverseReader(false) |
| .withInternalSchema(internalSchema) |
| .withBufferSize( |
| hadoopConf.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP, |
| HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE)) |
| .withMaxMemorySizeInBytes(maxCompactionMemoryInBytes) |
| .withSpillableMapBasePath( |
| hadoopConf.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP, |
| HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH)) |
| |
| if (logFiles.nonEmpty) { |
| logRecordScannerBuilder.withPartition( |
| getRelativePartitionPath(new Path(tableState.tablePath), logFiles.head.getPath.getParent)) |
| } |
| |
| logRecordScannerBuilder.build() |
| } |
| } |
| |
| /** |
| * Projects provided instance of [[InternalRow]] into provided schema, assuming that the |
| * the schema of the original row is strictly a superset of the given one |
| */ |
| private def projectRowUnsafe(row: InternalRow, |
| projectedSchema: StructType, |
| ordinals: Seq[Int]): InternalRow = { |
| val projectedRow = new SpecificInternalRow(projectedSchema) |
| var curIndex = 0 |
| projectedSchema.zip(ordinals).foreach { case (field, pos) => |
| val curField = if (row.isNullAt(pos)) { |
| null |
| } else { |
| row.get(pos, field.dataType) |
| } |
| projectedRow.update(curIndex, curField) |
| curIndex += 1 |
| } |
| projectedRow |
| } |
| |
| /** |
| * Projects provided instance of [[IndexedRecord]] into provided schema, assuming that the |
| * the schema of the original row is strictly a superset of the given one |
| */ |
| def projectAvroUnsafe(record: IndexedRecord, |
| projectedSchema: Schema, |
| ordinals: List[Int], |
| recordBuilder: GenericRecordBuilder): GenericRecord = { |
| val fields = projectedSchema.getFields.asScala |
| checkState(fields.length == ordinals.length) |
| fields.zip(ordinals).foreach { |
| case (field, pos) => recordBuilder.set(field, record.get(pos)) |
| } |
| recordBuilder.build() |
| } |
| |
| /** |
| * Projects provided instance of [[IndexedRecord]] into provided schema, assuming that the |
| * the schema of the original row is strictly a superset of the given one |
| * |
| * This is a "safe" counterpart of [[projectAvroUnsafe]]: it does build mapping of the record's |
| * schema into projected one itself (instead of expecting such mapping from the caller) |
| */ |
| def projectAvro(record: IndexedRecord, |
| projectedSchema: Schema, |
| recordBuilder: GenericRecordBuilder): GenericRecord = { |
| projectAvroUnsafe(record, projectedSchema, collectFieldOrdinals(projectedSchema, record.getSchema), recordBuilder) |
| } |
| |
| /** |
| * Maps [[projected]] [[Schema]] onto [[source]] one, collecting corresponding field ordinals w/in it, which |
| * will be subsequently used by either [[projectRowUnsafe]] or [[projectAvroUnsafe()]] method |
| * |
| * @param projected target projected schema (which is a proper subset of [[source]] [[Schema]]) |
| * @param source source schema of the record being projected |
| * @return list of ordinals of corresponding fields of [[projected]] schema w/in [[source]] one |
| */ |
| private def collectFieldOrdinals(projected: Schema, source: Schema): List[Int] = { |
| projected.getFields.asScala.map(f => source.getField(f.name()).pos()).toList |
| } |
| |
| private def getPartitionPath(split: HoodieMergeOnReadFileSplit): Path = { |
| // Determine partition path as an immediate parent folder of either |
| // - The base file |
| // - Some log file |
| split.dataFile.map(baseFile => new Path(baseFile.filePath)) |
| .getOrElse(split.logFiles.head.getPath) |
| .getParent |
| } |
| |
| private def resolveAvroSchemaNullability(schema: Schema) = { |
| AvroConversionUtils.resolveAvroTypeNullability(schema) match { |
| case (nullable, _) => nullable |
| } |
| } |
| |
| trait AvroDeserializerSupport extends SparkAdapterSupport { |
| protected val requiredAvroSchema: Schema |
| protected val requiredStructTypeSchema: StructType |
| |
| private lazy val deserializer: HoodieAvroDeserializer = |
| sparkAdapter.createAvroDeserializer(requiredAvroSchema, requiredStructTypeSchema) |
| |
| protected def deserialize(avroRecord: GenericRecord): InternalRow = { |
| checkState(avroRecord.getSchema.getFields.size() == requiredStructTypeSchema.fields.length) |
| deserializer.deserialize(avroRecord).get.asInstanceOf[InternalRow] |
| } |
| } |
| } |