blob: 93ed2c76d1efd85d5b0bedf1b1c0b66fbe069496 [file] [log] [blame]
/*
* Copyright 2009-2010 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.uci.ics.hyracks.tests.integration;
import java.io.File;
import java.io.IOException;
import java.util.BitSet;
import org.junit.Test;
import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.dataset.ResultSetId;
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.FloatSerializerDeserializer;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
import edu.uci.ics.hyracks.dataflow.common.data.parsers.FloatParserFactory;
import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
import edu.uci.ics.hyracks.dataflow.common.data.parsers.IntegerParserFactory;
import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.connectors.GlobalHashingLocalityMap;
import edu.uci.ics.hyracks.dataflow.std.connectors.HashtableLocalityMap;
import edu.uci.ics.hyracks.dataflow.std.connectors.LocalityAwareMToNPartitioningConnectorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
import edu.uci.ics.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
import edu.uci.ics.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptorFactory;
import edu.uci.ics.hyracks.dataflow.std.group.aggregators.FloatSumFieldAggregatorFactory;
import edu.uci.ics.hyracks.dataflow.std.group.aggregators.IntSumFieldAggregatorFactory;
import edu.uci.ics.hyracks.dataflow.std.group.aggregators.MultiFieldsAggregatorFactory;
import edu.uci.ics.hyracks.dataflow.std.group.hash.HashGroupOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.result.ResultWriterOperatorDescriptor;
import edu.uci.ics.hyracks.tests.util.ResultSerializerFactoryProvider;
public class LocalityAwareConnectorTest extends AbstractMultiNCIntegrationTest {
final IFileSplitProvider splitProvider = new ConstantFileSplitProvider(new FileSplit[] {
new FileSplit("asterix-001", new FileReference(new File("data/tpch0.001/lineitem.tbl"))),
new FileSplit("asterix-002", new FileReference(new File("data/tpch0.001/lineitem.tbl"))),
new FileSplit("asterix-003", new FileReference(new File("data/tpch0.001/lineitem.tbl"))),
new FileSplit("asterix-004", new FileReference(new File("data/tpch0.001/lineitem.tbl"))) });
final RecordDescriptor desc = new RecordDescriptor(new ISerializerDeserializer[] {
UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
IntegerSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,
FloatSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,
UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
final ITupleParserFactory tupleParserFactory = new DelimitedDataTupleParserFactory(new IValueParserFactory[] {
UTF8StringParserFactory.INSTANCE, IntegerParserFactory.INSTANCE, IntegerParserFactory.INSTANCE,
IntegerParserFactory.INSTANCE, IntegerParserFactory.INSTANCE, FloatParserFactory.INSTANCE,
FloatParserFactory.INSTANCE, FloatParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE, }, '|');
/**
* Test of aggregations using locality aware connector. The output two files should be the
* same, each of which is the aggregation of two copies of the lineitem.tbl.
* Note that if the hashing connector is not working correctly, the two files may be different. This
* also means that even the output files are the same, the hashing may have other problems.
*
* @throws Exception
*/
@Test
public void localityAwareAggregationTest() throws Exception {
JobSpecification spec = new JobSpecification();
FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
desc);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, "asterix-001", "asterix-002",
"asterix-003", "asterix-004");
RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
IntegerSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE });
int[] keyFields = new int[] { 0 };
int tableSize = 8;
HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(spec, keyFields,
new FieldHashPartitionComputerFactory(keyFields,
new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
.of(UTF8StringPointable.FACTORY) }),
new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
new IntSumFieldAggregatorFactory(1, true), new IntSumFieldAggregatorFactory(3, true),
new FloatSumFieldAggregatorFactory(5, true) }), outputRec, tableSize);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, "asterix-005", "asterix-006");
BitSet nodemap = new BitSet(8);
nodemap.set(0);
nodemap.set(2);
nodemap.set(5);
nodemap.set(7);
IConnectorDescriptor conn1 = new LocalityAwareMToNPartitioningConnectorDescriptor(spec,
new FieldHashPartitionComputerFactory(keyFields,
new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
.of(UTF8StringPointable.FACTORY) }), new HashtableLocalityMap(nodemap));
spec.connect(conn1, csvScanner, 0, grouper, 0);
AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "localityAwareSumInmemGroupTest");
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, "asterix-005", "asterix-006");
IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
spec.connect(conn2, grouper, 0, printer, 0);
spec.addRoot(printer);
runTest(spec);
}
/**
* Test for locality aware connector, using the global hashing node mapper. This should have
* the exactly the same result as using {@link MToNPartitioningConnectorDescriptor}.
*
* @throws Exception
*/
@Test
public void globalPartitionAggregationTest() throws Exception {
JobSpecification spec = new JobSpecification();
FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
desc);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, "asterix-001", "asterix-002",
"asterix-003", "asterix-004");
RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
IntegerSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE });
int[] keyFields = new int[] { 0 };
int tableSize = 8;
HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(spec, keyFields,
new FieldHashPartitionComputerFactory(keyFields,
new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
.of(UTF8StringPointable.FACTORY) }),
new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
new IntSumFieldAggregatorFactory(1, true), new IntSumFieldAggregatorFactory(3, true),
new FloatSumFieldAggregatorFactory(5, true) }), outputRec, tableSize);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, "asterix-005", "asterix-006");
IConnectorDescriptor conn1 = new LocalityAwareMToNPartitioningConnectorDescriptor(spec,
new FieldHashPartitionComputerFactory(keyFields,
new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
.of(UTF8StringPointable.FACTORY) }), new GlobalHashingLocalityMap());
spec.connect(conn1, csvScanner, 0, grouper, 0);
AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "localityAwareSumInmemGroupTest");
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, "asterix-005", "asterix-006");
IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
spec.connect(conn2, grouper, 0, printer, 0);
spec.addRoot(printer);
runTest(spec);
}
private AbstractSingleActivityOperatorDescriptor getPrinter(JobSpecification spec, String prefix)
throws IOException {
ResultSetId rsId = new ResultSetId(1);
AbstractSingleActivityOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true,
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
spec.addResultSetId(rsId);
return printer;
}
}