blob: 5a378972e9e70d5aeef2ca2abc84764a3e805f20 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOVICE file
* distributed with this work for additional information
* regarding copyright ownership. Vhe ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cep.nfa.sharedbuffer;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.KeyedStateStore;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.util.WrappingRuntimeException;
import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.stream.Collectors;
/**
* A shared buffer implementation which stores values under according state. Additionally, the values can be
* versioned such that it is possible to retrieve their predecessor element in the buffer.
*
* <p>The idea of the implementation is to have a buffer for incoming events with unique ids assigned to them. This way
* we do not need to deserialize events during processing and we store only one copy of the event.
*
* <p>The entries in {@link SharedBuffer} are {@link SharedBufferNode}. The shared buffer node allows to store
* relations between different entries. A dewey versioning scheme allows to discriminate between
* different relations (e.g. preceding element).
*
* <p>The implementation is strongly based on the paper "Efficient Pattern Matching over Event Streams".
*
* @param <V> Type of the values
* @see <a href="https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf">
* https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf</a>
*/
public class SharedBuffer<V> {
private static final String entriesStateName = "sharedBuffer-entries";
private static final String eventsStateName = "sharedBuffer-events";
private static final String eventsCountStateName = "sharedBuffer-events-count";
private MapState<EventId, Lockable<V>> eventsBuffer;
/** The number of events seen so far in the stream per timestamp. */
private MapState<Long, Integer> eventsCount;
private MapState<NodeId, Lockable<SharedBufferNode>> entries;
/** The cache of eventsBuffer State. */
private Map<EventId, Lockable<V>> eventsBufferCache = new HashMap<>();
/** The cache of sharedBufferNode. */
private Map<NodeId, Lockable<SharedBufferNode>> entryCache = new HashMap<>();
public SharedBuffer(KeyedStateStore stateStore, TypeSerializer<V> valueSerializer) {
this.eventsBuffer = stateStore.getMapState(
new MapStateDescriptor<>(
eventsStateName,
EventId.EventIdSerializer.INSTANCE,
new Lockable.LockableTypeSerializer<>(valueSerializer)));
this.entries = stateStore.getMapState(
new MapStateDescriptor<>(
entriesStateName,
NodeId.NodeIdSerializer.INSTANCE,
new Lockable.LockableTypeSerializer<>(new SharedBufferNode.SharedBufferNodeSerializer())));
this.eventsCount = stateStore.getMapState(
new MapStateDescriptor<>(
eventsCountStateName,
LongSerializer.INSTANCE,
IntSerializer.INSTANCE));
}
/**
* Initializes underlying state with given map of events and entries. Should be used only in case of migration from
* old state.
*
* @param events map of events with assigned unique ids
* @param entries map of SharedBufferNodes
* @throws Exception Thrown if the system cannot access the state.
* @deprecated Only for state migration!
*/
@Deprecated
public void init(
Map<EventId, Lockable<V>> events,
Map<NodeId, Lockable<SharedBufferNode>> entries) throws Exception {
eventsBuffer.putAll(events);
this.entries.putAll(entries);
Map<Long, Integer> maxIds = events.keySet().stream().collect(Collectors.toMap(
EventId::getTimestamp,
EventId::getId,
Math::max
));
eventsCount.putAll(maxIds);
}
/**
* Construct an accessor to deal with this sharedBuffer.
*
* @return an accessor to deal with this sharedBuffer.
*/
public SharedBufferAccessor<V> getAccessor() {
return new SharedBufferAccessor<>(this);
}
void advanceTime(long timestamp) throws Exception {
Iterator<Long> iterator = eventsCount.keys().iterator();
while (iterator.hasNext()) {
Long next = iterator.next();
if (next < timestamp) {
iterator.remove();
}
}
}
EventId registerEvent(V value, long timestamp) throws Exception {
Integer id = eventsCount.get(timestamp);
if (id == null) {
id = 0;
}
EventId eventId = new EventId(id, timestamp);
Lockable<V> lockableValue = new Lockable<>(value, 1);
eventsCount.put(timestamp, id + 1);
eventsBufferCache.put(eventId, lockableValue);
return eventId;
}
/**
* Checks if there is no elements in the buffer.
*
* @return true if there is no elements in the buffer
* @throws Exception Thrown if the system cannot access the state.
*/
public boolean isEmpty() throws Exception {
return Iterables.isEmpty(eventsBufferCache.keySet()) && Iterables.isEmpty(eventsBuffer.keys());
}
/**
* Inserts or updates an event in cache.
*
* @param eventId id of the event
* @param event event body
*/
void upsertEvent(EventId eventId, Lockable<V> event) {
this.eventsBufferCache.put(eventId, event);
}
/**
* Inserts or updates a shareBufferNode in cache.
*
* @param nodeId id of the event
* @param entry SharedBufferNode
*/
void upsertEntry(NodeId nodeId, Lockable<SharedBufferNode> entry) {
this.entryCache.put(nodeId, entry);
}
/**
* Removes an event from cache and state.
*
* @param eventId id of the event
*/
void removeEvent(EventId eventId) throws Exception {
this.eventsBufferCache.remove(eventId);
this.eventsBuffer.remove(eventId);
}
/**
* Removes a ShareBufferNode from cache and state.
*
* @param nodeId id of the event
*/
void removeEntry(NodeId nodeId) throws Exception {
this.entryCache.remove(nodeId);
this.entries.remove(nodeId);
}
/**
* It always returns node either from state or cache.
*
* @param nodeId id of the node
* @return SharedBufferNode
*/
Lockable<SharedBufferNode> getEntry(NodeId nodeId) {
return entryCache.computeIfAbsent(nodeId, id -> {
try {
return entries.get(id);
} catch (Exception ex) {
throw new WrappingRuntimeException(ex);
}
});
}
/**
* It always returns event either from state or cache.
*
* @param eventId id of the event
* @return event
*/
Lockable<V> getEvent(EventId eventId) {
return eventsBufferCache.computeIfAbsent(eventId, id -> {
try {
return eventsBuffer.get(id);
} catch (Exception ex) {
throw new WrappingRuntimeException(ex);
}
});
}
/**
* Flush the event and node from cache to state.
*
* @throws Exception Thrown if the system cannot access the state.
*/
void flushCache() throws Exception {
if (!entryCache.isEmpty()) {
entries.putAll(entryCache);
entryCache.clear();
}
if (!eventsBufferCache.isEmpty()) {
eventsBuffer.putAll(eventsBufferCache);
eventsBufferCache.clear();
}
}
@VisibleForTesting
Iterator<Map.Entry<Long, Integer>> getEventCounters() throws Exception {
return eventsCount.iterator();
}
@VisibleForTesting
public int getEventsBufferCacheSize() {
return eventsBufferCache.size();
}
@VisibleForTesting
public int getEventsBufferSize() throws Exception {
return Iterables.size(eventsBuffer.entries());
}
@VisibleForTesting
public int getSharedBufferNodeSize() throws Exception {
return Iterables.size(entries.entries());
}
@VisibleForTesting
public int getSharedBufferNodeCacheSize() throws Exception {
return entryCache.size();
}
}