blob: 9f8bbf2c95bce4a39ac81cd315ebeba6c6781353 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow.worker;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkState;
import java.io.Closeable;
import java.util.Collections;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.beam.runners.core.InMemoryMultimapSideInputView;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.transforms.Materializations;
import org.apache.beam.sdk.transforms.Materializations.MultimapView;
import org.apache.beam.sdk.transforms.ViewFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Optional;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Supplier;
import org.apache.beam.vendor.guava.v20_0.com.google.common.cache.Cache;
import org.apache.beam.vendor.guava.v20_0.com.google.common.cache.CacheBuilder;
import org.apache.beam.vendor.guava.v20_0.com.google.common.cache.Weigher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** Class responsible for fetching state from the windmill server. */
class StateFetcher {
private static final Logger LOG = LoggerFactory.getLogger(StateFetcher.class);
private Cache<SideInputId, SideInputCacheEntry> sideInputCache;
private MetricTrackingWindmillServerStub server;
private long bytesRead = 0L;
public StateFetcher(MetricTrackingWindmillServerStub server) {
this(
server,
CacheBuilder.newBuilder()
.maximumWeight(100000000 /* 100 MB */)
.expireAfterWrite(1, TimeUnit.MINUTES)
.weigher((Weigher<SideInputId, SideInputCacheEntry>) (id, entry) -> entry.size())
.build());
}
public StateFetcher(
MetricTrackingWindmillServerStub server,
Cache<SideInputId, SideInputCacheEntry> sideInputCache) {
this.server = server;
this.sideInputCache = sideInputCache;
}
/** Returns a view of the underlying cache that keeps track of bytes read separately. */
public StateFetcher byteTrackingView() {
return new StateFetcher(server, sideInputCache);
}
public long getBytesRead() {
return bytesRead;
}
/** Indicates the caller's knowledge of whether a particular side input has been computed. */
public enum SideInputState {
CACHED_IN_WORKITEM,
KNOWN_READY,
UNKNOWN;
}
/**
* Fetch the given side input, storing it in a process-level cache.
*
* <p>If state is KNOWN_READY, attempt to fetch the data regardless of whether a not-ready entry
* was cached.
*
* <p>Returns {@literal null} if the side input was not ready, {@literal Optional.absent()} if the
* side input was null, and {@literal Optional.present(...)} if the side input was non-null.
*/
@Nullable
public <T, SideWindowT extends BoundedWindow> Optional<T> fetchSideInput(
final PCollectionView<T> view,
final SideWindowT sideWindow,
final String stateFamily,
SideInputState state,
final Supplier<Closeable> scopedReadStateSupplier) {
final SideInputId id = new SideInputId(view.getTagInternal(), sideWindow);
Callable<SideInputCacheEntry> fetchCallable =
() -> {
@SuppressWarnings("unchecked")
WindowingStrategy<?, SideWindowT> sideWindowStrategy =
(WindowingStrategy<?, SideWindowT>) view.getWindowingStrategyInternal();
Coder<SideWindowT> windowCoder = sideWindowStrategy.getWindowFn().windowCoder();
ByteString.Output windowStream = ByteString.newOutput();
windowCoder.encode(sideWindow, windowStream, Coder.Context.OUTER);
@SuppressWarnings("unchecked")
Windmill.GlobalDataRequest request =
Windmill.GlobalDataRequest.newBuilder()
.setDataId(
Windmill.GlobalDataId.newBuilder()
.setTag(view.getTagInternal().getId())
.setVersion(windowStream.toByteString())
.build())
.setStateFamily(stateFamily)
.setExistenceWatermarkDeadline(
WindmillTimeUtils.harnessToWindmillTimestamp(
sideWindowStrategy
.getTrigger()
.getWatermarkThatGuaranteesFiring(sideWindow)))
.build();
Windmill.GlobalData data;
try (Closeable scope = scopedReadStateSupplier.get()) {
data = server.getSideInputData(request);
}
bytesRead += data.getSerializedSize();
checkState(
Materializations.MULTIMAP_MATERIALIZATION_URN.equals(
view.getViewFn().getMaterialization().getUrn()),
"Only materializations of type %s supported, received %s",
Materializations.MULTIMAP_MATERIALIZATION_URN,
view.getViewFn().getMaterialization().getUrn());
KvCoder<Object, Object> sideInputValueCoder = (KvCoder) view.getCoderInternal();
Iterable<KV<Object, Object>> rawData;
if (data.getIsReady()) {
if (data.getData().size() > 0) {
rawData =
IterableCoder.of(sideInputValueCoder)
.decode(data.getData().newInput(), Coder.Context.OUTER);
} else {
rawData = Collections.emptyList();
}
return SideInputCacheEntry.ready(
((ViewFn<MultimapView, Object>) (ViewFn) view.getViewFn())
.apply(
InMemoryMultimapSideInputView.fromIterable(
sideInputValueCoder.getKeyCoder(), rawData)),
data.getData().size());
} else {
return SideInputCacheEntry.notReady();
}
};
try {
if (state == SideInputState.KNOWN_READY) {
SideInputCacheEntry entry = sideInputCache.getIfPresent(id);
if (entry == null) {
return sideInputCache.get(id, fetchCallable).getValue();
} else if (!entry.isReady()) {
// Invalidate the existing not-ready entry. This must be done atomically
// so that another thread doesn't replace the entry with a ready entry, which
// would then be deleted here.
synchronized (entry) {
SideInputCacheEntry newEntry = sideInputCache.getIfPresent(id);
if (newEntry != null && !newEntry.isReady()) {
sideInputCache.invalidate(id);
}
}
return sideInputCache.get(id, fetchCallable).getValue();
} else {
return entry.getValue();
}
} else {
return sideInputCache.get(id, fetchCallable).getValue();
}
} catch (Exception e) {
LOG.error("Fetch failed: ", e);
throw new RuntimeException("Exception while fetching side input: ", e);
}
}
/** Struct representing a side input for a particular window. */
static class SideInputId {
private final TupleTag<?> tag;
private final BoundedWindow window;
public SideInputId(TupleTag<?> tag, BoundedWindow window) {
this.tag = tag;
this.window = window;
}
@Override
public boolean equals(Object other) {
if (other instanceof SideInputId) {
SideInputId otherId = (SideInputId) other;
return tag.equals(otherId.tag) && window.equals(otherId.window);
}
return false;
}
@Override
public int hashCode() {
return Objects.hash(tag, window);
}
}
/**
* Entry in the side input cache that stores the value (null if not ready), and the encoded size
* of the value.
*/
static class SideInputCacheEntry {
private final boolean ready;
private final Object value;
private final int encodedSize;
private SideInputCacheEntry(boolean ready, Object value, int encodedSize) {
this.ready = ready;
this.value = value;
this.encodedSize = encodedSize;
}
public static SideInputCacheEntry ready(Object value, int encodedSize) {
return new SideInputCacheEntry(true, value, encodedSize);
}
public static SideInputCacheEntry notReady() {
return new SideInputCacheEntry(false, null, 0);
}
public boolean isReady() {
return ready;
}
/**
* Returns {@literal null} if the side input was not ready, {@literal Optional.absent()} if the
* side input was null, and {@literal Optional.present(...)} if the side input was non-null.
*/
@Nullable
public <T> Optional<T> getValue() {
@SuppressWarnings("unchecked")
T typed = (T) value;
return ready ? Optional.fromNullable(typed) : null;
}
public int size() {
return encodedSize;
}
}
}