blob: ac1d5cd59674a5ae335b1ee351eb05b0de22fa74 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.wayang.giraph.operators;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.stream.Stream;
import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.job.GiraphJob;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.wayang.basic.channels.FileChannel;
import org.apache.wayang.basic.data.Tuple2;
import org.apache.wayang.basic.operators.PageRankOperator;
import org.apache.wayang.core.api.Configuration;
import org.apache.wayang.core.api.exception.WayangException;
import org.apache.wayang.core.optimizer.OptimizationContext;
import org.apache.wayang.core.optimizer.costs.LoadProfileEstimators;
import org.apache.wayang.core.plan.wayangplan.Operator;
import org.apache.wayang.core.platform.ChannelDescriptor;
import org.apache.wayang.core.platform.ChannelInstance;
import org.apache.wayang.core.platform.Platform;
import org.apache.wayang.core.platform.lineage.ExecutionLineageNode;
import org.apache.wayang.core.util.Tuple;
import org.apache.wayang.core.util.fs.FileSystem;
import org.apache.wayang.core.util.fs.FileSystems;
import org.apache.wayang.giraph.Algorithm.PageRankAlgorithm;
import org.apache.wayang.giraph.Algorithm.PageRankParameters;
import org.apache.wayang.giraph.execution.GiraphExecutor;
import org.apache.wayang.giraph.platform.GiraphPlatform;
import org.apache.wayang.java.channels.StreamChannel;
/**
* PageRank {@link Operator} implementation for the {@link GiraphPlatform}.
*/
public class GiraphPageRankOperator extends PageRankOperator implements GiraphExecutionOperator {
private final Logger logger = LogManager.getLogger(this.getClass());
private String path_out;
public GiraphPageRankOperator(Integer numIterations) {
super(numIterations);
setPathOut(null, null);
}
public GiraphPageRankOperator(PageRankOperator pageRankOperator) {
super(pageRankOperator);
setPathOut(null, null);
}
@Override
public Tuple<Collection<ExecutionLineageNode>, Collection<ChannelInstance>> execute(
ChannelInstance[] inputChannelInstances,
ChannelInstance[] outputChannelInstances,
GiraphExecutor giraphExecutor,
OptimizationContext.OperatorContext operatorContext) {
assert inputChannelInstances.length == this.getNumInputs();
assert outputChannelInstances.length == this.getNumOutputs();
final FileChannel.Instance inputChannel = (FileChannel.Instance) inputChannelInstances[0];
final StreamChannel.Instance outputChanne = (StreamChannel.Instance) outputChannelInstances[0];
try {
return this.runGiraph(inputChannel, outputChanne, giraphExecutor, operatorContext);
} catch (IOException e) {
throw new WayangException(String.format("Running %s failed.", this), e);
} catch (URISyntaxException e) {
throw new WayangException(e);
} catch (InterruptedException e) {
throw new WayangException(e);
} catch (ClassNotFoundException e) {
throw new WayangException(e);
}
}
private Tuple<Collection<ExecutionLineageNode>, Collection<ChannelInstance>> runGiraph(
FileChannel.Instance inputFileChannelInstance,
StreamChannel.Instance outputChannelInstance,
GiraphExecutor giraphExecutor,
OptimizationContext.OperatorContext operatorContext)
throws IOException, URISyntaxException, ClassNotFoundException, InterruptedException {
assert inputFileChannelInstance.wasProduced();
Configuration configuration = operatorContext.getOptimizationContext().getConfiguration();
String tempDirPath = this.getPathOut(configuration);
PageRankParameters.setParameter(PageRankParameters.PageRankEnum.ITERATION, this.getNumIterations());
FileSystem fs = FileSystems.getFileSystem(tempDirPath).orElseThrow(
() -> new WayangException(String.format("Cannot access file system of %s.", tempDirPath))
);
//delete the file the output if exist
fs.delete(tempDirPath, true);
final String inputPath = inputFileChannelInstance.getSinglePath();
GiraphConfiguration conf = giraphExecutor.getGiraphConfiguration();
//vertex reader
conf.set("giraph.vertex.input.dir", inputPath);
conf.set("mapred.job.tracker", configuration.getStringProperty("wayang.giraph.job.tracker"));
conf.set("mapreduce.job.counters.limit", configuration.getStringProperty("wayang.mapreduce.job.counters.limit"));
conf.setWorkerConfiguration((int)configuration.getLongProperty("wayang.giraph.maxWorkers"),
(int)configuration.getLongProperty("wayang.giraph.minWorkers"),
100.0f);
conf.set("giraph.SplitMasterWorker", "false");
conf.set("mapreduce.output.fileoutputformat.outputdir", tempDirPath);
conf.setComputationClass(PageRankAlgorithm.class);
conf.setVertexInputFormatClass(
PageRankAlgorithm.PageRankVertexInputFormat.class);
conf.setWorkerContextClass(
PageRankAlgorithm.PageRankWorkerContext.class);
conf.setMasterComputeClass(
PageRankAlgorithm.PageRankMasterCompute.class);
conf.setNumComputeThreads((int)configuration.getLongProperty("wayang.giraph.numThread"));
conf.setVertexOutputFormatClass(PageRankAlgorithm.PageRankVertexOutputFormat.class);
GiraphJob job = new GiraphJob(conf, "wayang-giraph");
job.run(true);
final String actualInputPath = FileSystems.findActualSingleInputPath(tempDirPath);
Stream<Tuple2<Long, Float>> stream = this.createStream(actualInputPath);
outputChannelInstance.accept(stream);
final ExecutionLineageNode mainExecutionLineage = new ExecutionLineageNode(operatorContext);
mainExecutionLineage.add(LoadProfileEstimators.createFromSpecification(
"wayang.giraph.pagerank.load.main", configuration
));
mainExecutionLineage.addPredecessor(inputFileChannelInstance.getLineage());
final ExecutionLineageNode outputExecutionLineage = new ExecutionLineageNode(operatorContext);
outputExecutionLineage.add(LoadProfileEstimators.createFromSpecification(
"wayang.giraph.pagerank.load.output", configuration
));
outputChannelInstance.getLineage().addPredecessor(outputExecutionLineage);
return mainExecutionLineage.collectAndMark();
}
@Override
public Platform getPlatform() {
return GiraphPlatform.getInstance();
}
@Override
public Collection<String> getLoadProfileEstimatorConfigurationKeys() {
return Arrays.asList("wayang.giraph.pagerank.load.main", "wayang.giraph.pagerank.load.output");
}
@Override
public List<ChannelDescriptor> getSupportedInputChannels(int index) {
return Collections.singletonList(FileChannel.HDFS_TSV_DESCRIPTOR);
}
@Override
public List<ChannelDescriptor> getSupportedOutputChannels(int index) {
return Collections.singletonList(StreamChannel.DESCRIPTOR);
}
public void setPathOut(String path, Configuration configuration){
if(path == null && configuration != null) {
path = configuration.getStringProperty("wayang.giraph.hdfs.tempdir");
}
this.path_out = path;
}
public String getPathOut(Configuration configuration){
if(this.path_out == null){
setPathOut(null, configuration);
}
return this.path_out;
}
private Stream<Tuple2<Long, Float>> createStream(String path) {
return org.apache.wayang.core.util.fs.FileUtils.streamLines(path).map(line -> {
String[] part = line.split("\t");
return new Tuple2<>(Long.parseLong(part[0]), Float.parseFloat(part[1]));
});
}
}