blob: b764bfbeabf15086eceb066068ea3221c7e207a3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow.worker;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import org.apache.beam.runners.core.KeyedWorkItem;
import org.apache.beam.runners.core.KeyedWorkItemCoder;
import org.apache.beam.runners.core.TimerInternals.TimerData;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Timer;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StructuredCoder;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Predicate;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Predicates;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.FluentIterable;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists;
import org.joda.time.Instant;
/**
* An implementation of {@link KeyedWorkItem} that wraps around a {@code Windmill.WorkItem}.
*
* @param <K> the key type
* @param <ElemT> the element type
*/
public class WindmillKeyedWorkItem<K, ElemT> implements KeyedWorkItem<K, ElemT> {
private static final Predicate<Timer> IS_WATERMARK =
input -> input.getType() == Timer.Type.WATERMARK;
private final Windmill.WorkItem workItem;
private final K key;
private final transient Coder<? extends BoundedWindow> windowCoder;
private final transient Coder<Collection<? extends BoundedWindow>> windowsCoder;
private final transient Coder<ElemT> valueCoder;
public WindmillKeyedWorkItem(
K key,
Windmill.WorkItem workItem,
Coder<? extends BoundedWindow> windowCoder,
Coder<Collection<? extends BoundedWindow>> windowsCoder,
Coder<ElemT> valueCoder) {
this.key = key;
this.workItem = workItem;
this.windowCoder = windowCoder;
this.windowsCoder = windowsCoder;
this.valueCoder = valueCoder;
}
@Override
public K key() {
return key;
}
@Override
public Iterable<TimerData> timersIterable() {
FluentIterable<Timer> allTimers = FluentIterable.from(workItem.getTimers().getTimersList());
FluentIterable<Timer> eventTimers = allTimers.filter(IS_WATERMARK);
FluentIterable<Timer> nonEventTimers = allTimers.filter(Predicates.not(IS_WATERMARK));
return eventTimers
.append(nonEventTimers)
.transform(
timer ->
WindmillTimerInternals.windmillTimerToTimerData(
WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX, timer, windowCoder));
}
@Override
public Iterable<WindowedValue<ElemT>> elementsIterable() {
return FluentIterable.from(workItem.getMessageBundlesList())
.transformAndConcat(Windmill.InputMessageBundle::getMessagesList)
.transform(
message -> {
try {
Instant timestamp =
WindmillTimeUtils.windmillToHarnessTimestamp(message.getTimestamp());
Collection<? extends BoundedWindow> windows =
WindmillSink.decodeMetadataWindows(windowsCoder, message.getMetadata());
PaneInfo pane = WindmillSink.decodeMetadataPane(message.getMetadata());
InputStream inputStream = message.getData().newInput();
ElemT value = valueCoder.decode(inputStream, Coder.Context.OUTER);
return WindowedValue.of(value, timestamp, windows, pane);
} catch (IOException e) {
throw new RuntimeException(e);
}
});
}
@Override
public boolean equals(Object other) {
if (!(other instanceof WindmillKeyedWorkItem)) {
return false;
}
WindmillKeyedWorkItem<?, ?> that = (WindmillKeyedWorkItem<?, ?>) other;
return Objects.equals(this.key, that.key) && Objects.equals(this.workItem, that.workItem);
}
@Override
public int hashCode() {
return Objects.hash(key, workItem);
}
@Override
public String toString() {
return "WindmillKeyedWorkItem{"
+ "key="
+ key()
+ ", elements="
+ Lists.newArrayList(elementsIterable())
+ ", timers="
+ Lists.newArrayList(timersIterable())
+ ", windowCoder="
+ windowCoder
+ ", windowsCoder="
+ windowsCoder
+ ", valueCoder="
+ valueCoder
+ '}';
}
/**
* Coder that forwards {@code ByteSizeObserver} calls to an underlying element coder. {@code
* TimerOrElement} objects never need to be encoded, so this class does not support the {@code
* encode} and {@code decode} methods.
*/
public static class FakeKeyedWorkItemCoder<K, ElemT>
extends StructuredCoder<KeyedWorkItem<K, ElemT>> {
final KvCoder<K, ElemT> kvCoder;
/** Creates a new {@code TimerOrElement.Coder} that wraps the given {@link Coder}. */
public static <T> FakeKeyedWorkItemCoder<?, ?> of(Coder<T> elemCoder) {
return new FakeKeyedWorkItemCoder<>(elemCoder);
}
@Override
public List<? extends Coder<?>> getCoderArguments() {
return Arrays.asList(kvCoder);
}
@Override
public void encode(KeyedWorkItem<K, ElemT> value, OutputStream outStream) {
throw new UnsupportedOperationException();
}
@Override
public KeyedWorkItem<K, ElemT> decode(InputStream inStream) {
throw new UnsupportedOperationException();
}
@Override
public boolean isRegisterByteSizeObserverCheap(KeyedWorkItem<K, ElemT> value) {
return true;
}
@Override
public void registerByteSizeObserver(
KeyedWorkItem<K, ElemT> value, ElementByteSizeObserver observer) throws Exception {
if (value instanceof WindmillKeyedWorkItem) {
long serializedSize = ((WindmillKeyedWorkItem<?, ?>) value).workItem.getSerializedSize();
observer.update(serializedSize);
} else {
throw new UnsupportedOperationException();
}
}
@Override
public void verifyDeterministic() throws NonDeterministicException {}
public Coder<K> getKeyCoder() {
return kvCoder.getKeyCoder();
}
public Coder<ElemT> getElementCoder() {
return kvCoder.getValueCoder();
}
protected FakeKeyedWorkItemCoder(Coder<?> elemCoder) {
if (elemCoder instanceof KeyedWorkItemCoder) {
KeyedWorkItemCoder kwiCoder = (KeyedWorkItemCoder) elemCoder;
this.kvCoder = KvCoder.of(kwiCoder.getKeyCoder(), kwiCoder.getElementCoder());
} else if (elemCoder instanceof KvCoder) {
this.kvCoder = ((KvCoder) elemCoder);
} else {
throw new IllegalArgumentException(
"FakeKeyedWorkItemCoder only works with KeyedWorkItemCoder or KvCoder; was: "
+ elemCoder.getClass());
}
}
}
}