Create index for result times
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
index 1ea8e7f..0dbc9e0 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
@@ -44,6 +44,7 @@
 import org.apache.asterix.bad.metadata.ChannelEventsListener;
 import org.apache.asterix.bad.runtime.RepetitiveChannelOperatorDescriptor;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import org.apache.asterix.common.config.DatasetConfig.IndexType;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.functions.FunctionSignature;
@@ -56,7 +57,9 @@
 import org.apache.asterix.lang.common.base.Statement;
 import org.apache.asterix.lang.common.expression.CallExpr;
 import org.apache.asterix.lang.common.expression.LiteralExpr;
+import org.apache.asterix.lang.common.expression.TypeExpression;
 import org.apache.asterix.lang.common.literal.StringLiteral;
+import org.apache.asterix.lang.common.statement.CreateIndexStatement;
 import org.apache.asterix.lang.common.statement.DatasetDecl;
 import org.apache.asterix.lang.common.statement.IDatasetDetailsDecl;
 import org.apache.asterix.lang.common.statement.InsertStatement;
@@ -241,10 +244,26 @@
                 new Identifier(BADConstants.BAD_DATAVERSE_NAME), resultsTypeName, null, null, null, null,
                 new HashMap<String, String>(), new HashMap<String, String>(), DatasetType.INTERNAL, idd, true);
 
+        //Create an index on timestamp for results
+        CreateIndexStatement createTimeIndex = new CreateIndexStatement();
+        createTimeIndex.setDatasetName(resultsName);
+        createTimeIndex.setDataverseName(new Identifier(dataverse));
+        createTimeIndex.setIndexName(new Identifier(resultsName + "TimeIndex"));
+        createTimeIndex.setIfNotExists(false);
+        createTimeIndex.setIndexType(IndexType.BTREE);
+        createTimeIndex.setEnforced(false);
+        createTimeIndex.setGramLength(0);
+        List<String> fNames = new ArrayList<>();
+        fNames.add(BADConstants.ChannelExecutionTime);
+        Pair<List<String>, TypeExpression> fields = new Pair<List<String>, TypeExpression>(fNames, null);
+        createTimeIndex.addFieldExprPair(fields);
+        createTimeIndex.addFieldIndexIndicator(0);
+
         //Run both statements to create datasets
         ((QueryTranslator) statementExecutor).handleCreateDatasetStatement(metadataProvider, createSubscriptionsDataset,
                 hcc);
         ((QueryTranslator) statementExecutor).handleCreateDatasetStatement(metadataProvider, createResultsDataset, hcc);
+        ((QueryTranslator) statementExecutor).handleCreateIndexStatement(metadataProvider, createTimeIndex, hcc);
 
     }
 
@@ -345,6 +364,7 @@
                 throw new AsterixException("Channel " + channelName + " is already running");
             }
             initialize(mdTxnCtx, subscriptionsName.getValue(), resultsName.getValue());
+
             channel = new Channel(dataverse, channelName.getValue(), subscriptionsTableName, resultsTableName, function,
                     duration);
 
@@ -356,6 +376,10 @@
                 throw new AsterixException("The channel name:" + channelName + " is not available.");
             }
 
+            //Create Channel Datasets
+            createDatasets(statementExecutor, subscriptionsName, resultsName, metadataProvider, hcc, hdc, stats,
+                    dataverse);
+
             // Now we subscribe
             if (listener == null) {
                 listener = new ChannelEventsListener(entityId);
@@ -364,9 +388,6 @@
             listener.registerEventSubscriber(eventSubscriber);
             subscriberRegistered = true;
 
-            //Create Channel Datasets
-            createDatasets(statementExecutor, subscriptionsName, resultsName, metadataProvider, hcc, hdc, stats,
-                    dataverse);
 
             //Create Channel Internal Job
             JobSpecification channeljobSpec = createChannelJob(statementExecutor, subscriptionsName, resultsName,