blob: 671e315b9b439279527ff816f5cc0b13886350f6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.statefun.testutils.function;
import static org.apache.flink.statefun.testutils.matchers.StatefulFunctionMatchers.*;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.equalTo;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.statefun.sdk.Address;
import org.apache.flink.statefun.sdk.AsyncOperationResult;
import org.apache.flink.statefun.sdk.Context;
import org.apache.flink.statefun.sdk.FunctionType;
import org.apache.flink.statefun.sdk.StatefulFunction;
import org.apache.flink.statefun.sdk.io.EgressIdentifier;
import org.junit.Assert;
import org.junit.Test;
/** Simple validation tests of the test harness. */
public class FunctionTestHarnessTest {
private static final FunctionType UNDER_TEST = new FunctionType("flink", "undertest");
private static final FunctionType OTHER_FUNCTION = new FunctionType("flink", "function");
private static final Address CALLER = new Address(OTHER_FUNCTION, "id");
private static final Address SOME_ADDRESS = new Address(OTHER_FUNCTION, "id2");
private static final EgressIdentifier<String> EGRESS =
new EgressIdentifier<>("flink", "egress", String.class);
@Test
public void basicMessageTest() {
FunctionTestHarness harness =
FunctionTestHarness.test(ignore -> new BasicFunction(), UNDER_TEST, "id");
Assert.assertThat(harness.invoke(CALLER, "ping"), sent(messagesTo(CALLER, equalTo("pong"))));
}
@Test
public void multiReturnTest() {
FunctionTestHarness harness =
FunctionTestHarness.test(ignore -> new MultiResponseFunction(), UNDER_TEST, "id");
Assert.assertThat(
harness.invoke("hello"),
sent(
messagesTo(CALLER, equalTo("a"), equalTo("b")),
messagesTo(SOME_ADDRESS, equalTo("c"))));
}
@Test
public void egressTest() {
FunctionTestHarness harness =
FunctionTestHarness.test(ignore -> new EgressFunction(), UNDER_TEST, "id");
Assert.assertThat(harness.invoke(CALLER, "ping"), sentNothing());
Assert.assertThat(harness.getEgress(EGRESS), contains(equalTo("pong")));
}
@Test
public void delayedMessageTest() {
FunctionTestHarness harness =
FunctionTestHarness.test(ignore -> new DelayedResponse(), UNDER_TEST, "id");
Assert.assertThat(harness.invoke(CALLER, "ping"), sentNothing());
Assert.assertThat(
harness.tick(Duration.ofMinutes(1)), sent(messagesTo(CALLER, equalTo("pong"))));
}
@Test
public void asyncMessageTest() {
FunctionTestHarness harness =
FunctionTestHarness.test(ignore -> new AsyncOperation(), UNDER_TEST, "id");
Assert.assertThat(harness.invoke(CALLER, "ping"), sent(messagesTo(CALLER, equalTo("pong"))));
}
private static class BasicFunction implements StatefulFunction {
@Override
public void invoke(Context context, Object input) {
context.reply("pong");
}
}
private static class MultiResponseFunction implements StatefulFunction {
@Override
public void invoke(Context context, Object input) {
context.send(CALLER, "a");
context.send(CALLER, "b");
context.send(SOME_ADDRESS, "c");
}
}
private static class EgressFunction implements StatefulFunction {
@Override
public void invoke(Context context, Object input) {
context.send(EGRESS, "pong");
}
}
private static class DelayedResponse implements StatefulFunction {
@Override
public void invoke(Context context, Object input) {
context.sendAfter(Duration.ofMinutes(1), context.caller(), "pong");
}
}
private static class AsyncOperation implements StatefulFunction {
@Override
public void invoke(Context context, Object input) {
if (input instanceof String) {
CompletableFuture<String> future = CompletableFuture.completedFuture("pong");
context.registerAsyncOperation(context.caller(), future);
}
if (input instanceof AsyncOperationResult) {
AsyncOperationResult<Address, String> result =
(AsyncOperationResult<Address, String>) input;
context.send(result.metadata(), result.value());
}
}
}
}