blob: 8087987d6e8f54ee26ce0de7d7283d74dd6279e3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.benchmark;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple8;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.benchmark.full.StringSerializationBenchmark;
import org.apache.flink.benchmark.functions.BaseSourceWithKeyRange;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.types.Row;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.OperationsPerInvocation;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;
import org.openjdk.jmh.runner.options.VerboseMode;
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;
/**
* Benchmark for serializing POJOs and Tuples with different serialization frameworks.
*/
public class SerializationFrameworkMiniBenchmarks extends BenchmarkBase {
protected static final int RECORDS_PER_INVOCATION = 300_000;
public static void main(String[] args) throws RunnerException {
Options options = new OptionsBuilder()
.verbosity(VerboseMode.NORMAL)
.include(".*" + SerializationFrameworkMiniBenchmarks.class.getCanonicalName() + ".*")
.build();
new Runner(options).run();
}
@Benchmark
@OperationsPerInvocation(value = SerializationFrameworkMiniBenchmarks.RECORDS_PER_INVOCATION)
public void serializerPojo(FlinkEnvironmentContext context) throws Exception {
StreamExecutionEnvironment env = context.env;
env.setParallelism(4);
ExecutionConfig executionConfig = env.getConfig();
executionConfig.registerPojoType(MyPojo.class);
executionConfig.registerPojoType(MyOperation.class);
env.addSource(new PojoSource(RECORDS_PER_INVOCATION, 10))
.rebalance()
.addSink(new DiscardingSink<>());
env.execute();
}
@Benchmark
@OperationsPerInvocation(value = SerializationFrameworkMiniBenchmarks.RECORDS_PER_INVOCATION)
public void serializerHeavyString(FlinkEnvironmentContext context) throws Exception {
StreamExecutionEnvironment env = context.env;
env.setParallelism(1);
ExecutionConfig executionConfig = env.getConfig();
executionConfig.registerPojoType(MyPojo.class);
executionConfig.registerPojoType(MyOperation.class);
env.addSource(new LongStringSource(RECORDS_PER_INVOCATION, 12))
.rebalance()
.addSink(new DiscardingSink<>());
env.execute();
}
@Benchmark
@OperationsPerInvocation(value = SerializationFrameworkMiniBenchmarks.RECORDS_PER_INVOCATION)
public void serializerTuple(FlinkEnvironmentContext context) throws Exception {
StreamExecutionEnvironment env = context.env;
env.setParallelism(4);
env.addSource(new TupleSource(RECORDS_PER_INVOCATION, 10))
.rebalance()
.addSink(new DiscardingSink<>());
env.execute();
}
@Benchmark
@OperationsPerInvocation(value = SerializationFrameworkMiniBenchmarks.RECORDS_PER_INVOCATION)
public void serializerKryo(FlinkEnvironmentContext context) throws Exception {
StreamExecutionEnvironment env = context.env;
env.setParallelism(4);
ExecutionConfig executionConfig = env.getConfig();
executionConfig.enableForceKryo();
executionConfig.registerKryoType(MyPojo.class);
executionConfig.registerKryoType(MyOperation.class);
env.addSource(new PojoSource(RECORDS_PER_INVOCATION, 10))
.rebalance()
.addSink(new DiscardingSink<>());
env.execute();
}
@Benchmark
@OperationsPerInvocation(value = SerializationFrameworkMiniBenchmarks.RECORDS_PER_INVOCATION)
public void serializerAvro(FlinkEnvironmentContext context) throws Exception {
StreamExecutionEnvironment env = context.env;
env.setParallelism(4);
env.addSource(new AvroPojoSource(RECORDS_PER_INVOCATION, 10))
.rebalance()
.addSink(new DiscardingSink<>());
env.execute();
}
@Benchmark
@OperationsPerInvocation(value = SerializationFrameworkMiniBenchmarks.RECORDS_PER_INVOCATION)
public void serializerRow(FlinkEnvironmentContext context) throws Exception {
StreamExecutionEnvironment env = context.env;
env.setParallelism(4);
env.addSource(new RowSource(RECORDS_PER_INVOCATION, 10))
.rebalance()
.addSink(new DiscardingSink<>());
env.execute();
}
/**
* Source emitting a long String.
*/
public static class LongStringSource extends BaseSourceWithKeyRange<String> {
private static final long serialVersionUID = 3746240885982877398L;
private String[] templates;
public LongStringSource(int numEvents, int numKeys) {
super(numEvents, numKeys);
}
@Override
protected void init() {
super.init();
templates = new String[] {
makeString(StringSerializationBenchmark.asciiChars, 1024),
makeString(StringSerializationBenchmark.russianChars, 1024),
makeString(StringSerializationBenchmark.chineseChars, 1024)
};
}
private String makeString(char[] symbols, int length) {
char[] buffer = new char[length];
Random random = ThreadLocalRandom.current();
Arrays.fill(buffer, symbols[random.nextInt(symbols.length)]);
return new String(buffer);
}
@Override
protected String getElement(int keyId) {
return templates[keyId % templates.length];
}
}
/**
* Source emitting a simple {@link MyPojo POJO}.
*/
public static class PojoSource extends BaseSourceWithKeyRange<MyPojo> {
private static final long serialVersionUID = 2941333602938145526L;
private transient MyPojo template;
public PojoSource(int numEvents, int numKeys) {
super(numEvents, numKeys);
}
@Override
protected void init() {
super.init();
template = new MyPojo(
0,
"myName",
new String[] {"op1", "op2", "op3", "op4"},
new MyOperation[] {
new MyOperation(1, "op1"),
new MyOperation(2, "op2"),
new MyOperation(3, "op3")},
1,
2,
3,
"null");
}
@Override
protected MyPojo getElement(int keyId) {
template.setId(keyId);
return template;
}
}
/**
* Source emitting a {@link org.apache.flink.benchmark.avro.MyPojo POJO} generated by an Avro schema.
*/
public static class AvroPojoSource extends BaseSourceWithKeyRange<org.apache.flink.benchmark.avro.MyPojo> {
private static final long serialVersionUID = 2941333602938145526L;
private transient org.apache.flink.benchmark.avro.MyPojo template;
public AvroPojoSource(int numEvents, int numKeys) {
super(numEvents, numKeys);
}
@Override
protected void init() {
super.init();
template = new org.apache.flink.benchmark.avro.MyPojo(
0,
"myName",
Arrays.asList("op1", "op2", "op3", "op4"),
Arrays.asList(
new org.apache.flink.benchmark.avro.MyOperation(1, "op1"),
new org.apache.flink.benchmark.avro.MyOperation(2, "op2"),
new org.apache.flink.benchmark.avro.MyOperation(3, "op3")),
1,
2,
3,
"null");
}
@Override
protected org.apache.flink.benchmark.avro.MyPojo getElement(int keyId) {
template.setId(keyId);
return template;
}
}
/**
* Source emitting a <tt>Tuple</tt> based on {@link MyPojo}.
*/
public static class TupleSource extends BaseSourceWithKeyRange<Tuple8<Integer, String, String[], Tuple2<Integer, String>[], Integer, Integer, Integer, Object>> {
private static final long serialVersionUID = 2941333602938145526L;
private transient Tuple8 template;
public TupleSource(int numEvents, int numKeys) {
super(numEvents, numKeys);
}
@SuppressWarnings("unchecked")
@Override
protected void init() {
super.init();
template = MyPojo.createTuple(
0,
"myName",
new String[] {"op1", "op2", "op3", "op4"},
new Tuple2[] {
MyOperation.createTuple(1, "op1"),
MyOperation.createTuple(2, "op2"),
MyOperation.createTuple(3, "op3")},
1,
2,
3,
"null");
}
@Override
protected Tuple8<Integer, String, String[], Tuple2<Integer, String>[], Integer, Integer, Integer, Object> getElement(int keyId) {
template.setField(keyId, 0);
return template;
}
}
/**
* Source emitting a {@link Row} based on {@link MyPojo}.
*/
public static class RowSource extends BaseSourceWithKeyRange<Row> implements ResultTypeQueryable<Row> {
private static final long serialVersionUID = 2941333602938145526L;
private transient Row template;
public RowSource(int numEvents, int numKeys) {
super(numEvents, numKeys);
}
@SuppressWarnings("unchecked")
@Override
protected void init() {
super.init();
template = MyPojo.createRow(
0,
"myName",
new String[] {"op1", "op2", "op3", "op4"},
new Row[] {
MyOperation.createRow(1, "op1"),
MyOperation.createRow(2, "op2"),
MyOperation.createRow(3, "op3")},
1,
2,
3,
"null");
}
@Override
protected Row getElement(int keyId) {
template.setField(0, keyId);
return template;
}
@Override
public TypeInformation<Row> getProducedType() {
return MyPojo.getProducedRowType();
}
}
/**
* Not so simple POJO.
*/
@SuppressWarnings({"WeakerAccess", "unused"})
public static class MyPojo {
public int id;
private String name;
private String[] operationNames;
private MyOperation[] operations;
private int otherId1;
private int otherId2;
private int otherId3;
private Object someObject;
public MyPojo() {
}
public MyPojo(
int id,
String name,
String[] operationNames,
MyOperation[] operations,
int otherId1,
int otherId2,
int otherId3,
Object someObject) {
this.id = id;
this.name = name;
this.operationNames = operationNames;
this.operations = operations;
this.otherId1 = otherId1;
this.otherId2 = otherId2;
this.otherId3 = otherId3;
this.someObject = someObject;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String[] getOperationNames() {
return operationNames;
}
public void setOperationNames(String[] operationNames) {
this.operationNames = operationNames;
}
public MyOperation[] getOperations() {
return operations;
}
public void setOperations(
MyOperation[] operations) {
this.operations = operations;
}
public int getOtherId1() {
return otherId1;
}
public void setOtherId1(int otherId1) {
this.otherId1 = otherId1;
}
public int getOtherId2() {
return otherId2;
}
public void setOtherId2(int otherId2) {
this.otherId2 = otherId2;
}
public int getOtherId3() {
return otherId3;
}
public void setOtherId3(int otherId3) {
this.otherId3 = otherId3;
}
public Object getSomeObject() {
return someObject;
}
public void setSomeObject(Object someObject) {
this.someObject = someObject;
}
public static Tuple8<Integer, String, String[], Tuple2<Integer, String>[], Integer, Integer, Integer, Object> createTuple(
int id,
String name,
String[] operationNames,
Tuple2<Integer, String>[] operations,
int otherId1,
int otherId2,
int otherId3,
Object someObject) {
return Tuple8.of(id, name, operationNames, operations, otherId1, otherId2, otherId3, someObject);
}
public static Row createRow(
int id,
String name,
String[] operationNames,
Row[] operations,
int otherId1,
int otherId2,
int otherId3,
Object someObject) {
return Row.of(id, name, operationNames, operations, otherId1, otherId2, otherId3, someObject);
}
public static TypeInformation<Row> getProducedRowType() {
return Types.ROW(
Types.INT,
Types.STRING,
Types.OBJECT_ARRAY(Types.STRING),
Types.OBJECT_ARRAY(Types.ROW(Types.INT, Types.STRING)),
Types.INT,
Types.INT,
Types.INT,
Types.GENERIC(Object.class)
);
}
}
/**
* Another POJO.
*/
@SuppressWarnings({"WeakerAccess", "unused"})
public static class MyOperation {
int id;
protected String name;
public MyOperation() {
}
public MyOperation(int id, String name) {
this.id = id;
this.name = name;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public static Tuple2<Integer, String> createTuple(int id, String name) {
return Tuple2.of(id, name);
}
public static Row createRow(int id, String name) {
return Row.of(id, name);
}
}
}