blob: 2db4d703397d80f72870294fe87c93343331077d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.core.construction;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
import static org.junit.Assert.assertEquals;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.beam.model.pipeline.v1.RunnerApi.WindowIntoPayload;
import org.apache.beam.sdk.Pipeline.PipelineVisitor;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.runners.TransformHierarchy.Node;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.PartitioningWindowFn;
import org.apache.beam.sdk.transforms.windowing.Sessions;
import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.Window.Assign;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
/** Tests for {@link WindowIntoTranslation}. */
@RunWith(Parameterized.class)
public class WindowIntoTranslationTest {
@Parameters(name = "{index}: {0}")
public static Iterable<WindowFn<?, ?>> data() {
// This pipeline exists for construction, not to run any test.
return ImmutableList.<WindowFn<?, ?>>builder()
.add(FixedWindows.of(Duration.standardMinutes(10L)))
.add(new GlobalWindows())
.add(Sessions.withGapDuration(Duration.standardMinutes(15L)))
.add(SlidingWindows.of(Duration.standardMinutes(5L)).every(Duration.standardMinutes(1L)))
.add(new CustomWindows())
.build();
}
@Parameter(0)
public WindowFn<?, ?> windowFn;
@Rule public TestPipeline pipeline = TestPipeline.create();
@Test
public void testToFromProto() throws InvalidProtocolBufferException {
pipeline.apply(GenerateSequence.from(0)).apply(Window.<Long>into((WindowFn) windowFn));
final AtomicReference<AppliedPTransform<?, ?, Assign<?>>> assign = new AtomicReference<>(null);
pipeline.traverseTopologically(
new PipelineVisitor.Defaults() {
@Override
public void visitPrimitiveTransform(Node node) {
if (node.getTransform() instanceof Window.Assign) {
checkState(assign.get() == null);
assign.set(
(AppliedPTransform<?, ?, Assign<?>>) node.toAppliedPTransform(getPipeline()));
}
}
});
checkState(assign.get() != null);
SdkComponents components = SdkComponents.create();
components.registerEnvironment(Environments.createDockerEnvironment("java"));
WindowIntoPayload payload =
WindowIntoTranslation.toProto(assign.get().getTransform(), components);
assertEquals(windowFn, WindowingStrategyTranslation.windowFnFromProto(payload.getWindowFn()));
}
private static class CustomWindows extends PartitioningWindowFn<String, BoundedWindow> {
@Override
public BoundedWindow assignWindow(Instant timestamp) {
return GlobalWindow.INSTANCE;
}
@Override
public boolean isCompatible(WindowFn<?, ?> other) {
return getClass().equals(other.getClass());
}
@Override
public Coder<BoundedWindow> windowCoder() {
return (Coder) GlobalWindow.Coder.INSTANCE;
}
@Override
public boolean equals(Object other) {
return other != null && other.getClass().equals(this.getClass());
}
@Override
public int hashCode() {
return getClass().hashCode();
}
}
}