Create index for result times
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
index 1ea8e7f..0dbc9e0 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
@@ -44,6 +44,7 @@
import org.apache.asterix.bad.metadata.ChannelEventsListener;
import org.apache.asterix.bad.runtime.RepetitiveChannelOperatorDescriptor;
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import org.apache.asterix.common.config.DatasetConfig.IndexType;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.functions.FunctionSignature;
@@ -56,7 +57,9 @@
import org.apache.asterix.lang.common.base.Statement;
import org.apache.asterix.lang.common.expression.CallExpr;
import org.apache.asterix.lang.common.expression.LiteralExpr;
+import org.apache.asterix.lang.common.expression.TypeExpression;
import org.apache.asterix.lang.common.literal.StringLiteral;
+import org.apache.asterix.lang.common.statement.CreateIndexStatement;
import org.apache.asterix.lang.common.statement.DatasetDecl;
import org.apache.asterix.lang.common.statement.IDatasetDetailsDecl;
import org.apache.asterix.lang.common.statement.InsertStatement;
@@ -241,10 +244,26 @@
new Identifier(BADConstants.BAD_DATAVERSE_NAME), resultsTypeName, null, null, null, null,
new HashMap<String, String>(), new HashMap<String, String>(), DatasetType.INTERNAL, idd, true);
+ //Create an index on timestamp for results
+ CreateIndexStatement createTimeIndex = new CreateIndexStatement();
+ createTimeIndex.setDatasetName(resultsName);
+ createTimeIndex.setDataverseName(new Identifier(dataverse));
+ createTimeIndex.setIndexName(new Identifier(resultsName + "TimeIndex"));
+ createTimeIndex.setIfNotExists(false);
+ createTimeIndex.setIndexType(IndexType.BTREE);
+ createTimeIndex.setEnforced(false);
+ createTimeIndex.setGramLength(0);
+ List<String> fNames = new ArrayList<>();
+ fNames.add(BADConstants.ChannelExecutionTime);
+ Pair<List<String>, TypeExpression> fields = new Pair<List<String>, TypeExpression>(fNames, null);
+ createTimeIndex.addFieldExprPair(fields);
+ createTimeIndex.addFieldIndexIndicator(0);
+
//Run both statements to create datasets
((QueryTranslator) statementExecutor).handleCreateDatasetStatement(metadataProvider, createSubscriptionsDataset,
hcc);
((QueryTranslator) statementExecutor).handleCreateDatasetStatement(metadataProvider, createResultsDataset, hcc);
+ ((QueryTranslator) statementExecutor).handleCreateIndexStatement(metadataProvider, createTimeIndex, hcc);
}
@@ -345,6 +364,7 @@
throw new AsterixException("Channel " + channelName + " is already running");
}
initialize(mdTxnCtx, subscriptionsName.getValue(), resultsName.getValue());
+
channel = new Channel(dataverse, channelName.getValue(), subscriptionsTableName, resultsTableName, function,
duration);
@@ -356,6 +376,10 @@
throw new AsterixException("The channel name:" + channelName + " is not available.");
}
+ //Create Channel Datasets
+ createDatasets(statementExecutor, subscriptionsName, resultsName, metadataProvider, hcc, hdc, stats,
+ dataverse);
+
// Now we subscribe
if (listener == null) {
listener = new ChannelEventsListener(entityId);
@@ -364,9 +388,6 @@
listener.registerEventSubscriber(eventSubscriber);
subscriberRegistered = true;
- //Create Channel Datasets
- createDatasets(statementExecutor, subscriptionsName, resultsName, metadataProvider, hcc, hdc, stats,
- dataverse);
//Create Channel Internal Job
JobSpecification channeljobSpec = createChannelJob(statementExecutor, subscriptionsName, resultsName,