blob: 5ef77b2e7b6465022500a1ac75c605bff4f6e91f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.integ.testsuite.generator;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Type;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericData.Fixed;
import org.apache.avro.generic.GenericFixed;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.common.util.collection.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This is a GenericRecord payload generator that generates full generic records {@link GenericRecord}.
* Every field of a generic record created using this generator contains a random value.
*/
public class GenericRecordFullPayloadGenerator implements Serializable {
public static final int DEFAULT_PAYLOAD_SIZE = 1024 * 10; // 10 KB
private static Logger log = LoggerFactory.getLogger(GenericRecordFullPayloadGenerator.class);
protected final Random random = new Random();
// The source schema used to generate a payload
private final transient Schema baseSchema;
// Used to validate a generic record
private final transient GenericData genericData = new GenericData();
// Number of more bytes to add based on the estimated full record payload size and min payload size
private int numberOfBytesToAdd;
// If more elements should be packed to meet the minPayloadSize
private boolean shouldAddMore;
// How many complex fields have we visited that can help us pack more entries and increase the size of the record
private int numberOfComplexFields;
// The size of a full record where every field of a generic record created contains 1 random value
private int estimatedFullPayloadSize;
// LogicalTypes in Avro 1.8.2
private static final String DECIMAL = "decimal";
private static final String UUID_NAME = "uuid";
private static final String DATE = "date";
private static final String TIME_MILLIS = "time-millis";
private static final String TIME_MICROS = "time-micros";
private static final String TIMESTAMP_MILLIS = "timestamp-millis";
private static final String TIMESTAMP_MICROS = "timestamp-micros";
public GenericRecordFullPayloadGenerator(Schema schema) {
this(schema, DEFAULT_PAYLOAD_SIZE);
}
public GenericRecordFullPayloadGenerator(Schema schema, int minPayloadSize) {
Pair<Integer, Integer> sizeInfo = new GenericRecordFullPayloadSizeEstimator(schema)
.typeEstimateAndNumComplexFields();
this.estimatedFullPayloadSize = sizeInfo.getLeft();
this.numberOfComplexFields = sizeInfo.getRight();
this.baseSchema = schema;
this.shouldAddMore = estimatedFullPayloadSize < minPayloadSize;
if (this.shouldAddMore) {
this.numberOfBytesToAdd = minPayloadSize - estimatedFullPayloadSize;
if (numberOfComplexFields < 1) {
log.warn("The schema does not have any collections/complex fields. Cannot achieve minPayloadSize : {}",
minPayloadSize);
}
}
}
protected static boolean isPrimitive(Schema localSchema) {
if (localSchema.getType() != Type.ARRAY
&& localSchema.getType() != Type.MAP
&& localSchema.getType() != Type.RECORD
&& localSchema.getType() != Type.UNION) {
return true;
} else {
return false;
}
}
/**
* Create a new {@link GenericRecord} with random value according to given schema.
*
* @return {@link GenericRecord} with random value
*/
public GenericRecord getNewPayload() {
return convert(baseSchema);
}
/**
* Update a given {@link GenericRecord} with random value. The fields in {@code blacklistFields} will not be updated.
*
* @param record GenericRecord to update
* @param blacklistFields Fields whose value should not be touched
* @return The updated {@link GenericRecord}
*/
public GenericRecord getUpdatePayload(GenericRecord record, List<String> blacklistFields) {
return randomize(record, blacklistFields);
}
/**
* Create a {@link GenericRecord} with random value according to given schema.
*
* @param schema Schema to create record with
* @return {@link GenericRecord} with random value
*/
protected GenericRecord convert(Schema schema) {
GenericRecord result = new GenericData.Record(schema);
for (Schema.Field f : schema.getFields()) {
result.put(f.name(), typeConvert(f.schema()));
}
return result;
}
/**
* Create a new {@link GenericRecord} with random values. Not all the fields have value, it is random, and its value
* is random too.
*
* @param schema Schema to create with.
* @return A {@link GenericRecord} with random value.
*/
protected GenericRecord convertPartial(Schema schema) {
GenericRecord result = new GenericData.Record(schema);
for (Schema.Field f : schema.getFields()) {
boolean setNull = random.nextBoolean();
if (!setNull) {
result.put(f.name(), typeConvert(f.schema()));
} else {
result.put(f.name(), null);
}
}
// TODO : pack remaining bytes into a complex field
return result;
}
/**
* Set random value to {@link GenericRecord} according to the schema type of field.
* The field in blacklist will not be set.
*
* @param record GenericRecord to randomize.
* @param blacklistFields blacklistFields where the filed will not be randomized.
* @return Randomized GenericRecord.
*/
protected GenericRecord randomize(GenericRecord record, List<String> blacklistFields) {
for (Schema.Field f : record.getSchema().getFields()) {
if (blacklistFields == null || !blacklistFields.contains(f.name())) {
record.put(f.name(), typeConvert(f.schema()));
}
}
return record;
}
/**
* Generate random value according to their type.
*/
private Object typeConvert(Schema schema) {
Schema localSchema = schema;
if (isOption(schema)) {
localSchema = getNonNull(schema);
}
switch (localSchema.getType()) {
case BOOLEAN:
return random.nextBoolean();
case DOUBLE:
return random.nextDouble();
case FLOAT:
return random.nextFloat();
case INT:
return random.nextInt();
case LONG:
return random.nextLong();
case STRING:
return UUID.randomUUID().toString();
case ENUM:
List<String> enumSymbols = localSchema.getEnumSymbols();
return new GenericData.EnumSymbol(localSchema, enumSymbols.get(random.nextInt(enumSymbols.size() - 1)));
case RECORD:
return convert(localSchema);
case ARRAY:
Schema elementSchema = localSchema.getElementType();
List listRes = new ArrayList();
if (isPrimitive(elementSchema) && this.shouldAddMore) {
int numEntriesToAdd = numEntriesToAdd(elementSchema);
while (numEntriesToAdd > 0) {
listRes.add(typeConvert(elementSchema));
numEntriesToAdd--;
}
} else {
listRes.add(typeConvert(elementSchema));
}
return listRes;
case MAP:
Schema valueSchema = localSchema.getValueType();
Map<String, Object> mapRes = new HashMap<String, Object>();
if (isPrimitive(valueSchema) && this.shouldAddMore) {
int numEntriesToAdd = numEntriesToAdd(valueSchema);
while (numEntriesToAdd > 0) {
mapRes.put(UUID.randomUUID().toString(), typeConvert(valueSchema));
numEntriesToAdd--;
}
} else {
mapRes.put(UUID.randomUUID().toString(), typeConvert(valueSchema));
}
return mapRes;
case BYTES:
return ByteBuffer.wrap(UUID.randomUUID().toString().getBytes(Charset.defaultCharset()));
case FIXED:
return generateFixedType(localSchema);
default:
throw new IllegalArgumentException(
"Cannot handle type: " + localSchema.getType());
}
}
private Object generateFixedType(Schema localSchema) {
// TODO: Need to implement valid data generation for fixed type
GenericFixed genericFixed = new GenericData.Fixed(localSchema);
switch (localSchema.getLogicalType().getName()) {
case UUID_NAME:
((Fixed) genericFixed).bytes(UUID.randomUUID().toString().getBytes());
return genericFixed;
case DECIMAL:
return genericFixed;
case DATE:
return genericFixed;
case TIME_MILLIS:
return genericFixed;
default:
throw new IllegalArgumentException(
"Cannot handle type: " + localSchema.getLogicalType());
}
}
/**
* Validate whether the record match schema.
*
* @param record Record to validate.
* @return True if matches.
*/
public boolean validate(GenericRecord record) {
return genericData.validate(baseSchema, record);
}
/**
* Check whether a schema is option.
* return true if it match the follows:
* 1. Its type is Type.UNION
* 2. Has two types
* 3. Has a NULL type.
*
* @param schema
* @return
*/
protected boolean isOption(Schema schema) {
return schema.getType().equals(Schema.Type.UNION)
&& schema.getTypes().size() == 2
&& (schema.getTypes().get(0).getType().equals(Schema.Type.NULL)
|| schema.getTypes().get(1).getType().equals(Schema.Type.NULL));
}
protected Schema getNonNull(Schema schema) {
List<Schema> types = schema.getTypes();
return types.get(0).getType().equals(Schema.Type.NULL) ? types.get(1) : types.get(0);
}
public int getEstimatedFullPayloadSize() {
return estimatedFullPayloadSize;
}
private int getSize(Schema elementSchema) {
switch (elementSchema.getType()) {
case BOOLEAN:
return 1;
case DOUBLE:
return Double.BYTES;
case FLOAT:
return Float.BYTES;
case INT:
return Integer.BYTES;
case LONG:
return Long.BYTES;
case STRING:
return UUID.randomUUID().toString().length();
case ENUM:
return 1;
case BYTES:
return UUID.randomUUID().toString().length();
case FIXED:
return elementSchema.getFixedSize();
default:
throw new RuntimeException("Unknown type " + elementSchema.getType());
}
}
/**
* Method help to calculate the number of entries to add.
*
* @param elementSchema
* @return Number of entries to add
*/
private int numEntriesToAdd(Schema elementSchema) {
// Find the size of the primitive data type in bytes
int primitiveDataTypeSize = getSize(elementSchema);
int numEntriesToAdd = numberOfBytesToAdd / primitiveDataTypeSize;
// If more than 10 entries are being added for this same complex field and there are still more complex fields to
// be visited in the schema, reduce the number of entries to add by a factor of 10 to allow for other complex
// fields to pack some entries
if (numEntriesToAdd % 10 > 0 && this.numberOfComplexFields > 1) {
numEntriesToAdd = numEntriesToAdd / 10;
numberOfBytesToAdd -= numEntriesToAdd * primitiveDataTypeSize;
this.shouldAddMore = true;
} else {
this.numberOfBytesToAdd = 0;
this.shouldAddMore = false;
}
this.numberOfComplexFields -= 1;
return numEntriesToAdd;
}
}