blob: d9ae8be3462b03073ea4b8dd79aa330f273ce55f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.statefun.e2e.sanity;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import java.util.Collections;
import java.util.Properties;
import org.apache.flink.statefun.e2e.common.StatefulFunctionsAppContainers;
import org.apache.flink.statefun.e2e.common.kafka.KafkaIOVerifier;
import org.apache.flink.statefun.e2e.common.kafka.KafkaProtobufSerializer;
import org.apache.flink.statefun.e2e.sanity.generated.VerificationMessages.Command;
import org.apache.flink.statefun.e2e.sanity.generated.VerificationMessages.FnAddress;
import org.apache.flink.statefun.e2e.sanity.generated.VerificationMessages.Modify;
import org.apache.flink.statefun.e2e.sanity.generated.VerificationMessages.Noop;
import org.apache.flink.statefun.e2e.sanity.generated.VerificationMessages.Send;
import org.apache.flink.statefun.e2e.sanity.generated.VerificationMessages.StateSnapshot;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.Rule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.KafkaContainer;
/**
* Sanity verification end-to-end test based on the {@link SanityVerificationModule} application.
*
* <p>The test setups Kafka brokers and the verification application using Docker, sends a few
* commands to Kafka to be consumed by the application, and finally verifies that outputs sent to
* Kafka from the application are correct.
*/
public class SanityVerificationE2E {
private static final Logger LOG = LoggerFactory.getLogger(SanityVerificationE2E.class);
private static final String CONFLUENT_PLATFORM_VERSION = "5.0.3";
private static final String KAFKA_HOST = "kafka-broker";
@Rule
public KafkaContainer kafka =
new KafkaContainer(CONFLUENT_PLATFORM_VERSION).withNetworkAliases(KAFKA_HOST);
@Rule
public StatefulFunctionsAppContainers verificationApp =
StatefulFunctionsAppContainers.builder("sanity-verification", 2)
.dependsOn(kafka)
.exposeLogs(LOG)
.withModuleGlobalConfiguration(
Constants.KAFKA_BOOTSTRAP_SERVERS_CONF, KAFKA_HOST + ":9092")
.build();
@Test(timeout = 60_000L)
public void run() throws Exception {
final String kafkaAddress = kafka.getBootstrapServers();
final Producer<FnAddress, Command> commandProducer = kafkaCommandProducer(kafkaAddress);
final Consumer<FnAddress, StateSnapshot> stateSnapshotConsumer =
kafkaStateSnapshotConsumer(kafkaAddress);
final KafkaIOVerifier<FnAddress, Command, FnAddress, StateSnapshot> verifier =
new KafkaIOVerifier<>(commandProducer, stateSnapshotConsumer);
assertThat(
verifier.sending(
producerRecord(modifyAction(fnAddress(0, "id-1"), 100)),
producerRecord(modifyAction(fnAddress(0, "id-2"), 300)),
producerRecord(modifyAction(fnAddress(1, "id-3"), 200)),
producerRecord(
sendAction(fnAddress(1, "id-2"), modifyAction(fnAddress(0, "id-2"), 50))),
producerRecord(sendAction(fnAddress(0, "id-1"), noOpAction(fnAddress(1, "id-1"))))),
verifier.resultsInOrder(
is(stateSnapshot(fnAddress(0, "id-1"), 100)),
is(stateSnapshot(fnAddress(0, "id-2"), 300)),
is(stateSnapshot(fnAddress(1, "id-3"), 200)),
is(stateSnapshot(fnAddress(0, "id-2"), 350))));
}
// =================================================================================
// Kafka IO utility methods
// =================================================================================
private static Producer<FnAddress, Command> kafkaCommandProducer(String bootstrapServers) {
Properties props = new Properties();
props.put("bootstrap.servers", bootstrapServers);
return new KafkaProducer<>(
props,
new KafkaProtobufSerializer<>(FnAddress.parser()),
new KafkaProtobufSerializer<>(Command.parser()));
}
private static Consumer<FnAddress, StateSnapshot> kafkaStateSnapshotConsumer(
String bootstrapServers) {
Properties consumerProps = new Properties();
consumerProps.setProperty("bootstrap.servers", bootstrapServers);
consumerProps.setProperty("group.id", "sanity-itcase");
consumerProps.setProperty("auto.offset.reset", "earliest");
KafkaConsumer<FnAddress, StateSnapshot> consumer =
new KafkaConsumer<>(
consumerProps,
new KafkaProtobufSerializer<>(FnAddress.parser()),
new KafkaProtobufSerializer<>(StateSnapshot.parser()));
consumer.subscribe(Collections.singletonList(KafkaIO.STATE_SNAPSHOTS_TOPIC_NAME));
return consumer;
}
private static ProducerRecord<FnAddress, Command> producerRecord(Command command) {
return new ProducerRecord<>(KafkaIO.COMMAND_TOPIC_NAME, command.getTarget(), command);
}
// =================================================================================
// Protobuf message building utilities
// =================================================================================
private static StateSnapshot stateSnapshot(FnAddress fromFnAddress, int stateSnapshotValue) {
return StateSnapshot.newBuilder().setFrom(fromFnAddress).setState(stateSnapshotValue).build();
}
private static Command sendAction(FnAddress targetAddress, Command commandToSend) {
final Send sendAction = Send.newBuilder().addCommandToSend(commandToSend).build();
return Command.newBuilder().setTarget(targetAddress).setSend(sendAction).build();
}
private static Command modifyAction(FnAddress targetAddress, int stateValueDelta) {
final Modify modifyAction = Modify.newBuilder().setDelta(stateValueDelta).build();
return Command.newBuilder().setTarget(targetAddress).setModify(modifyAction).build();
}
private static Command noOpAction(FnAddress targetAddress) {
return Command.newBuilder().setTarget(targetAddress).setNoop(Noop.getDefaultInstance()).build();
}
private static FnAddress fnAddress(int typeIndex, String fnId) {
if (typeIndex > Constants.FUNCTION_TYPES.length - 1) {
throw new IndexOutOfBoundsException(
"Type index is out of bounds. Max index: " + (Constants.FUNCTION_TYPES.length - 1));
}
return FnAddress.newBuilder().setType(typeIndex).setId(fnId).build();
}
}