blob: fcd25b440c6aa034ae1df88ab404952619247623 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.runtime.functions;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.SortedMapStateDescriptor;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.ListSerializer;
import org.apache.flink.api.common.typeutils.base.MapSerializer;
import org.apache.flink.api.common.typeutils.base.SortedMapSerializer;
import org.apache.flink.runtime.state.keyed.KeyedListState;
import org.apache.flink.runtime.state.keyed.KeyedListStateDescriptor;
import org.apache.flink.runtime.state.keyed.KeyedMapState;
import org.apache.flink.runtime.state.keyed.KeyedMapStateDescriptor;
import org.apache.flink.runtime.state.keyed.KeyedSortedMapState;
import org.apache.flink.runtime.state.keyed.KeyedSortedMapStateDescriptor;
import org.apache.flink.runtime.state.keyed.KeyedState;
import org.apache.flink.runtime.state.keyed.KeyedStateDescriptor;
import org.apache.flink.runtime.state.keyed.KeyedValueState;
import org.apache.flink.runtime.state.keyed.KeyedValueStateDescriptor;
import org.apache.flink.runtime.state.subkeyed.SubKeyedListState;
import org.apache.flink.runtime.state.subkeyed.SubKeyedListStateDescriptor;
import org.apache.flink.runtime.state.subkeyed.SubKeyedMapState;
import org.apache.flink.runtime.state.subkeyed.SubKeyedMapStateDescriptor;
import org.apache.flink.runtime.state.subkeyed.SubKeyedSortedMapState;
import org.apache.flink.runtime.state.subkeyed.SubKeyedSortedMapStateDescriptor;
import org.apache.flink.runtime.state.subkeyed.SubKeyedState;
import org.apache.flink.runtime.state.subkeyed.SubKeyedStateDescriptor;
import org.apache.flink.runtime.state.subkeyed.SubKeyedValueState;
import org.apache.flink.runtime.state.subkeyed.SubKeyedValueStateDescriptor;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.dataview.KeyedStateListView;
import org.apache.flink.table.dataview.KeyedStateMapView;
import org.apache.flink.table.dataview.KeyedStateSortedMapView;
import org.apache.flink.table.dataview.NullAwareKeyedStateMapView;
import org.apache.flink.table.dataview.NullAwareSubKeyedStateMapView;
import org.apache.flink.table.dataview.StateDataView;
import org.apache.flink.table.dataview.StateListView;
import org.apache.flink.table.dataview.StateMapView;
import org.apache.flink.table.dataview.StateSortedMapView;
import org.apache.flink.table.dataview.SubKeyedStateListView;
import org.apache.flink.table.dataview.SubKeyedStateMapView;
import org.apache.flink.table.typeutils.ListViewTypeInfo;
import org.apache.flink.table.typeutils.MapViewTypeInfo;
import org.apache.flink.table.typeutils.SortedMapViewTypeInfo;
import org.apache.flink.util.Preconditions;
import java.util.ArrayList;
import java.util.List;
/**
* Implementation of ExecutionContext.
*/
@SuppressWarnings("unchecked")
public final class ExecutionContextImpl implements ExecutionContext {
private static final String NULL_STATE_POSTFIX = "_null_state";
private final AbstractStreamOperator<?> operator;
private final RuntimeContext runtimeContext;
private final TypeSerializer<?> namespaceSerializer;
private final List<StateDataView<BaseRow>> registeredStateDataViews;
public ExecutionContextImpl(
AbstractStreamOperator<?> operator,
RuntimeContext runtimeContext) {
this(operator, runtimeContext, null);
}
public ExecutionContextImpl(
AbstractStreamOperator<?> operator,
RuntimeContext runtimeContext,
TypeSerializer<?> namespaceSerializer) {
this.operator = operator;
this.runtimeContext = Preconditions.checkNotNull(runtimeContext);
this.namespaceSerializer = namespaceSerializer;
this.registeredStateDataViews = new ArrayList<>();
}
@Override
public <K, V, S extends KeyedState<K, V>> S getKeyedState(KeyedStateDescriptor<K, V, S> descriptor) throws Exception {
return operator.getKeyedState(descriptor);
}
@Override
public <K, N, V, S extends SubKeyedState<K, N, V>> S getSubKeyedState(SubKeyedStateDescriptor<K, N, V, S> descriptor) throws Exception {
return operator.getSubKeyedState(descriptor);
}
@Override
public <K, V> KeyedValueState<K, V> getKeyedValueState(
ValueStateDescriptor<V> descriptor) throws Exception {
descriptor.initializeSerializerUnlessSet(operator.getExecutionConfig());
return operator.getKeyedState(
new KeyedValueStateDescriptor<>(
descriptor.getName(),
(TypeSerializer<K>) operator.getKeySerializer(),
descriptor.getSerializer()
)
);
}
@Override
public <K, V> KeyedListState<K, V> getKeyedListState(
ListStateDescriptor<V> descriptor
) throws Exception {
descriptor.initializeSerializerUnlessSet(operator.getExecutionConfig());
return operator.getKeyedState(
new KeyedListStateDescriptor<>(
descriptor.getName(),
(TypeSerializer<K>) operator.getKeySerializer(),
(ListSerializer<V>) descriptor.getSerializer()
)
);
}
@Override
public <K, UK, UV> KeyedMapState<K, UK, UV> getKeyedMapState(
MapStateDescriptor<UK, UV> descriptor
) throws Exception {
descriptor.initializeSerializerUnlessSet(operator.getExecutionConfig());
return operator.getKeyedState(
new KeyedMapStateDescriptor<>(
descriptor.getName(),
(TypeSerializer<K>) operator.getKeySerializer(),
(MapSerializer<UK, UV>) descriptor.getSerializer()
)
);
}
@Override
public <K, UK, UV> KeyedSortedMapState<K, UK, UV> getKeyedSortedMapState(
SortedMapStateDescriptor<UK, UV> descriptor
) throws Exception {
descriptor.initializeSerializerUnlessSet(operator.getExecutionConfig());
return operator.getKeyedState(
new KeyedSortedMapStateDescriptor<>(
descriptor.getName(),
(TypeSerializer<K>) operator.getKeySerializer(),
(SortedMapSerializer<UK, UV>) descriptor.getSerializer()
)
);
}
@Override
public <K, N, V> SubKeyedValueState<K, N, V> getSubKeyedValueState(
ValueStateDescriptor<V> descriptor
) throws Exception {
if (namespaceSerializer == null) {
throw new RuntimeException("The namespace serializer has not been initialized.");
}
descriptor.initializeSerializerUnlessSet(operator.getExecutionConfig());
return operator.getSubKeyedState(
new SubKeyedValueStateDescriptor<>(
descriptor.getName(),
(TypeSerializer<K>) operator.getKeySerializer(),
(TypeSerializer<N>) namespaceSerializer,
descriptor.getSerializer()
)
);
}
@Override
public <K, N, V> SubKeyedListState<K, N, V> getSubKeyedListState(
ListStateDescriptor<V> descriptor
) throws Exception {
if (namespaceSerializer == null) {
throw new RuntimeException("The namespace serializer has not been initialized.");
}
descriptor.initializeSerializerUnlessSet(operator.getExecutionConfig());
return operator.getSubKeyedState(new SubKeyedListStateDescriptor<>(
descriptor.getName(),
(TypeSerializer<K>) operator.getKeySerializer(),
(TypeSerializer<N>) namespaceSerializer,
((ListSerializer<V>) descriptor.getSerializer()).getElementSerializer()));
}
@Override
public <K, N, UK, UV> SubKeyedMapState<K, N, UK, UV> getSubKeyedMapState(
MapStateDescriptor<UK, UV> descriptor
) throws Exception {
if (namespaceSerializer == null) {
throw new RuntimeException("The namespace serializer has not been initialized.");
}
descriptor.initializeSerializerUnlessSet(operator.getExecutionConfig());
MapSerializer<UK, UV> mapSerializer = (MapSerializer<UK, UV>) descriptor.getSerializer();
return operator.getSubKeyedState(new SubKeyedMapStateDescriptor<>(
descriptor.getName(),
(TypeSerializer<K>) operator.getKeySerializer(),
(TypeSerializer<N>) namespaceSerializer,
mapSerializer.getKeySerializer(),
mapSerializer.getValueSerializer()));
}
@Override
public <K, N, UK, UV> SubKeyedSortedMapState<K, N, UK, UV> getSubKeyedSortedMapState(
SortedMapStateDescriptor<UK, UV> descriptor
) throws Exception {
if (namespaceSerializer == null) {
throw new RuntimeException("The namespace serializer has not been initialized.");
}
descriptor.initializeSerializerUnlessSet(operator.getExecutionConfig());
SortedMapSerializer<UK, UV> sortedMapSerializer = (SortedMapSerializer<UK, UV>) descriptor.getSerializer();
return operator.getSubKeyedState(new SubKeyedSortedMapStateDescriptor<>(
descriptor.getName(),
(TypeSerializer<K>) operator.getKeySerializer(),
(TypeSerializer<N>) namespaceSerializer,
sortedMapSerializer.getComparator(),
sortedMapSerializer.getKeySerializer(),
sortedMapSerializer.getValueSerializer()));
}
@Override
public <K, UK, UV> StateMapView<K, UK, UV> getStateMapView(
String stateName,
MapViewTypeInfo<UK, UV> mapViewTypeInfo,
boolean hasNamespace) throws Exception {
MapStateDescriptor<UK, UV> mapStateDescriptor = new MapStateDescriptor<>(
stateName,
mapViewTypeInfo.keyType(),
mapViewTypeInfo.valueType());
ValueStateDescriptor<UV> nullStateDescriptor = new ValueStateDescriptor<>(
stateName + NULL_STATE_POSTFIX,
mapViewTypeInfo.valueType());
if (hasNamespace) {
SubKeyedMapState<K, Object, UK, UV> mapState = getSubKeyedMapState(mapStateDescriptor);
if (mapViewTypeInfo.nullAware()) {
SubKeyedValueState<K, Object, UV> nullState = getSubKeyedValueState(nullStateDescriptor);
return new NullAwareSubKeyedStateMapView<>(mapState, nullState);
} else {
return new SubKeyedStateMapView<>(mapState);
}
} else {
KeyedMapState<K, UK, UV> mapState = getKeyedMapState(mapStateDescriptor);
if (mapViewTypeInfo.nullAware()) {
KeyedValueState<K, UV> nullState = getKeyedValueState(nullStateDescriptor);
return new NullAwareKeyedStateMapView<>(mapState, nullState);
} else {
return new KeyedStateMapView<>(mapState);
}
}
}
@Override
public <K, UK, UV> StateSortedMapView<K, UK, UV> getStateSortedMapView(
String stateName,
SortedMapViewTypeInfo<UK, UV> sortedMapViewTypeInfo,
boolean hasNamespace) throws Exception {
SortedMapStateDescriptor<UK, UV> sortedMapStateDesc = new SortedMapStateDescriptor<>(
stateName,
sortedMapViewTypeInfo.comparator,
sortedMapViewTypeInfo.keyType,
sortedMapViewTypeInfo.valueType);
if (!hasNamespace) {
KeyedSortedMapState<K, UK, UV> mapState = getKeyedSortedMapState(sortedMapStateDesc);
return new KeyedStateSortedMapView<>(mapState);
} else {
throw new UnsupportedOperationException("SubKeyedState SortedMapView is not supported currently");
}
}
@Override
public <K, V> StateListView<K, V> getStateListView(
String stateName,
ListViewTypeInfo<V> listViewTypeInfo,
boolean hasNamespace) throws Exception {
ListStateDescriptor<V> listStateDesc = new ListStateDescriptor<>(
stateName,
listViewTypeInfo.elementType());
if (hasNamespace) {
SubKeyedListState<K, Object, V> listState = getSubKeyedListState(listStateDesc);
return new SubKeyedStateListView<>(listState);
} else {
KeyedListState<K, V> listState = getKeyedListState(listStateDesc);
return new KeyedStateListView<>(listState);
}
}
@Override
public void registerStateDataView(StateDataView<BaseRow> stateDataView) {
registeredStateDataViews.add(stateDataView);
}
@Override
public <K> TypeSerializer<K> getKeySerializer() {
return (TypeSerializer<K>) operator.getKeySerializer();
}
@Override
public BaseRow currentKey() {
return (BaseRow) operator.getCurrentKey();
}
@Override
public void setCurrentKey(BaseRow key) {
operator.setCurrentKey(key);
// set current key to all the registered stateDataviews
for (StateDataView<BaseRow> dataView : registeredStateDataViews) {
dataView.setCurrentKey(key);
}
}
@Override
public RuntimeContext getRuntimeContext() {
return runtimeContext;
}
}