blob: 92eb524d01a83d35a6ff53f0b908bec182456f79 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.kafka;
import static org.apache.beam.sdk.io.synthetic.SyntheticOptions.fromJsonString;
import com.google.cloud.Timestamp;
import java.io.IOException;
import java.util.Arrays;
import java.util.Set;
import java.util.UUID;
import java.util.function.BiFunction;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.common.HashingFn;
import org.apache.beam.sdk.io.common.IOITHelper;
import org.apache.beam.sdk.io.common.IOTestPipelineOptions;
import org.apache.beam.sdk.io.synthetic.SyntheticBoundedSource;
import org.apache.beam.sdk.io.synthetic.SyntheticSourceOptions;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testutils.NamedTestResult;
import org.apache.beam.sdk.testutils.metrics.IOITMetrics;
import org.apache.beam.sdk.testutils.metrics.MetricsReader;
import org.apache.beam.sdk.testutils.metrics.TimeMonitor;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.joda.time.Duration;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/**
* IO Integration test for {@link org.apache.beam.sdk.io.kafka.KafkaIO}.
*
* <p>{@see https://beam.apache.org/documentation/io/testing/#i-o-transform-integration-tests} for
* more details.
*
* <p>NOTE: This test sets retention policy of the messages so that all messages are retained in the
* topic so that we could read them back after writing.
*/
@RunWith(JUnit4.class)
public class KafkaIOIT {
private static final String READ_TIME_METRIC_NAME = "read_time";
private static final String WRITE_TIME_METRIC_NAME = "write_time";
private static final String RUN_TIME_METRIC_NAME = "run_time";
private static final String NAMESPACE = KafkaIOIT.class.getName();
private static final String TEST_ID = UUID.randomUUID().toString();
private static final String TIMESTAMP = Timestamp.now().toString();
/** Hash for 1000 uniformly distributed records with 10B keys and 90B values (100kB total). */
private static final String EXPECTED_HASHCODE = "4507649971ee7c51abbb446e65a5c660";
private static SyntheticSourceOptions sourceOptions;
private static Options options;
@Rule public TestPipeline writePipeline = TestPipeline.create();
@Rule public TestPipeline readPipeline = TestPipeline.create();
@BeforeClass
public static void setup() throws IOException {
options = IOITHelper.readIOTestPipelineOptions(Options.class);
sourceOptions = fromJsonString(options.getSourceOptions(), SyntheticSourceOptions.class);
}
@Test
public void testKafkaIOReadsAndWritesCorrectly() throws IOException {
writePipeline
.apply("Generate records", Read.from(new SyntheticBoundedSource(sourceOptions)))
.apply("Measure write time", ParDo.of(new TimeMonitor<>(NAMESPACE, WRITE_TIME_METRIC_NAME)))
.apply("Write to Kafka", writeToKafka());
PCollection<String> hashcode =
readPipeline
.apply("Read from Kafka", readFromKafka())
.apply(
"Measure read time", ParDo.of(new TimeMonitor<>(NAMESPACE, READ_TIME_METRIC_NAME)))
.apply("Map records to strings", MapElements.via(new MapKafkaRecordsToStrings()))
.apply("Calculate hashcode", Combine.globally(new HashingFn()).withoutDefaults());
PAssert.thatSingleton(hashcode).isEqualTo(EXPECTED_HASHCODE);
PipelineResult writeResult = writePipeline.run();
writeResult.waitUntilFinish();
PipelineResult readResult = readPipeline.run();
PipelineResult.State readState =
readResult.waitUntilFinish(Duration.standardSeconds(options.getReadTimeout()));
cancelIfNotTerminal(readResult, readState);
Set<NamedTestResult> metrics = readMetrics(writeResult, readResult);
IOITMetrics.publish(
TEST_ID, TIMESTAMP, options.getBigQueryDataset(), options.getBigQueryTable(), metrics);
}
private Set<NamedTestResult> readMetrics(PipelineResult writeResult, PipelineResult readResult) {
BiFunction<MetricsReader, String, NamedTestResult> supplier =
(reader, metricName) -> {
long start = reader.getStartTimeMetric(metricName);
long end = reader.getEndTimeMetric(metricName);
return NamedTestResult.create(TEST_ID, TIMESTAMP, metricName, (end - start) / 1e3);
};
NamedTestResult writeTime =
supplier.apply(new MetricsReader(writeResult, NAMESPACE), WRITE_TIME_METRIC_NAME);
NamedTestResult readTime =
supplier.apply(new MetricsReader(readResult, NAMESPACE), READ_TIME_METRIC_NAME);
NamedTestResult runTime =
NamedTestResult.create(
TEST_ID, TIMESTAMP, RUN_TIME_METRIC_NAME, writeTime.getValue() + readTime.getValue());
return ImmutableSet.of(readTime, writeTime, runTime);
}
private void cancelIfNotTerminal(PipelineResult readResult, PipelineResult.State readState)
throws IOException {
if (!readState.isTerminal()) {
readResult.cancel();
}
}
private KafkaIO.Write<byte[], byte[]> writeToKafka() {
return KafkaIO.<byte[], byte[]>write()
.withBootstrapServers(options.getKafkaBootstrapServerAddress())
.withTopic(options.getKafkaTopic())
.withKeySerializer(ByteArraySerializer.class)
.withValueSerializer(ByteArraySerializer.class);
}
private KafkaIO.Read<byte[], byte[]> readFromKafka() {
return KafkaIO.readBytes()
.withBootstrapServers(options.getKafkaBootstrapServerAddress())
.withConsumerConfigUpdates(ImmutableMap.of("auto.offset.reset", "earliest"))
.withTopic(options.getKafkaTopic())
.withMaxNumRecords(sourceOptions.numRecords);
}
/** Pipeline options specific for this test. */
public interface Options extends IOTestPipelineOptions, StreamingOptions {
@Description("Options for synthetic source.")
@Validation.Required
String getSourceOptions();
void setSourceOptions(String sourceOptions);
@Description("Kafka server address")
@Validation.Required
String getKafkaBootstrapServerAddress();
void setKafkaBootstrapServerAddress(String address);
@Description("Kafka topic")
@Validation.Required
String getKafkaTopic();
void setKafkaTopic(String topic);
@Description("Time to wait for the events to be processed by the read pipeline (in seconds)")
@Validation.Required
Integer getReadTimeout();
void setReadTimeout(Integer readTimeout);
}
private static class MapKafkaRecordsToStrings
extends SimpleFunction<KafkaRecord<byte[], byte[]>, String> {
@Override
public String apply(KafkaRecord<byte[], byte[]> input) {
String key = Arrays.toString(input.getKV().getKey());
String value = Arrays.toString(input.getKV().getValue());
return String.format("%s %s", key, value);
}
}
}