blob: 6e5f9db2a6706c7be2e153aecbd6092b85e7c5a5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cep.nfa;
import org.apache.flink.api.common.typeutils.CompatibilityResult;
import org.apache.flink.api.common.typeutils.CompatibilityUtil;
import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cep.nfa.compiler.NFAStateNameHandler;
import org.apache.flink.cep.nfa.sharedbuffer.EventId;
import org.apache.flink.cep.nfa.sharedbuffer.Lockable;
import org.apache.flink.cep.nfa.sharedbuffer.NodeId;
import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferEdge;
import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferNode;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
/**
* @deprecated everything in this class is deprecated. Those are only migration procedures from older versions.
*/
@Deprecated
public class SharedBuffer<V> {
private final Map<Tuple2<String, ValueTimeWrapper<V>>, NodeId> mappingContext;
/** Run number (first block in DeweyNumber) -> EventId. */
private Map<Integer, EventId> starters;
private final Map<EventId, Lockable<V>> eventsBuffer;
private final Map<NodeId, Lockable<SharedBufferNode>> pages;
public Map<EventId, Lockable<V>> getEventsBuffer() {
return eventsBuffer;
}
public Map<NodeId, Lockable<SharedBufferNode>> getPages() {
return pages;
}
public SharedBuffer(
Map<EventId, Lockable<V>> eventsBuffer,
Map<NodeId, Lockable<SharedBufferNode>> pages,
Map<Tuple2<String, ValueTimeWrapper<V>>, NodeId> mappingContext,
Map<Integer, EventId> starters) {
this.eventsBuffer = eventsBuffer;
this.pages = pages;
this.mappingContext = mappingContext;
this.starters = starters;
}
public NodeId getNodeId(String prevState, long timestamp, int counter, V event) {
return mappingContext.get(Tuple2.of(NFAStateNameHandler.getOriginalNameFromInternal(prevState),
new ValueTimeWrapper<>(event, timestamp, counter)));
}
public EventId getStartEventId(int run) {
return starters.get(run);
}
/**
* Wrapper for a value-timestamp pair.
*
* @param <V> Type of the value
*/
private static class ValueTimeWrapper<V> {
private final V value;
private final long timestamp;
private final int counter;
ValueTimeWrapper(final V value, final long timestamp, final int counter) {
this.value = value;
this.timestamp = timestamp;
this.counter = counter;
}
/**
* Returns a counter used to disambiguate between different accepted
* elements with the same value and timestamp that refer to the same
* looping state.
*/
public int getCounter() {
return counter;
}
public V getValue() {
return value;
}
public long getTimestamp() {
return timestamp;
}
@Override
public String toString() {
return "ValueTimeWrapper(" + value + ", " + timestamp + ", " + counter + ")";
}
@Override
public boolean equals(Object obj) {
if (!(obj instanceof ValueTimeWrapper)) {
return false;
}
@SuppressWarnings("unchecked")
ValueTimeWrapper<V> other = (ValueTimeWrapper<V>) obj;
return timestamp == other.getTimestamp()
&& Objects.equals(value, other.getValue())
&& counter == other.getCounter();
}
@Override
public int hashCode() {
return (int) (31 * (31 * (timestamp ^ timestamp >>> 32) + value.hashCode()) + counter);
}
public static <V> ValueTimeWrapper<V> deserialize(
final TypeSerializer<V> valueSerializer,
final DataInputView source) throws IOException {
final V value = valueSerializer.deserialize(source);
final long timestamp = source.readLong();
final int counter = source.readInt();
return new ValueTimeWrapper<>(value, timestamp, counter);
}
}
/**
* The {@link TypeSerializerConfigSnapshot} serializer configuration to be stored with the managed state.
*/
public static final class SharedBufferSerializerConfigSnapshot<K, V> extends CompositeTypeSerializerConfigSnapshot {
private static final int VERSION = 1;
/** This empty constructor is required for deserializing the configuration. */
public SharedBufferSerializerConfigSnapshot() {
}
public SharedBufferSerializerConfigSnapshot(
final TypeSerializer<K> keySerializer,
final TypeSerializer<V> valueSerializer,
final TypeSerializer<DeweyNumber> versionSerializer) {
super(keySerializer, valueSerializer, versionSerializer);
}
@Override
public int getVersion() {
return VERSION;
}
}
/**
* A {@link TypeSerializer} for the {@link SharedBuffer}.
*/
public static class SharedBufferSerializer<K, V> extends TypeSerializer<SharedBuffer<V>> {
private static final long serialVersionUID = -3254176794680331560L;
private final TypeSerializer<K> keySerializer;
private final TypeSerializer<V> valueSerializer;
private final TypeSerializer<DeweyNumber> versionSerializer;
public SharedBufferSerializer(
final TypeSerializer<K> keySerializer,
final TypeSerializer<V> valueSerializer) {
this(keySerializer, valueSerializer, DeweyNumber.DeweyNumberSerializer.INSTANCE);
}
public SharedBufferSerializer(
final TypeSerializer<K> keySerializer,
final TypeSerializer<V> valueSerializer,
final TypeSerializer<DeweyNumber> versionSerializer) {
this.keySerializer = keySerializer;
this.valueSerializer = valueSerializer;
this.versionSerializer = versionSerializer;
}
public TypeSerializer<DeweyNumber> getVersionSerializer() {
return versionSerializer;
}
public TypeSerializer<K> getKeySerializer() {
return keySerializer;
}
public TypeSerializer<V> getValueSerializer() {
return valueSerializer;
}
@Override
public boolean isImmutableType() {
return false;
}
@Override
public SharedBufferSerializer<K, V> duplicate() {
return new SharedBufferSerializer<>(keySerializer.duplicate(), valueSerializer.duplicate());
}
@Override
public SharedBuffer<V> createInstance() {
throw new UnsupportedOperationException();
}
@Override
public SharedBuffer<V> copy(SharedBuffer<V> from) {
throw new UnsupportedOperationException();
}
@Override
public SharedBuffer<V> copy(SharedBuffer<V> from, SharedBuffer<V> reuse) {
throw new UnsupportedOperationException();
}
@Override
public int getLength() {
return -1;
}
@Override
public void serialize(SharedBuffer<V> record, DataOutputView target) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public SharedBuffer<V> deserialize(DataInputView source) throws IOException {
List<Tuple2<NodeId, Lockable<SharedBufferNode>>> entries = new ArrayList<>();
Map<ValueTimeWrapper<V>, EventId> values = new HashMap<>();
Map<EventId, Lockable<V>> valuesWithIds = new HashMap<>();
Map<Tuple2<String, ValueTimeWrapper<V>>, NodeId> mappingContext = new HashMap<>();
Map<Long, Integer> totalEventsPerTimestamp = new HashMap<>();
int totalPages = source.readInt();
for (int i = 0; i < totalPages; i++) {
// key of the page
K stateName = keySerializer.deserialize(source);
int numberEntries = source.readInt();
for (int j = 0; j < numberEntries; j++) {
ValueTimeWrapper<V> wrapper = ValueTimeWrapper.deserialize(valueSerializer, source);
EventId eventId = values.get(wrapper);
if (eventId == null) {
int id = totalEventsPerTimestamp.computeIfAbsent(wrapper.timestamp, k -> 0);
eventId = new EventId(id, wrapper.timestamp);
values.put(wrapper, eventId);
valuesWithIds.put(eventId, new Lockable<>(wrapper.value, 1));
totalEventsPerTimestamp.computeIfPresent(wrapper.timestamp, (k, v) -> v + 1);
} else {
Lockable<V> eventWrapper = valuesWithIds.get(eventId);
eventWrapper.lock();
}
NodeId nodeId = new NodeId(eventId, (String) stateName);
int refCount = source.readInt();
entries.add(Tuple2.of(nodeId, new Lockable<>(new SharedBufferNode(), refCount)));
mappingContext.put(Tuple2.of((String) stateName, wrapper), nodeId);
}
}
// read the edges of the shared buffer entries
int totalEdges = source.readInt();
Map<Integer, EventId> starters = new HashMap<>();
for (int j = 0; j < totalEdges; j++) {
int sourceIdx = source.readInt();
int targetIdx = source.readInt();
DeweyNumber version = versionSerializer.deserialize(source);
// We've already deserialized the shared buffer entry. Simply read its ID and
// retrieve the buffer entry from the list of entries
Tuple2<NodeId, Lockable<SharedBufferNode>> sourceEntry = entries.get(sourceIdx);
Tuple2<NodeId, Lockable<SharedBufferNode>> targetEntry =
targetIdx < 0 ? Tuple2.of(null, null) : entries.get(targetIdx);
sourceEntry.f1.getElement().addEdge(new SharedBufferEdge(targetEntry.f0, version));
if (version.length() == 1) {
starters.put(version.getRun(), sourceEntry.f0.getEventId());
}
}
Map<NodeId, Lockable<SharedBufferNode>> entriesMap = entries.stream().collect(Collectors.toMap(e -> e.f0, e -> e.f1));
return new SharedBuffer<>(valuesWithIds, entriesMap, mappingContext, starters);
}
@Override
public SharedBuffer<V> deserialize(SharedBuffer<V> reuse, DataInputView source) throws IOException {
return deserialize(source);
}
@Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public boolean equals(Object obj) {
if (obj == this) {
return true;
}
if (obj == null || !Objects.equals(obj.getClass(), getClass())) {
return false;
}
SharedBufferSerializer other = (SharedBufferSerializer) obj;
return
Objects.equals(keySerializer, other.getKeySerializer()) &&
Objects.equals(valueSerializer, other.getValueSerializer()) &&
Objects.equals(versionSerializer, other.getVersionSerializer());
}
@Override
public boolean canEqual(Object obj) {
return true;
}
@Override
public int hashCode() {
return 37 * keySerializer.hashCode() + valueSerializer.hashCode();
}
@Override
public TypeSerializerConfigSnapshot snapshotConfiguration() {
return new SharedBufferSerializerConfigSnapshot<>(
keySerializer,
valueSerializer,
versionSerializer);
}
@Override
public CompatibilityResult<SharedBuffer<V>> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
if (configSnapshot instanceof SharedBufferSerializerConfigSnapshot) {
List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> serializerConfigSnapshots =
((SharedBufferSerializerConfigSnapshot) configSnapshot).getNestedSerializersAndConfigs();
CompatibilityResult<K> keyCompatResult = CompatibilityUtil.resolveCompatibilityResult(
serializerConfigSnapshots.get(0).f0,
UnloadableDummyTypeSerializer.class,
serializerConfigSnapshots.get(0).f1,
keySerializer);
CompatibilityResult<V> valueCompatResult = CompatibilityUtil.resolveCompatibilityResult(
serializerConfigSnapshots.get(1).f0,
UnloadableDummyTypeSerializer.class,
serializerConfigSnapshots.get(1).f1,
valueSerializer);
CompatibilityResult<DeweyNumber> versionCompatResult = CompatibilityUtil.resolveCompatibilityResult(
serializerConfigSnapshots.get(2).f0,
UnloadableDummyTypeSerializer.class,
serializerConfigSnapshots.get(2).f1,
versionSerializer);
if (!keyCompatResult.isRequiresMigration() && !valueCompatResult.isRequiresMigration() &&
!versionCompatResult.isRequiresMigration()) {
return CompatibilityResult.compatible();
} else {
if (keyCompatResult.getConvertDeserializer() != null
&& valueCompatResult.getConvertDeserializer() != null
&& versionCompatResult.getConvertDeserializer() != null) {
return CompatibilityResult.requiresMigration(
new SharedBufferSerializer<>(
new TypeDeserializerAdapter<>(keyCompatResult.getConvertDeserializer()),
new TypeDeserializerAdapter<>(valueCompatResult.getConvertDeserializer()),
new TypeDeserializerAdapter<>(versionCompatResult.getConvertDeserializer())
));
}
}
}
return CompatibilityResult.requiresMigration();
}
}
}