blob: 6d53c2fadc71e664447e5eb88b58b6b88aca6a05 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.test.runtime;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.reader.RecordReader;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.io.network.api.writer.RecordWriterBuilder;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
/** Manually test the throughput of the network stack. */
public class NetworkStackThroughputITCase extends TestLogger {
private static final Logger LOG = LoggerFactory.getLogger(NetworkStackThroughputITCase.class);
private static final String DATA_VOLUME_GB_CONFIG_KEY = "data.volume.gb";
private static final String IS_SLOW_SENDER_CONFIG_KEY = "is.slow.sender";
private static final String IS_SLOW_RECEIVER_CONFIG_KEY = "is.slow.receiver";
private static final int IS_SLOW_SLEEP_MS = 10;
private static final int IS_SLOW_EVERY_NUM_RECORDS =
(2 * 32 * 1024) / SpeedTestRecord.RECORD_SIZE;
// ------------------------------------------------------------------------
/**
* Invokable that produces records and allows slowdown via {@link #IS_SLOW_EVERY_NUM_RECORDS}
* and {@link #IS_SLOW_SENDER_CONFIG_KEY} and creates records of different data sizes via {@link
* #DATA_VOLUME_GB_CONFIG_KEY}.
*
* <p>NOTE: needs to be <tt>public</tt> so that a task can be run with this!
*/
public static class SpeedTestProducer extends AbstractInvokable {
public SpeedTestProducer(Environment environment) {
super(environment);
}
@Override
public void invoke() throws Exception {
RecordWriter<SpeedTestRecord> writer =
new RecordWriterBuilder<SpeedTestRecord>().build(getEnvironment().getWriter(0));
try {
// Determine the amount of data to send per subtask
int dataVolumeGb =
getTaskConfiguration()
.getInteger(
NetworkStackThroughputITCase.DATA_VOLUME_GB_CONFIG_KEY, 1);
long dataMbPerSubtask = (dataVolumeGb * 10) / getCurrentNumberOfSubtasks();
long numRecordsToEmit =
(dataMbPerSubtask * 1024 * 1024) / SpeedTestRecord.RECORD_SIZE;
LOG.info(
String.format(
"%d/%d: Producing %d records (each record: %d bytes, total: %.2f GB)",
getIndexInSubtaskGroup() + 1,
getCurrentNumberOfSubtasks(),
numRecordsToEmit,
SpeedTestRecord.RECORD_SIZE,
dataMbPerSubtask / 1024.0));
boolean isSlow =
getTaskConfiguration().getBoolean(IS_SLOW_SENDER_CONFIG_KEY, false);
int numRecords = 0;
SpeedTestRecord record = new SpeedTestRecord();
for (long i = 0; i < numRecordsToEmit; i++) {
if (isSlow && (numRecords++ % IS_SLOW_EVERY_NUM_RECORDS) == 0) {
Thread.sleep(IS_SLOW_SLEEP_MS);
}
writer.emit(record);
}
} finally {
writer.close();
writer.flushAll();
}
}
}
/**
* Invokable that forwards incoming records.
*
* <p>NOTE: needs to be <tt>public</tt> so that a task can be run with this!
*/
public static class SpeedTestForwarder extends AbstractInvokable {
public SpeedTestForwarder(Environment environment) {
super(environment);
}
@Override
public void invoke() throws Exception {
RecordReader<SpeedTestRecord> reader =
new RecordReader<>(
getEnvironment().getInputGate(0),
SpeedTestRecord.class,
getEnvironment().getTaskManagerInfo().getTmpDirectories());
RecordWriter<SpeedTestRecord> writer =
new RecordWriterBuilder<SpeedTestRecord>().build(getEnvironment().getWriter(0));
try {
SpeedTestRecord record;
while ((record = reader.next()) != null) {
writer.emit(record);
}
} finally {
reader.clearBuffers();
writer.close();
writer.flushAll();
}
}
}
/**
* Invokable that consumes incoming records and allows slowdown via {@link
* #IS_SLOW_EVERY_NUM_RECORDS}.
*
* <p>NOTE: needs to be <tt>public</tt> so that a task can be run with this!
*/
public static class SpeedTestConsumer extends AbstractInvokable {
public SpeedTestConsumer(Environment environment) {
super(environment);
}
@Override
public void invoke() throws Exception {
RecordReader<SpeedTestRecord> reader =
new RecordReader<>(
getEnvironment().getInputGate(0),
SpeedTestRecord.class,
getEnvironment().getTaskManagerInfo().getTmpDirectories());
try {
boolean isSlow =
getTaskConfiguration().getBoolean(IS_SLOW_RECEIVER_CONFIG_KEY, false);
int numRecords = 0;
while (reader.next() != null) {
if (isSlow && (numRecords++ % IS_SLOW_EVERY_NUM_RECORDS) == 0) {
Thread.sleep(IS_SLOW_SLEEP_MS);
}
}
} finally {
reader.clearBuffers();
}
}
}
/**
* Record type for the speed test.
*
* <p>NOTE: needs to be <tt>public</tt> to allow deserialization!
*/
public static class SpeedTestRecord implements IOReadableWritable {
private static final int RECORD_SIZE = 128;
private final byte[] buf = new byte[RECORD_SIZE];
public SpeedTestRecord() {
for (int i = 0; i < RECORD_SIZE; ++i) {
this.buf[i] = (byte) (i % 128);
}
}
@Override
public void write(DataOutputView out) throws IOException {
out.write(this.buf);
}
@Override
public void read(DataInputView in) throws IOException {
in.readFully(this.buf);
}
}
// ------------------------------------------------------------------------
@Test
public void testThroughput() throws Exception {
Object[][] configParams =
new Object[][] {
new Object[] {1, false, false, false, 4, 2},
new Object[] {1, true, false, false, 4, 2},
new Object[] {1, true, true, false, 4, 2},
new Object[] {1, true, false, true, 4, 2},
new Object[] {2, true, false, false, 4, 2},
new Object[] {4, true, false, false, 4, 2},
new Object[] {4, true, false, false, 8, 4},
};
for (Object[] p : configParams) {
final int dataVolumeGb = (Integer) p[0];
final boolean useForwarder = (Boolean) p[1];
final boolean isSlowSender = (Boolean) p[2];
final boolean isSlowReceiver = (Boolean) p[3];
final int parallelism = (Integer) p[4];
final int numSlotsPerTaskManager = (Integer) p[5];
if (parallelism % numSlotsPerTaskManager != 0) {
throw new RuntimeException(
"The test case defines a parallelism that is not a multiple of the slots per task manager.");
}
final int numTaskManagers = parallelism / numSlotsPerTaskManager;
final MiniClusterWithClientResource cluster =
new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setNumberTaskManagers(numTaskManagers)
.setNumberSlotsPerTaskManager(numSlotsPerTaskManager)
.build());
cluster.before();
try {
System.out.println(
String.format(
"Running test with parameters: dataVolumeGB=%s, useForwarder=%s, isSlowSender=%s, isSlowReceiver=%s, parallelism=%s, numSlotsPerTM=%s",
dataVolumeGb,
useForwarder,
isSlowSender,
isSlowReceiver,
parallelism,
numSlotsPerTaskManager));
testProgram(
cluster,
dataVolumeGb,
useForwarder,
isSlowSender,
isSlowReceiver,
parallelism);
} finally {
cluster.after();
}
}
}
private void testProgram(
final MiniClusterWithClientResource cluster,
final int dataVolumeGb,
final boolean useForwarder,
final boolean isSlowSender,
final boolean isSlowReceiver,
final int parallelism)
throws Exception {
final ClusterClient<?> client = cluster.getClusterClient();
final JobGraph jobGraph =
createJobGraph(
dataVolumeGb, useForwarder, isSlowSender, isSlowReceiver, parallelism);
final JobResult jobResult =
client.submitJob(jobGraph).thenCompose(client::requestJobResult).get();
Assert.assertFalse(jobResult.getSerializedThrowable().isPresent());
final long dataVolumeMbit = dataVolumeGb * 8192;
final long runtimeSecs =
TimeUnit.SECONDS.convert(jobResult.getNetRuntime(), TimeUnit.MILLISECONDS);
final int mbitPerSecond = (int) (((double) dataVolumeMbit) / runtimeSecs);
LOG.info(
String.format(
"Test finished with throughput of %d MBit/s (runtime [secs]: %d, "
+ "data volume [gb/mbits]: %d/%d)",
mbitPerSecond, runtimeSecs, dataVolumeGb, dataVolumeMbit));
}
private JobGraph createJobGraph(
int dataVolumeGb,
boolean useForwarder,
boolean isSlowSender,
boolean isSlowReceiver,
int numSubtasks) {
SlotSharingGroup sharingGroup = new SlotSharingGroup();
final List<JobVertex> jobVertices = new ArrayList<>();
JobVertex producer = new JobVertex("Speed Test Producer");
producer.setSlotSharingGroup(sharingGroup);
producer.setInvokableClass(SpeedTestProducer.class);
producer.setParallelism(numSubtasks);
producer.getConfiguration().setInteger(DATA_VOLUME_GB_CONFIG_KEY, dataVolumeGb);
producer.getConfiguration().setBoolean(IS_SLOW_SENDER_CONFIG_KEY, isSlowSender);
jobVertices.add(producer);
JobVertex forwarder = null;
if (useForwarder) {
forwarder = new JobVertex("Speed Test Forwarder");
forwarder.setSlotSharingGroup(sharingGroup);
forwarder.setInvokableClass(SpeedTestForwarder.class);
forwarder.setParallelism(numSubtasks);
jobVertices.add(forwarder);
}
JobVertex consumer = new JobVertex("Speed Test Consumer");
consumer.setSlotSharingGroup(sharingGroup);
consumer.setInvokableClass(SpeedTestConsumer.class);
consumer.setParallelism(numSubtasks);
consumer.getConfiguration().setBoolean(IS_SLOW_RECEIVER_CONFIG_KEY, isSlowReceiver);
jobVertices.add(consumer);
if (useForwarder) {
forwarder.connectNewDataSetAsInput(
producer, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
consumer.connectNewDataSetAsInput(
forwarder, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
} else {
consumer.connectNewDataSetAsInput(
producer, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
}
return JobGraphTestUtils.streamingJobGraph(jobVertices.toArray(new JobVertex[0]));
}
public static void main(String[] args) throws Exception {
new NetworkStackThroughputITCase().testThroughput();
System.out.println("Done.");
}
}