blob: b3e6f64d7d5dfc01636ab086f30b901418ef69f2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.fn.harness.state;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
import java.util.ArrayList;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateAppendRequest;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateClearRequest;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.fn.stream.DataStreams;
import org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
/**
* An implementation of a bag user state that utilizes the Beam Fn State API to fetch, clear and
* persist values.
*
* <p>Calling {@link #asyncClose()} schedules any required persistence changes. This object should
* no longer be used after it is closed.
*
* <p>TODO: Move to an async persist model where persistence is signalled based upon cache memory
* pressure and its need to flush.
*
* <p>TODO: Support block level caching and prefetch.
*/
public class BagUserState<T> {
private final BeamFnStateClient beamFnStateClient;
private final StateRequest request;
private final Coder<T> valueCoder;
private Iterable<T> oldValues;
private ArrayList<T> newValues;
private boolean isClosed;
public BagUserState(
BeamFnStateClient beamFnStateClient,
String instructionId,
String ptransformId,
String stateId,
ByteString encodedWindow,
ByteString encodedKey,
Coder<T> valueCoder) {
this.beamFnStateClient = beamFnStateClient;
this.valueCoder = valueCoder;
StateRequest.Builder requestBuilder = StateRequest.newBuilder();
requestBuilder
.setInstructionId(instructionId)
.getStateKeyBuilder()
.getBagUserStateBuilder()
.setTransformId(ptransformId)
.setUserStateId(stateId)
.setWindow(encodedWindow)
.setKey(encodedKey);
request = requestBuilder.build();
this.oldValues =
new LazyCachingIteratorToIterable<>(
new DataStreams.DataStreamDecoder(
valueCoder,
DataStreams.inbound(
StateFetchingIterators.forFirstChunk(beamFnStateClient, request))));
this.newValues = new ArrayList<>();
}
public Iterable<T> get() {
checkState(
!isClosed,
"Bag user state is no longer usable because it is closed for %s",
request.getStateKey());
if (oldValues == null) {
// If we were cleared we should disregard old values.
return Iterables.limit(Collections.unmodifiableList(newValues), newValues.size());
} else if (newValues.isEmpty()) {
// If we have no new values then just return the old values.
return oldValues;
}
return Iterables.concat(
oldValues, Iterables.limit(Collections.unmodifiableList(newValues), newValues.size()));
}
public void append(T t) {
checkState(
!isClosed,
"Bag user state is no longer usable because it is closed for %s",
request.getStateKey());
newValues.add(t);
}
public void clear() {
checkState(
!isClosed,
"Bag user state is no longer usable because it is closed for %s",
request.getStateKey());
oldValues = null;
newValues = new ArrayList<>();
}
public void asyncClose() throws Exception {
checkState(
!isClosed,
"Bag user state is no longer usable because it is closed for %s",
request.getStateKey());
if (oldValues == null) {
beamFnStateClient.handle(
request.toBuilder().setClear(StateClearRequest.getDefaultInstance()),
new CompletableFuture<>());
}
if (!newValues.isEmpty()) {
ByteString.Output out = ByteString.newOutput();
for (T newValue : newValues) {
// TODO: Replace with chunking output stream
valueCoder.encode(newValue, out);
}
beamFnStateClient.handle(
request
.toBuilder()
.setAppend(StateAppendRequest.newBuilder().setData(out.toByteString())),
new CompletableFuture<>());
}
isClosed = true;
}
}