| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.beam.runners.core.construction; |
| |
| import static org.hamcrest.Matchers.equalTo; |
| import static org.junit.Assert.assertThat; |
| |
| import java.util.Objects; |
| import javax.annotation.Nullable; |
| import org.apache.beam.model.pipeline.v1.RunnerApi; |
| import org.apache.beam.sdk.io.DynamicFileDestinations; |
| import org.apache.beam.sdk.io.FileBasedSink; |
| import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy; |
| import org.apache.beam.sdk.io.FileBasedSink.OutputFileHints; |
| import org.apache.beam.sdk.io.FileSystems; |
| import org.apache.beam.sdk.io.WriteFiles; |
| import org.apache.beam.sdk.io.WriteFilesResult; |
| import org.apache.beam.sdk.io.fs.ResourceId; |
| import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; |
| import org.apache.beam.sdk.runners.AppliedPTransform; |
| import org.apache.beam.sdk.testing.TestPipeline; |
| import org.apache.beam.sdk.transforms.Create; |
| import org.apache.beam.sdk.transforms.SerializableFunctions; |
| import org.apache.beam.sdk.transforms.windowing.BoundedWindow; |
| import org.apache.beam.sdk.transforms.windowing.PaneInfo; |
| import org.apache.beam.sdk.values.PCollection; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; |
| import org.junit.Test; |
| import org.junit.runner.RunWith; |
| import org.junit.runners.Parameterized; |
| import org.junit.runners.Parameterized.Parameter; |
| import org.junit.runners.Parameterized.Parameters; |
| |
| /** Tests for {@link WriteFilesTranslation}. */ |
| @RunWith(Parameterized.class) |
| public class WriteFilesTranslationTest { |
| @Parameters(name = "{index}: {0}") |
| public static Iterable<WriteFiles<Object, Void, Object>> data() { |
| return ImmutableList.of( |
| WriteFiles.to(new DummySink()), |
| WriteFiles.to(new DummySink()).withWindowedWrites(), |
| WriteFiles.to(new DummySink()).withNumShards(17), |
| WriteFiles.to(new DummySink()).withWindowedWrites().withNumShards(42)); |
| } |
| |
| @Parameter(0) |
| public WriteFiles<String, Void, String> writeFiles; |
| |
| public static TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false); |
| |
| @Test |
| public void testEncodedProto() throws Exception { |
| SdkComponents components = SdkComponents.create(); |
| components.registerEnvironment(Environments.createDockerEnvironment("java")); |
| RunnerApi.WriteFilesPayload payload = |
| WriteFilesTranslation.payloadForWriteFiles(writeFiles, components); |
| |
| assertThat( |
| payload.getRunnerDeterminedSharding(), |
| equalTo( |
| writeFiles.getNumShardsProvider() == null && writeFiles.getComputeNumShards() == null)); |
| |
| assertThat(payload.getWindowedWrites(), equalTo(writeFiles.getWindowedWrites())); |
| |
| assertThat( |
| (FileBasedSink<String, Void, String>) |
| WriteFilesTranslation.sinkFromProto(payload.getSink()), |
| equalTo(writeFiles.getSink())); |
| } |
| |
| @Test |
| public void testExtractionDirectFromTransform() throws Exception { |
| PCollection<String> input = p.apply(Create.of("hello")); |
| WriteFilesResult<Void> output = input.apply(writeFiles); |
| |
| AppliedPTransform<PCollection<String>, WriteFilesResult<Void>, WriteFiles<String, Void, String>> |
| appliedPTransform = |
| AppliedPTransform.of("foo", input.expand(), output.expand(), writeFiles, p); |
| |
| assertThat( |
| WriteFilesTranslation.isRunnerDeterminedSharding(appliedPTransform), |
| equalTo( |
| writeFiles.getNumShardsProvider() == null && writeFiles.getComputeNumShards() == null)); |
| |
| assertThat( |
| WriteFilesTranslation.isWindowedWrites(appliedPTransform), |
| equalTo(writeFiles.getWindowedWrites())); |
| assertThat( |
| WriteFilesTranslation.<String, Void, String>getSink(appliedPTransform), |
| equalTo(writeFiles.getSink())); |
| } |
| |
| /** |
| * A simple {@link FileBasedSink} for testing serialization/deserialization. Not mocked to avoid |
| * any issues serializing mocks. |
| */ |
| private static class DummySink extends FileBasedSink<Object, Void, Object> { |
| |
| DummySink() { |
| super( |
| StaticValueProvider.of(FileSystems.matchNewResource("nowhere", false)), |
| DynamicFileDestinations.constant( |
| new DummyFilenamePolicy(), SerializableFunctions.constant(null))); |
| } |
| |
| @Override |
| public WriteOperation<Void, Object> createWriteOperation() { |
| return new DummyWriteOperation(this); |
| } |
| |
| @Override |
| public boolean equals(Object other) { |
| if (!(other instanceof DummySink)) { |
| return false; |
| } |
| |
| DummySink that = (DummySink) other; |
| |
| return getTempDirectoryProvider().isAccessible() |
| && that.getTempDirectoryProvider().isAccessible() |
| && getTempDirectoryProvider().get().equals(that.getTempDirectoryProvider().get()); |
| } |
| |
| @Override |
| public int hashCode() { |
| return Objects.hash( |
| DummySink.class, |
| getTempDirectoryProvider().isAccessible() ? getTempDirectoryProvider().get() : null); |
| } |
| } |
| |
| private static class DummyWriteOperation extends FileBasedSink.WriteOperation<Void, Object> { |
| public DummyWriteOperation(FileBasedSink<Object, Void, Object> sink) { |
| super(sink); |
| } |
| |
| @Override |
| public FileBasedSink.Writer<Void, Object> createWriter() throws Exception { |
| throw new UnsupportedOperationException("Should never be called."); |
| } |
| } |
| |
| private static class DummyFilenamePolicy extends FilenamePolicy { |
| @Override |
| public ResourceId windowedFilename( |
| int shardNumber, |
| int numShards, |
| BoundedWindow window, |
| PaneInfo paneInfo, |
| OutputFileHints outputFileHints) { |
| throw new UnsupportedOperationException("Should never be called."); |
| } |
| |
| @Nullable |
| @Override |
| public ResourceId unwindowedFilename( |
| int shardNumber, int numShards, OutputFileHints outputFileHints) { |
| throw new UnsupportedOperationException("Should never be called."); |
| } |
| |
| @Override |
| public boolean equals(Object other) { |
| return other instanceof DummyFilenamePolicy; |
| } |
| |
| @Override |
| public int hashCode() { |
| return DummyFilenamePolicy.class.hashCode(); |
| } |
| } |
| } |