blob: e1a0c873473aa71a396292cf2905dd6cbd4191e7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.statefun.flink.core.functions;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.statefun.flink.core.backpressure.InternalContext;
import org.apache.flink.statefun.flink.core.di.Inject;
import org.apache.flink.statefun.flink.core.di.Label;
import org.apache.flink.statefun.flink.core.message.Message;
import org.apache.flink.statefun.flink.core.message.MessageFactory;
import org.apache.flink.statefun.flink.core.metrics.FunctionTypeMetrics;
import org.apache.flink.statefun.flink.core.state.State;
import org.apache.flink.statefun.sdk.Address;
import org.apache.flink.statefun.sdk.io.EgressIdentifier;
final class ReusableContext implements ApplyingContext, InternalContext {
private final Partition thisPartition;
private final LocalSink localSink;
private final RemoteSink remoteSink;
private final DelaySink delaySink;
private final AsyncSink asyncSink;
private final SideOutputSink sideOutputSink;
private final State state;
private final MessageFactory messageFactory;
private Message in;
private LiveFunction function;
@Inject
ReusableContext(
Partition partition,
LocalSink localSink,
RemoteSink remoteSink,
DelaySink delaySink,
AsyncSink asyncSink,
SideOutputSink sideoutputSink,
@Label("state") State state,
MessageFactory messageFactory) {
this.thisPartition = Objects.requireNonNull(partition);
this.localSink = Objects.requireNonNull(localSink);
this.remoteSink = Objects.requireNonNull(remoteSink);
this.delaySink = Objects.requireNonNull(delaySink);
this.sideOutputSink = Objects.requireNonNull(sideoutputSink);
this.state = Objects.requireNonNull(state);
this.messageFactory = Objects.requireNonNull(messageFactory);
this.asyncSink = Objects.requireNonNull(asyncSink);
}
@Override
public void apply(LiveFunction function, Message inMessage) {
this.in = inMessage;
this.function = function;
state.setCurrentKey(inMessage.target());
function.metrics().incomingMessage();
function.receive(this, in);
in.postApply();
this.in = null;
}
@Override
public void send(Address to, Object what) {
Objects.requireNonNull(to);
Objects.requireNonNull(what);
Message envelope = messageFactory.from(self(), to, what);
if (thisPartition.contains(to)) {
localSink.accept(envelope);
function.metrics().outgoingLocalMessage();
} else {
remoteSink.accept(envelope);
function.metrics().outgoingRemoteMessage();
}
}
@Override
public <T> void send(EgressIdentifier<T> egress, T what) {
Objects.requireNonNull(egress);
Objects.requireNonNull(what);
function.metrics().outgoingEgressMessage();
sideOutputSink.accept(egress, what);
}
@Override
public void sendAfter(Duration delay, Address to, Object message) {
Objects.requireNonNull(delay);
Objects.requireNonNull(to);
Objects.requireNonNull(message);
Message envelope = messageFactory.from(self(), to, message);
delaySink.accept(envelope, delay.toMillis());
}
@Override
public void sendAfter(Duration delay, Address to, Object message, String cancellationToken) {
Objects.requireNonNull(delay);
Objects.requireNonNull(to);
Objects.requireNonNull(message);
Objects.requireNonNull(cancellationToken);
Message envelope = messageFactory.from(self(), to, message, cancellationToken);
delaySink.accept(envelope, delay.toMillis());
}
@Override
public void cancelDelayedMessage(String cancellationToken) {
Objects.requireNonNull(cancellationToken);
delaySink.removeMessageByCancellationToken(cancellationToken);
}
@Override
public <M, T> void registerAsyncOperation(M metadata, CompletableFuture<T> future) {
Objects.requireNonNull(metadata);
Objects.requireNonNull(future);
Message message = messageFactory.from(self(), self(), metadata);
asyncSink.accept(self(), message, future);
}
@Override
public void awaitAsyncOperationComplete() {
asyncSink.blockAddress(self());
}
@Override
public FunctionTypeMetrics functionTypeMetrics() {
return function.metrics();
}
@Override
public Address caller() {
return in.source();
}
@Override
public Address self() {
return in.target();
}
}