blob: 202f03d650103686e4ffba22d34a5cfe29c05ada [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.statefun.testutils.function;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.statefun.sdk.Address;
import org.apache.flink.statefun.sdk.Context;
import org.apache.flink.statefun.sdk.FunctionType;
import org.apache.flink.statefun.sdk.StatefulFunction;
import org.apache.flink.statefun.sdk.StatefulFunctionProvider;
import org.apache.flink.statefun.sdk.io.EgressIdentifier;
/**
* The {@link FunctionTestHarness} provides a thin convenience wrapper around a {@link
* StatefulFunction} to capture its results under test.
*
* <p>The harness captures all messages sent using the {@link org.apache.flink.statefun.sdk.Context}
* from within the functions {@link StatefulFunction#invoke(Context, Object)} method and returns
* them. Values sent to an egress are also captured and can be queried via the {@link
* #getEgress(EgressIdentifier)} method.
*
* <p><b>Important</b>This test harness is intended strictly for basic unit tests of functions. As
* such, {@link Context#registerAsyncOperation(Object, CompletableFuture)} awaits all futures. If
* you want to test in an asyncronous environment please consider using the the {@code
* statefun-flink-harness}.
*
* <pre>{@code
* {@code @Test}
* public void test() {
* FunctionType type = new FunctionType("flink", "testfunc");
* FunctionTestHarness harness = TestHarness.test(new TestFunctionProvider(), type, "my-id");
*
* Assert.assertThat(
* harness.invoke("ping"),
* sent(
* messagesTo(
* new Address(new FunctionType("flink", "func"), "id"), equalTo("pong"));
* }
* }</pre>
*/
@SuppressWarnings("WeakerAccess")
public class FunctionTestHarness {
private final TestContext context;
/**
* Creates a test harness, pinning the function to a particular address.
*
* @param provider A provider for the function under test.
* @param type The type of the function.
* @param id The static id of the function for the duration of the test.
* @param startTime The initial timestamp of the internal clock.
* @return A fully configured test harness.
*/
public static FunctionTestHarness test(
StatefulFunctionProvider provider, FunctionType type, String id, Instant startTime) {
Objects.requireNonNull(provider, "Function provider can not be null");
return new FunctionTestHarness(provider.functionOfType(type), type, id, startTime);
}
/**
* Creates a test harness, pinning the function to a particular address.
*
* @param provider A provider for the function under test.
* @param type The type of the function.
* @param id The static id of the function for the duration of the test.
* @return A fully configured test harness.
*/
public static FunctionTestHarness test(
StatefulFunctionProvider provider, FunctionType type, String id) {
Objects.requireNonNull(provider, "Function provider can not be null");
return new FunctionTestHarness(provider.functionOfType(type), type, id, Instant.EPOCH);
}
private FunctionTestHarness(
StatefulFunction function, FunctionType type, String id, Instant startTime) {
this.context = new TestContext(new Address(type, id), function, startTime);
}
/**
* @param message A message that will be sent to the function.
* @return A responses sent from the function after invocation using {@link Context#send(Address,
* Object)}.
*/
public Map<Address, List<Object>> invoke(Object message) {
return context.invoke(null, message);
}
/**
* @param message A message that will be sent to the function.
* @param from The address of the function that sent the message.
* @return A responses sent from the function after invocation using {@link Context#send(Address,
* Object)}.
*/
public Map<Address, List<Object>> invoke(Address from, Object message) {
Objects.requireNonNull(from);
return context.invoke(from, message);
}
/**
* Advances the internal clock the harness and fires and pending timers.
*
* @param duration the amount of time to advance for this tick.
* @return A responses sent from the function after invocation.
*/
public Map<Address, List<Object>> tick(Duration duration) {
Objects.requireNonNull(duration);
return context.tick(duration);
}
/**
* @param identifier An egress identifier
* @param <T> the data type consumed by the egress.
* @return All the messages sent to that egress.
*/
public <T> List<T> getEgress(EgressIdentifier<T> identifier) {
return context.getEgress(identifier);
}
}