| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.hyracks.examples.tpch.client; |
| |
| import static org.apache.hyracks.examples.tpch.client.Common.createPartitionConstraint; |
| import static org.apache.hyracks.examples.tpch.client.Common.orderParserFactories; |
| import static org.apache.hyracks.examples.tpch.client.Common.ordersDesc; |
| import static org.apache.hyracks.examples.tpch.client.Common.parseFileSplits; |
| |
| import java.util.EnumSet; |
| |
| import org.apache.hyracks.api.client.HyracksConnection; |
| import org.apache.hyracks.api.client.IHyracksClientConnection; |
| import org.apache.hyracks.api.dataflow.IOperatorDescriptor; |
| import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; |
| import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory; |
| import org.apache.hyracks.api.io.FileSplit; |
| import org.apache.hyracks.api.job.JobFlag; |
| import org.apache.hyracks.api.job.JobId; |
| import org.apache.hyracks.api.job.JobSpecification; |
| import org.apache.hyracks.data.std.accessors.PointableBinaryComparatorFactory; |
| import org.apache.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory; |
| import org.apache.hyracks.data.std.primitive.UTF8StringPointable; |
| import org.apache.hyracks.dataflow.common.data.normalizers.UTF8StringNormalizedKeyComputerFactory; |
| import org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory; |
| import org.apache.hyracks.dataflow.std.buffermanager.EnumFreeSlotPolicy; |
| import org.apache.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor; |
| import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor; |
| import org.apache.hyracks.dataflow.std.file.ConstantFileSplitProvider; |
| import org.apache.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory; |
| import org.apache.hyracks.dataflow.std.file.FileScanOperatorDescriptor; |
| import org.apache.hyracks.dataflow.std.file.IFileSplitProvider; |
| import org.apache.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor; |
| import org.apache.hyracks.dataflow.std.sort.AbstractSorterOperatorDescriptor; |
| import org.apache.hyracks.dataflow.std.sort.Algorithm; |
| import org.apache.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor; |
| import org.apache.hyracks.dataflow.std.sort.TopKSorterOperatorDescriptor; |
| import org.kohsuke.args4j.CmdLineParser; |
| import org.kohsuke.args4j.Option; |
| |
| public class Sort { |
| private static class Options { |
| @Option(name = "-host", usage = "Hyracks Cluster Controller Host name", required = true) |
| public String host; |
| |
| @Option(name = "-port", usage = "Hyracks Cluster Controller Port (default: 1098)", required = false) |
| public int port = 1098; |
| |
| @Option(name = "-frame-size", usage = "Hyracks frame size (default: 32768)", required = false) |
| public int frameSize = 32768; |
| |
| @Option(name = "-frame-limit", usage = "memory limit for sorting (default: 4)", required = false) |
| public int frameLimit = 4; |
| |
| @Option(name = "-infile-splits", usage = "Comma separated list of file-splits for the ORDER input. A file-split is <node-name>:<path>", required = true) |
| public String inFileOrderSplits; |
| |
| @Option(name = "-outfile-splits", usage = "Comma separated list of file-splits for the output", required = true) |
| public String outFileSplits; |
| |
| @Option(name = "-membuffer-alg", usage = "bestfit or lastfit (default: lastfit)", required = false) |
| public String memBufferAlg = "lastfit"; |
| |
| @Option(name = "-profile", usage = "Enable/Disable profiling. (default: enabled)") |
| public boolean profile = true; |
| |
| @Option(name = "-topK", usage = "only output topK for each node. (default: not set)") |
| public int topK = Integer.MAX_VALUE; |
| |
| @Option(name = "-heapSort", usage = "using heap sort for topK result. (default: false)") |
| public boolean usingHeapSorter = false; |
| } |
| |
| static int[] SortFields = new int[] { 1, 0 }; |
| static IBinaryComparatorFactory[] SortFieldsComparatorFactories = new IBinaryComparatorFactory[] { |
| PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), |
| PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) }; |
| |
| static IBinaryHashFunctionFactory[] orderBinaryHashFunctionFactories = new IBinaryHashFunctionFactory[] { |
| PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY), |
| PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }; |
| |
| public static void main(String[] args) throws Exception { |
| Options options = new Options(); |
| CmdLineParser parser = new CmdLineParser(options); |
| if (args.length == 0) { |
| parser.printUsage(System.err); |
| return; |
| } |
| parser.parseArgument(args); |
| |
| IHyracksClientConnection hcc = new HyracksConnection(options.host, options.port); |
| |
| JobSpecification job = createJob(parseFileSplits(options.inFileOrderSplits), |
| parseFileSplits(options.outFileSplits), |
| options.memBufferAlg, options.frameLimit, options.frameSize, options.topK, options.usingHeapSorter); |
| |
| long start = System.currentTimeMillis(); |
| JobId jobId = hcc.startJob(job, |
| options.profile ? EnumSet.of(JobFlag.PROFILE_RUNTIME) : EnumSet.noneOf(JobFlag.class)); |
| hcc.waitForCompletion(jobId); |
| long end = System.currentTimeMillis(); |
| System.err.println("finished in:" + (end - start) + "ms"); |
| } |
| |
| private static JobSpecification createJob(FileSplit[] ordersSplits, FileSplit[] outputSplit, String memBufferAlg, |
| int frameLimit, int frameSize, int limit, boolean usingHeapSorter) { |
| JobSpecification spec = new JobSpecification(); |
| |
| spec.setFrameSize(frameSize); |
| IFileSplitProvider ordersSplitProvider = new ConstantFileSplitProvider(ordersSplits); |
| FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitProvider, |
| new DelimitedDataTupleParserFactory(orderParserFactories, '|'), ordersDesc); |
| createPartitionConstraint(spec, ordScanner, ordersSplits); |
| AbstractSorterOperatorDescriptor sorter; |
| if (usingHeapSorter && limit < Integer.MAX_VALUE) { |
| sorter = new TopKSorterOperatorDescriptor(spec, frameLimit, limit, SortFields, null, |
| SortFieldsComparatorFactories, ordersDesc); |
| } else { |
| if (memBufferAlg.equalsIgnoreCase("bestfit")) { |
| sorter = new ExternalSortOperatorDescriptor(spec, frameLimit, SortFields, |
| null, SortFieldsComparatorFactories, ordersDesc, Algorithm.MERGE_SORT, |
| EnumFreeSlotPolicy.SMALLEST_FIT, limit); |
| } else if (memBufferAlg.equalsIgnoreCase("biggestfit")) { |
| sorter = new ExternalSortOperatorDescriptor(spec, frameLimit, SortFields, null, |
| SortFieldsComparatorFactories, ordersDesc, Algorithm.MERGE_SORT, EnumFreeSlotPolicy.BIGGEST_FIT, |
| limit); |
| } else { |
| sorter = new ExternalSortOperatorDescriptor(spec, frameLimit, SortFields, null, |
| SortFieldsComparatorFactories, ordersDesc, Algorithm.MERGE_SORT, EnumFreeSlotPolicy.LAST_FIT, |
| limit); |
| |
| } |
| } |
| createPartitionConstraint(spec, sorter, ordersSplits); |
| IFileSplitProvider outputSplitProvider = new ConstantFileSplitProvider(outputSplit); |
| IOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(spec, outputSplitProvider, "|"); |
| createPartitionConstraint(spec, printer, outputSplit); |
| |
| spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, sorter, 0); |
| |
| spec.connect( |
| new MToNPartitioningMergingConnectorDescriptor(spec, |
| new FieldHashPartitionComputerFactory(SortFields, orderBinaryHashFunctionFactories), |
| SortFields, SortFieldsComparatorFactories, new UTF8StringNormalizedKeyComputerFactory()), |
| sorter, 0, printer, 0); |
| |
| spec.addRoot(printer); |
| return spec; |
| } |
| } |