blob: a5d3ebd711878a8c7a7e33951cd4fa1420eb40bc [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.examples;
import java.io.IOException;
import java.util.Set;
import com.google.common.annotations.VisibleForTesting;
import org.apache.tez.dag.api.Vertex.VertexExecutionContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.ToolRunner;
import org.apache.tez.client.TezClient;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.Edge;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.api.client.StatusGetOpts;
import org.apache.tez.examples.HashJoinExample.ForwardingProcessor;
import org.apache.tez.mapreduce.input.MRInput;
import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.api.ProcessorContext;
import org.apache.tez.runtime.api.Reader;
import org.apache.tez.runtime.library.api.KeyValuesReader;
import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfig;
import org.apache.tez.runtime.library.partitioner.HashPartitioner;
import org.apache.tez.runtime.library.processor.SimpleProcessor;
import org.apache.tez.common.Preconditions;
import com.google.common.collect.Sets;
public class JoinValidate extends TezExampleBase {
private static final Logger LOG = LoggerFactory.getLogger(JoinValidate.class);
private static final String LHS_INPUT_NAME = "lhsfile";
private static final String RHS_INPUT_NAME = "rhsfile";
private static final String COUNTER_GROUP_NAME = "JOIN_VALIDATE";
private static final String MISSING_KEY_COUNTER_NAME = "MISSING_KEY_EXISTS";
public static void main(String[] args) throws Exception {
JoinValidate validate = new JoinValidate();
int status = ToolRunner.run(new Configuration(), validate, args);
System.exit(status);
}
@Override
protected void printUsage() {
System.err.println("Usage: " + "joinvalidate <path1> <path2>");
}
@Override
protected int runJob(String[] args, TezConfiguration tezConf,
TezClient tezClient) throws Exception {
LOG.info("Running JoinValidate");
String lhsDir = args[0];
String rhsDir = args[1];
int numPartitions = 1;
if (args.length == 3) {
numPartitions = Integer.parseInt(args[2]);
}
if (numPartitions <= 0) {
System.err.println("NumPartitions must be > 0");
return 4;
}
Path lhsPath = new Path(lhsDir);
Path rhsPath = new Path(rhsDir);
DAG dag = createDag(tezConf, lhsPath, rhsPath, numPartitions);
tezClient.waitTillReady();
DAGClient dagClient = tezClient.submitDAG(dag);
Set<StatusGetOpts> getOpts = Sets.newHashSet();
if (isCountersLog()) {
getOpts.add(StatusGetOpts.GET_COUNTERS);
}
DAGStatus dagStatus = dagClient.waitForCompletionWithStatusUpdates(getOpts);
if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) {
LOG.info("DAG diagnostics: " + dagStatus.getDiagnostics());
return -1;
} else {
dagStatus = dagClient.getDAGStatus(Sets.newHashSet(StatusGetOpts.GET_COUNTERS));
TezCounter counter = dagStatus.getDAGCounters().findCounter(COUNTER_GROUP_NAME,
MISSING_KEY_COUNTER_NAME);
if (counter == null) {
LOG.info("Unable to determing equality");
return -2;
} else {
if (counter.getValue() != 0) {
LOG.info("Validate failed. The two sides are not equivalent");
return -3;
} else {
LOG.info("Validation successful. The two sides are equivalent");
return 0;
}
}
}
}
@Override
protected int validateArgs(String[] otherArgs) {
if (otherArgs.length != 3 && otherArgs.length != 2) {
return 2;
}
return 0;
}
@VisibleForTesting
DAG createDag(TezConfiguration tezConf, Path lhs, Path rhs, int numPartitions)
throws IOException {
DAG dag = DAG.create(getDagName());
if (getDefaultExecutionContext() != null) {
dag.setExecutionContext(getDefaultExecutionContext());
}
// Configuration for intermediate output - shared by Vertex1 and Vertex2
// This should only be setting selective keys from the underlying conf. Fix after there's a
// better mechanism to configure the IOs. The setFromConfiguration call is optional and allows
// overriding the config options with command line parameters.
OrderedPartitionedKVEdgeConfig edgeConf = OrderedPartitionedKVEdgeConfig
.newBuilder(Text.class.getName(), NullWritable.class.getName(),
HashPartitioner.class.getName())
.setFromConfiguration(tezConf)
.build();
Vertex lhsVertex = Vertex.create(LHS_INPUT_NAME, ProcessorDescriptor.create(
ForwardingProcessor.class.getName())).addDataSource("lhs",
MRInput
.createConfigBuilder(new Configuration(tezConf), TextInputFormat.class,
lhs.toUri().toString()).groupSplits(!isDisableSplitGrouping())
.generateSplitsInAM(!isGenerateSplitInClient()).build());
setVertexExecutionContext(lhsVertex, getLhsExecutionContext());
Vertex rhsVertex = Vertex.create(RHS_INPUT_NAME, ProcessorDescriptor.create(
ForwardingProcessor.class.getName())).addDataSource("rhs",
MRInput
.createConfigBuilder(new Configuration(tezConf), TextInputFormat.class,
rhs.toUri().toString()).groupSplits(!isDisableSplitGrouping())
.generateSplitsInAM(!isGenerateSplitInClient()).build());
setVertexExecutionContext(rhsVertex, getRhsExecutionContext());
Vertex joinValidateVertex = Vertex.create("joinvalidate", ProcessorDescriptor.create(
JoinValidateProcessor.class.getName()), numPartitions);
setVertexExecutionContext(joinValidateVertex, getValidateExecutionContext());
Edge e1 = Edge.create(lhsVertex, joinValidateVertex, edgeConf.createDefaultEdgeProperty());
Edge e2 = Edge.create(rhsVertex, joinValidateVertex, edgeConf.createDefaultEdgeProperty());
dag.addVertex(lhsVertex).addVertex(rhsVertex).addVertex(joinValidateVertex).addEdge(e1)
.addEdge(e2);
return dag;
}
private void setVertexExecutionContext(Vertex vertex, VertexExecutionContext executionContext) {
if (executionContext != null) {
vertex.setExecutionContext(executionContext);
}
}
// This is for internal use only, to use this example for external service testing.
// Not meant as documentation for the example.
protected VertexExecutionContext getDefaultExecutionContext() {
return null;
}
// This is for internal use only, to use this example for external service testing.
// Not meant as documentation for the example.
protected VertexExecutionContext getLhsExecutionContext() {
return null;
}
// This is for internal use only, to use this example for external service testing.
// Not meant as documentation for the example.
protected VertexExecutionContext getRhsExecutionContext() {
return null;
}
// This is for internal use only, to use this example for external service testing.
// Not meant as documentation for the example.
protected VertexExecutionContext getValidateExecutionContext() {
return null;
}
protected String getDagName() {
return "JoinValidate";
}
public static class JoinValidateProcessor extends SimpleProcessor {
private static final Logger LOG = LoggerFactory.getLogger(JoinValidateProcessor.class);
public JoinValidateProcessor(ProcessorContext context) {
super(context);
}
@Override
public void run() throws Exception {
Preconditions.checkState(getInputs().size() == 2);
Preconditions.checkState(getOutputs().size() == 0);
LogicalInput lhsInput = getInputs().get(LHS_INPUT_NAME);
LogicalInput rhsInput = getInputs().get(RHS_INPUT_NAME);
Reader lhsReaderRaw = lhsInput.getReader();
Reader rhsReaderRaw = rhsInput.getReader();
Preconditions.checkState(lhsReaderRaw instanceof KeyValuesReader);
Preconditions.checkState(rhsReaderRaw instanceof KeyValuesReader);
KeyValuesReader lhsReader = (KeyValuesReader) lhsReaderRaw;
KeyValuesReader rhsReader = (KeyValuesReader) rhsReaderRaw;
boolean rhsReaderEnd = false;
TezCounter lhsMissingKeyCounter = getContext().getCounters().findCounter(COUNTER_GROUP_NAME,
MISSING_KEY_COUNTER_NAME);
while (lhsReader.next()) {
if (rhsReader.next()) {
if (!lhsReader.getCurrentKey().equals(rhsReader.getCurrentKey())) {
LOG.info("MismatchedKeys: " + "lhs=" + lhsReader.getCurrentKey() + ", rhs=" + rhsReader.getCurrentKey());
lhsMissingKeyCounter.increment(1);
}
} else {
lhsMissingKeyCounter.increment(1);
LOG.info("ExtraKey in lhs: " + lhsReader.getClass());
rhsReaderEnd = true;
break;
}
}
if (!rhsReaderEnd && rhsReader.next()) {
lhsMissingKeyCounter.increment(1);
LOG.info("ExtraKey in rhs: " + rhsReader.getClass());
}
}
}
}