| /* |
| * Copyright 2009-2010 by The Regents of the University of California |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * you may obtain a copy of the License from |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package edu.uci.ics.hyracks.examples.tpch.client; |
| |
| import java.io.File; |
| import java.util.EnumSet; |
| import java.util.UUID; |
| |
| import org.kohsuke.args4j.CmdLineParser; |
| import org.kohsuke.args4j.Option; |
| |
| import edu.uci.ics.hyracks.api.client.HyracksRMIConnection; |
| import edu.uci.ics.hyracks.api.client.IHyracksClientConnection; |
| import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper; |
| import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor; |
| import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor; |
| import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory; |
| import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory; |
| import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer; |
| import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor; |
| import edu.uci.ics.hyracks.api.io.FileReference; |
| import edu.uci.ics.hyracks.api.job.JobFlag; |
| import edu.uci.ics.hyracks.api.job.JobSpecification; |
| import edu.uci.ics.hyracks.dataflow.common.data.comparators.UTF8StringBinaryComparatorFactory; |
| import edu.uci.ics.hyracks.dataflow.common.data.hash.UTF8StringBinaryHashFunctionFactory; |
| import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer; |
| import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer; |
| import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory; |
| import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory; |
| import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory; |
| import edu.uci.ics.hyracks.dataflow.std.aggregators.CountAggregatorFactory; |
| import edu.uci.ics.hyracks.dataflow.std.aggregators.IFieldValueResultingAggregatorFactory; |
| import edu.uci.ics.hyracks.dataflow.std.aggregators.MultiAggregatorFactory; |
| import edu.uci.ics.hyracks.dataflow.std.connectors.MToNHashPartitioningConnectorDescriptor; |
| import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor; |
| import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider; |
| import edu.uci.ics.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory; |
| import edu.uci.ics.hyracks.dataflow.std.file.FileScanOperatorDescriptor; |
| import edu.uci.ics.hyracks.dataflow.std.file.FileSplit; |
| import edu.uci.ics.hyracks.dataflow.std.file.FrameFileWriterOperatorDescriptor; |
| import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider; |
| import edu.uci.ics.hyracks.dataflow.std.group.HashGroupOperatorDescriptor; |
| import edu.uci.ics.hyracks.dataflow.std.join.InMemoryHashJoinOperatorDescriptor; |
| |
| public class Main { |
| private static class Options { |
| @Option(name = "-host", usage = "Hyracks Cluster Controller Host name", required = true) |
| public String host; |
| |
| @Option(name = "-port", usage = "Hyracks Cluster Controller Port (default: 1099)", required = false) |
| public int port = 1099; |
| |
| @Option(name = "-app", usage = "Hyracks Application name", required = true) |
| public String app; |
| |
| @Option(name = "-infile-customer-splits", usage = "Comma separated list of file-splits for the CUSTOMER input. A file-split is <node-name>:<path>", required = true) |
| public String inFileCustomerSplits; |
| |
| @Option(name = "-infile-order-splits", usage = "Comma separated list of file-splits for the ORDER input. A file-split is <node-name>:<path>", required = true) |
| public String inFileOrderSplits; |
| |
| @Option(name = "-outfile-splits", usage = "Comma separated list of file-splits for the output", required = true) |
| public String outFileSplits; |
| |
| @Option(name = "-num-join-partitions", usage = "Number of Join partitions to use (default: 1)", required = false) |
| public int numJoinPartitions = 1; |
| |
| @Option(name = "-profile", usage = "Enable/Disable profiling. (default: enabled)") |
| public boolean profile = true; |
| } |
| |
| public static void main(String[] args) throws Exception { |
| Options options = new Options(); |
| CmdLineParser parser = new CmdLineParser(options); |
| parser.parseArgument(args); |
| |
| IHyracksClientConnection hcc = new HyracksRMIConnection(options.host, options.port); |
| |
| JobSpecification job = createJob(parseFileSplits(options.inFileCustomerSplits), |
| parseFileSplits(options.inFileOrderSplits), parseFileSplits(options.outFileSplits), |
| options.numJoinPartitions); |
| |
| long start = System.currentTimeMillis(); |
| UUID jobId = hcc.createJob(options.app, job, |
| options.profile ? EnumSet.of(JobFlag.PROFILE_RUNTIME) : EnumSet.noneOf(JobFlag.class)); |
| hcc.start(jobId); |
| hcc.waitForCompletion(jobId); |
| long end = System.currentTimeMillis(); |
| System.err.println(start + " " + end + " " + (end - start)); |
| } |
| |
| private static FileSplit[] parseFileSplits(String fileSplits) { |
| String[] splits = fileSplits.split(","); |
| FileSplit[] fSplits = new FileSplit[splits.length]; |
| for (int i = 0; i < splits.length; ++i) { |
| String s = splits[i].trim(); |
| int idx = s.indexOf(':'); |
| if (idx < 0) { |
| throw new IllegalArgumentException("File split " + s + " not well formed"); |
| } |
| fSplits[i] = new FileSplit(s.substring(0, idx), new FileReference(new File(s.substring(idx + 1)))); |
| } |
| return fSplits; |
| } |
| |
| private static JobSpecification createJob(FileSplit[] customerSplits, FileSplit[] orderSplits, |
| FileSplit[] resultSplits, int numJoinPartitions) { |
| JobSpecification spec = new JobSpecification(); |
| |
| IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(customerSplits); |
| RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] { |
| UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE, |
| UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE, |
| UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE, |
| UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE }); |
| |
| IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(orderSplits); |
| RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] { |
| UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE, |
| UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE, |
| UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE, |
| UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE, |
| UTF8StringSerializerDeserializer.INSTANCE }); |
| |
| RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] { |
| UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE, |
| UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE, |
| UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE, |
| UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE, |
| UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE, |
| UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE, |
| UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE, |
| UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE, |
| UTF8StringSerializerDeserializer.INSTANCE }); |
| |
| FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider, |
| new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE, |
| UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, |
| UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, |
| UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, |
| UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc); |
| createPartitionConstraint(spec, ordScanner, orderSplits); |
| |
| FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider, |
| new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE, |
| UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, |
| UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, |
| UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, |
| UTF8StringParserFactory.INSTANCE }, '|'), custDesc); |
| createPartitionConstraint(spec, custScanner, customerSplits); |
| |
| InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] { 0 }, |
| new int[] { 1 }, new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }, |
| new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc, |
| 6000000); |
| PartitionConstraintHelper.addPartitionCountConstraint(spec, join, numJoinPartitions); |
| |
| RecordDescriptor groupResultDesc = new RecordDescriptor(new ISerializerDeserializer[] { |
| UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE }); |
| |
| HashGroupOperatorDescriptor gby = new HashGroupOperatorDescriptor( |
| spec, |
| new int[] { 6 }, |
| new FieldHashPartitionComputerFactory(new int[] { 6 }, |
| new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }), |
| new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, |
| new MultiAggregatorFactory(new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }), |
| groupResultDesc, 16); |
| createPartitionConstraint(spec, gby, resultSplits); |
| |
| IFileSplitProvider outSplitProvider = new ConstantFileSplitProvider(resultSplits); |
| FrameFileWriterOperatorDescriptor writer = new FrameFileWriterOperatorDescriptor(spec, outSplitProvider); |
| createPartitionConstraint(spec, writer, resultSplits); |
| |
| IConnectorDescriptor ordJoinConn = new MToNHashPartitioningConnectorDescriptor(spec, |
| new FieldHashPartitionComputerFactory(new int[] { 1 }, |
| new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE })); |
| spec.connect(ordJoinConn, ordScanner, 0, join, 1); |
| |
| IConnectorDescriptor custJoinConn = new MToNHashPartitioningConnectorDescriptor(spec, |
| new FieldHashPartitionComputerFactory(new int[] { 0 }, |
| new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE })); |
| spec.connect(custJoinConn, custScanner, 0, join, 0); |
| |
| IConnectorDescriptor joinGroupConn = new MToNHashPartitioningConnectorDescriptor(spec, |
| new FieldHashPartitionComputerFactory(new int[] { 6 }, |
| new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE })); |
| spec.connect(joinGroupConn, join, 0, gby, 0); |
| |
| IConnectorDescriptor gbyPrinterConn = new OneToOneConnectorDescriptor(spec); |
| spec.connect(gbyPrinterConn, gby, 0, writer, 0); |
| |
| spec.addRoot(writer); |
| return spec; |
| } |
| |
| private static void createPartitionConstraint(JobSpecification spec, IOperatorDescriptor op, FileSplit[] splits) { |
| String[] parts = new String[splits.length]; |
| for (int i = 0; i < splits.length; ++i) { |
| parts[i] = splits[i].getNodeName(); |
| } |
| PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, op, parts); |
| } |
| } |