blob: 4368fd709b236e8a266b7c7eaef6421513f8ffd7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.jmh.schemas;
import java.nio.charset.StandardCharsets;
import org.apache.beam.sdk.schemas.Factory;
import org.apache.beam.sdk.schemas.GetterBasedSchemaProvider;
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.schemas.SchemaCoder;
import org.apache.beam.sdk.schemas.SchemaRegistry;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.RowWithGetters;
import org.apache.beam.sdk.values.RowWithStorage;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.infra.Blackhole;
/**
* Bundle of rows according to the configured {@link Factory} as input for benchmarks.
*
* <p>When reading, rows are created during {@link #setup()} to exclude initialization costs from
* the measurement. To prevent unintended cache hits in {@link RowWithGetters}, a new bundle of rows
* must be generated before every invocation.
*
* <p>Setup per {@link Level#Invocation} has considerable drawbacks. Though, given that processing
* bundles of rows (n={@link #bundleSize}) takes well above 1 ms, each individual invocation can be
* adequately timestamped without risking generating wrong results.
*/
@State(Scope.Benchmark)
public class RowBundle<T> {
public enum Action {
/**
* Write field to object using {@link
* GetterBasedSchemaProvider#fromRowFunction(TypeDescriptor)}.
*
* <p>Use {@link RowWithStorage} to bypass optimizations in RowWithGetters for writes.
*/
WRITE,
/**
* Read field from {@link RowWithGetters} provided by {@link
* GetterBasedSchemaProvider#toRowFunction(TypeDescriptor)}.
*/
READ_ONCE,
/**
* Repeatedly (3x) read field from {@link RowWithGetters} provided by {@link
* GetterBasedSchemaProvider#toRowFunction(TypeDescriptor)}.
*/
READ_REPEATED
}
private static final SchemaRegistry REGISTRY = SchemaRegistry.createDefault();
private final SerializableFunction<Row, T> fromRow;
private final SerializableFunction<T, Row> toRow;
private final Row rowWithStorage;
private final T rowTarget;
private Row[] rows;
@Param("1000000")
int bundleSize;
@Param({"READ_ONCE", "READ_REPEATED", "WRITE"})
Action action;
public RowBundle() {
this(null); // unused, just to prevent warnings
}
public RowBundle(Class<T> clazz) {
try {
SchemaCoder<T> coder = REGISTRY.getSchemaCoder(clazz);
if (coder.getSchema().getFieldCount() != 1) {
throw new IllegalArgumentException("Expected class with a single field");
}
fromRow = coder.getFromRowFunction();
toRow = coder.getToRowFunction();
rowWithStorage = createRowWithStorage(coder.getSchema());
rowTarget = fromRow.apply(rowWithStorage);
} catch (NoSuchSchemaException e) {
throw new RuntimeException(e);
}
}
@Setup(Level.Invocation)
public void setup() {
// no mutable state in case of writes, skip setup
if (action == Action.WRITE) {
return;
}
if (rows == null) {
rows = new Row[bundleSize];
}
// new rows (with getters) for each invocation to prevent accidental cache hits
for (int i = 0; i < bundleSize; i++) {
rows[i] = toRow.apply(rowTarget);
}
}
/** Runs benchmark iteration on a bundle of rows. */
public void processRows(Blackhole blackhole) {
if (action == Action.READ_ONCE) {
readRowsOnce(blackhole);
} else if (action == Action.READ_REPEATED) {
readRowsRepeatedly(blackhole);
} else {
writeRows(blackhole);
}
}
/** Reads single field from row (of type {@link RowWithGetters}). */
protected void readField(Row row, Blackhole blackhole) {
blackhole.consume(row.getValue(0));
}
private void readRowsOnce(Blackhole blackhole) {
for (Row row : rows) {
readField(row, blackhole);
}
}
private void readRowsRepeatedly(Blackhole blackhole) {
for (Row row : rows) {
readField(row, blackhole);
readField(row, blackhole);
readField(row, blackhole);
}
}
private void writeRows(Blackhole blackhole) {
for (int i = 0; i < bundleSize; i++) {
blackhole.consume(fromRow.apply(rowWithStorage));
}
}
private static final Instant TODAY = DateTime.now().withTimeAtStartOfDay().toInstant();
/** Creates row of type {@link RowWithStorage} with single field matching the provided schema. */
private static Row createRowWithStorage(Schema schema) {
return RowWithStorage.withSchema(schema)
.attachValues(createValue(42, schema.getField(0).getType()));
}
private static Object createValue(int val, FieldType type) {
switch (type.getTypeName()) {
case STRING:
return String.valueOf(val);
case INT32:
return val;
case BYTES:
return String.valueOf(val).getBytes(StandardCharsets.UTF_8);
case DATETIME:
return TODAY.minus(Duration.standardHours(val));
case ROW:
return createRowWithStorage(type.getRowSchema());
case ARRAY:
case ITERABLE:
return ImmutableList.of(createValue(val, type.getCollectionElementType()));
case MAP:
return ImmutableMap.of(
createValue(val, type.getMapKeyType()), createValue(val, type.getMapValueType()));
default:
throw new RuntimeException("No value factory for type " + type);
}
}
}