blob: ac7b9a5dbf994b4fe9a4e7a6bcf508ccdf64384d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.statefun.flink.core.functions;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.runtime.state.internal.InternalListState;
import org.apache.flink.statefun.flink.core.di.Inject;
import org.apache.flink.statefun.flink.core.di.Label;
import org.apache.flink.statefun.flink.core.message.Message;
final class FlinkStateDelayedMessagesBuffer implements DelayedMessagesBuffer {
static final String BUFFER_STATE_NAME = "delayed-messages-buffer";
static final String INDEX_STATE_NAME = "delayed-message-index";
private final InternalListState<String, Long, Message> bufferState;
private final MapState<String, Long> cancellationTokenToTimestamp;
@Inject
FlinkStateDelayedMessagesBuffer(
@Label("delayed-messages-buffer-state") InternalListState<String, Long, Message> bufferState,
@Label("delayed-message-index") MapState<String, Long> cancellationTokenToTimestamp) {
this.bufferState = Objects.requireNonNull(bufferState);
this.cancellationTokenToTimestamp = Objects.requireNonNull(cancellationTokenToTimestamp);
}
@Override
public void forEachMessageAt(long timestamp, Consumer<Message> fn) {
try {
forEachMessageThrows(timestamp, fn);
} catch (Exception e) {
throw new IllegalStateException(e);
}
}
@Override
public OptionalLong removeMessageByCancellationToken(String token) {
try {
return remove(token);
} catch (Exception e) {
throw new IllegalStateException(
"Failed clearing a message with a cancellation token " + token, e);
}
}
@Override
public void add(Message message, long untilTimestamp) {
try {
addThrows(message, untilTimestamp);
} catch (Exception e) {
throw new RuntimeException("Error adding delayed message to state buffer: " + message, e);
}
}
// -----------------------------------------------------------------------------------------------------
// Internal
// -----------------------------------------------------------------------------------------------------
private void forEachMessageThrows(long timestamp, Consumer<Message> fn) throws Exception {
bufferState.setCurrentNamespace(timestamp);
for (Message message : bufferState.get()) {
removeMessageIdMapping(message);
fn.accept(message);
}
bufferState.clear();
}
private void addThrows(Message message, long untilTimestamp) throws Exception {
bufferState.setCurrentNamespace(untilTimestamp);
bufferState.add(message);
Optional<String> maybeToken = message.cancellationToken();
if (!maybeToken.isPresent()) {
return;
}
String cancellationToken = maybeToken.get();
@Nullable Long previousTimestamp = cancellationTokenToTimestamp.get(cancellationToken);
if (previousTimestamp != null) {
throw new IllegalStateException(
"Trying to associate a message with cancellation token "
+ cancellationToken
+ " and timestamp "
+ untilTimestamp
+ ", but a message with the same cancellation token exists and with a timestamp "
+ previousTimestamp);
}
cancellationTokenToTimestamp.put(cancellationToken, untilTimestamp);
}
private OptionalLong remove(String cancellationToken) throws Exception {
final @Nullable Long untilTimestamp = cancellationTokenToTimestamp.get(cancellationToken);
if (untilTimestamp == null) {
// The message associated with @cancellationToken has already been delivered, or previously
// removed.
return OptionalLong.empty();
}
cancellationTokenToTimestamp.remove(cancellationToken);
bufferState.setCurrentNamespace(untilTimestamp);
List<Message> newList = removeMessageByToken(bufferState.get(), cancellationToken);
if (!newList.isEmpty()) {
// There are more messages to process, so we indicate to the caller that
// they should NOT cancel the timer.
bufferState.update(newList);
return OptionalLong.empty();
}
// There are no more message to remove, we clear the buffer and indicate
// to our caller to remove the timer for @untilTimestamp
bufferState.clear();
return OptionalLong.of(untilTimestamp);
}
// ---------------------------------------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------------------------------------
private void removeMessageIdMapping(Message message) throws Exception {
Optional<String> maybeToken = message.cancellationToken();
if (maybeToken.isPresent()) {
cancellationTokenToTimestamp.remove(maybeToken.get());
}
}
private static List<Message> removeMessageByToken(Iterable<Message> messages, String token) {
ArrayList<Message> newList = new ArrayList<>();
for (Message message : messages) {
Optional<String> thisMessageId = message.cancellationToken();
if (!thisMessageId.isPresent() || !Objects.equals(thisMessageId.get(), token)) {
newList.add(message);
}
}
return newList;
}
}