blob: 9233e39ff43e732ce67285261b29a863ddde61b0 [file] [log] [blame]
/*
* Copyright 2009-2010 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.uci.ics.hyracks.tests.integration;
import java.io.File;
import org.junit.Test;
import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparator;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
import edu.uci.ics.hyracks.dataflow.std.connectors.MToNReplicatingConnectorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
import edu.uci.ics.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
import edu.uci.ics.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
import edu.uci.ics.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.join.NestedLoopJoinOperatorDescriptor;
import edu.uci.ics.hyracks.tests.util.NoopNullWriterFactory;
public class TPCHCustomerOrderNestedLoopJoinTest extends AbstractIntegrationTest {
private static class JoinComparatorFactory implements ITuplePairComparatorFactory {
private static final long serialVersionUID = 1L;
private final IBinaryComparatorFactory bFactory;
private final int pos0;
private final int pos1;
public JoinComparatorFactory(IBinaryComparatorFactory bFactory, int pos0, int pos1) {
this.bFactory = bFactory;
this.pos0 = pos0;
this.pos1 = pos1;
}
@Override
public ITuplePairComparator createTuplePairComparator(IHyracksTaskContext ctx) {
return new JoinComparator(bFactory.createBinaryComparator(), pos0, pos1);
}
}
private static class JoinComparator implements ITuplePairComparator {
private final IBinaryComparator bComparator;
private final int field0;
private final int field1;
public JoinComparator(IBinaryComparator bComparator, int field0, int field1) {
this.bComparator = bComparator;
this.field0 = field0;
this.field1 = field1;
}
@Override
public int compare(IFrameTupleAccessor accessor0, int tIndex0, IFrameTupleAccessor accessor1, int tIndex1) {
int tStart0 = accessor0.getTupleStartOffset(tIndex0);
int fStartOffset0 = accessor0.getFieldSlotsLength() + tStart0;
int tStart1 = accessor1.getTupleStartOffset(tIndex1);
int fStartOffset1 = accessor1.getFieldSlotsLength() + tStart1;
int fStart0 = accessor0.getFieldStartOffset(tIndex0, field0);
int fEnd0 = accessor0.getFieldEndOffset(tIndex0, field0);
int fLen0 = fEnd0 - fStart0;
int fStart1 = accessor1.getFieldStartOffset(tIndex1, field1);
int fEnd1 = accessor1.getFieldEndOffset(tIndex1, field1);
int fLen1 = fEnd1 - fStart1;
int c = bComparator.compare(accessor0.getBuffer().array(), fStart0 + fStartOffset0, fLen0, accessor1
.getBuffer().array(), fStart1 + fStartOffset1, fLen1);
if (c != 0) {
return c;
}
return 0;
}
}
/*
* TPCH Customer table: CREATE TABLE CUSTOMER ( C_CUSTKEY INTEGER NOT NULL,
* C_NAME VARCHAR(25) NOT NULL, C_ADDRESS VARCHAR(40) NOT NULL, C_NATIONKEY
* INTEGER NOT NULL, C_PHONE CHAR(15) NOT NULL, C_ACCTBAL DECIMAL(15,2) NOT
* NULL, C_MKTSEGMENT CHAR(10) NOT NULL, C_COMMENT VARCHAR(117) NOT NULL );
* TPCH Orders table: CREATE TABLE ORDERS ( O_ORDERKEY INTEGER NOT NULL,
* O_CUSTKEY INTEGER NOT NULL, O_ORDERSTATUS CHAR(1) NOT NULL, O_TOTALPRICE
* DECIMAL(15,2) NOT NULL, O_ORDERDATE DATE NOT NULL, O_ORDERPRIORITY
* CHAR(15) NOT NULL, O_CLERK CHAR(15) NOT NULL, O_SHIPPRIORITY INTEGER NOT
* NULL, O_COMMENT VARCHAR(79) NOT NULL );
*/
@Test
public void customerOrderCIDJoin() throws Exception {
JobSpecification spec = new JobSpecification();
FileSplit[] custSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
"data/tpch0.001/customer.tbl"))) };
IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
"data/tpch0.001/orders.tbl"))) };
IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
UTF8StringSerializerDeserializer.INSTANCE });
RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
UTF8StringSerializerDeserializer.INSTANCE });
FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID);
FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID);
NestedLoopJoinOperatorDescriptor join = new NestedLoopJoinOperatorDescriptor(spec, new JoinComparatorFactory(
PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0), custOrderJoinDesc, 4, false,
null);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
createTempFile().getAbsolutePath()) });
IOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(spec, outSplits, ",");
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec);
spec.connect(ordJoinConn, ordScanner, 0, join, 0);
IConnectorDescriptor custJoinConn = new MToNReplicatingConnectorDescriptor(spec);
spec.connect(custJoinConn, custScanner, 0, join, 1);
IConnectorDescriptor joinPrinterConn = new OneToOneConnectorDescriptor(spec);
spec.connect(joinPrinterConn, join, 0, printer, 0);
spec.addRoot(printer);
runTest(spec);
}
@Test
public void customerOrderCIDJoinMulti() throws Exception {
JobSpecification spec = new JobSpecification();
FileSplit[] custSplits = new FileSplit[] {
new FileSplit(NC1_ID, new FileReference(new File("data/tpch0.001/customer-part1.tbl"))),
new FileSplit(NC2_ID, new FileReference(new File("data/tpch0.001/customer-part2.tbl"))) };
IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
FileSplit[] ordersSplits = new FileSplit[] {
new FileSplit(NC1_ID, new FileReference(new File("data/tpch0.001/orders-part1.tbl"))),
new FileSplit(NC2_ID, new FileReference(new File("data/tpch0.001/orders-part2.tbl"))) };
IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
UTF8StringSerializerDeserializer.INSTANCE });
RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
UTF8StringSerializerDeserializer.INSTANCE });
FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID, NC2_ID);
FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID, NC2_ID);
NestedLoopJoinOperatorDescriptor join = new NestedLoopJoinOperatorDescriptor(spec, new JoinComparatorFactory(
PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0), custOrderJoinDesc, 5, false,
null);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID, NC2_ID);
IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
createTempFile().getAbsolutePath()) });
IOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(spec, outSplits, ",");
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec);
spec.connect(ordJoinConn, ordScanner, 0, join, 0);
IConnectorDescriptor custJoinConn = new MToNReplicatingConnectorDescriptor(spec);
spec.connect(custJoinConn, custScanner, 0, join, 1);
IConnectorDescriptor joinPrinterConn = new MToNReplicatingConnectorDescriptor(spec);
spec.connect(joinPrinterConn, join, 0, printer, 0);
spec.addRoot(printer);
runTest(spec);
}
@Test
public void customerOrderCIDJoinAutoExpand() throws Exception {
JobSpecification spec = new JobSpecification();
FileSplit[] custSplits = new FileSplit[] {
new FileSplit(NC1_ID, new FileReference(new File("data/tpch0.001/customer-part1.tbl"))),
new FileSplit(NC2_ID, new FileReference(new File("data/tpch0.001/customer-part2.tbl"))) };
IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
FileSplit[] ordersSplits = new FileSplit[] {
new FileSplit(NC1_ID, new FileReference(new File("data/tpch0.001/orders-part1.tbl"))),
new FileSplit(NC2_ID, new FileReference(new File("data/tpch0.001/orders-part2.tbl"))) };
IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
UTF8StringSerializerDeserializer.INSTANCE });
RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
UTF8StringSerializerDeserializer.INSTANCE });
FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID, NC2_ID);
FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID, NC2_ID);
NestedLoopJoinOperatorDescriptor join = new NestedLoopJoinOperatorDescriptor(spec, new JoinComparatorFactory(
PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0), custOrderJoinDesc, 6, false,
null);
PartitionConstraintHelper.addPartitionCountConstraint(spec, join, 2);
IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
createTempFile().getAbsolutePath()) });
IOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(spec, outSplits, ",");
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec);
spec.connect(ordJoinConn, ordScanner, 0, join, 0);
IConnectorDescriptor custJoinConn = new MToNReplicatingConnectorDescriptor(spec);
spec.connect(custJoinConn, custScanner, 0, join, 1);
IConnectorDescriptor joinPrinterConn = new MToNReplicatingConnectorDescriptor(spec);
spec.connect(joinPrinterConn, join, 0, printer, 0);
spec.addRoot(printer);
runTest(spec);
}
@Test
public void customerOrderCIDOuterJoinMulti() throws Exception {
JobSpecification spec = new JobSpecification();
FileSplit[] custSplits = new FileSplit[] {
new FileSplit(NC1_ID, new FileReference(new File("data/tpch0.001/customer-part1.tbl"))),
new FileSplit(NC2_ID, new FileReference(new File("data/tpch0.001/customer-part2.tbl"))) };
IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
FileSplit[] ordersSplits = new FileSplit[] {
new FileSplit(NC1_ID, new FileReference(new File("data/tpch0.001/orders-part1.tbl"))),
new FileSplit(NC2_ID, new FileReference(new File("data/tpch0.001/orders-part2.tbl"))) };
IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
UTF8StringSerializerDeserializer.INSTANCE });
RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
UTF8StringSerializerDeserializer.INSTANCE });
FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID, NC2_ID);
FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID, NC2_ID);
INullWriterFactory[] nullWriterFactories = new INullWriterFactory[ordersDesc.getFieldCount()];
for (int j = 0; j < nullWriterFactories.length; j++) {
nullWriterFactories[j] = NoopNullWriterFactory.INSTANCE;
}
NestedLoopJoinOperatorDescriptor join = new NestedLoopJoinOperatorDescriptor(spec, new JoinComparatorFactory(
PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0), custOrderJoinDesc, 5, true,
nullWriterFactories);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID, NC2_ID);
IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
createTempFile().getAbsolutePath()) });
IOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(spec, outSplits, ",");
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec);
spec.connect(ordJoinConn, ordScanner, 0, join, 0);
IConnectorDescriptor custJoinConn = new MToNReplicatingConnectorDescriptor(spec);
spec.connect(custJoinConn, custScanner, 0, join, 1);
IConnectorDescriptor joinPrinterConn = new MToNReplicatingConnectorDescriptor(spec);
spec.connect(joinPrinterConn, join, 0, printer, 0);
spec.addRoot(printer);
runTest(spec);
}
}