blob: bdb67e3a850cfa526daaa84f1916e279a8910cdf [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.statefun.flink.core.functions;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.junit.Assert.assertThat;
import java.io.Serializable;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.DoubleCounter;
import org.apache.flink.api.common.accumulators.Histogram;
import org.apache.flink.api.common.accumulators.IntCounter;
import org.apache.flink.api.common.accumulators.LongCounter;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.common.externalresource.ExternalResourceInfo;
import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.AggregatingState;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.metrics.CharacterFilter;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
import org.apache.flink.runtime.state.Keyed;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.KeyedStateFunction;
import org.apache.flink.runtime.state.PriorityComparable;
import org.apache.flink.runtime.state.StateSnapshotTransformer.StateSnapshotTransformFactory;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
import org.apache.flink.runtime.state.internal.InternalListState;
import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.MoreExecutors;
import org.apache.flink.statefun.flink.core.StatefulFunctionsUniverse;
import org.apache.flink.statefun.flink.core.TestUtils;
import org.apache.flink.statefun.flink.core.backpressure.ThresholdBackPressureValve;
import org.apache.flink.statefun.flink.core.message.Message;
import org.apache.flink.statefun.flink.core.message.MessageFactoryType;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.function.BiConsumerWithException;
import org.junit.Test;
public class ReductionsTest {
@Test
public void testFactory() {
Reductions reductions =
Reductions.create(
new ThresholdBackPressureValve(-1),
new StatefulFunctionsUniverse(MessageFactoryType.WITH_KRYO_PAYLOADS),
new FakeRuntimeContext(),
new FakeKeyedStateBackend(),
new FakeTimerServiceFactory(),
new FakeInternalListState(),
new HashMap<>(),
new FakeOutput(),
TestUtils.ENVELOPE_FACTORY,
MoreExecutors.directExecutor(),
new FakeMetricGroup(),
new FakeMapState());
assertThat(reductions, notNullValue());
}
@SuppressWarnings("deprecation")
private static final class FakeRuntimeContext implements RuntimeContext {
@Override
public <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) {
return new ValueState<T>() {
@Override
public T value() {
return null;
}
@Override
public void update(T value) {}
@Override
public void clear() {}
};
}
@Override
public <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties) {
return new MapState<UK, UV>() {
@Override
public UV get(UK key) {
return null;
}
@Override
public void put(UK key, UV value) {}
@Override
public void putAll(Map<UK, UV> map) {}
@Override
public void remove(UK key) {}
@Override
public boolean contains(UK key) {
return false;
}
@Override
public Iterable<Entry<UK, UV>> entries() {
return null;
}
@Override
public Iterable<UK> keys() {
return null;
}
@Override
public Iterable<UV> values() {
return null;
}
@Override
public Iterator<Entry<UK, UV>> iterator() {
return null;
}
@Override
public boolean isEmpty() throws Exception {
return true;
}
@Override
public void clear() {}
};
}
@Override
public ExecutionConfig getExecutionConfig() {
return new ExecutionConfig();
}
// everything below this line would throw UnspportedOperationException()
@Override
public String getTaskName() {
throw new UnsupportedOperationException();
}
@Override
public MetricGroup getMetricGroup() {
throw new UnsupportedOperationException();
}
@Override
public int getNumberOfParallelSubtasks() {
return 0;
}
@Override
public int getMaxNumberOfParallelSubtasks() {
return 0;
}
@Override
public int getIndexOfThisSubtask() {
return 0;
}
@Override
public int getAttemptNumber() {
return 0;
}
@Override
public String getTaskNameWithSubtasks() {
throw new UnsupportedOperationException();
}
@Override
public ClassLoader getUserCodeClassLoader() {
throw new UnsupportedOperationException();
}
@Override
public <V, A extends Serializable> void addAccumulator(
String name, Accumulator<V, A> accumulator) {}
@Override
public <V, A extends Serializable> Accumulator<V, A> getAccumulator(String name) {
throw new UnsupportedOperationException();
}
@Override
public Map<String, Accumulator<?, ?>> getAllAccumulators() {
throw new UnsupportedOperationException();
}
@Override
public IntCounter getIntCounter(String name) {
throw new UnsupportedOperationException();
}
@Override
public LongCounter getLongCounter(String name) {
throw new UnsupportedOperationException();
}
@Override
public DoubleCounter getDoubleCounter(String name) {
throw new UnsupportedOperationException();
}
@Override
public Histogram getHistogram(String name) {
throw new UnsupportedOperationException();
}
@Override
public Set<ExternalResourceInfo> getExternalResourceInfos(String resourceName) {
throw new UnsupportedOperationException();
}
@Override
public boolean hasBroadcastVariable(String name) {
return false;
}
@Override
public <RT> List<RT> getBroadcastVariable(String name) {
throw new UnsupportedOperationException();
}
@Override
public <T, C> C getBroadcastVariableWithInitializer(
String name, BroadcastVariableInitializer<T, C> initializer) {
throw new UnsupportedOperationException();
}
@Override
public DistributedCache getDistributedCache() {
throw new UnsupportedOperationException();
}
@Override
public <T> ListState<T> getListState(ListStateDescriptor<T> stateProperties) {
throw new UnsupportedOperationException();
}
@Override
public <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties) {
throw new UnsupportedOperationException();
}
@Override
public <IN, ACC, OUT> AggregatingState<IN, OUT> getAggregatingState(
AggregatingStateDescriptor<IN, ACC, OUT> stateProperties) {
throw new UnsupportedOperationException();
}
}
private static final class FakeKeyedStateBackend implements KeyedStateBackend<Object> {
@Override
public <N, S extends State, T> void applyToAllKeys(
N namespace,
TypeSerializer<N> namespaceSerializer,
StateDescriptor<S, T> stateDescriptor,
KeyedStateFunction<Object, S> function) {}
@Override
public <N> Stream<Object> getKeys(String state, N namespace) {
throw new UnsupportedOperationException();
}
@Override
public <N, S extends State, T> S getOrCreateKeyedState(
TypeSerializer<N> namespaceSerializer, StateDescriptor<S, T> stateDescriptor) {
throw new UnsupportedOperationException();
}
@Override
public <N, S extends State> S getPartitionedState(
N namespace, TypeSerializer<N> namespaceSerializer, StateDescriptor<S, ?> stateDescriptor) {
throw new UnsupportedOperationException();
}
@Override
public void dispose() {}
@Override
public void registerKeySelectionListener(KeySelectionListener<Object> listener) {}
@Override
public boolean deregisterKeySelectionListener(KeySelectionListener<Object> listener) {
return false;
}
@Nonnull
@Override
public <N, SV, SEV, S extends State, IS extends S> IS createInternalState(
@Nonnull TypeSerializer<N> namespaceSerializer,
@Nonnull StateDescriptor<S, SV> stateDesc,
@Nonnull StateSnapshotTransformFactory<SEV> snapshotTransformFactory) {
throw new UnsupportedOperationException();
}
@Nonnull
@Override
public <T extends HeapPriorityQueueElement & PriorityComparable & Keyed>
KeyGroupedInternalPriorityQueue<T> create(
@Nonnull String stateName, @Nonnull TypeSerializer<T> byteOrderedElementSerializer) {
throw new UnsupportedOperationException();
}
@Override
public Object getCurrentKey() {
throw new UnsupportedOperationException();
}
@Override
public void setCurrentKey(Object newKey) {}
@Override
public TypeSerializer<Object> getKeySerializer() {
throw new UnsupportedOperationException();
}
}
private static final class FakeTimerServiceFactory implements TimerServiceFactory {
@Override
public InternalTimerService<VoidNamespace> createTimerService(
Triggerable<String, VoidNamespace> triggerable) {
return new FakeTimerService();
}
}
private static final class FakeTimerService implements InternalTimerService<VoidNamespace> {
@Override
public long currentProcessingTime() {
return 0;
}
@Override
public long currentWatermark() {
return 0;
}
@Override
public void registerEventTimeTimer(VoidNamespace namespace, long time) {
throw new UnsupportedOperationException();
}
@Override
public void registerProcessingTimeTimer(VoidNamespace namespace, long time) {
throw new UnsupportedOperationException();
}
@Override
public void deleteEventTimeTimer(VoidNamespace namespace, long time) {
throw new UnsupportedOperationException();
}
@Override
public void deleteProcessingTimeTimer(VoidNamespace namespace, long time) {
throw new UnsupportedOperationException();
}
@Override
public void forEachEventTimeTimer(
BiConsumerWithException<VoidNamespace, Long, Exception> consumer) throws Exception {
throw new UnsupportedOperationException();
}
@Override
public void forEachProcessingTimeTimer(
BiConsumerWithException<VoidNamespace, Long, Exception> consumer) throws Exception {
throw new UnsupportedOperationException();
}
}
private static final class FakeInternalListState
implements InternalListState<String, Long, Message> {
@Override
public void add(Message value) throws Exception {
throw new UnsupportedOperationException();
}
@Override
public void addAll(List<Message> values) throws Exception {
throw new UnsupportedOperationException();
}
@Override
public void update(List<Message> values) throws Exception {
throw new UnsupportedOperationException();
}
@Override
public void updateInternal(List<Message> valueToStore) throws Exception {
throw new UnsupportedOperationException();
}
@Override
public void setCurrentNamespace(Long namespace) {
throw new UnsupportedOperationException();
}
@Override
public void clear() {
throw new UnsupportedOperationException();
}
@Override
public byte[] getSerializedValue(
byte[] serializedKeyAndNamespace,
TypeSerializer<String> safeKeySerializer,
TypeSerializer<Long> safeNamespaceSerializer,
TypeSerializer<List<Message>> safeValueSerializer)
throws Exception {
throw new UnsupportedOperationException();
}
@Override
public List<Message> getInternal() throws Exception {
throw new UnsupportedOperationException();
}
@Override
public Iterable<Message> get() throws Exception {
throw new UnsupportedOperationException();
}
@Override
public void mergeNamespaces(Long target, Collection<Long> sources) throws Exception {
throw new UnsupportedOperationException();
}
@Override
public StateIncrementalVisitor<String, Long, List<Message>> getStateIncrementalVisitor(
int recommendedMaxNumberOfReturnedRecords) {
throw new UnsupportedOperationException();
}
@Override
public TypeSerializer<Long> getNamespaceSerializer() {
throw new UnsupportedOperationException();
}
@Override
public TypeSerializer<String> getKeySerializer() {
throw new UnsupportedOperationException();
}
@Override
public TypeSerializer<List<Message>> getValueSerializer() {
throw new UnsupportedOperationException();
}
}
private static final class FakeMapState implements MapState<Long, Message> {
@Override
public Message get(Long key) throws Exception {
return null;
}
@Override
public void put(Long key, Message value) throws Exception {}
@Override
public void putAll(Map<Long, Message> map) throws Exception {}
@Override
public void remove(Long key) throws Exception {}
@Override
public boolean contains(Long key) throws Exception {
return false;
}
@Override
public Iterable<Entry<Long, Message>> entries() throws Exception {
return null;
}
@Override
public Iterable<Long> keys() throws Exception {
return null;
}
@Override
public Iterable<Message> values() throws Exception {
return null;
}
@Override
public Iterator<Entry<Long, Message>> iterator() throws Exception {
return null;
}
@Override
public boolean isEmpty() throws Exception {
return true;
}
@Override
public void clear() {}
}
private static final class FakeOutput implements Output<StreamRecord<Message>> {
@Override
public void emitWatermark(Watermark mark) {}
@Override
public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {}
@Override
public void emitLatencyMarker(LatencyMarker latencyMarker) {}
@Override
public void collect(StreamRecord<Message> record) {}
@Override
public void close() {}
}
private static final class FakeMetricGroup implements MetricGroup {
@Override
public Counter counter(int i) {
throw new UnsupportedOperationException();
}
@Override
public Counter counter(String s) {
return new SimpleCounter();
}
@Override
public <C extends Counter> C counter(int i, C c) {
throw new UnsupportedOperationException();
}
@Override
public <C extends Counter> C counter(String s, C c) {
throw new UnsupportedOperationException();
}
@Override
public <T, G extends Gauge<T>> G gauge(int i, G g) {
throw new UnsupportedOperationException();
}
@Override
public <T, G extends Gauge<T>> G gauge(String s, G g) {
throw new UnsupportedOperationException();
}
@Override
public <H extends org.apache.flink.metrics.Histogram> H histogram(String s, H h) {
throw new UnsupportedOperationException();
}
@Override
public <H extends org.apache.flink.metrics.Histogram> H histogram(int i, H h) {
throw new UnsupportedOperationException();
}
@Override
public <M extends Meter> M meter(String s, M m) {
throw new UnsupportedOperationException();
}
@Override
public <M extends Meter> M meter(int i, M m) {
throw new UnsupportedOperationException();
}
@Override
public MetricGroup addGroup(int i) {
throw new UnsupportedOperationException();
}
@Override
public MetricGroup addGroup(String s) {
throw new UnsupportedOperationException();
}
@Override
public MetricGroup addGroup(String s, String s1) {
throw new UnsupportedOperationException();
}
@Override
public String[] getScopeComponents() {
return new String[0];
}
@Override
public Map<String, String> getAllVariables() {
throw new UnsupportedOperationException();
}
@Override
public String getMetricIdentifier(String s) {
throw new UnsupportedOperationException();
}
@Override
public String getMetricIdentifier(String s, CharacterFilter characterFilter) {
throw new UnsupportedOperationException();
}
}
}