[FLINK-22039] Add Javadocs for new Java SDK
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/AddressScopedStorage.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/AddressScopedStorage.java
index d04310a..710b9dd 100644
--- a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/AddressScopedStorage.java
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/AddressScopedStorage.java
@@ -18,11 +18,53 @@
package org.apache.flink.statefun.sdk.java;
import java.util.Optional;
+import org.apache.flink.statefun.sdk.java.storage.IllegalStorageAccessException;
+/**
+ * An {@link AddressScopedStorage} is used for reading and writing persistent values that is managed
+ * by Stateful Functions for fault-tolerance and consistency.
+ *
+ * <p>All access to the storage is scoped to the current invoked function instance, identified by
+ * the instance's {@link Address}. This means that within an invocation, function instances may only
+ * access it's own persisted values through this storage.
+ */
public interface AddressScopedStorage {
- <T> Optional<T> get(ValueSpec<T> descriptor);
- <T> void set(ValueSpec<T> key, T value);
+ /**
+ * Gets the value of the provided {@link ValueSpec}, scoped to the current invoked {@link
+ * Address}.
+ *
+ * @param spec the {@link ValueSpec} to read the value for.
+ * @param <T> the type of the value.
+ * @return the value, or {@link Optional#empty()} if there was not prior value set.
+ * @throws IllegalStorageAccessException if the provided {@link ValueSpec} is not recognized by
+ * the storage (e.g., if it wasn't registered for the accessing function).
+ */
+ <T> Optional<T> get(ValueSpec<T> spec);
- <T> void remove(ValueSpec<T> key);
+ /**
+ * Sets the value for the provided {@link ValueSpec}, scoped to the current invoked {@link
+ * Address}.
+ *
+ * @param spec the {@link ValueSpec} to write the new value for.
+ * @param value the new value to set.
+ * @param <T> the type of the value.
+ * @throws IllegalStorageAccessException if the provided {@link ValueSpec} is not recognized by
+ * the storage (e.g., if it wasn't registered for the accessing function).
+ */
+ <T> void set(ValueSpec<T> spec, T value);
+
+ /**
+ * Removes the prior value set for the provided {@link ValueSpec}, scoped to the current invoked
+ * {@link Address}.
+ *
+ * <p>After removing the value, calling {@link #get(ValueSpec)} for the same spec will return an
+ * {@link Optional#empty()}.
+ *
+ * @param spec the {@link ValueSpec} to remove the prior value for.
+ * @param <T> the type of the value.
+ * @throws IllegalStorageAccessException if the provided {@link ValueSpec} is not recognized by
+ * the storage (e.g., if it wasn't registered for the accessing function).
+ */
+ <T> void remove(ValueSpec<T> spec);
}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/Context.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/Context.java
index 9bc011b..43ab6b8 100644
--- a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/Context.java
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/Context.java
@@ -23,18 +23,50 @@
import org.apache.flink.statefun.sdk.java.message.EgressMessage;
import org.apache.flink.statefun.sdk.java.message.Message;
+/**
+ * A {@link Context} contains information about the current function invocation, such as the invoked
+ * function instance's and caller's {@link Address}. It is also used for side-effects as a result of
+ * the invocation such as send messages to other functions or egresses, and provides access to
+ * {@link AddressScopedStorage} scoped to the current {@link Address}.
+ */
public interface Context {
+ /** @return The current invoked function instance's {@link Address}. */
Address self();
+ /**
+ * @return The caller function instance's {@link Address}, if applicable. This is {@link
+ * Optional#empty()} if the message was sent to this function via an ingress.
+ */
Optional<Address> caller();
+ /**
+ * Sends out a {@link Message} to another function.
+ *
+ * @param message the message to send.
+ */
void send(Message message);
+ /**
+ * Sends out a {@link Message} to another function, after a specified {@link Duration} delay.
+ *
+ * @param duration the amount of time to delay the message delivery.
+ * @param message the message to send.
+ */
void sendAfter(Duration duration, Message message);
+ /**
+ * Sends out a {@link EgressMessage} to an egress.
+ *
+ * @param message the message to send.
+ */
void send(EgressMessage message);
+ /**
+ * @return The {@link AddressScopedStorage}, providing access to stored values scoped to the
+ * current invoked function instance's {@link Address} (which is obtainable using {@link
+ * #self()}).
+ */
AddressScopedStorage storage();
default CompletableFuture<Void> done() {
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/Expiration.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/Expiration.java
index 3ceaa7b..521fa3f 100644
--- a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/Expiration.java
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/Expiration.java
@@ -43,7 +43,7 @@
}
/**
- * Returns an Expiration configuration that would expire a @duration after the last write.
+ * Returns an {@link Expiration} configuration that would expire a @duration after the last write.
*
* @param duration a duration to wait before considering the state expired.
*/
@@ -52,8 +52,8 @@
}
/**
- * Returns an Expiration configuration that would expire a @duration after the last invocation of
- * the function.
+ * Returns an {@link Expiration} configuration that would expire a @duration after the last
+ * invocation of the function.
*
* @param duration a duration to wait before considering the state expired.
*/
@@ -61,6 +61,13 @@
return new Expiration(Mode.AFTER_CALL, duration);
}
+ /**
+ * Returns an {@link Expiration} configuration that has an expiration characteristic based on the
+ * provided expire {@link Mode}.
+ *
+ * @param duration a duration to wait before considering the state expired.
+ * @param mode the expire mode.
+ */
public static Expiration expireAfter(Duration duration, Mode mode) {
return new Expiration(mode, duration);
}
@@ -78,10 +85,12 @@
this.duration = Objects.requireNonNull(duration);
}
+ /** @return The expire mode of this {@link Expiration} configuration. */
public Mode mode() {
return mode;
}
+ /** @return The duration of this {@link Expiration} configuration. */
public Duration duration() {
return duration;
}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/StatefulFunction.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/StatefulFunction.java
index bd9aef6..b6830c3 100644
--- a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/StatefulFunction.java
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/StatefulFunction.java
@@ -20,7 +20,46 @@
import java.util.concurrent.CompletableFuture;
import org.apache.flink.statefun.sdk.java.message.Message;
+/**
+ * A {@link StatefulFunction} is a user-defined function that can be invoked with a given input.
+ * This is the primitive building block for a Stateful Functions application.
+ *
+ * <h2>Concept</h2>
+ *
+ * <p>Each individual {@code StatefulFunction} is an uniquely invokable "instance" of a registered
+ * {@link StatefulFunctionSpec}. Each instance is identified by an {@link Address}, representing the
+ * function's unique id (a string) within its type. From a user's perspective, it would seem as if
+ * for each unique function id, there exists a stateful instance of the function that is always
+ * available to be invoked within a Stateful Functions application.
+ *
+ * <h2>Invoking a {@code StatefulFunction}</h2>
+ *
+ * <p>An individual {@code StatefulFunction} can be invoked with arbitrary input from any another
+ * {@code StatefulFunction} (including itself), or routed from ingresses. To invoke a {@code
+ * StatefulFunction}, the caller simply needs to know the {@code Address} of the target function.
+ *
+ * <p>As a result of invoking a {@code StatefulFunction}, the function may continue to invoke other
+ * functions, access persisted values, or send messages to egresses.
+ *
+ * <h2>Persistent State</h2>
+ *
+ * <p>Each individual {@code StatefulFunction} may have persistent values written to storage that is
+ * maintained by the system, providing consistent exactly-once and fault-tolerant guarantees. Please
+ * see Javadocs in {@link ValueSpec} and {@link AddressScopedStorage} for an overview of how to
+ * register persistent values and access the storage.
+ *
+ * @see Address
+ * @see StatefulFunctionSpec
+ * @see ValueSpec
+ * @see AddressScopedStorage
+ */
public interface StatefulFunction {
+ /**
+ * Applies an input message to this function.
+ *
+ * @param context the {@link Context} of the current invocation.
+ * @param argument the input message.
+ */
CompletableFuture<Void> apply(Context context, Message argument) throws Throwable;
}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/StatefulFunctionSpec.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/StatefulFunctionSpec.java
index dbf4b7e..ce0074a 100644
--- a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/StatefulFunctionSpec.java
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/StatefulFunctionSpec.java
@@ -22,11 +22,17 @@
import java.util.Objects;
import java.util.function.Supplier;
+/** Specification for a {@link StatefulFunction}, identifiable by a unique {@link TypeName}. */
public final class StatefulFunctionSpec {
private final TypeName typeName;
private final Map<String, ValueSpec<?>> knownValues;
private final Supplier<? extends StatefulFunction> supplier;
+ /**
+ * Creates a {@link Builder} for the spec.
+ *
+ * @param typeName the associated {@link TypeName} for the spec.
+ */
public static Builder builder(TypeName typeName) {
return new Builder(typeName);
}
@@ -40,27 +46,46 @@
this.knownValues = Objects.requireNonNull(knownValues);
}
+ /** @return The {@link TypeName} of the function. */
public TypeName typeName() {
return typeName;
}
+ /** @return The registered {@link ValueSpec}s for the function. */
public Map<String, ValueSpec<?>> knownValues() {
return knownValues;
}
+ /** @return The supplier for instances of the function. */
public Supplier<? extends StatefulFunction> supplier() {
return supplier;
}
+ /** Builder for a {@link StatefulFunctionSpec}. */
public static final class Builder {
private final TypeName typeName;
private final Map<String, ValueSpec<?>> knownValues = new HashMap<>();
private Supplier<? extends StatefulFunction> supplier;
+ /**
+ * Creates a {@link Builder} for a {@link StatefulFunctionSpec} which is identifiable via the
+ * specified {@link TypeName}.
+ *
+ * @param typeName the associated {@link TypeName} of the {@link StatefulFunctionSpec} being
+ * built.
+ */
private Builder(TypeName typeName) {
this.typeName = Objects.requireNonNull(typeName);
}
+ /**
+ * Registers a {@link ValueSpec} that will be used by this function. A function may only access
+ * values which have a registered {@link ValueSpec}.
+ *
+ * @param valueSpec the value spec to register.
+ * @throws IllegalArgumentException if multiple {@link ValueSpec}s with the same name was
+ * registered.
+ */
public Builder withValueSpec(ValueSpec<?> valueSpec) {
Objects.requireNonNull(valueSpec);
if (knownValues.put(valueSpec.name(), valueSpec) != null) {
@@ -71,6 +96,14 @@
return this;
}
+ /**
+ * Registers multiple {@link ValueSpec}s that will be used by this function. A function may only
+ * access values which have a registered {@link ValueSpec}.
+ *
+ * @param valueSpecs the value specs to register.
+ * @throws IllegalArgumentException if multiple {@link ValueSpec}s with the same name was
+ * registered.
+ */
public Builder withValueSpecs(ValueSpec<?>... valueSpecs) {
for (ValueSpec<?> spec : valueSpecs) {
withValueSpec(spec);
@@ -78,11 +111,18 @@
return this;
}
+ /**
+ * A {@link Supplier} used to create instances of a {@link StatefulFunction} for this {@link
+ * StatefulFunctionSpec}.
+ *
+ * @param supplier the supplier.
+ */
public Builder withSupplier(Supplier<? extends StatefulFunction> supplier) {
this.supplier = Objects.requireNonNull(supplier);
return this;
}
+ /** Builds the {@link StatefulFunctionSpec}. */
public StatefulFunctionSpec build() {
return new StatefulFunctionSpec(typeName, knownValues, supplier);
}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/StatefulFunctions.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/StatefulFunctions.java
index 60dbacc..8558228 100644
--- a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/StatefulFunctions.java
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/StatefulFunctions.java
@@ -22,13 +22,34 @@
import org.apache.flink.statefun.sdk.java.handler.ConcurrentRequestReplyHandler;
import org.apache.flink.statefun.sdk.java.handler.RequestReplyHandler;
+/**
+ * A registry for multiple {@link StatefulFunction}s. A {@link RequestReplyHandler} can be created
+ * from the registry that understands how to dispatch invocation requests to the registered
+ * functions as well as encode side-effects (e.g., sending messages to other functions or updating
+ * values in storage) as the response.
+ */
public class StatefulFunctions {
private final Map<TypeName, StatefulFunctionSpec> specs = new HashMap<>();
+ /**
+ * Registers a {@link StatefulFunctionSpec} builder, which will be used to build the function
+ * spec.
+ *
+ * @param builder a builder for the function spec to register.
+ * @throws IllegalArgumentException if multiple {@link StatefulFunctionSpec} under the same {@link
+ * TypeName} have been registered.
+ */
public StatefulFunctions withStatefulFunction(StatefulFunctionSpec.Builder builder) {
return withStatefulFunction(builder.build());
}
+ /**
+ * Registers a {@link StatefulFunctionSpec}.
+ *
+ * @param spec the function spec to register.
+ * @throws IllegalArgumentException if multiple {@link StatefulFunctionSpec} under the same {@link
+ * TypeName} have been registered.
+ */
public StatefulFunctions withStatefulFunction(StatefulFunctionSpec spec) {
if (specs.put(spec.typeName(), spec) != null) {
throw new IllegalArgumentException(
@@ -38,10 +59,18 @@
return this;
}
+ /** @return The registered {@link StatefulFunctionSpec}s. */
public Map<TypeName, StatefulFunctionSpec> functionSpecs() {
return specs;
}
+ /**
+ * Creates a {@link RequestReplyHandler} that understands how to dispatch invocation requests to
+ * the registered functions as well as encode side-effects (e.g., sending messages to other
+ * functions or updating values in storage) as the response.
+ *
+ * @return a {@link RequestReplyHandler} for the registered functions.
+ */
public RequestReplyHandler requestReplyHandler() {
return new ConcurrentRequestReplyHandler(specs);
}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/TypeName.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/TypeName.java
index c30bc83..064b254 100644
--- a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/TypeName.java
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/TypeName.java
@@ -19,16 +19,17 @@
import java.io.Serializable;
import java.util.Objects;
+import org.apache.flink.statefun.sdk.java.types.Type;
import org.apache.flink.statefun.sdk.shaded.com.google.protobuf.ByteString;
/**
- * This class represents the type of a {@code StatefulFunction}, consisting of a namespace of the
- * function type as well as the type's name.
- *
- * <p>A function's type is part of a function's {@link Address} and serves as integral part of an
- * individual function's identity.
+ * A {@link TypeName} is used to uniquely identify objects within a Stateful Functions application,
+ * including functions, egresses, and types. Typenames serves as an integral part of identifying
+ * these objects for message delivery as well as message data serialization and deserialization.
*
* @see Address
+ * @see Type
+ * @see StatefulFunctionSpec
*/
public final class TypeName implements Serializable {
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/ValueSpec.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/ValueSpec.java
index eb46e6b..380105e 100644
--- a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/ValueSpec.java
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/ValueSpec.java
@@ -24,6 +24,16 @@
import org.apache.flink.statefun.sdk.java.types.Types;
import org.apache.flink.statefun.sdk.shaded.com.google.protobuf.ByteString;
+/**
+ * A {@link ValueSpec} identifies a registered persistent value of a function, which will be managed
+ * by the Stateful Functions runtime for consistency and fault-tolerance. A {@link ValueSpec} is
+ * registered for a function by configuring it on the function's assoicated {@link
+ * StatefulFunctionSpec}.
+ *
+ * @see StatefulFunctionSpec.Builder#withValueSpec(ValueSpec)
+ * @see AddressScopedStorage
+ * @param <T> the type of the value.
+ */
public final class ValueSpec<T> {
/**
@@ -66,18 +76,22 @@
this.nameByteString = ByteString.copyFromUtf8(untyped.stateName);
}
+ /** @return The name of the value. */
public String name() {
return name;
}
+ /** @return The expiration configuration of the value. */
public Expiration expiration() {
return expiration;
}
+ /** @return The {@link TypeName} of the value's type. */
public TypeName typeName() {
return type.typeName();
}
+ /** @return The {@link Type} of the value. */
public Type<T> type() {
return type;
}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/types/SimpleType.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/types/SimpleType.java
index 072fb83..f556a44 100644
--- a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/types/SimpleType.java
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/types/SimpleType.java
@@ -25,6 +25,11 @@
import org.apache.flink.statefun.sdk.java.slice.Slice;
import org.apache.flink.statefun.sdk.java.slice.Slices;
+/**
+ * A utility to create simple {@link Type} implementations.
+ *
+ * @param <T> the Java type handled by this {@link Type}.
+ */
public final class SimpleType<T> implements Type<T> {
@FunctionalInterface
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/types/Type.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/types/Type.java
index 7343b5a..e855689 100644
--- a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/types/Type.java
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/types/Type.java
@@ -20,11 +20,45 @@
import java.util.Collections;
import java.util.Set;
import org.apache.flink.statefun.sdk.java.TypeName;
+import org.apache.flink.statefun.sdk.java.ValueSpec;
+import org.apache.flink.statefun.sdk.shaded.com.google.protobuf.Message;
+/**
+ * This class is the core abstraction used by Stateful Functions's type system, and consists of a
+ * few things that StateFun uses to handle {@link Message}s and {@link ValueSpec}s:
+ *
+ * <ul>
+ * <li>A {@link TypeName} to identify the type.
+ * <li>A {@link TypeSerializer} for serializing and deserializing instances of the type.
+ * </ul>
+ *
+ * <h2>Cross-language primitive types</h2>
+ *
+ * <p>StateFun's type system has cross-language support for common primitive types, such as boolean,
+ * integer, long, etc. These primitive types have built-in {@link Type}s implemented for them
+ * already, with predefined {@link TypeName}s.
+ *
+ * <p>This is of course all transparent for the user, so you don't need to worry about it. Functions
+ * implemented in various languages (e.g. Java or Python) can message each other by directly sending
+ * supported primitive values as message arguments. Moreover, the type system is used for state
+ * values as well; so, you can expect that a function can safely read previous state after
+ * reimplementing it in a different language.
+ *
+ * <h2>Common custom types (e.g. JSON or Protobuf)</h2>
+ *
+ * <p>The type system is also very easily extensible to support custom message types, such as JSON
+ * or Protobuf messages. This is just a matter of implementing your own {@link Type} with a custom
+ * typename and serializer. Alternatively, you can also use the {@link SimpleType} class to do this
+ * easily.
+ *
+ * @param <T> the Java type of serialized / deserialized instances.
+ */
public interface Type<T> {
+ /** @return The unique {@link TypeName} of this type. */
TypeName typeName();
+ /** @return A {@link TypeSerializer} that can serialize and deserialize this type. */
TypeSerializer<T> typeSerializer();
default Set<TypeCharacteristics> typeCharacteristics() {