blob: a351c85f74189ee39fbcf8c1d922c12be8a58e39 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hyracks.examples.tpch.client;
import static org.apache.hyracks.examples.tpch.client.Common.createPartitionConstraint;
import static org.apache.hyracks.examples.tpch.client.Common.custParserFactories;
import static org.apache.hyracks.examples.tpch.client.Common.orderParserFactories;
import static org.apache.hyracks.examples.tpch.client.Common.parseFileSplits;
import java.util.EnumSet;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.constraints.PartitionConstraintHelper;
import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileSplit;
import org.apache.hyracks.api.job.JobFlag;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
import org.apache.hyracks.data.std.accessors.UTF8StringBinaryComparatorFactory;
import org.apache.hyracks.data.std.accessors.UTF8StringBinaryHashFunctionFamily;
import org.apache.hyracks.data.std.primitive.UTF8StringPointable;
import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
import org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
import org.apache.hyracks.dataflow.common.data.normalizers.UTF8StringNormalizedKeyComputerFactory;
import org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
import org.apache.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
import org.apache.hyracks.dataflow.std.file.ConstantFileSplitProvider;
import org.apache.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
import org.apache.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
import org.apache.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor;
import org.apache.hyracks.dataflow.std.group.HashSpillableTableFactory;
import org.apache.hyracks.dataflow.std.group.IFieldAggregateDescriptorFactory;
import org.apache.hyracks.dataflow.std.group.aggregators.FloatSumFieldAggregatorFactory;
import org.apache.hyracks.dataflow.std.group.aggregators.IntSumFieldAggregatorFactory;
import org.apache.hyracks.dataflow.std.group.aggregators.MultiFieldsAggregatorFactory;
import org.apache.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescriptor;
import org.apache.hyracks.dataflow.std.join.InMemoryHashJoinOperatorDescriptor;
import org.apache.hyracks.dataflow.std.join.JoinComparatorFactory;
import org.apache.hyracks.dataflow.std.join.NestedLoopJoinOperatorDescriptor;
import org.apache.hyracks.dataflow.std.join.OptimizedHybridHashJoinOperatorDescriptor;
import org.apache.hyracks.ipc.impl.HyracksConnection;
import org.kohsuke.args4j.CmdLineParser;
import org.kohsuke.args4j.Option;
public class Join {
private static class Options {
@Option(name = "-host", usage = "Hyracks Cluster Controller Host name", required = true)
public String host;
@Option(name = "-port", usage = "Hyracks Cluster Controller Port (default: 1098)", required = false)
public int port = 1098;
@Option(name = "-infile-customer-splits", usage = "Comma separated list of file-splits for the CUSTOMER input. A file-split is <node-name>:<path>", required = true)
public String inFileCustomerSplits;
@Option(name = "-infile-order-splits", usage = "Comma separated list of file-splits for the ORDER input. A file-split is <node-name>:<path>", required = true)
public String inFileOrderSplits;
@Option(name = "-outfile-splits", usage = "Comma separated list of file-splits for the output", required = true)
public String outFileSplits;
@Option(name = "-num-join-partitions", usage = "Number of Join partitions to use (default: 1)", required = false)
public int numJoinPartitions = 1;
@Option(name = "-profile", usage = "Enable/Disable profiling. (default: enabled)")
public boolean profile = true;
@Option(name = "-table-size", usage = "Table size for in-memory hash join. (default: 8191)", required = false)
public int tableSize = 8191;
@Option(name = "-algo", usage = "Join types:InMem|NestedLoop|Hybrid|Grace", required = true)
public String algo;
// For grace/hybrid hash join only
@Option(name = "-mem-size", usage = "Memory size for hash join", required = true)
public int memSize;
@Option(name = "-input-size", usage = "Input size of the hybrid hash join", required = false)
public int graceInputSize = 100000;
@Option(name = "-records-per-frame", usage = "Records per frame for hybrid hash join", required = false)
public int graceRecordsPerFrame = 200;
@Option(name = "-grace-factor", usage = "Factor of the grace/hybrid hash join, (default:1.2)", required = false)
public double graceFactor = 1.2;
// Whether group-by is processed after the join
@Option(name = "-has-groupby", usage = "Whether to have group-by operation after join (default: disabled)", required = false)
public boolean hasGroupBy = false;
@Option(name = "-frame-size", usage = "Hyracks frame size (default: 32768)", required = false)
public int frameSize = 32768;
}
public static void main(String[] args) throws Exception {
Options options = new Options();
CmdLineParser parser = new CmdLineParser(options);
if (args.length == 0) {
parser.printUsage(System.err);
return;
}
parser.parseArgument(args);
IHyracksClientConnection hcc = new HyracksConnection(options.host, options.port);
JobSpecification job = createJob(parseFileSplits(options.inFileCustomerSplits),
parseFileSplits(options.inFileOrderSplits), parseFileSplits(options.outFileSplits),
options.numJoinPartitions, options.algo, options.graceInputSize, options.graceRecordsPerFrame,
options.graceFactor, options.memSize, options.tableSize, options.hasGroupBy, options.frameSize);
if (job == null) {
return;
}
long start = System.currentTimeMillis();
JobId jobId = hcc.startJob(job,
options.profile ? EnumSet.of(JobFlag.PROFILE_RUNTIME) : EnumSet.noneOf(JobFlag.class));
hcc.waitForCompletion(jobId);
long end = System.currentTimeMillis();
System.err.println(start + " " + end + " " + (end - start));
}
private static JobSpecification createJob(FileSplit[] customerSplits, FileSplit[] orderSplits,
FileSplit[] resultSplits, int numJoinPartitions, String algo, int graceInputSize, int graceRecordsPerFrame,
double graceFactor, int memSize, int tableSize, boolean hasGroupBy, int frameSize)
throws HyracksDataException {
JobSpecification spec = new JobSpecification(frameSize);
IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(customerSplits);
long custFileSize = 0;
for (int i = 0; i < customerSplits.length; i++) {
custFileSize += customerSplits[i].getFile(null).length();
}
IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(orderSplits);
long orderFileSize = 0;
for (int i = 0; i < orderSplits.length; i++) {
orderFileSize += orderSplits[i].getFile(null).length();
}
FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
new DelimitedDataTupleParserFactory(orderParserFactories, '|'), Common.ordersDesc);
createPartitionConstraint(spec, ordScanner, orderSplits);
FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
new DelimitedDataTupleParserFactory(custParserFactories, '|'), Common.custDesc);
createPartitionConstraint(spec, custScanner, customerSplits);
IOperatorDescriptor join;
if ("nestedloop".equalsIgnoreCase(algo)) {
join = new NestedLoopJoinOperatorDescriptor(spec,
new JoinComparatorFactory(UTF8StringBinaryComparatorFactory.INSTANCE, 0, 1),
Common.custOrderJoinDesc, memSize, false, null);
} else if ("inmem".equalsIgnoreCase(algo)) {
join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] { 0 }, new int[] { 1 },
new IBinaryHashFunctionFactory[] {
PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
new IBinaryHashFunctionFactory[] {
PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
new JoinComparatorFactory(UTF8StringBinaryComparatorFactory.INSTANCE, 0, 1),
Common.custOrderJoinDesc, tableSize, memSize * frameSize);
} else if ("hybrid".equalsIgnoreCase(algo)) {
join = new OptimizedHybridHashJoinOperatorDescriptor(spec, memSize, graceInputSize, graceFactor,
new int[] { 0 }, new int[] { 1 },
new IBinaryHashFunctionFamily[] { UTF8StringBinaryHashFunctionFamily.INSTANCE },
new IBinaryHashFunctionFamily[] { UTF8StringBinaryHashFunctionFamily.INSTANCE },
Common.custOrderJoinDesc,
new JoinComparatorFactory(UTF8StringBinaryComparatorFactory.INSTANCE, 0, 1),
new JoinComparatorFactory(UTF8StringBinaryComparatorFactory.INSTANCE, 1, 0), null, null);
} else {
System.err.println("unknown algorithm:" + algo);
return null;
}
PartitionConstraintHelper.addPartitionCountConstraint(spec, join, numJoinPartitions);
IConnectorDescriptor ordJoinConn = new MToNPartitioningConnectorDescriptor(spec,
new FieldHashPartitionComputerFactory(new int[] { 1 }, new IBinaryHashFunctionFactory[] {
PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }));
spec.connect(ordJoinConn, ordScanner, 0, join, 1);
IConnectorDescriptor custJoinConn = new MToNPartitioningConnectorDescriptor(spec,
new FieldHashPartitionComputerFactory(new int[] { 0 }, new IBinaryHashFunctionFactory[] {
PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }));
spec.connect(custJoinConn, custScanner, 0, join, 0);
IOperatorDescriptor endingOp = join;
if (hasGroupBy) {
RecordDescriptor groupResultDesc = new RecordDescriptor(new ISerializerDeserializer[] {
new UTF8StringSerializerDeserializer(), IntegerSerializerDeserializer.INSTANCE });
ExternalGroupOperatorDescriptor gby = new ExternalGroupOperatorDescriptor(spec, tableSize,
custFileSize + orderFileSize, new int[] { 6 }, null, memSize,
new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
new UTF8StringNormalizedKeyComputerFactory(),
new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
new IntSumFieldAggregatorFactory(1, false), new IntSumFieldAggregatorFactory(3, false),
new FloatSumFieldAggregatorFactory(5, false) }),
new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
new IntSumFieldAggregatorFactory(1, false), new IntSumFieldAggregatorFactory(2, false),
new FloatSumFieldAggregatorFactory(3, false) }),
groupResultDesc, groupResultDesc, new HashSpillableTableFactory(
new IBinaryHashFunctionFamily[] { UTF8StringBinaryHashFunctionFamily.INSTANCE }));
createPartitionConstraint(spec, gby, resultSplits);
IConnectorDescriptor joinGroupConn = new MToNPartitioningConnectorDescriptor(spec,
new FieldHashPartitionComputerFactory(new int[] { 6 }, new IBinaryHashFunctionFactory[] {
PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }));
spec.connect(joinGroupConn, join, 0, gby, 0);
endingOp = gby;
}
IFileSplitProvider outSplitProvider = new ConstantFileSplitProvider(resultSplits);
//FrameFileWriterOperatorDescriptor writer = new FrameFileWriterOperatorDescriptor(spec, outSplitProvider);
IOperatorDescriptor writer = new PlainFileWriterOperatorDescriptor(spec, outSplitProvider, "|");
createPartitionConstraint(spec, writer, resultSplits);
IConnectorDescriptor endingPrinterConn = new OneToOneConnectorDescriptor(spec);
spec.connect(endingPrinterConn, endingOp, 0, writer, 0);
spec.addRoot(writer);
return spec;
}
}