blob: d8360947b7e04f5482da3a45579dd8df9a7dae7a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.statefun.e2e.smoke;
import org.apache.flink.statefun.e2e.smoke.generated.Command;
import org.apache.flink.statefun.e2e.smoke.generated.Commands;
import org.apache.flink.statefun.e2e.smoke.generated.FunctionTrackerSnapshot;
import org.apache.flink.statefun.e2e.smoke.generated.SourceCommand;
final class FunctionStateTracker {
private final long[] expectedStates;
public FunctionStateTracker(int numberOfFunctionInstances) {
this.expectedStates = new long[numberOfFunctionInstances];
}
/**
* Find any state modification commands nested under @sourceCommand, and apply them in the
* internal state representation.
*/
public void apply(SourceCommand sourceCommand) {
updateInternally(sourceCommand.getTarget(), sourceCommand.getCommands());
}
/** Apply all the state modification stored in the snapshot represented by the snapshotBytes. */
public FunctionStateTracker apply(FunctionTrackerSnapshot snapshot) {
for (int i = 0; i < snapshot.getStateCount(); i++) {
expectedStates[i] += snapshot.getState(i);
}
return this;
}
/** Get the current expected state of a function instance. */
public long stateOf(int id) {
return expectedStates[id];
}
public FunctionTrackerSnapshot.Builder snapshot() {
FunctionTrackerSnapshot.Builder snapshot = FunctionTrackerSnapshot.newBuilder();
for (long state : expectedStates) {
snapshot.addState(state);
}
return snapshot;
}
/**
* Recursively traverse the commands tree and look for {@link Command.IncrementState} commands.
* For each {@code ModifyState} command found update the corresponding expected state.
*/
private void updateInternally(int currentAddress, Commands commands) {
for (Command command : commands.getCommandList()) {
if (command.hasIncrement()) {
expectedStates[currentAddress]++;
} else if (command.hasSend()) {
updateInternally(command.getSend().getTarget(), command.getSend().getCommands());
} else if (command.hasSendAfter()) {
updateInternally(command.getSendAfter().getTarget(), command.getSendAfter().getCommands());
} else if (command.hasAsyncOperation()) {
updateInternally(currentAddress, command.getAsyncOperation().getResolvedCommands());
}
}
}
}