blob: e66529ce9100ead13d334b8516938b3f470040c4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hyracks.examples.tpch.client;
import static org.apache.hyracks.examples.tpch.client.Common.createPartitionConstraint;
import static org.apache.hyracks.examples.tpch.client.Common.lineitemDesc;
import static org.apache.hyracks.examples.tpch.client.Common.lineitemParserFactories;
import static org.apache.hyracks.examples.tpch.client.Common.parseFileSplits;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.io.FileSplit;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.data.std.accessors.IntegerBinaryComparatorFactory;
import org.apache.hyracks.data.std.accessors.MurmurHash3BinaryHashFunctionFamily;
import org.apache.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
import org.apache.hyracks.data.std.primitive.IntegerPointable;
import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
import org.apache.hyracks.dataflow.common.data.normalizers.IntegerNormalizedKeyComputerFactory;
import org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
import org.apache.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
import org.apache.hyracks.dataflow.std.file.ConstantFileSplitProvider;
import org.apache.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
import org.apache.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
import org.apache.hyracks.dataflow.std.file.FrameFileWriterOperatorDescriptor;
import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
import org.apache.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor;
import org.apache.hyracks.dataflow.std.group.HashSpillableTableFactory;
import org.apache.hyracks.dataflow.std.group.IFieldAggregateDescriptorFactory;
import org.apache.hyracks.dataflow.std.group.aggregators.CountFieldAggregatorFactory;
import org.apache.hyracks.dataflow.std.group.aggregators.IntSumFieldAggregatorFactory;
import org.apache.hyracks.dataflow.std.group.aggregators.MultiFieldsAggregatorFactory;
import org.apache.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescriptor;
import org.apache.hyracks.dataflow.std.group.sort.SortGroupByOperatorDescriptor;
import org.apache.hyracks.ipc.impl.HyracksConnection;
import org.kohsuke.args4j.CmdLineParser;
import org.kohsuke.args4j.Option;
/**
* The application client for the performance tests of the groupby
* operator.
*/
public class Groupby {
private static class Options {
@Option(name = "-host", usage = "Hyracks Cluster Controller Host name", required = true)
public String host;
@Option(name = "-port", usage = "Hyracks Cluster Controller Port (default: 1098)")
public int port = 1098;
@Option(name = "-infile-splits", usage = "Comma separated list of file-splits for the input. A file-split is <node-name>:<path>", required = true)
public String inFileSplits;
@Option(name = "-outfile-splits", usage = "Comma separated list of file-splits for the output", required = true)
public String outFileSplits;
@Option(name = "-input-tuples", usage = "Hash table size ", required = true)
public int htSize;
@Option(name = "-input-size", usage = "Physical file size in bytes ", required = true)
public long fileSize;
@Option(name = "-frame-size", usage = "Frame size (default: 32768)", required = false)
public int frameSize = 32768;
@Option(name = "-frame-limit", usage = "memory limit for sorting (default: 4)", required = false)
public int frameLimit = 4;
@Option(name = "-out-plain", usage = "Whether to output plain text (default: true)", required = false)
public boolean outPlain = true;
@Option(name = "-algo", usage = "The algorithm to be used: hash|sort", required = true)
public String algo = "hash";
}
public static void main(String[] args) throws Exception {
Options options = new Options();
CmdLineParser parser = new CmdLineParser(options);
if (args.length == 0) {
parser.printUsage(System.err);
return;
}
parser.parseArgument(args);
IHyracksClientConnection hcc = new HyracksConnection(options.host, options.port);
JobSpecification job;
long start = System.currentTimeMillis();
job = createJob(parseFileSplits(options.inFileSplits), parseFileSplits(options.outFileSplits), options.htSize,
options.fileSize, options.frameLimit, options.frameSize, options.algo, options.outPlain);
if (job != null) {
System.out.print("CreateJobTime:" + (System.currentTimeMillis() - start));
start = System.currentTimeMillis();
JobId jobId = hcc.startJob(job);
hcc.waitForCompletion(jobId);
System.out.println("JobExecuteTime:" + (System.currentTimeMillis() - start));
}
}
private static JobSpecification createJob(FileSplit[] inSplits, FileSplit[] outSplits, int htSize, long fileSize,
int frameLimit, int frameSize, String alg, boolean outPlain) {
JobSpecification spec = new JobSpecification(frameSize);
IFileSplitProvider splitsProvider = new ConstantFileSplitProvider(inSplits);
FileScanOperatorDescriptor fileScanner = new FileScanOperatorDescriptor(spec, splitsProvider,
new DelimitedDataTupleParserFactory(lineitemParserFactories, '|'), lineitemDesc);
createPartitionConstraint(spec, fileScanner, inSplits);
// Output: each unique string with an integer count
RecordDescriptor outDesc =
new RecordDescriptor(new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE,
// IntegerSerializerDeserializer.INSTANCE,
IntegerSerializerDeserializer.INSTANCE });
// Specify the grouping key, which will be the string extracted during
// the scan.
int[] keys = new int[] { 0,
// 1
};
AbstractOperatorDescriptor grouper;
if (alg.equalsIgnoreCase("hash")) {// external hash graph
grouper = new ExternalGroupOperatorDescriptor(spec, htSize, fileSize, keys, null, frameLimit,
new IBinaryComparatorFactory[] { IntegerBinaryComparatorFactory.INSTANCE },
new IntegerNormalizedKeyComputerFactory(),
new MultiFieldsAggregatorFactory(
new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(false) }),
new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
new IntSumFieldAggregatorFactory(keys.length, false) }),
outDesc, outDesc, new HashSpillableTableFactory(
new IBinaryHashFunctionFamily[] { MurmurHash3BinaryHashFunctionFamily.INSTANCE }));
createPartitionConstraint(spec, grouper, outSplits);
} else if (alg.equalsIgnoreCase("sort")) {
grouper = new SortGroupByOperatorDescriptor(spec, frameLimit, keys, keys,
new IntegerNormalizedKeyComputerFactory(),
new IBinaryComparatorFactory[] { IntegerBinaryComparatorFactory.INSTANCE },
new MultiFieldsAggregatorFactory(
new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }),
new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
new IntSumFieldAggregatorFactory(keys.length, true) }),
outDesc, outDesc, false);
createPartitionConstraint(spec, grouper, outSplits);
} else {
System.err.println("unknow groupby alg:" + alg);
return null;
}
// Connect scanner with the grouper
IConnectorDescriptor scanGroupConnDef2 = new MToNPartitioningConnectorDescriptor(spec,
new FieldHashPartitionComputerFactory(keys,
new IBinaryHashFunctionFactory[] {
// PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY),
PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY) }));
spec.connect(scanGroupConnDef2, fileScanner, 0, grouper, 0);
IFileSplitProvider outSplitProvider = new ConstantFileSplitProvider(outSplits);
AbstractSingleActivityOperatorDescriptor writer =
outPlain ? new PlainFileWriterOperatorDescriptor(spec, outSplitProvider, "|")
: new FrameFileWriterOperatorDescriptor(spec, outSplitProvider);
createPartitionConstraint(spec, writer, outSplits);
IConnectorDescriptor groupOutConn = new OneToOneConnectorDescriptor(spec);
spec.connect(groupOutConn, grouper, 0, writer, 0);
spec.addRoot(writer);
return spec;
}
}