blob: eb30869e446a8deefa68a86165b1d02b8466f025 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.core.construction;
import static org.junit.Assert.assertThat;
import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PCollectionViews;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
import org.hamcrest.Matchers;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
/** Tests for {@link CreatePCollectionViewTranslation}. */
@RunWith(Parameterized.class)
public class CreatePCollectionViewTranslationTest {
// Two parameters suffices because the nature of the serialization/deserialization of
// the view is not what is being tested; it is just important that the round trip
// is not vacuous.
@Parameters(name = "{index}: {0}")
public static Iterable<CreatePCollectionView<?, ?>> data() {
return ImmutableList.of(
CreatePCollectionView.of(
PCollectionViews.singletonView(
testPCollection,
testPCollection.getWindowingStrategy(),
false,
null,
StringUtf8Coder.of())),
CreatePCollectionView.of(
PCollectionViews.listView(testPCollection, testPCollection.getWindowingStrategy())));
}
@Parameter(0)
public CreatePCollectionView<?, ?> createViewTransform;
public static TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
private static final PCollection<KV<Void, String>> testPCollection =
p.apply(Create.of(KV.of((Void) null, "one")));
@Test
public void testEncodedProto() throws Exception {
SdkComponents components = SdkComponents.create();
components.registerEnvironment(Environments.createDockerEnvironment("java"));
components.registerPCollection(testPCollection);
AppliedPTransform<?, ?, ?> appliedPTransform =
AppliedPTransform.of(
"foo",
testPCollection.expand(),
createViewTransform.getView().expand(),
createViewTransform,
p);
FunctionSpec payload = PTransformTranslation.toProto(appliedPTransform, components).getSpec();
// Checks that the payload is what it should be
PCollectionView<?> deserializedView =
(PCollectionView<?>)
SerializableUtils.deserializeFromByteArray(
payload.getPayload().toByteArray(), PCollectionView.class.getSimpleName());
assertThat(deserializedView, Matchers.equalTo(createViewTransform.getView()));
}
@Test
public void testExtractionDirectFromTransform() throws Exception {
SdkComponents components = SdkComponents.create();
components.registerEnvironment(Environments.createDockerEnvironment("java"));
components.registerPCollection(testPCollection);
AppliedPTransform<?, ?, ?> appliedPTransform =
AppliedPTransform.of(
"foo",
testPCollection.expand(),
createViewTransform.getView().expand(),
createViewTransform,
p);
CreatePCollectionViewTranslation.getView((AppliedPTransform) appliedPTransform);
FunctionSpec payload = PTransformTranslation.toProto(appliedPTransform, components).getSpec();
// Checks that the payload is what it should be
PCollectionView<?> deserializedView =
(PCollectionView<?>)
SerializableUtils.deserializeFromByteArray(
payload.getPayload().toByteArray(), PCollectionView.class.getSimpleName());
assertThat(deserializedView, Matchers.equalTo(createViewTransform.getView()));
}
}