blob: 81e5e78abb8447e30d13cc6654b9375b77b1b3dc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.twister2;
import static org.apache.beam.runners.core.construction.resources.PipelineResources.detectClassPathResourcesToStage;
import edu.iu.dsc.tws.api.JobConfig;
import edu.iu.dsc.tws.api.Twister2Job;
import edu.iu.dsc.tws.api.config.Config;
import edu.iu.dsc.tws.api.exceptions.Twister2RuntimeException;
import edu.iu.dsc.tws.api.scheduler.Twister2JobState;
import edu.iu.dsc.tws.api.tset.sets.TSet;
import edu.iu.dsc.tws.api.tset.sets.batch.BatchTSet;
import edu.iu.dsc.tws.local.LocalSubmitter;
import edu.iu.dsc.tws.rsched.core.ResourceAllocator;
import edu.iu.dsc.tws.rsched.job.Twister2Submitter;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.logging.LogManager;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
import org.apache.beam.runners.core.construction.PTransformMatchers;
import org.apache.beam.runners.core.construction.PTransformTranslation;
import org.apache.beam.runners.core.construction.SplittableParDo;
import org.apache.beam.runners.core.construction.SplittableParDoNaiveBounded;
import org.apache.beam.runners.core.construction.resources.PipelineResources;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsValidator;
import org.apache.beam.sdk.runners.PTransformOverride;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
/**
* A {@link PipelineRunner} that executes the operations in the pipeline by first translating them
* to a Twister2 Plan and then executing them either locally or on a Twister2 cluster, depending on
* the configuration.
*/
@SuppressWarnings({
"rawtypes", // TODO(https://issues.apache.org/jira/browse/BEAM-10556)
"nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
})
public class Twister2Runner extends PipelineRunner<PipelineResult> {
private static final Logger LOG = Logger.getLogger(Twister2Runner.class.getName());
private static final String SIDEINPUTS = "sideInputs";
private static final String LEAVES = "leaves";
private static final String GRAPH = "graph";
/** Provided options. */
private final Twister2PipelineOptions options;
protected Twister2Runner(Twister2PipelineOptions options) {
this.options = options;
}
public static Twister2Runner fromOptions(PipelineOptions options) {
Twister2PipelineOptions pipelineOptions =
PipelineOptionsValidator.validate(Twister2PipelineOptions.class, options);
if (pipelineOptions.getFilesToStage() == null) {
pipelineOptions.setFilesToStage(
detectClassPathResourcesToStage(Twister2Runner.class.getClassLoader(), pipelineOptions));
LOG.info(
"PipelineOptions.filesToStage was not specified. "
+ "Defaulting to files from the classpath: will stage {} files. "
+ "Enable logging at DEBUG level to see which files will be staged"
+ pipelineOptions.getFilesToStage().size());
}
return new Twister2Runner(pipelineOptions);
}
@Override
public PipelineResult run(Pipeline pipeline) {
// create a worker and pass in the pipeline and then do the translation
Twister2PipelineExecutionEnvironment env = new Twister2PipelineExecutionEnvironment(options);
LOG.info("Translating pipeline to Twister2 program.");
pipeline.replaceAll(getDefaultOverrides());
SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary(pipeline);
env.translate(pipeline);
setupSystem(options);
Map configMap = new HashMap();
JobConfig jobConfig = new JobConfig();
if (isLocalMode(options)) {
options.setParallelism(1);
configMap.put(SIDEINPUTS, extractNames(env.getSideInputs()));
configMap.put(LEAVES, extractNames(env.getLeaves()));
configMap.put(GRAPH, env.getTSetGraph());
configMap.put("twister2.network.buffer.size", 32000);
configMap.put("twister2.network.sendBuffer.count", options.getParallelism());
LOG.warning("Twister2 Local Mode currently only supports single worker");
} else {
jobConfig.put(SIDEINPUTS, extractNames(env.getSideInputs()));
jobConfig.put(LEAVES, extractNames(env.getLeaves()));
jobConfig.put(GRAPH, env.getTSetGraph());
}
Config config = ResourceAllocator.loadConfig(configMap);
int workers = options.getParallelism();
Twister2Job twister2Job =
Twister2Job.newBuilder()
.setJobName(options.getJobName())
.setWorkerClass(BeamBatchWorker.class)
.addComputeResource(options.getWorkerCPUs(), options.getRamMegaBytes(), workers)
.setConfig(jobConfig)
.build();
Twister2JobState jobState;
if (isLocalMode(options)) {
jobState = LocalSubmitter.submitJob(twister2Job, config);
} else {
jobState = Twister2Submitter.submitJob(twister2Job, config);
}
Twister2PipelineResult result = new Twister2PipelineResult(jobState);
return result;
}
/** Check if the Runner is set to use Twister local mode or pointing to a deployment. */
private boolean isLocalMode(Twister2PipelineOptions options) {
if (options.getTwister2Home() == null || "".equals(options.getTwister2Home())) {
return true;
} else {
return false;
}
}
public PipelineResult runTest(Pipeline pipeline) {
// create a worker and pass in the pipeline and then do the translation
Twister2PipelineExecutionEnvironment env = new Twister2PipelineExecutionEnvironment(options);
LOG.info("Translating pipeline to Twister2 program.");
pipeline.replaceAll(getDefaultOverrides());
SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary(pipeline);
env.translate(pipeline);
setupSystemTest(options);
Map configMap = new HashMap();
configMap.put(SIDEINPUTS, extractNames(env.getSideInputs()));
configMap.put(LEAVES, extractNames(env.getLeaves()));
configMap.put(GRAPH, env.getTSetGraph());
configMap.put("twister2.network.buffer.size", 32000);
configMap.put("twister2.network.sendBuffer.count", options.getParallelism());
Config config = ResourceAllocator.loadConfig(configMap);
JobConfig jobConfig = new JobConfig();
int workers = options.getParallelism();
Twister2Job twister2Job =
Twister2Job.newBuilder()
.setJobName(options.getJobName())
.setWorkerClass(BeamBatchWorker.class)
.addComputeResource(options.getWorkerCPUs(), options.getRamMegaBytes(), workers)
.setConfig(jobConfig)
.build();
Twister2JobState jobState = LocalSubmitter.submitJob(twister2Job, config);
Twister2PipelineResult result = new Twister2PipelineResult(jobState);
// TODO: Need to fix the check for "RUNNING" once fix for this is done on Twister2 end.
if (result.state == PipelineResult.State.FAILED) {
throw new RuntimeException("Pipeline execution failed", jobState.getCause());
}
return result;
}
private void setupSystem(Twister2PipelineOptions options) {
prepareFilesToStage(options);
zipFilesToStage(options);
System.setProperty("cluster_type", options.getClusterType());
System.setProperty("job_file", options.getJobFileZip());
System.setProperty("job_type", options.getJobType());
if (isLocalMode(options)) {
System.setProperty("twister2_home", System.getProperty("java.io.tmpdir"));
System.setProperty("config_dir", System.getProperty("java.io.tmpdir") + "/conf/");
} else {
System.setProperty("twister2_home", options.getTwister2Home());
System.setProperty("config_dir", options.getTwister2Home() + "/conf/");
// do a simple config dir validation
File cDir = new File(System.getProperty("config_dir"), options.getClusterType());
String[] filesList =
new String[] {
"core.yaml", "network.yaml", "data.yaml", "resource.yaml", "task.yaml",
};
for (String file : filesList) {
File toCheck = new File(cDir, file);
if (!toCheck.exists()) {
throw new Twister2RuntimeException(
"Couldn't find " + file + " in config directory specified.");
}
}
// setup logging
FileInputStream fis = null;
try {
fis = new FileInputStream(new File(cDir, "logger.properties"));
LogManager.getLogManager().readConfiguration(fis);
fis.close();
} catch (IOException e) {
LOG.warning("Couldn't load logging configuration");
} finally {
if (fis != null) {
try {
fis.close();
} catch (IOException e) {
LOG.info(e.getMessage());
}
}
}
}
}
private void setupSystemTest(Twister2PipelineOptions options) {
prepareFilesToStage(options);
zipFilesToStage(options);
System.setProperty("cluster_type", options.getClusterType());
System.setProperty("twister2_home", System.getProperty("java.io.tmpdir"));
System.setProperty("job_file", options.getJobFileZip());
System.setProperty("job_type", options.getJobType());
}
private Set<String> extractNames(Set<TSet> leaves) {
Set<String> results = new HashSet<>();
for (TSet leaf : leaves) {
results.add(leaf.getId());
}
return results;
}
private Map<String, String> extractNames(Map<String, BatchTSet<?>> sideInputs) {
Map<String, String> results = new LinkedHashMap<>();
for (Map.Entry<String, BatchTSet<?>> entry : sideInputs.entrySet()) {
results.put(entry.getKey(), entry.getValue().getId());
}
return results;
}
/**
* Classpath contains non jar files (eg. directories with .class files or empty directories) will
* cause exception in running log.
*/
private void prepareFilesToStage(Twister2PipelineOptions options) {
List<String> filesToStage =
options.getFilesToStage().stream()
.map(File::new)
.filter(File::exists)
.map(
file -> {
return file.getAbsolutePath();
})
.collect(Collectors.toList());
options.setFilesToStage(
PipelineResources.prepareFilesForStaging(
filesToStage,
MoreObjects.firstNonNull(
options.getTempLocation(), System.getProperty("java.io.tmpdir"))));
}
/**
* creates a single zip file from all the jar files that are listed as files to stage in options.
*
* @param options
*/
private void zipFilesToStage(Twister2PipelineOptions options) {
File zipFile = null;
Set<String> jarSet = new HashSet<>();
// TODO figure out if we can remove all the dependencies that come with
// the twister2 jars as well since they will be anyway provided from the
// system we do not need to pack them
List<String> filesToStage = options.getFilesToStage();
List<String> trimmed = new ArrayList<>();
// remove twister2 jars from the list
for (String file : filesToStage) {
if (!file.contains("/org/twister2")) {
trimmed.add(file);
}
}
FileInputStream fis = null;
try {
zipFile = File.createTempFile("twister2-", ".zip");
FileOutputStream fos = new FileOutputStream(zipFile);
ZipOutputStream zipOut = new ZipOutputStream(fos);
zipOut.putNextEntry(new ZipEntry("lib/"));
for (String srcFile : trimmed) {
File fileToZip = new File(srcFile);
if (!jarSet.contains(fileToZip.getName())) {
jarSet.add(fileToZip.getName());
} else {
continue;
}
fis = new FileInputStream(fileToZip);
ZipEntry zipEntry = new ZipEntry("lib/" + fileToZip.getName());
zipOut.putNextEntry(zipEntry);
byte[] bytes = new byte[1024];
int length;
while ((length = fis.read(bytes)) >= 0) {
zipOut.write(bytes, 0, length);
}
fis.close();
}
zipOut.close();
fos.close();
zipFile.deleteOnExit();
} catch (FileNotFoundException e) {
LOG.info(e.getMessage());
} catch (IOException e) {
LOG.info(e.getMessage());
} finally {
if (fis != null) {
try {
fis.close();
} catch (IOException e) {
LOG.info(e.getMessage());
}
}
}
if (zipFile != null) {
options.setJobFileZip(zipFile.getPath());
}
}
private static List<PTransformOverride> getDefaultOverrides() {
List<PTransformOverride> overrides =
ImmutableList.<PTransformOverride>builder()
.add(
PTransformOverride.of(
PTransformMatchers.splittableParDo(), new SplittableParDo.OverrideFactory()))
.add(
PTransformOverride.of(
PTransformMatchers.urnEqualTo(
PTransformTranslation.SPLITTABLE_PROCESS_KEYED_URN),
new SplittableParDoNaiveBounded.OverrideFactory()))
.build();
return overrides;
}
}