blob: 893fbe5852532e592afbff173092cc88beb811b4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.core.construction;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.Durations;
import com.google.protobuf.util.Timestamps;
import java.io.IOException;
import java.io.Serializable;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
import org.apache.beam.model.pipeline.v1.RunnerApi.OutputTime;
import org.apache.beam.model.pipeline.v1.RunnerApi.SdkFunctionSpec;
import org.apache.beam.model.pipeline.v1.StandardWindowFns;
import org.apache.beam.model.pipeline.v1.StandardWindowFns.FixedWindowsPayload;
import org.apache.beam.model.pipeline.v1.StandardWindowFns.SessionsPayload;
import org.apache.beam.model.pipeline.v1.StandardWindowFns.SlidingWindowsPayload;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Sessions;
import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
import org.apache.beam.sdk.transforms.windowing.Window.OnTimeBehavior;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.sdk.values.WindowingStrategy.AccumulationMode;
import org.joda.time.Duration;
/** Utilities for working with {@link WindowingStrategy WindowingStrategies}. */
public class WindowingStrategyTranslation implements Serializable {
public static AccumulationMode fromProto(RunnerApi.AccumulationMode.Enum proto) {
switch (proto) {
case DISCARDING:
return AccumulationMode.DISCARDING_FIRED_PANES;
case ACCUMULATING:
return AccumulationMode.ACCUMULATING_FIRED_PANES;
case UNRECOGNIZED:
default:
// Whether or not it is proto that cannot recognize it (due to the version of the
// generated code we link to) or the switch hasn't been updated to handle it,
// the situation is the same: we don't know what this OutputTime means
throw new IllegalArgumentException(
String.format(
"Cannot convert unknown %s to %s: %s",
RunnerApi.AccumulationMode.class.getCanonicalName(),
AccumulationMode.class.getCanonicalName(),
proto));
}
}
public static RunnerApi.AccumulationMode.Enum toProto(AccumulationMode accumulationMode) {
switch (accumulationMode) {
case DISCARDING_FIRED_PANES:
return RunnerApi.AccumulationMode.Enum.DISCARDING;
case ACCUMULATING_FIRED_PANES:
return RunnerApi.AccumulationMode.Enum.ACCUMULATING;
default:
throw new IllegalArgumentException(
String.format(
"Cannot convert unknown %s to %s: %s",
AccumulationMode.class.getCanonicalName(),
RunnerApi.AccumulationMode.class.getCanonicalName(),
accumulationMode));
}
}
public static RunnerApi.ClosingBehavior.Enum toProto(ClosingBehavior closingBehavior) {
switch (closingBehavior) {
case FIRE_ALWAYS:
return RunnerApi.ClosingBehavior.Enum.EMIT_ALWAYS;
case FIRE_IF_NON_EMPTY:
return RunnerApi.ClosingBehavior.Enum.EMIT_IF_NONEMPTY;
default:
throw new IllegalArgumentException(
String.format(
"Cannot convert unknown %s to %s: %s",
ClosingBehavior.class.getCanonicalName(),
RunnerApi.ClosingBehavior.class.getCanonicalName(),
closingBehavior));
}
}
public static ClosingBehavior fromProto(RunnerApi.ClosingBehavior.Enum proto) {
switch (proto) {
case EMIT_ALWAYS:
return ClosingBehavior.FIRE_ALWAYS;
case EMIT_IF_NONEMPTY:
return ClosingBehavior.FIRE_IF_NON_EMPTY;
case UNRECOGNIZED:
default:
// Whether or not it is proto that cannot recognize it (due to the version of the
// generated code we link to) or the switch hasn't been updated to handle it,
// the situation is the same: we don't know what this OutputTime means
throw new IllegalArgumentException(
String.format(
"Cannot convert unknown %s to %s: %s",
RunnerApi.ClosingBehavior.class.getCanonicalName(),
ClosingBehavior.class.getCanonicalName(),
proto));
}
}
public static RunnerApi.OnTimeBehavior.Enum toProto(OnTimeBehavior onTimeBehavior) {
switch (onTimeBehavior) {
case FIRE_ALWAYS:
return RunnerApi.OnTimeBehavior.Enum.FIRE_ALWAYS;
case FIRE_IF_NON_EMPTY:
return RunnerApi.OnTimeBehavior.Enum.FIRE_IF_NONEMPTY;
default:
throw new IllegalArgumentException(
String.format(
"Cannot convert unknown %s to %s: %s",
OnTimeBehavior.class.getCanonicalName(),
RunnerApi.OnTimeBehavior.class.getCanonicalName(),
onTimeBehavior));
}
}
public static OnTimeBehavior fromProto(RunnerApi.OnTimeBehavior.Enum proto) {
switch (proto) {
case FIRE_ALWAYS:
return OnTimeBehavior.FIRE_ALWAYS;
case FIRE_IF_NONEMPTY:
return OnTimeBehavior.FIRE_IF_NON_EMPTY;
case UNRECOGNIZED:
default:
// Whether or not it is proto that cannot recognize it (due to the version of the
// generated code we link to) or the switch hasn't been updated to handle it,
// the situation is the same: we don't know what this OutputTime means
throw new IllegalArgumentException(
String.format(
"Cannot convert unknown %s to %s: %s",
RunnerApi.OnTimeBehavior.class.getCanonicalName(),
OnTimeBehavior.class.getCanonicalName(),
proto));
}
}
public static RunnerApi.OutputTime.Enum toProto(TimestampCombiner timestampCombiner) {
switch(timestampCombiner) {
case EARLIEST:
return OutputTime.Enum.EARLIEST_IN_PANE;
case END_OF_WINDOW:
return OutputTime.Enum.END_OF_WINDOW;
case LATEST:
return OutputTime.Enum.LATEST_IN_PANE;
default:
throw new IllegalArgumentException(
String.format(
"Unknown %s: %s",
TimestampCombiner.class.getSimpleName(),
timestampCombiner));
}
}
public static TimestampCombiner timestampCombinerFromProto(RunnerApi.OutputTime.Enum proto) {
switch (proto) {
case EARLIEST_IN_PANE:
return TimestampCombiner.EARLIEST;
case END_OF_WINDOW:
return TimestampCombiner.END_OF_WINDOW;
case LATEST_IN_PANE:
return TimestampCombiner.LATEST;
case UNRECOGNIZED:
default:
// Whether or not it is proto that cannot recognize it (due to the version of the
// generated code we link to) or the switch hasn't been updated to handle it,
// the situation is the same: we don't know what this OutputTime means
throw new IllegalArgumentException(
String.format(
"Cannot convert unknown %s to %s: %s",
RunnerApi.OutputTime.class.getCanonicalName(),
OutputTime.class.getCanonicalName(),
proto));
}
}
public static final String GLOBAL_WINDOWS_FN = "beam:windowfn:global_windows:v0.1";
public static final String FIXED_WINDOWS_FN = "beam:windowfn:fixed_windows:v0.1";
public static final String SLIDING_WINDOWS_FN = "beam:windowfn:sliding_windows:v0.1";
public static final String SESSION_WINDOWS_FN = "beam:windowfn:session_windows:v0.1";
// This URN says that the WindowFn is just a UDF blob the Java SDK understands
// TODO: standardize such things
public static final String SERIALIZED_JAVA_WINDOWFN_URN = "beam:windowfn:javasdk:v0.1";
public static final String OLD_SERIALIZED_JAVA_WINDOWFN_URN = "urn:beam:windowfn:javasdk:0.1";
// Remove this once the dataflow worker understands all the above formats.
private static final boolean USE_OLD_SERIALIZED_JAVA_WINDOWFN_URN = true;
/**
* Converts a {@link WindowFn} into a {@link RunnerApi.MessageWithComponents} where {@link
* RunnerApi.MessageWithComponents#getFunctionSpec()} is a {@link RunnerApi.FunctionSpec} for the
* input {@link WindowFn}.
*/
public static SdkFunctionSpec toProto(
WindowFn<?, ?> windowFn, @SuppressWarnings("unused") SdkComponents components) {
// TODO: Set environment IDs
ByteString serializedFn = ByteString.copyFrom(SerializableUtils.serializeToByteArray(windowFn));
if (USE_OLD_SERIALIZED_JAVA_WINDOWFN_URN) {
return SdkFunctionSpec.newBuilder()
.setSpec(
FunctionSpec.newBuilder()
.setUrn(OLD_SERIALIZED_JAVA_WINDOWFN_URN)
.setPayload(serializedFn)
.build())
.build();
} else if (windowFn instanceof GlobalWindows) {
return SdkFunctionSpec.newBuilder()
.setSpec(FunctionSpec.newBuilder().setUrn(GLOBAL_WINDOWS_FN))
.build();
} else if (windowFn instanceof FixedWindows) {
FixedWindowsPayload fixedWindowsPayload =
FixedWindowsPayload.newBuilder()
.setSize(Durations.fromMillis(((FixedWindows) windowFn).getSize().getMillis()))
.setOffset(Timestamps.fromMillis(((FixedWindows) windowFn).getOffset().getMillis()))
.build();
return SdkFunctionSpec.newBuilder()
.setSpec(
FunctionSpec.newBuilder()
.setUrn(FIXED_WINDOWS_FN)
.setPayload(fixedWindowsPayload.toByteString()))
.build();
} else if (windowFn instanceof SlidingWindows) {
SlidingWindowsPayload slidingWindowsPayload = SlidingWindowsPayload.newBuilder()
.setSize(Durations.fromMillis(((SlidingWindows) windowFn).getSize().getMillis()))
.setOffset(Timestamps.fromMillis(((SlidingWindows) windowFn).getOffset().getMillis()))
.setPeriod(Durations.fromMillis(((SlidingWindows) windowFn).getPeriod().getMillis()))
.build();
return SdkFunctionSpec.newBuilder()
.setSpec(
FunctionSpec.newBuilder()
.setUrn(SLIDING_WINDOWS_FN)
.setPayload(slidingWindowsPayload.toByteString()))
.build();
} else if (windowFn instanceof Sessions) {
SessionsPayload sessionsPayload =
SessionsPayload.newBuilder()
.setGapSize(Durations.fromMillis(((Sessions) windowFn).getGapDuration().getMillis()))
.build();
return SdkFunctionSpec.newBuilder()
.setSpec(
FunctionSpec.newBuilder()
.setUrn(SESSION_WINDOWS_FN)
.setPayload(sessionsPayload.toByteString()))
.build();
} else {
return SdkFunctionSpec.newBuilder()
.setSpec(
FunctionSpec.newBuilder()
.setUrn(SERIALIZED_JAVA_WINDOWFN_URN)
.setPayload(serializedFn))
.build();
}
}
/**
* Converts a {@link WindowingStrategy} into a {@link RunnerApi.MessageWithComponents} where
* {@link RunnerApi.MessageWithComponents#getWindowingStrategy()} ()} is a {@link
* RunnerApi.WindowingStrategy RunnerApi.WindowingStrategy (proto)} for the input {@link
* WindowingStrategy}.
*/
public static RunnerApi.MessageWithComponents toProto(WindowingStrategy<?, ?> windowingStrategy)
throws IOException {
SdkComponents components = SdkComponents.create();
RunnerApi.WindowingStrategy windowingStrategyProto = toProto(windowingStrategy, components);
return RunnerApi.MessageWithComponents.newBuilder()
.setWindowingStrategy(windowingStrategyProto)
.setComponents(components.toComponents())
.build();
}
/**
* Converts a {@link WindowingStrategy} into a {@link RunnerApi.WindowingStrategy}, registering
* any components in the provided {@link SdkComponents}.
*/
public static RunnerApi.WindowingStrategy toProto(
WindowingStrategy<?, ?> windowingStrategy, SdkComponents components) throws IOException {
SdkFunctionSpec windowFnSpec = toProto(windowingStrategy.getWindowFn(), components);
RunnerApi.WindowingStrategy.Builder windowingStrategyProto =
RunnerApi.WindowingStrategy.newBuilder()
.setOutputTime(toProto(windowingStrategy.getTimestampCombiner()))
.setAccumulationMode(toProto(windowingStrategy.getMode()))
.setClosingBehavior(toProto(windowingStrategy.getClosingBehavior()))
.setAllowedLateness(windowingStrategy.getAllowedLateness().getMillis())
.setTrigger(TriggerTranslation.toProto(windowingStrategy.getTrigger()))
.setWindowFn(windowFnSpec)
.setAssignsToOneWindow(windowingStrategy.getWindowFn().assignsToOneWindow())
.setOnTimeBehavior(toProto(windowingStrategy.getOnTimeBehavior()))
.setWindowCoderId(
components.registerCoder(windowingStrategy.getWindowFn().windowCoder()));
return windowingStrategyProto.build();
}
/**
* Converts from a {@link RunnerApi.WindowingStrategy} accompanied by {@link Components}
* to the SDK's {@link WindowingStrategy}.
*/
public static WindowingStrategy<?, ?> fromProto(RunnerApi.MessageWithComponents proto)
throws InvalidProtocolBufferException {
switch (proto.getRootCase()) {
case WINDOWING_STRATEGY:
return fromProto(
proto.getWindowingStrategy(),
RehydratedComponents.forComponents(proto.getComponents()));
default:
throw new IllegalArgumentException(
String.format(
"Expected a %s with components but received %s",
RunnerApi.WindowingStrategy.class.getCanonicalName(), proto));
}
}
/**
* Converts from {@link RunnerApi.WindowingStrategy} to the SDK's {@link WindowingStrategy} using
* the provided components to dereferences identifiers found in the proto.
*/
public static WindowingStrategy<?, ?> fromProto(
RunnerApi.WindowingStrategy proto, RehydratedComponents components)
throws InvalidProtocolBufferException {
SdkFunctionSpec windowFnSpec = proto.getWindowFn();
WindowFn<?, ?> windowFn = windowFnFromProto(windowFnSpec);
TimestampCombiner timestampCombiner = timestampCombinerFromProto(proto.getOutputTime());
AccumulationMode accumulationMode = fromProto(proto.getAccumulationMode());
Trigger trigger = TriggerTranslation.fromProto(proto.getTrigger());
ClosingBehavior closingBehavior = fromProto(proto.getClosingBehavior());
Duration allowedLateness = Duration.millis(proto.getAllowedLateness());
OnTimeBehavior onTimeBehavior = fromProto(proto.getOnTimeBehavior());
return WindowingStrategy.of(windowFn)
.withAllowedLateness(allowedLateness)
.withMode(accumulationMode)
.withTrigger(trigger)
.withTimestampCombiner(timestampCombiner)
.withClosingBehavior(closingBehavior)
.withOnTimeBehavior(onTimeBehavior);
}
public static WindowFn<?, ?> windowFnFromProto(SdkFunctionSpec windowFnSpec) {
try {
switch (windowFnSpec.getSpec().getUrn()) {
case GLOBAL_WINDOWS_FN:
return new GlobalWindows();
case FIXED_WINDOWS_FN:
StandardWindowFns.FixedWindowsPayload fixedParams = null;
fixedParams =
StandardWindowFns.FixedWindowsPayload.parseFrom(
windowFnSpec.getSpec().getPayload());
return FixedWindows.of(Duration.millis(Durations.toMillis(fixedParams.getSize())))
.withOffset(Duration.millis(Timestamps.toMillis(fixedParams.getOffset())));
case SLIDING_WINDOWS_FN:
StandardWindowFns.SlidingWindowsPayload slidingParams =
StandardWindowFns.SlidingWindowsPayload.parseFrom(
windowFnSpec.getSpec().getPayload());
return SlidingWindows.of(Duration.millis(Durations.toMillis(slidingParams.getSize())))
.every(Duration.millis(Durations.toMillis(slidingParams.getPeriod())))
.withOffset(Duration.millis(Timestamps.toMillis(slidingParams.getOffset())));
case SESSION_WINDOWS_FN:
StandardWindowFns.SessionsPayload sessionParams =
StandardWindowFns.SessionsPayload.parseFrom(windowFnSpec.getSpec().getPayload());
return Sessions.withGapDuration(
Duration.millis(Durations.toMillis(sessionParams.getGapSize())));
case SERIALIZED_JAVA_WINDOWFN_URN:
case OLD_SERIALIZED_JAVA_WINDOWFN_URN:
return (WindowFn<?, ?>)
SerializableUtils.deserializeFromByteArray(
windowFnSpec.getSpec().getPayload().toByteArray(), "WindowFn");
default:
throw new IllegalArgumentException(
"Unknown or unsupported WindowFn: " + windowFnSpec.getSpec().getUrn());
}
} catch (InvalidProtocolBufferException e) {
throw new IllegalArgumentException(
String.format(
"%s for %s with URN %s did not contain expected proto message for payload",
FunctionSpec.class.getSimpleName(),
WindowFn.class.getSimpleName(),
windowFnSpec.getSpec().getUrn()),
e);
}
}
}