blob: f75575228fa29b1e899c6cd5df9932296f8ab0e2 [file] [log] [blame]
package edu.uci.ics.asterix.file;
import java.io.DataOutput;
import java.io.File;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import edu.uci.ics.asterix.common.context.AsterixIndexRegistryProvider;
import edu.uci.ics.asterix.common.context.AsterixStorageManagerInterface;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
import edu.uci.ics.asterix.dataflow.data.nontagged.comparators.AObjectAscBinaryComparatorFactory;
import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AObjectSerializerDeserializer;
import edu.uci.ics.asterix.om.base.AString;
import edu.uci.ics.asterix.om.types.ATypeTag;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
import edu.uci.ics.hyracks.api.client.HyracksConnection;
import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.data.parsers.DoubleParserFactory;
import edu.uci.ics.hyracks.dataflow.common.data.parsers.FloatParserFactory;
import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
import edu.uci.ics.hyracks.dataflow.common.data.parsers.IntegerParserFactory;
import edu.uci.ics.hyracks.dataflow.common.data.parsers.LongParserFactory;
import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
import edu.uci.ics.hyracks.dataflow.std.misc.ConstantTupleSourceOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.misc.PrinterOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
public class TestKeywordIndexJob {
private static final HashMap<ATypeTag, IValueParserFactory> typeToValueParserFactMap = new HashMap<ATypeTag, IValueParserFactory>();
static {
typeToValueParserFactMap.put(ATypeTag.INT32, IntegerParserFactory.INSTANCE);
typeToValueParserFactMap.put(ATypeTag.FLOAT, FloatParserFactory.INSTANCE);
typeToValueParserFactMap.put(ATypeTag.DOUBLE, DoubleParserFactory.INSTANCE);
typeToValueParserFactMap.put(ATypeTag.INT64, LongParserFactory.INSTANCE);
typeToValueParserFactMap.put(ATypeTag.STRING, UTF8StringParserFactory.INSTANCE);
}
public static int DEFAULT_INPUT_DATA_COLUMN = 0;
public static LogicalVariable METADATA_DUMMY_VAR = new LogicalVariable(-1);
@SuppressWarnings("unchecked")
public JobSpecification createJobSpec() throws AsterixException, HyracksDataException {
JobSpecification spec = new JobSpecification();
// ---------- START GENERAL BTREE STUFF
IIndexRegistryProvider<IIndex> indexRegistryProvider = AsterixIndexRegistryProvider.INSTANCE;
IStorageManagerInterface storageManager = AsterixStorageManagerInterface.INSTANCE;
// ---------- END GENERAL BTREE STUFF
List<String> nodeGroup = new ArrayList<String>();
nodeGroup.add("nc1");
nodeGroup.add("nc2");
// ---------- START KEY PROVIDER OP
// TODO: should actually be empty tuple source
// build tuple containing low and high search keys
ArrayTupleBuilder tb = new ArrayTupleBuilder(1); // just one dummy field
DataOutput dos = tb.getDataOutput();
tb.reset();
AObjectSerializerDeserializer.INSTANCE.serialize(new AString("Jodi Rotruck"), dos); // dummy
// field
tb.addFieldEndOffset();
ISerializerDeserializer[] keyRecDescSers = { AObjectSerializerDeserializer.INSTANCE };
RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
ConstantTupleSourceOperatorDescriptor keyProviderOp = new ConstantTupleSourceOperatorDescriptor(spec,
keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize());
String[] keyProviderOpLocationConstraint = new String[nodeGroup.size()];
for (int p = 0; p < nodeGroup.size(); p++) {
keyProviderOpLocationConstraint[p] = nodeGroup.get(p);
}
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, keyProviderOp, keyProviderOpLocationConstraint);
// ---------- END KEY PROVIDER OP
// ---------- START SECONRARY INDEX SCAN
ITypeTraits[] secondaryTypeTraits = new ITypeTraits[2];
secondaryTypeTraits[0] = new ITypeTraits() {
@Override
public boolean isFixedLength() {
return false;
}
@Override
public int getFixedLength() {
return -1;
}
};
secondaryTypeTraits[1] = new ITypeTraits() {
@Override
public boolean isFixedLength() {
return true;
}
@Override
public int getFixedLength() {
return 5;
}
};
ISerializerDeserializer[] secondaryRecFields = new ISerializerDeserializer[2];
secondaryRecFields[0] = AObjectSerializerDeserializer.INSTANCE;
secondaryRecFields[1] = AObjectSerializerDeserializer.INSTANCE;
IBinaryComparatorFactory[] secondaryComparatorFactories = new IBinaryComparatorFactory[2];
secondaryComparatorFactories[0] = AObjectAscBinaryComparatorFactory.INSTANCE;
secondaryComparatorFactories[1] = AObjectAscBinaryComparatorFactory.INSTANCE;
int[] lowKeyFields = null;
int[] highKeyFields = null;
RecordDescriptor secondaryRecDesc = new RecordDescriptor(secondaryRecFields);
// TODO: change file splits according to mount points in cluster config
IFileSplitProvider secondarySplitProvider = new ConstantFileSplitProvider(new FileSplit[] {
new FileSplit("nc1", new FileReference(new File("/tmp/nc1/demo1112/Customers_idx_NameInvIndex"))),
new FileSplit("nc2", new FileReference(new File("/tmp/nc2/demo1112/Customers_idx_NameInvIndex"))) });
BTreeSearchOperatorDescriptor secondarySearchOp = new BTreeSearchOperatorDescriptor(spec, secondaryRecDesc,
storageManager, indexRegistryProvider, secondarySplitProvider, secondaryTypeTraits,
secondaryComparatorFactories, lowKeyFields, highKeyFields, true, true,
new BTreeDataflowHelperFactory(), NoOpOperationCallbackProvider.INSTANCE);
String[] secondarySearchOpLocationConstraint = new String[nodeGroup.size()];
for (int p = 0; p < nodeGroup.size(); p++) {
secondarySearchOpLocationConstraint[p] = nodeGroup.get(p);
}
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondarySearchOp,
secondarySearchOpLocationConstraint);
// ---------- END SECONDARY INDEX SCAN
PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
String[] printerLocationConstraint = new String[nodeGroup.size()];
for (int p = 0; p < nodeGroup.size(); p++) {
printerLocationConstraint[p] = nodeGroup.get(p);
}
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, printerLocationConstraint);
// ---------- START CONNECT THE OPERATORS
spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, secondarySearchOp, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), secondarySearchOp, 0, printer, 0);
// ---------- END CONNECT THE OPERATORS
spec.addRoot(printer);
return spec;
}
public static void main(String[] args) throws Exception {
String host;
String appName;
String ddlFile;
switch (args.length) {
case 0: {
host = "127.0.0.1";
appName = "asterix";
ddlFile = "/home/abehm/workspace/asterix/src/test/resources/demo0927/local/create-index.aql";
System.out.println("No arguments specified, using defauls:");
System.out.println("HYRACKS HOST: " + host);
System.out.println("APPNAME: " + appName);
System.out.println("DDLFILE: " + ddlFile);
}
break;
case 3: {
host = args[0];
appName = args[1];
ddlFile = args[2];
}
break;
default: {
System.out.println("USAGE:");
System.out.println("ARG 1: Hyracks Host (IP or Hostname)");
System.out.println("ARG 2: Application Name (e.g., asterix)");
System.out.println("ARG 3: DDL File");
host = null;
appName = null;
ddlFile = null;
System.exit(0);
}
break;
}
int port = 1098;
IHyracksClientConnection hcc = new HyracksConnection(host, port);
TestKeywordIndexJob tij = new TestKeywordIndexJob();
JobSpecification jobSpec = tij.createJobSpec();
JobId jobId = hcc.createJob("asterix", jobSpec);
long start = System.currentTimeMillis();
hcc.start(jobId);
hcc.waitForCompletion(jobId);
long end = System.currentTimeMillis();
System.err.println(start + " " + end + " " + (end - start));
}
}