blob: 4e8d3a256f2a8a2b7c6b5fcd02ff413626e2c280 [file] [log] [blame]
/*
* Copyright 2009-2013 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.uci.ics.pregelix.core.jobgen;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.logging.Logger;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.VLongWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
import edu.uci.ics.hyracks.dataflow.std.misc.ConstantTupleSourceOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.sort.Algorithm;
import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
import edu.uci.ics.hyracks.hdfs2.dataflow.HDFSReadOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
import edu.uci.ics.pregelix.api.graph.MsgList;
import edu.uci.ics.pregelix.api.job.PregelixJob;
import edu.uci.ics.pregelix.api.util.BspUtils;
import edu.uci.ics.pregelix.core.data.TypeTraits;
import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
import edu.uci.ics.pregelix.core.optimizer.IOptimizer;
import edu.uci.ics.pregelix.core.util.DataflowUtils;
import edu.uci.ics.pregelix.dataflow.ConnectorPolicyAssignmentPolicy;
import edu.uci.ics.pregelix.dataflow.EmptySinkOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.EmptyTupleSourceOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.FinalAggregateOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.HDFSFileWriteOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.KeyValueParserFactory;
import edu.uci.ics.pregelix.dataflow.MaterializingReadOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.MaterializingWriteOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.TerminationStateWriterOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
import edu.uci.ics.pregelix.dataflow.std.IndexNestedLoopJoinFunctionUpdateOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.std.IndexNestedLoopJoinOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.std.RuntimeHookOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.std.TreeIndexBulkReLoadOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.std.TreeSearchFunctionUpdateOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
import edu.uci.ics.pregelix.runtime.function.ComputeUpdateFunctionFactory;
import edu.uci.ics.pregelix.runtime.function.StartComputeUpdateFunctionFactory;
import edu.uci.ics.pregelix.runtime.touchpoint.MergePartitionComputerFactory;
import edu.uci.ics.pregelix.runtime.touchpoint.PostSuperStepRuntimeHookFactory;
import edu.uci.ics.pregelix.runtime.touchpoint.PreSuperStepRuntimeHookFactory;
import edu.uci.ics.pregelix.runtime.touchpoint.RuntimeHookFactory;
public class JobGenInnerJoin extends JobGen {
private static final Logger LOGGER = Logger.getLogger(JobGen.class.getName());
public JobGenInnerJoin(PregelixJob job, IOptimizer optimizer) {
super(job, optimizer);
}
public JobGenInnerJoin(PregelixJob job, String jobId, IOptimizer optimizer) {
super(job, jobId, optimizer);
}
protected JobSpecification generateFirstIteration(int iteration) throws HyracksException {
Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
Class<? extends Writable> messageValueClass = BspUtils.getMessageValueClass(conf);
String[] partialAggregateValueClassNames = BspUtils.getPartialAggregateValueClassNames(conf);
IConfigurationFactory confFactory = getConfigurationFactory();
JobSpecification spec = new JobSpecification(frameSize);
/**
* construct empty tuple operator
*/
EmptyTupleSourceOperatorDescriptor emptyTupleSource = new EmptyTupleSourceOperatorDescriptor(spec);
setLocationConstraint(spec, emptyTupleSource);
/** construct runtime hook */
RuntimeHookOperatorDescriptor preSuperStep = new RuntimeHookOperatorDescriptor(spec,
new PreSuperStepRuntimeHookFactory(jobId, confFactory));
setLocationConstraint(spec, preSuperStep);
/**
* construct drop index operator
*/
IFileSplitProvider secondaryFileSplitProvider = getFileSplitProvider(jobId, SECONDARY_INDEX_ODD);
IFileSplitProvider fileSplitProvider = getFileSplitProvider(jobId, PRIMARY_INDEX);
/**
* construct btree search and function call update operator
*/
RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf,
vertexIdClass.getName(), vertexClass.getName());
IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, vertexIdClass);
ITypeTraits[] typeTraits = new ITypeTraits[2];
typeTraits[0] = new TypeTraits(false);
typeTraits[1] = new TypeTraits(false);
RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(conf,
VLongWritable.class.getName());
RecordDescriptor rdPartialAggregate = DataflowUtils.getRecordDescriptorFromWritableClasses(conf,
partialAggregateValueClassNames);
IConfigurationFactory configurationFactory = getConfigurationFactory();
IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(configurationFactory);
IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
getConfigurationFactory(), vertexIdClass.getName(), vertexClass.getName());
RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf, vertexIdClass.getName(),
MsgList.class.getName());
RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf,
vertexIdClass.getName(), messageValueClass.getName());
RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf, vertexIdClass.getName(),
vertexClass.getName());
RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(conf, vertexIdClass.getName());
TreeSearchFunctionUpdateOperatorDescriptor scanner = new TreeSearchFunctionUpdateOperatorDescriptor(spec,
recordDescriptor, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
getIndexDataflowHelperFactory(), inputRdFactory, 6, new StartComputeUpdateFunctionFactory(confFactory),
preHookFactory, null, rdUnnestedMessage, rdDummy, rdPartialAggregate, rdInsert, rdDelete, rdFinal);
setLocationConstraint(spec, scanner);
/**
* termination state write operator
*/
TerminationStateWriterOperatorDescriptor terminateWriter = new TerminationStateWriterOperatorDescriptor(spec,
configurationFactory, jobId);
PartitionConstraintHelper.addPartitionCountConstraint(spec, terminateWriter, 1);
/**
* final aggregate write operator
*/
IRecordDescriptorFactory aggRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
getConfigurationFactory(), partialAggregateValueClassNames);
FinalAggregateOperatorDescriptor finalAggregator = new FinalAggregateOperatorDescriptor(spec,
configurationFactory, aggRdFactory, jobId);
PartitionConstraintHelper.addPartitionCountConstraint(spec, finalAggregator, 1);
/**
* construct bulk-load index operator
*/
int[] fieldPermutation = new int[] { 0, 1 };
int[] keyFields = new int[] { 0 };
IBinaryComparatorFactory[] indexCmpFactories = new IBinaryComparatorFactory[1];
indexCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration + 1,
WritableComparator.get(vertexIdClass).getClass());
TreeIndexBulkReLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkReLoadOperatorDescriptor(spec,
storageManagerInterface, lcManagerProvider, secondaryFileSplitProvider, typeTraits, indexCmpFactories,
fieldPermutation, keyFields, DEFAULT_BTREE_FILL_FACTOR, getIndexDataflowHelperFactory());
setLocationConstraint(spec, btreeBulkLoad);
/**
* construct group-by operator pipeline
*/
Pair<IOperatorDescriptor, IOperatorDescriptor> groupOps = generateGroupingOperators(spec, iteration,
vertexIdClass);
IOperatorDescriptor groupStartOperator = groupOps.getLeft();
IOperatorDescriptor groupEndOperator = groupOps.getRight();
/**
* construct the materializing write operator
*/
MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal,
jobId, iteration);
setLocationConstraint(spec, materialize);
/**
* do pre- & post- super step
*/
RuntimeHookOperatorDescriptor postSuperStep = new RuntimeHookOperatorDescriptor(spec,
new PostSuperStepRuntimeHookFactory(jobId));
setLocationConstraint(spec, postSuperStep);
/** construct empty sink operator */
EmptySinkOperatorDescriptor emptySink = new EmptySinkOperatorDescriptor(spec);
setLocationConstraint(spec, emptySink);
/**
* add the insert operator to insert vertexes
*/
TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, getIndexDataflowHelperFactory(),
null, NoOpOperationCallbackFactory.INSTANCE);
setLocationConstraint(spec, insertOp);
/**
* add the delete operator to delete vertexes
*/
int[] fieldPermutationDelete = new int[] { 0 };
TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE,
getIndexDataflowHelperFactory(), null, NoOpOperationCallbackFactory.INSTANCE);
setLocationConstraint(spec, deleteOp);
/** construct empty sink operator */
EmptySinkOperatorDescriptor emptySink3 = new EmptySinkOperatorDescriptor(spec);
setLocationConstraint(spec, emptySink3);
/** construct empty sink operator */
EmptySinkOperatorDescriptor emptySink4 = new EmptySinkOperatorDescriptor(spec);
setLocationConstraint(spec, emptySink4);
ITuplePartitionComputerFactory partionFactory = getVertexPartitionComputerFactory();
ITuplePartitionComputerFactory unifyingPartitionComputerFactory = new MergePartitionComputerFactory();
/** connect all operators **/
spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, preSuperStep, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), preSuperStep, 0, scanner, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, groupStartOperator, 0);
spec.connect(new MToNPartitioningConnectorDescriptor(spec, unifyingPartitionComputerFactory), scanner, 1,
terminateWriter, 0);
spec.connect(new MToNPartitioningConnectorDescriptor(spec, unifyingPartitionComputerFactory), scanner, 2,
finalAggregator, 0);
/**
* connect the insert/delete operator
*/
spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), scanner, 3, insertOp, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), insertOp, 0, emptySink3, 0);
spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), scanner, 4, deleteOp, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), deleteOp, 0, emptySink4, 0);
spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), scanner, 5, btreeBulkLoad, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), groupEndOperator, 0, materialize, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), materialize, 0, postSuperStep, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), postSuperStep, 0, emptySink, 0);
spec.addRoot(emptySink);
spec.addRoot(btreeBulkLoad);
spec.addRoot(terminateWriter);
spec.addRoot(finalAggregator);
spec.addRoot(emptySink3);
spec.addRoot(emptySink4);
spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy(spec));
spec.setFrameSize(frameSize);
return spec;
}
@Override
protected JobSpecification generateNonFirstIteration(int iteration) throws HyracksException {
Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
Class<? extends Writable> messageValueClass = BspUtils.getMessageValueClass(conf);
String[] partialAggregateValueClassNames = BspUtils.getPartialAggregateValueClassNames(conf);
JobSpecification spec = new JobSpecification(frameSize);
/**
* source aggregate
*/
int[] keyFields = new int[] { 0 };
RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf,
vertexIdClass.getName(), messageValueClass.getName());
IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, vertexIdClass);;
RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf, vertexIdClass.getName(),
MsgList.class.getName());
RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf, vertexIdClass.getName(),
vertexClass.getName());
RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(conf, vertexIdClass.getName());
/**
* construct empty tuple operator
*/
EmptyTupleSourceOperatorDescriptor emptyTupleSource = new EmptyTupleSourceOperatorDescriptor(spec);
setLocationConstraint(spec, emptyTupleSource);
/**
* construct pre-superstep
*/
IConfigurationFactory confFactory = getConfigurationFactory();
RuntimeHookOperatorDescriptor preSuperStep = new RuntimeHookOperatorDescriptor(spec,
new PreSuperStepRuntimeHookFactory(jobId, confFactory));
setLocationConstraint(spec, preSuperStep);
/**
* construct the materializing write operator
*/
MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal,
true, jobId, iteration);
setLocationConstraint(spec, materializeRead);
/**
* construct the index-set-union operator
*/
String readFile = iteration % 2 == 0 ? SECONDARY_INDEX_ODD : SECONDARY_INDEX_EVEN;
IFileSplitProvider secondaryFileSplitProviderRead = getFileSplitProvider(jobId, readFile);
ITypeTraits[] typeTraits = new ITypeTraits[2];
typeTraits[0] = new TypeTraits(false);
typeTraits[1] = new TypeTraits(false);
IndexNestedLoopJoinOperatorDescriptor setUnion = new IndexNestedLoopJoinOperatorDescriptor(spec, rdFinal,
storageManagerInterface, lcManagerProvider, secondaryFileSplitProviderRead, typeTraits,
comparatorFactories, true, keyFields, keyFields, true, true, getIndexDataflowHelperFactory(), true);
setLocationConstraint(spec, setUnion);
/**
* construct index-join-function-update operator
*/
IFileSplitProvider fileSplitProvider = getFileSplitProvider(jobId, PRIMARY_INDEX);
RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(conf,
VLongWritable.class.getName());
RecordDescriptor rdPartialAggregate = DataflowUtils.getRecordDescriptorFromWritableClasses(conf,
partialAggregateValueClassNames);
IConfigurationFactory configurationFactory = getConfigurationFactory();
IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(configurationFactory);
IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
getConfigurationFactory(), vertexIdClass.getName(), MsgList.class.getName(), vertexIdClass.getName(),
vertexClass.getName());
IndexNestedLoopJoinFunctionUpdateOperatorDescriptor join = new IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(
spec, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories,
JobGenUtil.getForwardScan(iteration), keyFields, keyFields, true, true,
getIndexDataflowHelperFactory(), inputRdFactory, 6, new ComputeUpdateFunctionFactory(confFactory),
preHookFactory, null, rdUnnestedMessage, rdDummy, rdPartialAggregate, rdInsert, rdDelete, rdFinal);
setLocationConstraint(spec, join);
/**
* construct bulk-load index operator
*/
int fieldPermutation[] = new int[] { 0, 1 };
IBinaryComparatorFactory[] indexCmpFactories = new IBinaryComparatorFactory[1];
indexCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration + 1,
WritableComparator.get(vertexIdClass).getClass());
String writeFile = iteration % 2 == 0 ? SECONDARY_INDEX_EVEN : SECONDARY_INDEX_ODD;
IFileSplitProvider secondaryFileSplitProviderWrite = getFileSplitProvider(jobId, writeFile);
TreeIndexBulkReLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkReLoadOperatorDescriptor(spec,
storageManagerInterface, lcManagerProvider, secondaryFileSplitProviderWrite, typeTraits,
indexCmpFactories, fieldPermutation, keyFields, DEFAULT_BTREE_FILL_FACTOR,
getIndexDataflowHelperFactory());
setLocationConstraint(spec, btreeBulkLoad);
/**
* construct group-by operator pipeline
*/
Pair<IOperatorDescriptor, IOperatorDescriptor> groupOps = generateGroupingOperators(spec, iteration,
vertexIdClass);
IOperatorDescriptor groupStartOperator = groupOps.getLeft();
IOperatorDescriptor groupEndOperator = groupOps.getRight();
/**
* construct the materializing write operator
*/
MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal,
jobId, iteration);
setLocationConstraint(spec, materialize);
/** construct runtime hook */
RuntimeHookOperatorDescriptor postSuperStep = new RuntimeHookOperatorDescriptor(spec,
new PostSuperStepRuntimeHookFactory(jobId));
setLocationConstraint(spec, postSuperStep);
/** construct empty sink operator */
EmptySinkOperatorDescriptor emptySink = new EmptySinkOperatorDescriptor(spec);
setLocationConstraint(spec, emptySink);
/**
* termination state write operator
*/
TerminationStateWriterOperatorDescriptor terminateWriter = new TerminationStateWriterOperatorDescriptor(spec,
configurationFactory, jobId);
PartitionConstraintHelper.addPartitionCountConstraint(spec, terminateWriter, 1);
/**
* final aggregate write operator
*/
IRecordDescriptorFactory aggRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
getConfigurationFactory(), partialAggregateValueClassNames);
FinalAggregateOperatorDescriptor finalAggregator = new FinalAggregateOperatorDescriptor(spec,
configurationFactory, aggRdFactory, jobId);
PartitionConstraintHelper.addPartitionCountConstraint(spec, finalAggregator, 1);
/**
* add the insert operator to insert vertexes
*/
TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, getIndexDataflowHelperFactory(),
null, NoOpOperationCallbackFactory.INSTANCE);
setLocationConstraint(spec, insertOp);
/**
* add the delete operator to delete vertexes
*/
int[] fieldPermutationDelete = new int[] { 0 };
TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
comparatorFactories, null, fieldPermutationDelete, IndexOperation.DELETE,
getIndexDataflowHelperFactory(), null, NoOpOperationCallbackFactory.INSTANCE);
setLocationConstraint(spec, deleteOp);
/** construct empty sink operator */
EmptySinkOperatorDescriptor emptySink3 = new EmptySinkOperatorDescriptor(spec);
setLocationConstraint(spec, emptySink3);
/** construct empty sink operator */
EmptySinkOperatorDescriptor emptySink4 = new EmptySinkOperatorDescriptor(spec);
setLocationConstraint(spec, emptySink4);
ITuplePartitionComputerFactory unifyingPartitionComputerFactory = new MergePartitionComputerFactory();
ITuplePartitionComputerFactory partionFactory = getVertexPartitionComputerFactory();
/** connect all operators **/
spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, preSuperStep, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), preSuperStep, 0, materializeRead, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), materializeRead, 0, setUnion, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), setUnion, 0, join, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), join, 0, groupStartOperator, 0);
spec.connect(new MToNPartitioningConnectorDescriptor(spec, unifyingPartitionComputerFactory), join, 1,
terminateWriter, 0);
spec.connect(new MToNPartitioningConnectorDescriptor(spec, unifyingPartitionComputerFactory), join, 2,
finalAggregator, 0);
/**
* connect the insert/delete operator
*/
spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), join, 3, insertOp, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), insertOp, 0, emptySink3, 0);
spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), join, 4, deleteOp, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), deleteOp, 0, emptySink4, 0);
spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), join, 5, btreeBulkLoad, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), groupEndOperator, 0, materialize, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), materialize, 0, postSuperStep, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), postSuperStep, 0, emptySink, 0);
spec.addRoot(emptySink);
spec.addRoot(btreeBulkLoad);
spec.addRoot(terminateWriter);
spec.addRoot(emptySink3);
spec.addRoot(emptySink4);
spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy(spec));
spec.setFrameSize(frameSize);
return spec;
}
/** generate plan specific state checkpointing */
protected JobSpecification[] generateStateCheckpointing(int lastSuccessfulIteration) throws HyracksException {
JobSpecification[] msgCkpSpecs = super.generateStateCheckpointing(lastSuccessfulIteration);
/** generate secondary index checkpoint */
PregelixJob tmpJob = this.createCloneJob("Secondary index checkpointing for job " + jobId, pregelixJob);
JobSpecification secondaryBTreeCkp = generateSecondaryBTreeCheckpoint(lastSuccessfulIteration, tmpJob);
JobSpecification[] specs = new JobSpecification[msgCkpSpecs.length + 1];
for (int i = 0; i < msgCkpSpecs.length; i++) {
specs[i] = msgCkpSpecs[i];
}
specs[specs.length - 1] = secondaryBTreeCkp;
return specs;
}
/**
* generate plan specific checkpoint loading
*/
@Override
protected JobSpecification[] generateStateCheckpointLoading(int lastSuccessfulIteration, PregelixJob job)
throws HyracksException {
/** generate message checkpoint load */
JobSpecification[] msgCkpSpecs = super.generateStateCheckpointLoading(lastSuccessfulIteration, job);
/** generate secondary index checkpoint load */
PregelixJob tmpJob = this.createCloneJob("Secondary index checkpoint loading for job " + jobId, pregelixJob);
tmpJob.setOutputFormatClass(SequenceFileOutputFormat.class);
JobSpecification secondaryBTreeCkpLoad = generateSecondaryBTreeCheckpointLoad(lastSuccessfulIteration, tmpJob);
JobSpecification[] specs = new JobSpecification[msgCkpSpecs.length + 1];
for (int i = 0; i < msgCkpSpecs.length; i++) {
specs[i] = msgCkpSpecs[i];
}
specs[specs.length - 1] = secondaryBTreeCkpLoad;
return specs;
}
@SuppressWarnings({ "unchecked", "rawtypes" })
private JobSpecification generateSecondaryBTreeCheckpointLoad(int lastSuccessfulIteration, PregelixJob job)
throws HyracksException {
Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(job.getConfiguration());
JobSpecification spec = new JobSpecification(frameSize);
String checkpointPath = BspUtils.getSecondaryIndexCheckpointPath(conf, lastSuccessfulIteration);
PregelixJob tmpJob = createCloneJob("State checkpoint loading for job " + jobId, job);
tmpJob.setInputFormatClass(SequenceFileInputFormat.class);
try {
FileInputFormat.setInputPaths(tmpJob, checkpointPath);
} catch (IOException e) {
throw new HyracksException(e);
}
/***
* HDFS read operator
*/
List<InputSplit> splits = new ArrayList<InputSplit>();
try {
InputFormat inputFormat = org.apache.hadoop.util.ReflectionUtils.newInstance(job.getInputFormatClass(),
job.getConfiguration());
splits = inputFormat.getSplits(tmpJob);
LOGGER.info("number of splits: " + splits.size());
for (InputSplit split : splits)
LOGGER.info(split.toString());
} catch (Exception e) {
throw new HyracksDataException(e);
}
RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf,
vertexIdClass.getName(), MsgList.class.getName());
String[] readSchedule = ClusterConfig.getHdfsScheduler().getLocationConstraints(splits);
HDFSReadOperatorDescriptor scanner = new HDFSReadOperatorDescriptor(spec, recordDescriptor, tmpJob, splits,
readSchedule, new KeyValueParserFactory());
setLocationConstraint(spec, scanner);
/** construct the sort operator to sort message states */
int[] keyFields = new int[] { 0 };
INormalizedKeyComputerFactory nkmFactory = JobGenUtil.getINormalizedKeyComputerFactory(conf);
IBinaryComparatorFactory[] sortCmpFactories = new IBinaryComparatorFactory[1];
sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(lastSuccessfulIteration, vertexIdClass);
ExternalSortOperatorDescriptor sort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
nkmFactory, sortCmpFactories, recordDescriptor, Algorithm.QUICK_SORT);
setLocationConstraint(spec, sort);
/**
* construct bulk-load index operator
*/
ITypeTraits[] typeTraits = new ITypeTraits[2];
typeTraits[0] = new TypeTraits(false);
typeTraits[1] = new TypeTraits(false);
int fieldPermutation[] = new int[] { 0, 1 };
IBinaryComparatorFactory[] indexCmpFactories = new IBinaryComparatorFactory[1];
indexCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(lastSuccessfulIteration + 1, WritableComparator
.get(vertexIdClass).getClass());
String writeFile = lastSuccessfulIteration % 2 == 0 ? SECONDARY_INDEX_EVEN : SECONDARY_INDEX_ODD;
IFileSplitProvider secondaryFileSplitProviderWrite = getFileSplitProvider(jobId, writeFile);
TreeIndexBulkReLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkReLoadOperatorDescriptor(spec,
storageManagerInterface, lcManagerProvider, secondaryFileSplitProviderWrite, typeTraits,
indexCmpFactories, fieldPermutation, new int[] { 0 }, DEFAULT_BTREE_FILL_FACTOR,
getIndexDataflowHelperFactory());
setLocationConstraint(spec, btreeBulkLoad);
/**
* connect operator descriptors
*/
ITuplePartitionComputerFactory hashPartitionComputerFactory = getVertexPartitionComputerFactory();
spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 0, sort, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), sort, 0, btreeBulkLoad, 0);
spec.setFrameSize(frameSize);
return spec;
}
@SuppressWarnings({ "rawtypes" })
private JobSpecification generateSecondaryBTreeCheckpoint(int lastSuccessfulIteration, PregelixJob job)
throws HyracksException {
job.setOutputFormatClass(SequenceFileOutputFormat.class);
String checkpointPath = BspUtils.getSecondaryIndexCheckpointPath(conf, lastSuccessfulIteration);
FileOutputFormat.setOutputPath(job, new Path(checkpointPath));
job.setOutputKeyClass(BspUtils.getVertexIndexClass(job.getConfiguration()));
job.setOutputValueClass(MsgList.class);
Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(job.getConfiguration());
Class<? extends Writable> msgListClass = MsgList.class;
String readFile = lastSuccessfulIteration % 2 == 0 ? SECONDARY_INDEX_EVEN : SECONDARY_INDEX_ODD;
IFileSplitProvider secondaryFileSplitProviderRead = getFileSplitProvider(jobId, readFile);
JobSpecification spec = new JobSpecification(frameSize);
/**
* construct empty tuple operator
*/
ArrayTupleBuilder tb = new ArrayTupleBuilder(2);
DataOutput dos = tb.getDataOutput();
tb.reset();
UTF8StringSerializerDeserializer.INSTANCE.serialize("0", dos);
tb.addFieldEndOffset();
ISerializerDeserializer[] keyRecDescSers = { UTF8StringSerializerDeserializer.INSTANCE,
UTF8StringSerializerDeserializer.INSTANCE };
RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
ConstantTupleSourceOperatorDescriptor emptyTupleSource = new ConstantTupleSourceOperatorDescriptor(spec,
keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize());
setLocationConstraint(spec, emptyTupleSource);
/**
* construct btree search operator
*/
RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf,
vertexIdClass.getName(), msgListClass.getName());
IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(0, vertexIdClass);;
ITypeTraits[] typeTraits = new ITypeTraits[2];
typeTraits[0] = new TypeTraits(false);
typeTraits[1] = new TypeTraits(false);
BTreeSearchOperatorDescriptor scanner = new BTreeSearchOperatorDescriptor(spec, recordDescriptor,
storageManagerInterface, lcManagerProvider, secondaryFileSplitProviderRead, typeTraits,
comparatorFactories, null, null, null, true, true, getIndexDataflowHelperFactory(), false, false, null,
NoOpOperationCallbackFactory.INSTANCE, null, null);
setLocationConstraint(spec, scanner);
/**
* construct write file operator
*/
IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
getConfigurationFactory(), vertexIdClass.getName(), MsgList.class.getName());
HDFSFileWriteOperatorDescriptor writer = new HDFSFileWriteOperatorDescriptor(spec, job, inputRdFactory);
setLocationConstraint(spec, writer);
/**
* connect operator descriptors
*/
spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, scanner, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, writer, 0);
spec.setFrameSize(frameSize);
return spec;
}
@Override
public JobSpecification[] generateCleanup() throws HyracksException {
JobSpecification[] cleanups = new JobSpecification[3];
cleanups[0] = this.dropIndex(PRIMARY_INDEX);
cleanups[1] = this.dropIndex(SECONDARY_INDEX_ODD);
cleanups[2] = this.dropIndex(SECONDARY_INDEX_EVEN);
return cleanups;
}
}