blob: ec518014de30b5dfddb2e99448826cc35ce4a059 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.examples.subprocess;
import static java.nio.charset.StandardCharsets.UTF_8;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.SeekableByteChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.examples.subprocess.configuration.SubProcessConfiguration;
import org.apache.beam.examples.subprocess.kernel.SubProcessCommandLineArgs;
import org.apache.beam.examples.subprocess.kernel.SubProcessCommandLineArgs.Command;
import org.apache.beam.examples.subprocess.kernel.SubProcessKernel;
import org.apache.beam.examples.subprocess.utils.CallingSubProcessUtils;
import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
import org.apache.beam.sdk.extensions.gcp.util.GcsUtil;
import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* To keep {@link org.apache.beam.examples.subprocess.ExampleEchoPipeline} simple, it is not
* factored or testable. This test file should be maintained with a copy of its code for a basic
* smoke test.
*/
@RunWith(JUnit4.class)
public class ExampleEchoPipelineTest {
static final Logger LOG = LoggerFactory.getLogger(ExampleEchoPipelineTest.class);
@Rule public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
@Test
public void testExampleEchoPipeline() throws Exception {
// Create two Bash files as tests for the binary files
Path fileA = Files.createTempFile("test-Echo", ".sh");
Path fileB = Files.createTempFile("test-EchoAgain", ".sh");
Path workerTempFiles = Files.createTempDirectory("test-Echoo");
try (SeekableByteChannel channel =
FileChannel.open(fileA, StandardOpenOption.CREATE, StandardOpenOption.WRITE)) {
channel.write(ByteBuffer.wrap(getTestShellEcho().getBytes(UTF_8)));
}
try (SeekableByteChannel channel =
FileChannel.open(fileB, StandardOpenOption.CREATE, StandardOpenOption.WRITE)) {
channel.write(ByteBuffer.wrap(getTestShellEchoAgain().getBytes(UTF_8)));
}
// Read in the options for the pipeline
SubProcessPipelineOptions options = PipelineOptionsFactory.as(SubProcessPipelineOptions.class);
options.setConcurrency(2);
options.setSourcePath(fileA.getParent().toString());
options.setWorkerPath(workerTempFiles.toAbsolutePath().toString());
p.getOptions().as(GcsOptions.class).setGcsUtil(buildMockGcsUtil());
// Setup the Configuration option used with all transforms
SubProcessConfiguration configuration = options.getSubProcessConfiguration();
// Create some sample data to be fed to our c++ Echo library
List<KV<String, String>> sampleData = new ArrayList<>();
for (int i = 0; i < 100; i++) {
String str = String.valueOf(i);
sampleData.add(KV.of(str, str));
}
// Define the pipeline which is two transforms echoing the inputs out to Logs
// For this use case we will make use of two shell files instead of the binary to make
// testing easier
PCollection<KV<String, String>> output =
p.apply(Create.of(sampleData))
.apply(
"Echo inputs round 1",
ParDo.of(new EchoInputDoFn(configuration, fileA.getFileName().toString())))
.apply(
"Echo inputs round 2",
ParDo.of(new EchoInputDoFn(configuration, fileB.getFileName().toString())));
PAssert.that(output).containsInAnyOrder(sampleData);
p.run();
}
/** Simple DoFn that echos the element, used as an example of running a C++ library. */
@SuppressWarnings("serial")
private static class EchoInputDoFn extends DoFn<KV<String, String>, KV<String, String>> {
static final Logger LOG = LoggerFactory.getLogger(EchoInputDoFn.class);
private SubProcessConfiguration configuration;
private String binaryName;
public EchoInputDoFn(SubProcessConfiguration configuration, String binary) {
// Pass in configuration information the name of the filename of the sub-process and the level
// of concurrency
this.configuration = configuration;
this.binaryName = binary;
}
@Setup
public void setUp() throws Exception {
CallingSubProcessUtils.setUp(configuration, binaryName);
}
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
try {
// Our Library takes a single command in position 0 which it will echo back in the result
SubProcessCommandLineArgs commands = new SubProcessCommandLineArgs();
Command command = new Command(0, String.valueOf(c.element().getValue()));
commands.putCommand(command);
// The ProcessingKernel deals with the execution of the process
SubProcessKernel kernel = new SubProcessKernel(configuration, binaryName);
// Run the command and work through the results
List<String> results = kernel.exec(commands);
for (String s : results) {
c.output(KV.of(c.element().getKey(), s));
}
} catch (Exception ex) {
LOG.error("Error processing element ", ex);
throw ex;
}
}
}
private static String getTestShellEcho() {
return "#!/bin/sh\n" + "filename=$1;\n" + "echo $2 >> $filename;";
}
private static String getTestShellEchoAgain() {
return "#!/bin/sh\n" + "filename=$1;\n" + "echo $2 >> $filename;";
}
private GcsUtil buildMockGcsUtil() throws IOException {
GcsUtil mockGcsUtil = Mockito.mock(GcsUtil.class);
// Any request to open gets a new bogus channel
Mockito.when(mockGcsUtil.open(Mockito.any(GcsPath.class)))
.then(
new Answer<SeekableByteChannel>() {
@Override
public SeekableByteChannel answer(InvocationOnMock invocation) throws Throwable {
return FileChannel.open(
Files.createTempFile("channel-", ".tmp"),
StandardOpenOption.CREATE,
StandardOpenOption.DELETE_ON_CLOSE);
}
});
// Any request for expansion returns a list containing the original GcsPath
// This is required to pass validation that occurs in TextIO during apply()
Mockito.when(mockGcsUtil.expand(Mockito.any(GcsPath.class)))
.then(
new Answer<List<GcsPath>>() {
@Override
public List<GcsPath> answer(InvocationOnMock invocation) throws Throwable {
return ImmutableList.of((GcsPath) invocation.getArguments()[0]);
}
});
return mockGcsUtil;
}
}