blob: 3b7b114edd2c86da60cf5b5d71ca2178bddd1e39 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.integ.testsuite.generator;
import java.io.IOException;
import java.io.Serializable;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.StreamSupport;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.integ.testsuite.converter.Converter;
import org.apache.hudi.integ.testsuite.converter.UpdateConverter;
import org.apache.hudi.integ.testsuite.reader.DFSAvroDeltaInputReader;
import org.apache.hudi.integ.testsuite.reader.DFSHoodieDatasetInputReader;
import org.apache.hudi.integ.testsuite.reader.DeltaInputReader;
import org.apache.hudi.integ.testsuite.writer.DeltaOutputMode;
import org.apache.hudi.integ.testsuite.writer.DeltaWriteStats;
import org.apache.hudi.integ.testsuite.writer.DeltaWriterAdapter;
import org.apache.hudi.integ.testsuite.writer.DeltaWriterFactory;
import org.apache.hudi.keygen.BuiltinKeyGenerator;
import org.apache.hudi.integ.testsuite.configuration.DFSDeltaConfig;
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig;
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.storage.StorageLevel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
/**
* The delta generator generates all types of workloads (insert, update) for the given configs.
*/
public class DeltaGenerator implements Serializable {
private static Logger log = LoggerFactory.getLogger(DeltaGenerator.class);
private DeltaConfig deltaOutputConfig;
private transient JavaSparkContext jsc;
private transient SparkSession sparkSession;
private String schemaStr;
private List<String> recordRowKeyFieldNames;
private List<String> partitionPathFieldNames;
private int batchId;
public DeltaGenerator(DeltaConfig deltaOutputConfig, JavaSparkContext jsc, SparkSession sparkSession,
String schemaStr, BuiltinKeyGenerator keyGenerator) {
this.deltaOutputConfig = deltaOutputConfig;
this.jsc = jsc;
this.sparkSession = sparkSession;
this.schemaStr = schemaStr;
this.recordRowKeyFieldNames = keyGenerator.getRecordKeyFields();
this.partitionPathFieldNames = keyGenerator.getPartitionPathFields();
}
public JavaRDD<DeltaWriteStats> writeRecords(JavaRDD<GenericRecord> records) {
// The following creates a new anonymous function for iterator and hence results in serialization issues
JavaRDD<DeltaWriteStats> ws = records.mapPartitions(itr -> {
try {
DeltaWriterAdapter<GenericRecord> deltaWriterAdapter = DeltaWriterFactory
.getDeltaWriterAdapter(deltaOutputConfig, batchId);
return Collections.singletonList(deltaWriterAdapter.write(itr)).iterator();
} catch (IOException io) {
throw new UncheckedIOException(io);
}
}).flatMap(List::iterator);
batchId++;
return ws;
}
public JavaRDD<GenericRecord> generateInserts(Config operation) {
long recordsPerPartition = operation.getNumRecordsInsert();
int minPayloadSize = operation.getRecordSize();
JavaRDD<GenericRecord> inputBatch = jsc.parallelize(Collections.EMPTY_LIST)
.repartition(operation.getNumInsertPartitions()).mapPartitions(p -> {
return new LazyRecordGeneratorIterator(new FlexibleSchemaRecordGenerationIterator(recordsPerPartition,
minPayloadSize, schemaStr, partitionPathFieldNames));
});
return inputBatch;
}
public JavaRDD<GenericRecord> generateUpdates(Config config) throws IOException {
if (deltaOutputConfig.getDeltaOutputMode() == DeltaOutputMode.DFS) {
JavaRDD<GenericRecord> inserts = null;
if (config.getNumRecordsInsert() > 0) {
inserts = generateInserts(config);
}
DeltaInputReader deltaInputReader = null;
JavaRDD<GenericRecord> adjustedRDD = null;
if (config.getNumUpsertPartitions() < 1) {
// randomly generate updates for a given number of records without regard to partitions and files
deltaInputReader = new DFSAvroDeltaInputReader(sparkSession, schemaStr,
((DFSDeltaConfig) deltaOutputConfig).getDeltaBasePath(), Option.empty(), Option.empty());
adjustedRDD = deltaInputReader.read(config.getNumRecordsUpsert());
adjustedRDD = adjustRDDToGenerateExactNumUpdates(adjustedRDD, jsc, config.getNumRecordsUpsert());
} else {
deltaInputReader =
new DFSHoodieDatasetInputReader(jsc, ((DFSDeltaConfig) deltaOutputConfig).getDatasetOutputPath(),
schemaStr);
if (config.getFractionUpsertPerFile() > 0) {
adjustedRDD = deltaInputReader.read(config.getNumUpsertPartitions(), config.getNumUpsertFiles(),
config.getFractionUpsertPerFile());
} else {
adjustedRDD = deltaInputReader.read(config.getNumUpsertPartitions(), config.getNumUpsertFiles(), config
.getNumRecordsUpsert());
}
}
log.info("Repartitioning records");
// persist this since we will make multiple passes over this
adjustedRDD = adjustedRDD.repartition(jsc.defaultParallelism());
log.info("Repartitioning records done");
Converter converter = new UpdateConverter(schemaStr, config.getRecordSize(),
partitionPathFieldNames, recordRowKeyFieldNames);
JavaRDD<GenericRecord> updates = converter.convert(adjustedRDD);
log.info("Records converted");
updates.persist(StorageLevel.DISK_ONLY());
return inserts != null ? inserts.union(updates) : updates;
// TODO : Generate updates for only N partitions.
} else {
throw new IllegalArgumentException("Other formats are not supported at the moment");
}
}
public Map<Integer, Long> getPartitionToCountMap(JavaRDD<GenericRecord> records) {
// Requires us to keep the partitioner the same
return records.mapPartitionsWithIndex((index, itr) -> {
Iterable<GenericRecord> newIterable = () -> itr;
// parallelize counting for speed
long count = StreamSupport.stream(newIterable.spliterator(), true).count();
return Arrays.asList(new Tuple2<>(index, count)).iterator();
}, true).mapToPair(i -> i).collectAsMap();
}
public Map<Integer, Long> getAdjustedPartitionsCount(Map<Integer, Long> partitionCountMap, long
recordsToRemove) {
long remainingRecordsToRemove = recordsToRemove;
Iterator<Map.Entry<Integer, Long>> iterator = partitionCountMap.entrySet().iterator();
Map<Integer, Long> adjustedPartitionCountMap = new HashMap<>();
while (iterator.hasNext()) {
Map.Entry<Integer, Long> entry = iterator.next();
if (entry.getValue() < remainingRecordsToRemove) {
remainingRecordsToRemove -= entry.getValue();
adjustedPartitionCountMap.put(entry.getKey(), 0L);
} else {
long newValue = entry.getValue() - remainingRecordsToRemove;
remainingRecordsToRemove = 0;
adjustedPartitionCountMap.put(entry.getKey(), newValue);
}
if (remainingRecordsToRemove == 0) {
break;
}
}
return adjustedPartitionCountMap;
}
public JavaRDD<GenericRecord> adjustRDDToGenerateExactNumUpdates(JavaRDD<GenericRecord> updates, JavaSparkContext
jsc, long totalRecordsRequired) {
Map<Integer, Long> actualPartitionCountMap = getPartitionToCountMap(updates);
long totalRecordsGenerated = actualPartitionCountMap.values().stream().mapToLong(Long::longValue).sum();
if (isSafeToTake(totalRecordsRequired, totalRecordsGenerated)) {
// Generate totalRecordsRequired - totalRecordsGenerated new records and union the RDD's
// NOTE : This performs poorly when totalRecordsRequired >> totalRecordsGenerated. Hence, always
// ensure that enough inserts are created before hand (this needs to be noted during the WorkflowDag creation)
long sizeOfUpdateRDD = totalRecordsGenerated;
while (totalRecordsRequired != sizeOfUpdateRDD) {
long recordsToTake = (totalRecordsRequired - sizeOfUpdateRDD) > sizeOfUpdateRDD
? sizeOfUpdateRDD : (totalRecordsRequired - sizeOfUpdateRDD);
if ((totalRecordsRequired - sizeOfUpdateRDD) > recordsToTake && recordsToTake <= sizeOfUpdateRDD) {
updates = updates.union(updates);
sizeOfUpdateRDD *= 2;
} else {
List<GenericRecord> remainingUpdates = updates.take((int) (recordsToTake));
updates = updates.union(jsc.parallelize(remainingUpdates));
sizeOfUpdateRDD = sizeOfUpdateRDD + recordsToTake;
}
}
return updates;
} else if (totalRecordsRequired < totalRecordsGenerated) {
final Map<Integer, Long> adjustedPartitionCountMap = getAdjustedPartitionsCount(actualPartitionCountMap,
totalRecordsGenerated - totalRecordsRequired);
// limit counts across partitions to meet the exact number of updates required
JavaRDD<GenericRecord> trimmedRecords = updates.mapPartitionsWithIndex((index, itr) -> {
int counter = 1;
List<GenericRecord> entriesToKeep = new ArrayList<>();
if (!adjustedPartitionCountMap.containsKey(index)) {
return itr;
} else {
long recordsToKeepForThisPartition = adjustedPartitionCountMap.get(index);
while (counter <= recordsToKeepForThisPartition && itr.hasNext()) {
entriesToKeep.add(itr.next());
counter++;
}
return entriesToKeep.iterator();
}
}, true);
return trimmedRecords;
}
return updates;
}
private boolean isSafeToTake(long totalRecords, long totalRecordsGenerated) {
// TODO : Ensure that the difference between totalRecords and totalRecordsGenerated is not too big, if yes,
// then there are fewer number of records on disk, hence we need to find another way to generate updates when
// requiredUpdates >> insertedRecords
return totalRecords > totalRecordsGenerated;
}
}