Added parameterized procedures

Add tests, including concurrent/parameterized execution
delete and query procedures can both use parameters
these will use Asterix job parameters to assign at runtime
Add timeStamp index to channel results
Cleanup result code for query procedures
Prevent repetitive jobs from executing
multiple iterations concurrently

Change-Id: I999879b1cae0de179a1d3c232fa940228979f4fe
diff --git a/asterix-bad/pom.xml b/asterix-bad/pom.xml
index 8738162..4f85b0a 100644
--- a/asterix-bad/pom.xml
+++ b/asterix-bad/pom.xml
@@ -244,6 +244,11 @@
       <version>${asterix.version}</version>
     </dependency>
     <dependency>
+      <groupId>org.apache.asterix</groupId>
+      <artifactId>asterix-lang-common</artifactId>
+      <version>${asterix.version}</version>
+    </dependency>
+    <dependency>
       <groupId>log4j</groupId>
       <artifactId>log4j</artifactId>
       <version>1.2.17</version>
@@ -289,6 +294,11 @@
       <version>${asterix.version}</version>
     </dependency>
     <dependency>
+      <groupId>org.apache.asterix</groupId>
+      <artifactId>asterix-transactions</artifactId>
+      <version>${asterix.version}</version>
+    </dependency>
+    <dependency>
       <groupId>org.apache.hyracks</groupId>
       <artifactId>hyracks-api</artifactId>
       <version>${hyracks.version}</version>
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java b/asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java
index a906ae6..548f1ba 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java
@@ -49,6 +49,8 @@
     public static final String FIELD_NAME_RETURN_TYPE = "ReturnType";
     public static final String FIELD_NAME_DEFINITION = "Definition";
     public static final String FIELD_NAME_LANGUAGE = "Language";
+    //To enable new Asterix TxnId for separate deployed job spec invocations
+    public static final byte[] TRANSACTION_ID_PARAMETER_NAME = "TxnIdParameter".getBytes();
 
     public enum ChannelJobType {
         REPETITIVE
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/ChannelJobService.java b/asterix-bad/src/main/java/org/apache/asterix/bad/ChannelJobService.java
index ae24e0e..1db0669 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/ChannelJobService.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/ChannelJobService.java
@@ -23,66 +23,25 @@
 import java.io.InputStreamReader;
 import java.net.HttpURLConnection;
 import java.net.URL;
-import java.util.EnumSet;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import org.apache.asterix.active.EntityId;
 import org.apache.asterix.om.base.AOrderedList;
 import org.apache.asterix.om.base.AUUID;
-import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.JobFlag;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.job.JobSpecification;
 
 /**
- * Provides functionality for running channel jobs and communicating with Brokers
+ * Provides functionality for channel jobs and communicating with Brokers
  */
 public class ChannelJobService {
 
     private static final Logger LOGGER = Logger.getLogger(ChannelJobService.class.getName());
 
-    public static ScheduledExecutorService startJob(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags, JobId jobId,
-            IHyracksClientConnection hcc, long duration)
-            throws Exception {
-        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
-        scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
-            @Override
-            public void run() {
-                try {
-                    executeJob(jobSpec, jobFlags, jobId, hcc);
-                } catch (Exception e) {
-                    LOGGER.log(Level.WARNING, "Channel Job Failed to run.", e);
-                }
-            }
-        }, duration, duration, TimeUnit.MILLISECONDS);
-        return scheduledExecutorService;
-    }
-
-    public static void executeJob(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags, JobId jobId,
-            IHyracksClientConnection hcc)
-            throws Exception {
-        LOGGER.info("Executing Channel Job");
-        if (jobId == null) {
-            hcc.startJob(jobSpec, jobFlags);
-        } else {
-            hcc.startJob(jobId);
-        }
-    }
-
-    public static void runChannelJob(JobSpecification channeljobSpec, IHyracksClientConnection hcc) throws Exception {
-        JobId jobId = hcc.startJob(channeljobSpec);
-        hcc.waitForCompletion(jobId);
-    }
-
     public static void sendBrokerNotificationsForChannel(EntityId activeJobId, String brokerEndpoint,
             AOrderedList subscriptionIds, String channelExecutionTime) throws HyracksDataException {
         String formattedString;
-            formattedString = formatJSON(activeJobId, subscriptionIds, channelExecutionTime);
+        formattedString = formatJSON(activeJobId, subscriptionIds, channelExecutionTime);
         sendMessage(brokerEndpoint, formattedString);
     }
 
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
index f4ea2f3..2b189be 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
@@ -18,6 +18,10 @@
  */
 package org.apache.asterix.bad.lang.statement;
 
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
 import org.apache.asterix.active.EntityId;
 import org.apache.asterix.algebra.extension.IExtensionStatement;
 import org.apache.asterix.app.active.ActiveNotificationHandler;
@@ -25,7 +29,7 @@
 import org.apache.asterix.bad.BADConstants;
 import org.apache.asterix.bad.lang.BADLangExtension;
 import org.apache.asterix.bad.metadata.Channel;
-import org.apache.asterix.bad.metadata.PrecompiledJobEventListener;
+import org.apache.asterix.bad.metadata.DeployedJobSpecEventListener;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.lang.common.statement.DropDatasetStatement;
@@ -38,11 +42,11 @@
 import org.apache.asterix.translator.IStatementExecutor;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.DeployedJobSpecId;
 
 public class ChannelDropStatement implements IExtensionStatement {
+    private static final Logger LOGGER = Logger.getLogger(ChannelDropStatement.class.getName());
 
     private final Identifier dataverseName;
     private final Identifier channelName;
@@ -91,7 +95,7 @@
         ICcApplicationContext appCtx = metadataProvider.getApplicationContext();
         ActiveNotificationHandler activeEventHandler =
                 (ActiveNotificationHandler) appCtx.getActiveNotificationHandler();
-        PrecompiledJobEventListener listener = (PrecompiledJobEventListener) activeEventHandler.getListener(entityId);
+        DeployedJobSpecEventListener listener = (DeployedJobSpecEventListener) activeEventHandler.getListener(entityId);
         Channel channel = null;
 
         MetadataTransactionContext mdTxnCtx = null;
@@ -109,22 +113,30 @@
                 }
             }
 
-            listener.getExecutorService().shutdownNow();
-            JobId hyracksJobId = listener.getJobId();
-            listener.deActivate();
-            activeEventHandler.unregisterListener(listener);
-            if (hyracksJobId != null) {
-                hcc.destroyJob(hyracksJobId);
-                // wait for job completion to release any resources to be dropped
-                ensureJobDestroyed(hcc, hyracksJobId);
-            }
+            if (listener == null) {
+                //TODO: Channels need to better handle cluster failures
+                LOGGER.log(Level.SEVERE,
+                        "Tried to drop a Deployed Job  whose listener no longer exists:  "
+                                + entityId.getExtensionName() + " " + entityId.getDataverse() + "."
+                                + entityId.getEntityName() + ".");
 
+            } else {
+                listener.getExecutorService().shutdown();
+                listener.getExecutorService().awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
+                DeployedJobSpecId deployedJobSpecId = listener.getDeployedJobSpecId();
+                listener.deActivate();
+                activeEventHandler.unregisterListener(listener);
+                if (deployedJobSpecId != null) {
+                    hcc.undeployJobSpec(deployedJobSpecId);
+                }
+            }
             //Create a metadata provider to use in nested jobs.
             MetadataProvider tempMdProvider = new MetadataProvider(appCtx, metadataProvider.getDefaultDataverse());
             tempMdProvider.getConfig().putAll(metadataProvider.getConfig());
             //Drop the Channel Datasets
             //TODO: Need to find some way to handle if this fails.
             //TODO: Prevent datasets for Channels from being dropped elsewhere
+
             DropDatasetStatement dropStmt = new DropDatasetStatement(new Identifier(dataverse),
                     new Identifier(channel.getResultsDatasetName()), true);
             ((QueryTranslator) statementExecutor).handleDatasetDropStatement(tempMdProvider, dropStmt, hcc, null);
@@ -147,19 +159,4 @@
         }
     }
 
-    private void ensureJobDestroyed(IHyracksClientConnection hcc, JobId hyracksJobId) throws Exception {
-        try {
-            hcc.waitForCompletion(hyracksJobId);
-        } catch (Exception e) {
-            // if the job has already been destroyed, it is safe to complete
-            if (e instanceof HyracksDataException) {
-                HyracksDataException hde = (HyracksDataException) e;
-                if (hde.getComponent().equals(ErrorCode.HYRACKS)
-                        && hde.getErrorCode() == ErrorCode.JOB_HAS_BEEN_CLEARED_FROM_HISTORY) {
-                    return;
-                }
-            }
-            throw e;
-        }
-    }
 }
\ No newline at end of file
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
index 6d04f20..0898eec 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
@@ -22,13 +22,13 @@
 import java.io.DataOutputStream;
 import java.io.StringReader;
 import java.util.ArrayList;
-import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.List;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import org.apache.asterix.active.DeployedJobService;
 import org.apache.asterix.active.EntityId;
 import org.apache.asterix.algebra.extension.IExtensionStatement;
 import org.apache.asterix.app.active.ActiveNotificationHandler;
@@ -38,9 +38,10 @@
 import org.apache.asterix.bad.lang.BADLangExtension;
 import org.apache.asterix.bad.lang.BADParserFactory;
 import org.apache.asterix.bad.metadata.Channel;
-import org.apache.asterix.bad.metadata.PrecompiledJobEventListener;
-import org.apache.asterix.bad.metadata.PrecompiledJobEventListener.PrecompiledType;
+import org.apache.asterix.bad.metadata.DeployedJobSpecEventListener;
+import org.apache.asterix.bad.metadata.DeployedJobSpecEventListener.PrecompiledType;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import org.apache.asterix.common.config.DatasetConfig.IndexType;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.exceptions.CompilationException;
@@ -50,8 +51,10 @@
 import org.apache.asterix.lang.common.base.Expression;
 import org.apache.asterix.lang.common.base.Statement;
 import org.apache.asterix.lang.common.expression.CallExpr;
+import org.apache.asterix.lang.common.expression.IndexedTypeExpression;
 import org.apache.asterix.lang.common.expression.LiteralExpr;
 import org.apache.asterix.lang.common.literal.StringLiteral;
+import org.apache.asterix.lang.common.statement.CreateIndexStatement;
 import org.apache.asterix.lang.common.statement.DatasetDecl;
 import org.apache.asterix.lang.common.statement.IDatasetDetailsDecl;
 import org.apache.asterix.lang.common.statement.InsertStatement;
@@ -69,11 +72,11 @@
 import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
 import org.apache.asterix.translator.IStatementExecutor.Stats;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.dataset.IHyracksDataset;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.JobFlag;
-import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.DeployedJobSpecId;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.dataflow.common.data.parsers.IValueParser;
 
@@ -89,16 +92,14 @@
     private InsertStatement channelResultsInsertQuery;
     private String subscriptionsTableName;
     private String resultsTableName;
-    private boolean distributed;
 
     public CreateChannelStatement(Identifier dataverseName, Identifier channelName, FunctionSignature function,
-            Expression period, boolean distributed) {
+            Expression period) {
         this.channelName = channelName;
         this.dataverseName = dataverseName;
         this.function = function;
         this.period = (CallExpr) period;
         this.duration = "";
-        this.distributed = distributed;
     }
 
     public Identifier getDataverseName() {
@@ -196,12 +197,32 @@
                 new Identifier(BADConstants.BAD_DATAVERSE_NAME), resultsTypeName, null, null, null, null,
                 new HashMap<String, String>(), new HashMap<String, String>(), DatasetType.INTERNAL, idd, true);
 
+        //Create an index on timestamp for results
+        CreateIndexStatement createTimeIndex = new CreateIndexStatement();
+        createTimeIndex.setDatasetName(resultsName);
+        createTimeIndex.setDataverseName(new Identifier(dataverse));
+        createTimeIndex.setIndexName(new Identifier(resultsName + "TimeIndex"));
+        createTimeIndex.setIfNotExists(false);
+        createTimeIndex.setIndexType(IndexType.BTREE);
+        createTimeIndex.setEnforced(false);
+        createTimeIndex.setGramLength(0);
+        List<String> fNames = new ArrayList<>();
+        fNames.add(BADConstants.ChannelExecutionTime);
+        Pair<List<String>, IndexedTypeExpression> fields = new Pair<>(fNames, null);
+        createTimeIndex.addFieldExprPair(fields);
+        createTimeIndex.addFieldIndexIndicator(0);
+
+
         //Run both statements to create datasets
         ((QueryTranslator) statementExecutor).handleCreateDatasetStatement(metadataProvider, createSubscriptionsDataset,
                 hcc, null);
         metadataProvider.getLocks().reset();
         ((QueryTranslator) statementExecutor).handleCreateDatasetStatement(metadataProvider, createResultsDataset, hcc,
                 null);
+        metadataProvider.getLocks().reset();
+
+        //Create a time index for the results
+        ((QueryTranslator) statementExecutor).handleCreateIndexStatement(metadataProvider, createTimeIndex, hcc, null);
 
     }
 
@@ -240,16 +261,12 @@
     }
 
     private void setupExecutorJob(EntityId entityId, JobSpecification channeljobSpec, IHyracksClientConnection hcc,
-            PrecompiledJobEventListener listener, boolean predistributed) throws Exception {
+            DeployedJobSpecEventListener listener) throws Exception {
         if (channeljobSpec != null) {
-            //TODO: Find a way to fix optimizer tests so we don't need this check
-            JobId jobId = null;
-            if (predistributed) {
-                jobId = hcc.distributeJob(channeljobSpec);
-            }
-            ScheduledExecutorService ses = ChannelJobService.startJob(channeljobSpec, EnumSet.noneOf(JobFlag.class),
-                    jobId, hcc, ChannelJobService.findPeriod(duration));
-            listener.storeDistributedInfo(jobId, ses, null);
+            DeployedJobSpecId destributedId = hcc.deployJobSpec(channeljobSpec);
+            ScheduledExecutorService ses = DeployedJobService.startRepetitiveDeployedJobSpec(destributedId, hcc,
+                    ChannelJobService.findPeriod(duration), new HashMap<>(), entityId);
+            listener.storeDistributedInfo(destributedId, ses, null, null);
         }
 
     }
@@ -275,7 +292,7 @@
         ICcApplicationContext appCtx = metadataProvider.getApplicationContext();
         ActiveNotificationHandler activeEventHandler =
                 (ActiveNotificationHandler) appCtx.getActiveNotificationHandler();
-        PrecompiledJobEventListener listener = (PrecompiledJobEventListener) activeEventHandler.getListener(entityId);
+        DeployedJobSpecEventListener listener = (DeployedJobSpecEventListener) activeEventHandler.getListener(entityId);
         boolean alreadyActive = false;
         Channel channel = null;
 
@@ -322,16 +339,12 @@
                 datasets.add(MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverse, subscriptionsName.getValue()));
                 datasets.add(MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverse, resultsName.getValue()));
                 //TODO: Add datasets used by channel function
-                listener = new PrecompiledJobEventListener(appCtx, entityId, PrecompiledType.CHANNEL, datasets, null,
+                listener = new DeployedJobSpecEventListener(appCtx, entityId, PrecompiledType.CHANNEL, datasets, null,
                         "BadListener");
                 activeEventHandler.registerListener(listener);
             }
 
-            if (distributed) {
-                setupExecutorJob(entityId, channeljobSpec, hcc, listener, true);
-            } else {
-                setupExecutorJob(entityId, channeljobSpec, hcc, listener, false);
-            }
+            setupExecutorJob(entityId, channeljobSpec, hcc, listener);
 
             MetadataManager.INSTANCE.addEntity(mdTxnCtx, channel);
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
index 0666b38..adfa485 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
@@ -20,7 +20,6 @@
 
 import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
-import java.io.StringReader;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.logging.Level;
@@ -29,13 +28,11 @@
 import org.apache.asterix.active.EntityId;
 import org.apache.asterix.algebra.extension.IExtensionStatement;
 import org.apache.asterix.app.active.ActiveNotificationHandler;
-import org.apache.asterix.app.result.ResultReader;
 import org.apache.asterix.app.translator.QueryTranslator;
 import org.apache.asterix.bad.BADConstants;
 import org.apache.asterix.bad.lang.BADLangExtension;
-import org.apache.asterix.bad.lang.BADParserFactory;
-import org.apache.asterix.bad.metadata.PrecompiledJobEventListener;
-import org.apache.asterix.bad.metadata.PrecompiledJobEventListener.PrecompiledType;
+import org.apache.asterix.bad.metadata.DeployedJobSpecEventListener;
+import org.apache.asterix.bad.metadata.DeployedJobSpecEventListener.PrecompiledType;
 import org.apache.asterix.bad.metadata.Procedure;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.AsterixException;
@@ -44,30 +41,36 @@
 import org.apache.asterix.common.functions.FunctionSignature;
 import org.apache.asterix.lang.common.base.Expression;
 import org.apache.asterix.lang.common.base.Statement;
+import org.apache.asterix.lang.common.clause.LetClause;
 import org.apache.asterix.lang.common.expression.CallExpr;
 import org.apache.asterix.lang.common.expression.LiteralExpr;
+import org.apache.asterix.lang.common.expression.VariableExpr;
 import org.apache.asterix.lang.common.literal.StringLiteral;
+import org.apache.asterix.lang.common.statement.DeleteStatement;
 import org.apache.asterix.lang.common.statement.Query;
 import org.apache.asterix.lang.common.struct.Identifier;
 import org.apache.asterix.lang.common.struct.VarIdentifier;
 import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
+import org.apache.asterix.lang.sqlpp.expression.SelectExpression;
 import org.apache.asterix.lang.sqlpp.visitor.SqlppDeleteRewriteVisitor;
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.MetadataTransactionContext;
 import org.apache.asterix.metadata.declared.MetadataProvider;
 import org.apache.asterix.metadata.entities.Function;
 import org.apache.asterix.om.base.temporal.ADurationParserFactory;
+import org.apache.asterix.om.functions.BuiltinFunctions;
 import org.apache.asterix.translator.IRequestParameters;
 import org.apache.asterix.translator.IStatementExecutor;
 import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
 import org.apache.asterix.translator.IStatementExecutor.Stats;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.dataset.IHyracksDataset;
 import org.apache.hyracks.api.dataset.ResultSetId;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.DeployedJobSpecId;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.dataflow.common.data.parsers.IValueParser;
 
@@ -76,26 +79,35 @@
     private static final Logger LOGGER = Logger.getLogger(CreateProcedureStatement.class.getName());
 
     private final FunctionSignature signature;
-    private final String functionBody;
+    private final String procedureBody;
+    private final Statement procedureBodyStatement;
     private final List<String> paramList;
+    private final List<VariableExpr> varList;
     private final CallExpr period;
     private String duration = "";
 
-    public String getFunctionBody() {
-        return functionBody;
-    }
-
-    public CreateProcedureStatement(FunctionSignature signature, List<VarIdentifier> parameterList, String functionBody,
-            Expression period) {
+    public CreateProcedureStatement(FunctionSignature signature, List<VarIdentifier> parameterList,
+            List<Integer> paramIds, String functionBody, Statement procedureBodyStatement, Expression period) {
         this.signature = signature;
-        this.functionBody = functionBody;
+        this.procedureBody = functionBody;
+        this.procedureBodyStatement = procedureBodyStatement;
         this.paramList = new ArrayList<>();
-        for (VarIdentifier varId : parameterList) {
-            this.paramList.add(varId.getValue());
+        this.varList = new ArrayList<>();
+        for (int i = 0; i < parameterList.size(); i++) {
+            this.paramList.add(parameterList.get(i).getValue());
+            this.varList.add(new VariableExpr(new VarIdentifier(parameterList.get(i).toString(), paramIds.get(i))));
         }
         this.period = (CallExpr) period;
     }
 
+    public String getProcedureBody() {
+        return procedureBody;
+    }
+
+    public Statement getProcedureBodyStatement() {
+        return procedureBodyStatement;
+    }
+
     @Override
     public byte getKind() {
         return Kind.EXTENSION;
@@ -105,6 +117,10 @@
         return paramList;
     }
 
+    public List<VariableExpr> getVarList() {
+        return varList;
+    }
+
     public FunctionSignature getSignature() {
         return signature;
     }
@@ -158,43 +174,56 @@
         return jobSpec;
     }
 
-    private Pair<JobSpecification, PrecompiledType> createProcedureJob(String body,
-            IStatementExecutor statementExecutor, MetadataProvider metadataProvider, IHyracksClientConnection hcc,
-            IHyracksDataset hdc, Stats stats) throws Exception {
-        StringBuilder builder = new StringBuilder();
-        builder.append(body);
-        builder.append(";");
-        BADParserFactory factory = new BADParserFactory();
-        List<Statement> fStatements = factory.createParser(new StringReader(builder.toString())).parse();
-        if (fStatements.size() > 1) {
-            throw new CompilationException("Procedure can only execute a single statement");
+    private void addLets(SelectExpression s) {
+        FunctionIdentifier function = BuiltinFunctions.GET_JOB_PARAMETER;
+        FunctionSignature sig =
+                new FunctionSignature(function.getNamespace(), function.getName(), function.getArity());
+        for (VariableExpr var : varList) {
+            List<Expression> strListForCall = new ArrayList<>();
+            LiteralExpr l = new LiteralExpr(new StringLiteral(var.getVar().getValue()));
+            strListForCall.add(l);
+            Expression con = new CallExpr(sig, strListForCall);
+            LetClause let = new LetClause(var, con);
+            s.getLetList().add(let);
         }
-        if (fStatements.get(0).getKind() == Statement.Kind.INSERT) {
+    }
+
+    private Pair<JobSpecification, PrecompiledType> createProcedureJob(IStatementExecutor statementExecutor,
+            MetadataProvider metadataProvider, IHyracksClientConnection hcc, IHyracksDataset hdc, Stats stats)
+                    throws Exception {
+        if (getProcedureBodyStatement().getKind() == Statement.Kind.INSERT) {
+            if (!varList.isEmpty()) {
+                throw new CompilationException("Insert procedures cannot have parameters");
+            }
             return new Pair<>(
                     ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider,
-                            fStatements.get(0), hcc, hdc, ResultDelivery.ASYNC, null, stats, true, null),
+                            getProcedureBodyStatement(), hcc, hdc, ResultDelivery.ASYNC, null, stats, true, null),
                     PrecompiledType.INSERT);
-        } else if (fStatements.get(0).getKind() == Statement.Kind.QUERY) {
-            Pair<JobSpecification, PrecompiledType> pair =
-                    new Pair<>(compileQueryJob(statementExecutor, metadataProvider, hcc, (Query) fStatements.get(0)),
-                            PrecompiledType.QUERY);
+        } else if (getProcedureBodyStatement().getKind() == Statement.Kind.QUERY) {
+            Query s = (Query) getProcedureBodyStatement();
+            addLets((SelectExpression) s.getBody());
+            Pair<JobSpecification, PrecompiledType> pair = new Pair<>(
+                    compileQueryJob(statementExecutor, metadataProvider, hcc, (Query) getProcedureBodyStatement()),
+                    PrecompiledType.QUERY);
             metadataProvider.getLocks().unlock();
             return pair;
-        } else if (fStatements.get(0).getKind() == Statement.Kind.DELETE) {
+        } else if (getProcedureBodyStatement().getKind() == Statement.Kind.DELETE) {
             SqlppDeleteRewriteVisitor visitor = new SqlppDeleteRewriteVisitor();
-            fStatements.get(0).accept(visitor, null);
+            getProcedureBodyStatement().accept(visitor, null);
+            DeleteStatement delete = (DeleteStatement) getProcedureBodyStatement();
+            addLets((SelectExpression) delete.getQuery().getBody());
             return new Pair<>(((QueryTranslator) statementExecutor).handleDeleteStatement(metadataProvider,
-                    fStatements.get(0), hcc, true), PrecompiledType.DELETE);
+                    getProcedureBodyStatement(), hcc, true), PrecompiledType.DELETE);
         } else {
             throw new CompilationException("Procedure can only execute a single delete, insert, or query");
         }
     }
 
-    private void setupDistributedJob(EntityId entityId, JobSpecification jobSpec, IHyracksClientConnection hcc,
-            PrecompiledJobEventListener listener, ResultSetId resultSetId, IHyracksDataset hdc, Stats stats)
+    private void setupDeployedJobSpec(EntityId entityId, JobSpecification jobSpec, IHyracksClientConnection hcc,
+            DeployedJobSpecEventListener listener, ResultSetId resultSetId, IHyracksDataset hdc, Stats stats)
             throws Exception {
-        JobId jobId = hcc.distributeJob(jobSpec);
-        listener.storeDistributedInfo(jobId, null, new ResultReader(hdc, jobId, resultSetId));
+        DeployedJobSpecId deployedJobSpecId = hcc.deployJobSpec(jobSpec);
+        listener.storeDistributedInfo(deployedJobSpecId, null, hdc, resultSetId);
     }
 
     @Override
@@ -208,7 +237,7 @@
         String dataverse =
                 ((QueryTranslator) statementExecutor).getActiveDataverse(new Identifier(signature.getNamespace()));
         EntityId entityId = new EntityId(BADConstants.PROCEDURE_KEYWORD, dataverse, signature.getName());
-        PrecompiledJobEventListener listener = (PrecompiledJobEventListener) activeEventHandler.getListener(entityId);
+        DeployedJobSpecEventListener listener = (DeployedJobSpecEventListener) activeEventHandler.getListener(entityId);
         boolean alreadyActive = false;
         Procedure procedure = null;
 
@@ -228,7 +257,7 @@
                 throw new AsterixException("Procedure " + signature.getName() + " is already running");
             }
             procedure = new Procedure(dataverse, signature.getName(), signature.getArity(), getParamList(),
-                    Function.RETURNTYPE_VOID, getFunctionBody(), Function.LANGUAGE_AQL, duration);
+                    Function.RETURNTYPE_VOID, getProcedureBody(), Function.LANGUAGE_AQL, duration);
             MetadataProvider tempMdProvider = new MetadataProvider(metadataProvider.getApplicationContext(),
                     metadataProvider.getDefaultDataverse());
             tempMdProvider.getConfig().putAll(metadataProvider.getConfig());
@@ -246,16 +275,18 @@
 
             //Create Procedure Internal Job
             Pair<JobSpecification, PrecompiledType> procedureJobSpec =
-                    createProcedureJob(getFunctionBody(), statementExecutor, tempMdProvider, hcc, hdc, stats);
+                    createProcedureJob(statementExecutor, tempMdProvider, hcc, hdc, stats);
 
             // Now we subscribe
             if (listener == null) {
                 //TODO: Add datasets used by channel function
-                listener = new PrecompiledJobEventListener(appCtx, entityId, procedureJobSpec.second, new ArrayList<>(),
+                listener = new DeployedJobSpecEventListener(appCtx, entityId, procedureJobSpec.second,
+                        new ArrayList<>(),
                         null, "BadListener");
                 activeEventHandler.registerListener(listener);
             }
-            setupDistributedJob(entityId, procedureJobSpec.first, hcc, listener, tempMdProvider.getResultSetId(), hdc,
+            setupDeployedJobSpec(entityId, procedureJobSpec.first, hcc, listener, tempMdProvider.getResultSetId(),
+                    hdc,
                     stats);
 
             MetadataManager.INSTANCE.addEntity(mdTxnCtx, procedure);
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
index 0dbd0a3..b6c66dc 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
@@ -18,9 +18,13 @@
  */
 package org.apache.asterix.bad.lang.statement;
 
-import java.util.EnumSet;
+import java.io.DataOutput;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ScheduledExecutorService;
 
+import org.apache.asterix.active.DeployedJobService;
 import org.apache.asterix.active.EntityId;
 import org.apache.asterix.algebra.extension.IExtensionStatement;
 import org.apache.asterix.api.http.server.ResultUtil;
@@ -30,35 +34,47 @@
 import org.apache.asterix.bad.BADConstants;
 import org.apache.asterix.bad.ChannelJobService;
 import org.apache.asterix.bad.lang.BADLangExtension;
-import org.apache.asterix.bad.metadata.PrecompiledJobEventListener;
-import org.apache.asterix.bad.metadata.PrecompiledJobEventListener.PrecompiledType;
+import org.apache.asterix.bad.metadata.DeployedJobSpecEventListener;
+import org.apache.asterix.bad.metadata.DeployedJobSpecEventListener.PrecompiledType;
 import org.apache.asterix.bad.metadata.Procedure;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
+import org.apache.asterix.lang.common.base.Expression;
+import org.apache.asterix.lang.common.expression.LiteralExpr;
 import org.apache.asterix.lang.common.struct.Identifier;
 import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.MetadataTransactionContext;
 import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.om.base.AString;
+import org.apache.asterix.om.base.IAObject;
+import org.apache.asterix.transaction.management.service.transaction.TxnIdFactory;
+import org.apache.asterix.translator.ConstantHelper;
 import org.apache.asterix.translator.IRequestParameters;
 import org.apache.asterix.translator.IStatementExecutor;
 import org.apache.asterix.translator.IStatementExecutor.Stats;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.JobFlag;
+import org.apache.hyracks.api.job.DeployedJobSpecId;
 import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
 
 public class ExecuteProcedureStatement implements IExtensionStatement {
 
     private final String dataverseName;
     private final String procedureName;
     private final int arity;
+    private final List<Expression> argList;
 
-    public ExecuteProcedureStatement(String dataverseName, String procedureName, int arity) {
+    public ExecuteProcedureStatement(String dataverseName, String procedureName, int arity, List<Expression> argList) {
         this.dataverseName = dataverseName;
         this.procedureName = procedureName;
         this.arity = arity;
+        this.argList = argList;
     }
 
     public String getDataverseName() {
@@ -98,7 +114,7 @@
         String dataverse = ((QueryTranslator) statementExecutor).getActiveDataverse(new Identifier(dataverseName));
         boolean txnActive = false;
         EntityId entityId = new EntityId(BADConstants.PROCEDURE_KEYWORD, dataverse, procedureName);
-        PrecompiledJobEventListener listener = (PrecompiledJobEventListener) activeEventHandler.getListener(entityId);
+        DeployedJobSpecEventListener listener = (DeployedJobSpecEventListener) activeEventHandler.getListener(entityId);
         Procedure procedure = null;
 
         MetadataTransactionContext mdTxnCtx = null;
@@ -109,22 +125,30 @@
             if (procedure == null) {
                 throw new AlgebricksException("There is no procedure with this name " + procedureName + ".");
             }
-
-            JobId hyracksJobId = listener.getJobId();
+            Map<byte[], byte[]> contextRuntimeVarMap = createParameterMap(procedure);
+            DeployedJobSpecId deployedJobSpecId = listener.getDeployedJobSpecId();
             if (procedure.getDuration().equals("")) {
-                hcc.startJob(hyracksJobId);
+
+                //Add the Asterix Transaction Id to the map
+                contextRuntimeVarMap.put(BADConstants.TRANSACTION_ID_PARAMETER_NAME,
+                        String.valueOf(TxnIdFactory.create().getId()).getBytes());
+                JobId jobId = hcc.startJob(deployedJobSpecId, contextRuntimeVarMap);
 
                 if (listener.getType() == PrecompiledType.QUERY) {
-                    hcc.waitForCompletion(hyracksJobId);
-                    ResultReader resultReader = listener.getResultReader();
+                    hcc.waitForCompletion(jobId);
+                    ResultReader resultReader =
+                            new ResultReader(listener.getResultDataset(), jobId, listener.getResultId());
+
                     ResultUtil.printResults(appCtx, resultReader,
                             ((QueryTranslator) statementExecutor).getSessionOutput(), new Stats(), null);
                 }
 
             } else {
-                ScheduledExecutorService ses = ChannelJobService.startJob(null, EnumSet.noneOf(JobFlag.class),
-                        hyracksJobId, hcc, ChannelJobService.findPeriod(procedure.getDuration()));
-                listener.storeDistributedInfo(hyracksJobId, ses, listener.getResultReader());
+                ScheduledExecutorService ses =
+                        DeployedJobService.startRepetitiveDeployedJobSpec(deployedJobSpecId, hcc,
+                                ChannelJobService.findPeriod(procedure.getDuration()), contextRuntimeVarMap, entityId);
+                listener.storeDistributedInfo(deployedJobSpecId, ses, listener.getResultDataset(),
+                        listener.getResultId());
             }
 
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -140,4 +164,44 @@
         }
     }
 
+    private Map<byte[], byte[]> createParameterMap(Procedure procedure)
+            throws AsterixException, HyracksDataException {
+        Map<byte[], byte[]> map = new HashMap<>();
+        if (procedure.getParams().size() != argList.size()) {
+            throw AsterixException.create(ErrorCode.COMPILATION_INVALID_PARAMETER_NUMBER,
+                    procedure.getEntityId().getEntityName(), argList.size());
+        }
+        ArrayBackedValueStorage abvsKey = new ArrayBackedValueStorage();
+        DataOutput dosKey = abvsKey.getDataOutput();
+        ArrayBackedValueStorage abvsValue = new ArrayBackedValueStorage();
+        DataOutput dosValue = abvsValue.getDataOutput();
+
+        for (int i = 0; i < procedure.getParams().size(); i++) {
+            if (!(argList.get(i) instanceof LiteralExpr)) {
+                //TODO handle nonliteral arguments to procedure
+                throw AsterixException.create(ErrorCode.TYPE_UNSUPPORTED, procedure.getEntityId().getEntityName(),
+                        argList.get(i).getClass());
+            }
+            //Turn the argument name into a byte array
+            IAObject str = new AString(procedure.getParams().get(i));
+            abvsKey.reset();
+            SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(str.getType()).serialize(str, dosKey);
+            //We do not save the type tag of the string key
+            byte[] key = new byte[abvsKey.getLength() - 1];
+            System.arraycopy(abvsKey.getByteArray(), 1, key, 0, abvsKey.getLength() - 1);
+
+            //Turn the argument value into a byte array
+            IAObject object = ConstantHelper.objectFromLiteral(((LiteralExpr) argList.get(i)).getValue());
+            abvsValue.reset();
+            SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(object.getType()).serialize(object,
+                    dosValue);
+            byte[] value = new byte[abvsValue.getLength()];
+            System.arraycopy(abvsValue.getByteArray(), abvsValue.getStartOffset(), value, 0, abvsValue.getLength());
+
+            map.put(key, value);
+        }
+
+        return map;
+    }
+
 }
\ No newline at end of file
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java
index 3c618ae..18e769d 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java
@@ -18,13 +18,17 @@
  */
 package org.apache.asterix.bad.lang.statement;
 
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
 import org.apache.asterix.active.EntityId;
 import org.apache.asterix.algebra.extension.IExtensionStatement;
 import org.apache.asterix.app.active.ActiveNotificationHandler;
 import org.apache.asterix.app.translator.QueryTranslator;
 import org.apache.asterix.bad.BADConstants;
 import org.apache.asterix.bad.lang.BADLangExtension;
-import org.apache.asterix.bad.metadata.PrecompiledJobEventListener;
+import org.apache.asterix.bad.metadata.DeployedJobSpecEventListener;
 import org.apache.asterix.bad.metadata.Procedure;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.CompilationException;
@@ -39,9 +43,10 @@
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.DeployedJobSpecId;
 
 public class ProcedureDropStatement implements IExtensionStatement {
+    private static final Logger LOGGER = Logger.getLogger(ProcedureDropStatement.class.getName());
 
     private final FunctionSignature signature;
     private boolean ifExists;
@@ -87,7 +92,7 @@
         signature.setNamespace(dataverse);
         boolean txnActive = false;
         EntityId entityId = new EntityId(BADConstants.PROCEDURE_KEYWORD, dataverse, signature.getName());
-        PrecompiledJobEventListener listener = (PrecompiledJobEventListener) activeEventHandler.getListener(entityId);
+        DeployedJobSpecEventListener listener = (DeployedJobSpecEventListener) activeEventHandler.getListener(entityId);
         Procedure procedure = null;
 
         MetadataTransactionContext mdTxnCtx = null;
@@ -106,14 +111,24 @@
                 }
             }
 
-            if (listener.getExecutorService() != null) {
-                listener.getExecutorService().shutdownNow();
-            }
-            JobId hyracksJobId = listener.getJobId();
-            listener.deActivate();
-            activeEventHandler.unregisterListener(listener);
-            if (hyracksJobId != null) {
-                hcc.destroyJob(hyracksJobId);
+            if (listener == null) {
+                //TODO: Channels need to better handle cluster failures
+                LOGGER.log(Level.SEVERE,
+                        "Tried to drop a Deployed Job  whose listener no longer exists:  "
+                                + entityId.getExtensionName() + " " + entityId.getDataverse() + "."
+                                + entityId.getEntityName() + ".");
+            } else {
+                if (listener.getExecutorService() != null) {
+                    listener.getExecutorService().shutdown();
+                    listener.getExecutorService().awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
+                }
+                DeployedJobSpecId deployedJobSpecId = listener.getDeployedJobSpecId();
+                listener.deActivate();
+                activeEventHandler.unregisterListener(listener);
+                if (deployedJobSpecId != null) {
+                    hcc.undeployJobSpec(deployedJobSpecId);
+                }
+
             }
 
             //Remove the Channel Metadata
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/PrecompiledJobEventListener.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DeployedJobSpecEventListener.java
similarity index 85%
rename from asterix-bad/src/main/java/org/apache/asterix/bad/metadata/PrecompiledJobEventListener.java
rename to asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DeployedJobSpecEventListener.java
index 5036549..950612c 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/PrecompiledJobEventListener.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DeployedJobSpecEventListener.java
@@ -30,19 +30,22 @@
 import org.apache.asterix.active.IActiveEntityEventSubscriber;
 import org.apache.asterix.active.IActiveEntityEventsListener;
 import org.apache.asterix.active.message.ActivePartitionMessage;
-import org.apache.asterix.active.message.ActivePartitionMessage.Event;
 import org.apache.asterix.app.result.ResultReader;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.metadata.IDataset;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.api.dataset.IHyracksDataset;
+import org.apache.hyracks.api.dataset.ResultSetId;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.DeployedJobSpecId;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 
-public class PrecompiledJobEventListener implements IActiveEntityEventsListener {
+public class DeployedJobSpecEventListener implements IActiveEntityEventsListener {
 
-    private static final Logger LOGGER = Logger.getLogger(PrecompiledJobEventListener.class);
+    private static final Logger LOGGER = Logger.getLogger(DeployedJobSpecEventListener.class);
+
 
     public enum PrecompiledType {
         CHANNEL,
@@ -57,9 +60,13 @@
         FINISHED
     }
 
+    private DeployedJobSpecId deployedJobSpecId;
     private ScheduledExecutorService executorService = null;
     private ResultReader resultReader;
     private final PrecompiledType type;
+
+    private IHyracksDataset hdc;
+    private ResultSetId resultSetId;
     // members
     protected volatile ActivityState state;
     protected JobId jobId;
@@ -75,7 +82,7 @@
     protected final AlgebricksAbsolutePartitionConstraint locations;
     protected int numRegistered;
 
-    public PrecompiledJobEventListener(ICcApplicationContext appCtx, EntityId entityId, PrecompiledType type,
+    public DeployedJobSpecEventListener(ICcApplicationContext appCtx, EntityId entityId, PrecompiledType type,
             List<IDataset> datasets, AlgebricksAbsolutePartitionConstraint locations, String runtimeName) {
         this.appCtx = appCtx;
         this.entityId = entityId;
@@ -92,8 +99,21 @@
         this.type = type;
     }
 
+
+    public IHyracksDataset getResultDataset() {
+        return hdc;
+    }
+
+    public ResultSetId getResultId() {
+        return resultSetId;
+    }
+
+    public DeployedJobSpecId getDeployedJobSpecId() {
+        return deployedJobSpecId;
+    }
+
     protected synchronized void handle(ActivePartitionMessage message) {
-        if (message.getEvent() == Event.RUNTIME_REGISTERED) {
+        if (message.getEvent() == ActivePartitionMessage.Event.RUNTIME_REGISTERED) {
             numRegistered++;
             if (numRegistered == locations.getLocations().length) {
                 state = ActivityState.RUNNING;
@@ -172,10 +192,12 @@
         return type;
     }
 
-    public void storeDistributedInfo(JobId jobId, ScheduledExecutorService ses, ResultReader resultReader) {
-        this.jobId = jobId;
+    public void storeDistributedInfo(DeployedJobSpecId deployedJobSpecId, ScheduledExecutorService ses,
+            IHyracksDataset hdc, ResultSetId resultSetId) {
+        this.deployedJobSpecId = deployedJobSpecId;
         this.executorService = ses;
-        this.resultReader = resultReader;
+        this.hdc = hdc;
+        this.resultSetId = resultSetId;
     }
 
     public ScheduledExecutorService getExecutorService() {
diff --git a/asterix-bad/src/test/resources/conf/asterix-build-configuration.xml b/asterix-bad/src/main/resources/asterix-build-configuration.xml
similarity index 100%
rename from asterix-bad/src/test/resources/conf/asterix-build-configuration.xml
rename to asterix-bad/src/main/resources/asterix-build-configuration.xml
diff --git a/asterix-bad/src/main/resources/lang-extension/lang.txt b/asterix-bad/src/main/resources/lang-extension/lang.txt
index 6a06ea6..7c5931c 100644
--- a/asterix-bad/src/main/resources/lang-extension/lang.txt
+++ b/asterix-bad/src/main/resources/lang-extension/lang.txt
@@ -101,16 +101,15 @@
   CreateChannelStatement ccs = null;
   String fqFunctionName = null;
   Expression period = null;
-  boolean distributed = true;
 }
 {
   (
     "repetitive" "channel"  nameComponents = QualifiedName()
     <USING> appliedFunction = FunctionSignature()
-    "period" period = FunctionCallExpr() ("nondistributed" { distributed = false; })?
+    "period" period = FunctionCallExpr()
     {
       ccs = new CreateChannelStatement(nameComponents.first,
-                                   nameComponents.second, appliedFunction, period, distributed);
+                                   nameComponents.second, appliedFunction, period);
     }
   )
     {
@@ -124,6 +123,7 @@
   FunctionName fctName = null;
   FunctionSignature signature;
   List<VarIdentifier> paramList = new ArrayList<VarIdentifier>();
+  List<Integer> paramIds = new ArrayList<Integer>();
   String functionBody;
   Token beginPos;
   Token endPos;
@@ -135,7 +135,13 @@
      paramList = ParameterList()
     <LEFTBRACE>
   {
-     beginPos = token;
+    for (VarIdentifier param : paramList)
+    {
+      VarIdentifier v = new VarIdentifier(param.toString());
+      getCurrentScope().addNewVarSymbolToScope(v);
+      paramIds.add(v.getId());
+    }
+    beginPos = token;
   }
   functionBodyExpr = SingleStatement() <RIGHTBRACE>
     {
@@ -146,7 +152,7 @@
     }
   ("period" period = FunctionCallExpr())?
   {
-  return new CreateProcedureStatement(signature, paramList, functionBody, period);
+  return new CreateProcedureStatement(signature, paramList, paramIds, functionBody, functionBodyExpr, period);
   }
 }
 
@@ -176,7 +182,7 @@
   )*)? <RIGHTPAREN>
     {
       String fqFunctionName =  funcName.function;
-      return new ExecuteProcedureStatement(funcName.dataverse, fqFunctionName, arity);
+      return new ExecuteProcedureStatement(funcName.dataverse, fqFunctionName, arity, argList);
     }
 }
 
diff --git a/asterix-bad/src/test/java/org/apache/asterix/bad/test/BADAsterixHyracksIntegrationUtil.java b/asterix-bad/src/test/java/org/apache/asterix/bad/test/BADAsterixHyracksIntegrationUtil.java
new file mode 100644
index 0000000..65c184c
--- /dev/null
+++ b/asterix-bad/src/test/java/org/apache/asterix/bad/test/BADAsterixHyracksIntegrationUtil.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.test;
+
+import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil;
+import org.apache.asterix.common.config.GlobalConfig;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class BADAsterixHyracksIntegrationUtil extends AsterixHyracksIntegrationUtil {
+
+    public static void main(String[] args) throws Exception {
+        BADAsterixHyracksIntegrationUtil integrationUtil = new BADAsterixHyracksIntegrationUtil();
+        try {
+            integrationUtil.run(Boolean.getBoolean("cleanup.start"), Boolean.getBoolean("cleanup.shutdown"),
+                    System.getProperty("external.lib", ""));
+        } catch (Exception e) {
+            System.exit(1);
+        }
+    }
+
+    @Override
+    protected void run(boolean cleanupOnStart, boolean cleanupOnShutdown, String loadExternalLibs) throws Exception {
+        Runtime.getRuntime().addShutdownHook(new Thread() {
+            @Override
+            public void run() {
+                try {
+                    deinit(cleanupOnShutdown);
+                } catch (Exception e) {
+
+                }
+            }
+        });
+        System.setProperty(GlobalConfig.CONFIG_FILE_PROPERTY,
+                "asterixdb/asterix-opt/asterix-bad/src/main/resources/asterix-build-configuration.xml");
+
+        init(cleanupOnStart);
+        while (true) {
+            Thread.sleep(10000);
+        }
+    }
+
+}
diff --git a/asterix-bad/src/test/java/org/apache/asterix/bad/test/BADExecutionTest.java b/asterix-bad/src/test/java/org/apache/asterix/bad/test/BADExecutionTest.java
index d3ec0ba..57c9738 100644
--- a/asterix-bad/src/test/java/org/apache/asterix/bad/test/BADExecutionTest.java
+++ b/asterix-bad/src/test/java/org/apache/asterix/bad/test/BADExecutionTest.java
@@ -48,7 +48,7 @@
     protected static final String PATH_BASE = StringUtils.join(new String[] { "src", "test", "resources", "runtimets" },
             File.separator);
 
-    protected static final String TEST_CONFIG_FILE_NAME = "src/test/resources/conf/asterix-build-configuration.xml";
+    protected static final String TEST_CONFIG_FILE_NAME = "src/main/resources/asterix-build-configuration.xml";
 
     protected static TransactionProperties txnProperties;
     private static final TestExecutor testExecutor = new TestExecutor();
diff --git a/asterix-bad/src/test/java/org/apache/asterix/bad/test/BADOptimizerTest.java b/asterix-bad/src/test/java/org/apache/asterix/bad/test/BADOptimizerTest.java
index ad2f1bf..36ba481 100644
--- a/asterix-bad/src/test/java/org/apache/asterix/bad/test/BADOptimizerTest.java
+++ b/asterix-bad/src/test/java/org/apache/asterix/bad/test/BADOptimizerTest.java
@@ -38,7 +38,7 @@
 
     @BeforeClass
     public static void setUp() throws Exception {
-        TEST_CONFIG_FILE_NAME = "src/test/resources/conf/asterix-build-configuration.xml";
+        TEST_CONFIG_FILE_NAME = "src/main/resources/asterix-build-configuration.xml";
         System.setProperty(GlobalConfig.CONFIG_FILE_PROPERTY, TEST_CONFIG_FILE_NAME);
         final File outdir = new File(PATH_ACTUAL);
         outdir.mkdirs();
diff --git a/asterix-bad/src/test/resources/optimizerts/queries/channel/channel-advanced.sqlpp b/asterix-bad/src/test/resources/optimizerts/queries/channel/channel-advanced.sqlpp
new file mode 100644
index 0000000..53399e1
--- /dev/null
+++ b/asterix-bad/src/test/resources/optimizerts/queries/channel/channel-advanced.sqlpp
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Check an advanced channel for index usage
+ * Expected Res : Success
+ * Date         : Oct 2017
+ */
+
+drop dataverse channels5 if exists;
+create dataverse channels5;
+use channels5;
+
+create type UserLocation as closed {
+  recordId: uuid,
+  location: circle,
+  userName: string,
+  timeStamp: datetime
+};
+
+create type EmergencyReport as closed {
+  reportId: uuid,
+  Etype: string,
+  location: circle,
+  timeStamp: datetime
+};
+
+create type EmergencyShelter as closed {
+  shelterName: string,
+  location: circle
+};
+
+create dataset UserLocations(UserLocation)
+primary key recordId autogenerated;
+
+create dataset EmergencyReports(EmergencyReport)
+primary key reportId autogenerated;
+
+create index locTimes on UserLocations(timeStamp);
+create index repTimes on EmergencyReports(timeStamp);
+
+create function RecentEmergenciesNearUser(userName) {
+  (
+  SELECT r AS report
+  FROM
+  (select value r from EmergencyReports r where r.timeStamp > current_datetime() - day_time_duration("PT10S")) r,
+  (select value r from UserLocations l where l.timeStamp > current_datetime() - day_time_duration("PT10S")) l
+  where l.userName = userName
+  and spatial_intersect(r.location,l.location)
+  )
+};
+
+write output to nc1:"rttest/channel-advanced.sqlpp";
+
+create repetitive channel EmergencyChannel using RecentEmergenciesNearUser@1 period duration("PT10S");
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/optimizerts/results/channel/channel-advanced.plan b/asterix-bad/src/test/resources/optimizerts/results/channel/channel-advanced.plan
new file mode 100644
index 0000000..907044b
--- /dev/null
+++ b/asterix-bad/src/test/resources/optimizerts/results/channel/channel-advanced.plan
@@ -0,0 +1,104 @@
+-- NOTIFY_BROKERS  |PARTITIONED|
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    -- PRE_CLUSTERED_GROUP_BY[$$76, $$channelExecutionTime]  |PARTITIONED|
+            {
+              -- AGGREGATE  |LOCAL|
+                -- NESTED_TUPLE_SOURCE  |LOCAL|
+            }
+      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+        -- STABLE_SORT [$$76(ASC), $$channelExecutionTime(ASC)]  |PARTITIONED|
+          -- HASH_PARTITION_EXCHANGE [$$76, $$channelExecutionTime]  |PARTITIONED|
+            -- PRE_SORTED_DISTINCT_BY  |PARTITIONED|
+              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                -- STABLE_SORT [$$51(ASC)]  |PARTITIONED|
+                  -- HASH_PARTITION_EXCHANGE [$$51]  |PARTITIONED|
+                    -- STREAM_PROJECT  |PARTITIONED|
+                      -- COMMIT  |PARTITIONED|
+                        -- STREAM_PROJECT  |PARTITIONED|
+                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                            -- INDEX_INSERT_DELETE  |PARTITIONED|
+                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                -- STREAM_PROJECT  |PARTITIONED|
+                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                    -- INSERT_DELETE  |PARTITIONED|
+                                      -- HASH_PARTITION_EXCHANGE [$$47]  |PARTITIONED|
+                                        -- ASSIGN  |PARTITIONED|
+                                          -- STREAM_PROJECT  |PARTITIONED|
+                                            -- ASSIGN  |PARTITIONED|
+                                              -- STREAM_PROJECT  |PARTITIONED|
+                                                -- ASSIGN  |PARTITIONED|
+                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                    -- ASSIGN  |PARTITIONED|
+                                                      -- STREAM_PROJECT  |PARTITIONED|
+                                                        -- STREAM_SELECT  |PARTITIONED|
+                                                          -- STREAM_PROJECT  |PARTITIONED|
+                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                              -- HYBRID_HASH_JOIN [$$61][$$60]  |PARTITIONED|
+                                                                -- HASH_PARTITION_EXCHANGE [$$61]  |PARTITIONED|
+                                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                      -- HYBRID_HASH_JOIN [$$59, $$57][$$52, $$53]  |PARTITIONED|
+                                                                        -- HASH_PARTITION_EXCHANGE [$$59, $$57]  |PARTITIONED|
+                                                                          -- STREAM_PROJECT  |PARTITIONED|
+                                                                            -- ASSIGN  |PARTITIONED|
+                                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                -- DATASOURCE_SCAN  |PARTITIONED|
+                                                                                  -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                                                                    -- ASSIGN  |UNPARTITIONED|
+                                                                                      -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
+                                                                        -- HASH_PARTITION_EXCHANGE [$$52, $$53]  |PARTITIONED|
+                                                                          -- STREAM_PROJECT  |PARTITIONED|
+                                                                            -- ASSIGN  |PARTITIONED|
+                                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                -- DATASOURCE_SCAN  |PARTITIONED|
+                                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                                -- HASH_PARTITION_EXCHANGE [$$60]  |PARTITIONED|
+                                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                      -- HYBRID_HASH_JOIN [$$54][$$70]  |PARTITIONED|
+                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                          -- STREAM_SELECT  |PARTITIONED|
+                                                                            -- ASSIGN  |PARTITIONED|
+                                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                -- BTREE_SEARCH  |PARTITIONED|
+                                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                    -- STABLE_SORT [$$80(ASC)]  |PARTITIONED|
+                                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                        -- STREAM_PROJECT  |PARTITIONED|
+                                                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                            -- BTREE_SEARCH  |PARTITIONED|
+                                                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                -- ASSIGN  |PARTITIONED|
+                                                                                                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                                        -- HASH_PARTITION_EXCHANGE [$$70]  |PARTITIONED|
+                                                                          -- NESTED_LOOP  |PARTITIONED|
+                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                              -- STREAM_PROJECT  |PARTITIONED|
+                                                                                -- STREAM_SELECT  |PARTITIONED|
+                                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                    -- BTREE_SEARCH  |PARTITIONED|
+                                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                        -- STABLE_SORT [$$84(ASC)]  |PARTITIONED|
+                                                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                            -- STREAM_PROJECT  |PARTITIONED|
+                                                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                -- BTREE_SEARCH  |PARTITIONED|
+                                                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                    -- ASSIGN  |PARTITIONED|
+                                                                                                      -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                                            -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                                                              -- STREAM_PROJECT  |PARTITIONED|
+                                                                                -- STREAM_SELECT  |PARTITIONED|
+                                                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                      -- BTREE_SEARCH  |PARTITIONED|
+                                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                          -- STABLE_SORT [$$87(ASC)]  |PARTITIONED|
+                                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                              -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                  -- BTREE_SEARCH  |PARTITIONED|
+                                                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                      -- ASSIGN  |PARTITIONED|
+                                                                                                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/optimizerts/results/channel/channel-create.plan b/asterix-bad/src/test/resources/optimizerts/results/channel/channel-create.plan
index 65e7dbc..a352739 100644
--- a/asterix-bad/src/test/resources/optimizerts/results/channel/channel-create.plan
+++ b/asterix-bad/src/test/resources/optimizerts/results/channel/channel-create.plan
@@ -16,42 +16,46 @@
                       -- COMMIT  |PARTITIONED|
                         -- STREAM_PROJECT  |PARTITIONED|
                           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                            -- INSERT_DELETE  |PARTITIONED|
-                              -- HASH_PARTITION_EXCHANGE [$$32]  |PARTITIONED|
-                                -- ASSIGN  |PARTITIONED|
-                                  -- STREAM_PROJECT  |PARTITIONED|
-                                    -- ASSIGN  |PARTITIONED|
-                                      -- STREAM_PROJECT  |PARTITIONED|
+                            -- INDEX_INSERT_DELETE  |PARTITIONED|
+                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                -- STREAM_PROJECT  |PARTITIONED|
+                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                    -- INSERT_DELETE  |PARTITIONED|
+                                      -- HASH_PARTITION_EXCHANGE [$$32]  |PARTITIONED|
                                         -- ASSIGN  |PARTITIONED|
                                           -- STREAM_PROJECT  |PARTITIONED|
                                             -- ASSIGN  |PARTITIONED|
                                               -- STREAM_PROJECT  |PARTITIONED|
-                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                  -- NESTED_LOOP  |PARTITIONED|
-                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                -- ASSIGN  |PARTITIONED|
+                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                    -- ASSIGN  |PARTITIONED|
                                                       -- STREAM_PROJECT  |PARTITIONED|
                                                         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                          -- HYBRID_HASH_JOIN [$$44, $$42][$$38, $$39]  |PARTITIONED|
-                                                            -- HASH_PARTITION_EXCHANGE [$$44, $$42]  |PARTITIONED|
-                                                              -- STREAM_PROJECT  |PARTITIONED|
-                                                                -- ASSIGN  |PARTITIONED|
-                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                    -- DATASOURCE_SCAN  |PARTITIONED|
-                                                                      -- BROADCAST_EXCHANGE  |PARTITIONED|
-                                                                        -- ASSIGN  |UNPARTITIONED|
-                                                                          -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
-                                                            -- HASH_PARTITION_EXCHANGE [$$38, $$39]  |PARTITIONED|
-                                                              -- STREAM_PROJECT  |PARTITIONED|
-                                                                -- ASSIGN  |PARTITIONED|
-                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                    -- DATASOURCE_SCAN  |PARTITIONED|
-                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                                    -- BROADCAST_EXCHANGE  |PARTITIONED|
-                                                      -- STREAM_PROJECT  |PARTITIONED|
-                                                        -- ASSIGN  |PARTITIONED|
-                                                          -- STREAM_PROJECT  |PARTITIONED|
+                                                          -- NESTED_LOOP  |PARTITIONED|
                                                             -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                              -- DATASOURCE_SCAN  |PARTITIONED|
+                                                              -- STREAM_PROJECT  |PARTITIONED|
                                                                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
\ No newline at end of file
+                                                                  -- HYBRID_HASH_JOIN [$$44, $$42][$$38, $$39]  |PARTITIONED|
+                                                                    -- HASH_PARTITION_EXCHANGE [$$44, $$42]  |PARTITIONED|
+                                                                      -- STREAM_PROJECT  |PARTITIONED|
+                                                                        -- ASSIGN  |PARTITIONED|
+                                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                            -- DATASOURCE_SCAN  |PARTITIONED|
+                                                                              -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                                                                -- ASSIGN  |UNPARTITIONED|
+                                                                                  -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
+                                                                    -- HASH_PARTITION_EXCHANGE [$$38, $$39]  |PARTITIONED|
+                                                                      -- STREAM_PROJECT  |PARTITIONED|
+                                                                        -- ASSIGN  |PARTITIONED|
+                                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                            -- DATASOURCE_SCAN  |PARTITIONED|
+                                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                            -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                                              -- STREAM_PROJECT  |PARTITIONED|
+                                                                -- ASSIGN  |PARTITIONED|
+                                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                      -- DATASOURCE_SCAN  |PARTITIONED|
+                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/optimizerts/results/channel/channel-subscribe.plan b/asterix-bad/src/test/resources/optimizerts/results/channel/channel-subscribe.plan
index 06630e6..7ad21f6 100644
--- a/asterix-bad/src/test/resources/optimizerts/results/channel/channel-subscribe.plan
+++ b/asterix-bad/src/test/resources/optimizerts/results/channel/channel-subscribe.plan
@@ -16,45 +16,49 @@
                       -- COMMIT  |PARTITIONED|
                         -- STREAM_PROJECT  |PARTITIONED|
                           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                            -- INSERT_DELETE  |PARTITIONED|
-                              -- HASH_PARTITION_EXCHANGE [$$32]  |PARTITIONED|
-                                -- ASSIGN  |PARTITIONED|
-                                  -- STREAM_PROJECT  |PARTITIONED|
-                                    -- ASSIGN  |PARTITIONED|
-                                      -- STREAM_PROJECT  |PARTITIONED|
+                            -- INDEX_INSERT_DELETE  |PARTITIONED|
+                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                -- STREAM_PROJECT  |PARTITIONED|
+                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                    -- INSERT_DELETE  |PARTITIONED|
+                                      -- HASH_PARTITION_EXCHANGE [$$32]  |PARTITIONED|
                                         -- ASSIGN  |PARTITIONED|
                                           -- STREAM_PROJECT  |PARTITIONED|
                                             -- ASSIGN  |PARTITIONED|
                                               -- STREAM_PROJECT  |PARTITIONED|
-                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                  -- NESTED_LOOP  |PARTITIONED|
-                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                -- ASSIGN  |PARTITIONED|
+                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                    -- ASSIGN  |PARTITIONED|
                                                       -- STREAM_PROJECT  |PARTITIONED|
                                                         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                          -- HYBRID_HASH_JOIN [$$44, $$42][$$38, $$39]  |PARTITIONED|
-                                                            -- HASH_PARTITION_EXCHANGE [$$44, $$42]  |PARTITIONED|
-                                                              -- STREAM_PROJECT  |PARTITIONED|
-                                                                -- ASSIGN  |PARTITIONED|
-                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                    -- DATASOURCE_SCAN  |PARTITIONED|
-                                                                      -- BROADCAST_EXCHANGE  |PARTITIONED|
-                                                                        -- ASSIGN  |UNPARTITIONED|
-                                                                          -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
-                                                            -- HASH_PARTITION_EXCHANGE [$$38, $$39]  |PARTITIONED|
-                                                              -- STREAM_PROJECT  |PARTITIONED|
-                                                                -- ASSIGN  |PARTITIONED|
-                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                    -- DATASOURCE_SCAN  |PARTITIONED|
-                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                                    -- BROADCAST_EXCHANGE  |PARTITIONED|
-                                                      -- STREAM_PROJECT  |PARTITIONED|
-                                                        -- ASSIGN  |PARTITIONED|
-                                                          -- STREAM_PROJECT  |PARTITIONED|
+                                                          -- NESTED_LOOP  |PARTITIONED|
                                                             -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                              -- DATASOURCE_SCAN  |PARTITIONED|
+                                                              -- STREAM_PROJECT  |PARTITIONED|
                                                                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                                  -- HYBRID_HASH_JOIN [$$44, $$42][$$38, $$39]  |PARTITIONED|
+                                                                    -- HASH_PARTITION_EXCHANGE [$$44, $$42]  |PARTITIONED|
+                                                                      -- STREAM_PROJECT  |PARTITIONED|
+                                                                        -- ASSIGN  |PARTITIONED|
+                                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                            -- DATASOURCE_SCAN  |PARTITIONED|
+                                                                              -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                                                                -- ASSIGN  |UNPARTITIONED|
+                                                                                  -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
+                                                                    -- HASH_PARTITION_EXCHANGE [$$38, $$39]  |PARTITIONED|
+                                                                      -- STREAM_PROJECT  |PARTITIONED|
+                                                                        -- ASSIGN  |PARTITIONED|
+                                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                            -- DATASOURCE_SCAN  |PARTITIONED|
+                                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                            -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                                              -- STREAM_PROJECT  |PARTITIONED|
+                                                                -- ASSIGN  |PARTITIONED|
+                                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                      -- DATASOURCE_SCAN  |PARTITIONED|
+                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
 -- DISTRIBUTE_RESULT  |PARTITIONED|
   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
     -- STREAM_PROJECT  |PARTITIONED|
diff --git a/asterix-bad/src/test/resources/optimizerts/results/channel/channel-unsubscribe.plan b/asterix-bad/src/test/resources/optimizerts/results/channel/channel-unsubscribe.plan
index b3f4c51..3338e79 100644
--- a/asterix-bad/src/test/resources/optimizerts/results/channel/channel-unsubscribe.plan
+++ b/asterix-bad/src/test/resources/optimizerts/results/channel/channel-unsubscribe.plan
@@ -16,45 +16,49 @@
                       -- COMMIT  |PARTITIONED|
                         -- STREAM_PROJECT  |PARTITIONED|
                           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                            -- INSERT_DELETE  |PARTITIONED|
-                              -- HASH_PARTITION_EXCHANGE [$$32]  |PARTITIONED|
-                                -- ASSIGN  |PARTITIONED|
-                                  -- STREAM_PROJECT  |PARTITIONED|
-                                    -- ASSIGN  |PARTITIONED|
-                                      -- STREAM_PROJECT  |PARTITIONED|
+                            -- INDEX_INSERT_DELETE  |PARTITIONED|
+                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                -- STREAM_PROJECT  |PARTITIONED|
+                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                    -- INSERT_DELETE  |PARTITIONED|
+                                      -- HASH_PARTITION_EXCHANGE [$$32]  |PARTITIONED|
                                         -- ASSIGN  |PARTITIONED|
                                           -- STREAM_PROJECT  |PARTITIONED|
                                             -- ASSIGN  |PARTITIONED|
                                               -- STREAM_PROJECT  |PARTITIONED|
-                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                  -- NESTED_LOOP  |PARTITIONED|
-                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                -- ASSIGN  |PARTITIONED|
+                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                    -- ASSIGN  |PARTITIONED|
                                                       -- STREAM_PROJECT  |PARTITIONED|
                                                         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                          -- HYBRID_HASH_JOIN [$$44, $$42][$$38, $$39]  |PARTITIONED|
-                                                            -- HASH_PARTITION_EXCHANGE [$$44, $$42]  |PARTITIONED|
-                                                              -- STREAM_PROJECT  |PARTITIONED|
-                                                                -- ASSIGN  |PARTITIONED|
-                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                    -- DATASOURCE_SCAN  |PARTITIONED|
-                                                                      -- BROADCAST_EXCHANGE  |PARTITIONED|
-                                                                        -- ASSIGN  |UNPARTITIONED|
-                                                                          -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
-                                                            -- HASH_PARTITION_EXCHANGE [$$38, $$39]  |PARTITIONED|
-                                                              -- STREAM_PROJECT  |PARTITIONED|
-                                                                -- ASSIGN  |PARTITIONED|
-                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                    -- DATASOURCE_SCAN  |PARTITIONED|
-                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                                    -- BROADCAST_EXCHANGE  |PARTITIONED|
-                                                      -- STREAM_PROJECT  |PARTITIONED|
-                                                        -- ASSIGN  |PARTITIONED|
-                                                          -- STREAM_PROJECT  |PARTITIONED|
+                                                          -- NESTED_LOOP  |PARTITIONED|
                                                             -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                              -- DATASOURCE_SCAN  |PARTITIONED|
+                                                              -- STREAM_PROJECT  |PARTITIONED|
                                                                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                                  -- HYBRID_HASH_JOIN [$$44, $$42][$$38, $$39]  |PARTITIONED|
+                                                                    -- HASH_PARTITION_EXCHANGE [$$44, $$42]  |PARTITIONED|
+                                                                      -- STREAM_PROJECT  |PARTITIONED|
+                                                                        -- ASSIGN  |PARTITIONED|
+                                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                            -- DATASOURCE_SCAN  |PARTITIONED|
+                                                                              -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                                                                -- ASSIGN  |UNPARTITIONED|
+                                                                                  -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
+                                                                    -- HASH_PARTITION_EXCHANGE [$$38, $$39]  |PARTITIONED|
+                                                                      -- STREAM_PROJECT  |PARTITIONED|
+                                                                        -- ASSIGN  |PARTITIONED|
+                                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                            -- DATASOURCE_SCAN  |PARTITIONED|
+                                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                            -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                                              -- STREAM_PROJECT  |PARTITIONED|
+                                                                -- ASSIGN  |PARTITIONED|
+                                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                      -- DATASOURCE_SCAN  |PARTITIONED|
+                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
 -- COMMIT  |PARTITIONED|
   -- STREAM_PROJECT  |PARTITIONED|
     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
diff --git a/asterix-bad/src/test/resources/runtimets/queries/channel/disasters_with_friends/disasters_with_friends.1.ddl.sqlpp b/asterix-bad/src/test/resources/runtimets/queries/channel/disasters_with_friends/disasters_with_friends.1.ddl.sqlpp
new file mode 100644
index 0000000..e8d57a6
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/queries/channel/disasters_with_friends/disasters_with_friends.1.ddl.sqlpp
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+* Description  : Disasters with friends
+* Expected Res : Success
+* Date         : May 17
+* Author       : Steven Jacobs
+*/
+
+drop dataverse channels if exists;
+create dataverse channels;
+use channels;
+
+create type UserLocation as {
+  recordId: uuid,
+  latitude: double,
+  longitude: double,
+  user_id: int,
+  timestamp: datetime
+};
+
+create type EmergencyShelter as {
+  name: string,
+  location: point
+};
+
+create type EmergencyReport as {
+  reportId: uuid,
+  impactZone: circle,
+  timestamp: datetime,
+  emergencyType: string
+};
+
+//create datasets
+create dataset EmergencyReports(EmergencyReport) primary key reportId autogenerated;
+create dataset UserLocations(UserLocation) primary key recordId autogenerated;
+create dataset EmergencyShelters(EmergencyShelter) primary key name;
+
+create broker brokerA at "http://www.notifyA.com";
+
+create function EmergenciesNearMe(emergencyType, uid){
+  (with tenMinutesAgo as current_datetime() - day_time_duration("PT10S")
+  select report as report, shelters as shelters
+  from EmergencyReports report, UserLocations user
+  let shelters = (select * from EmergencyShelters shelter
+    where spatial_intersect(report.impactZone,shelter.location))
+  where user.user_id = uid
+    and report.timestamp >= tenMinutesAgo
+    and user.timestamp >= tenMinutesAgo
+    and report.emergencyType = emergencyType
+    and spatial_intersect(report.impactZone,create_point(user.latitude,user.longitude)))
+};
+
+create repetitive channel EmergenciesNearMeChannel using EmergenciesNearMe@2 period duration("PT5S");
diff --git a/asterix-bad/src/test/resources/runtimets/queries/channel/disasters_with_friends/disasters_with_friends.2.update.sqlpp b/asterix-bad/src/test/resources/runtimets/queries/channel/disasters_with_friends/disasters_with_friends.2.update.sqlpp
new file mode 100644
index 0000000..b1a132b
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/queries/channel/disasters_with_friends/disasters_with_friends.2.update.sqlpp
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+* Description  : Disasters with friends
+* Expected Res : Success
+* Date         : May 17
+* Author       : Steven Jacobs
+*/
+use channels;
+
+subscribe to EmergenciesNearMeChannel("tornado", 1) on brokerA;
diff --git a/asterix-bad/src/test/resources/runtimets/queries/channel/disasters_with_friends/disasters_with_friends.3.update.sqlpp b/asterix-bad/src/test/resources/runtimets/queries/channel/disasters_with_friends/disasters_with_friends.3.update.sqlpp
new file mode 100644
index 0000000..292151e
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/queries/channel/disasters_with_friends/disasters_with_friends.3.update.sqlpp
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+* Description  : Disasters with friends
+* Expected Res : Success
+* Date         : May 17
+* Author       : Steven Jacobs
+*/
+
+use channels;
+upsert into UserLocations([
+{"user_id":1, "latitude":5, "longitude":5, "timestamp":current_datetime()},
+{"user_id":2, "latitude":10, "longitude":10, "timestamp":current_datetime()},
+{"user_id":3, "latitude":15, "longitude":15, "timestamp":current_datetime()}]
+);
+upsert into EmergencyShelters([
+{"name":"A", "location":create_point(5.0,5.0)},
+{"name":"B", "location":create_point(10.0,10.0)},
+{"name":"C", "location":create_point(15.0,15.0)}]
+);
+upsert into EmergencyReports([
+{"emergencyType":"tornado", "impactZone":create_circle(create_point(5.0,6.0), 10.0), "timestamp":current_datetime()},
+{"emergencyType":"flood", "impactZone":create_circle(create_point(5.0,6.0), 5.0), "timestamp":current_datetime()},
+{"emergencyType":"tornado", "impactZone":create_circle(create_point(30.0,70.0), 5.0), "timestamp":current_datetime()}]
+);
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/runtimets/queries/channel/disasters_with_friends/disasters_with_friends.4.sleep.sqlpp b/asterix-bad/src/test/resources/runtimets/queries/channel/disasters_with_friends/disasters_with_friends.4.sleep.sqlpp
new file mode 100644
index 0000000..67c5efa
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/queries/channel/disasters_with_friends/disasters_with_friends.4.sleep.sqlpp
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+* Description  : Disasters with friends
+* Expected Res : Success
+* Date         : May 17
+* Author       : Steven Jacobs
+*/
+
+5000
diff --git a/asterix-bad/src/test/resources/runtimets/queries/channel/disasters_with_friends/disasters_with_friends.5.query.sqlpp b/asterix-bad/src/test/resources/runtimets/queries/channel/disasters_with_friends/disasters_with_friends.5.query.sqlpp
new file mode 100644
index 0000000..2fbd21d
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/queries/channel/disasters_with_friends/disasters_with_friends.5.query.sqlpp
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+* Description  : Disasters with friends
+* Expected Res : Success
+* Date         : May 17
+* Author       : Steven Jacobs
+*/
+
+use channels;
+select value array_count (result.result.shelters)
+from EmergenciesNearMeChannelResults result;
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/runtimets/queries/channel/disasters_with_friends/disasters_with_friends.6.ddl.sqlpp b/asterix-bad/src/test/resources/runtimets/queries/channel/disasters_with_friends/disasters_with_friends.6.ddl.sqlpp
new file mode 100644
index 0000000..220f05a
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/queries/channel/disasters_with_friends/disasters_with_friends.6.ddl.sqlpp
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+* Description  : Disasters with friends
+* Expected Res : Success
+* Date         : May 17
+* Author       : Steven Jacobs
+*/
+
+use channels;
+
+drop channel EmergenciesNearMeChannel;
diff --git a/asterix-bad/src/test/resources/runtimets/queries/channel/ten_minute_channel/ten_minute_channel.1.ddl.sqlpp b/asterix-bad/src/test/resources/runtimets/queries/channel/ten_minute_channel/ten_minute_channel.1.ddl.sqlpp
index 42fd4aa..11b7b33 100644
--- a/asterix-bad/src/test/resources/runtimets/queries/channel/ten_minute_channel/ten_minute_channel.1.ddl.sqlpp
+++ b/asterix-bad/src/test/resources/runtimets/queries/channel/ten_minute_channel/ten_minute_channel.1.ddl.sqlpp
@@ -35,12 +35,12 @@
 create dataset UserLocations(userLocation)
 primary key userId;
 
-create function RoomOccupants($room) {
-    for $location in dataset UserLocations
-    where $location.roomNumber = $room
-    return $location.userId
+create function RoomOccupants(room) {
+  (select location.userId
+  from UserLocations location
+  where location.roomNumber = room)
 };
 
 create broker brokerA at "http://www.notifyA.com";
 
-create repetitive channel roomRecords using RoomOccupants@1 period duration("PT1S");
+create repetitive channel roomRecords using RoomOccupants@1 period duration("PT30S");
diff --git a/asterix-bad/src/test/resources/runtimets/queries/channel/ten_minute_channel/ten_minute_channel.4.sleep.sqlpp b/asterix-bad/src/test/resources/runtimets/queries/channel/ten_minute_channel/ten_minute_channel.4.sleep.sqlpp
index 270326b..d5f4290 100644
--- a/asterix-bad/src/test/resources/runtimets/queries/channel/ten_minute_channel/ten_minute_channel.4.sleep.sqlpp
+++ b/asterix-bad/src/test/resources/runtimets/queries/channel/ten_minute_channel/ten_minute_channel.4.sleep.sqlpp
@@ -22,4 +22,4 @@
 * Date         : Sep 2016
 * Author       : Steven Jacobs
 */
-600000
+630000
diff --git a/asterix-bad/src/test/resources/runtimets/queries/channel/ten_minute_channel/ten_minute_channel.5.query.sqlpp b/asterix-bad/src/test/resources/runtimets/queries/channel/ten_minute_channel/ten_minute_channel.5.query.sqlpp
index 8db3fde..cfe92c9 100644
--- a/asterix-bad/src/test/resources/runtimets/queries/channel/ten_minute_channel/ten_minute_channel.5.query.sqlpp
+++ b/asterix-bad/src/test/resources/runtimets/queries/channel/ten_minute_channel/ten_minute_channel.5.query.sqlpp
@@ -25,6 +25,4 @@
 
 use channels;
 
-count (from $result in dataset roomRecordsResults
-order by $result.result
-select $result.result) > 599;
+(select value count(result) from roomRecordsResults)[0] > 19;
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/runtimets/queries/procedure/concurrent_procedure/concurrent_procedure.1.ddl.sqlpp b/asterix-bad/src/test/resources/runtimets/queries/procedure/concurrent_procedure/concurrent_procedure.1.ddl.sqlpp
new file mode 100644
index 0000000..06ca3d5
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/queries/procedure/concurrent_procedure/concurrent_procedure.1.ddl.sqlpp
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+* Description  : Run Procedure Concurrently
+* Expected Res : Success
+* Date         : Oct 2017
+* Author       : Steven Jacobs
+*/
+
+drop dataverse channels if exists;
+create dataverse channels;
+use channels;
+
+create type myLocation as {
+  id : uuid,
+  timeStamp: datetime,
+  roomNumber: int
+};
+
+
+create dataset UserLocations(myLocation)
+primary key id autogenerated;
+
+create procedure addMe() {
+  insert into UserLocations([
+    {"timeStamp":current_datetime(), "roomNumber":222}]
+  )
+};
diff --git a/asterix-bad/src/test/resources/runtimets/queries/procedure/concurrent_procedure/concurrent_procedure.2.update.sqlpp b/asterix-bad/src/test/resources/runtimets/queries/procedure/concurrent_procedure/concurrent_procedure.2.update.sqlpp
new file mode 100644
index 0000000..ac88f47
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/queries/procedure/concurrent_procedure/concurrent_procedure.2.update.sqlpp
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+* Description  : Simple Insert Procedure
+* Expected Res : Success
+* Date         : Jan 2017
+* Author       : Steven Jacobs
+*/
+
+use channels;
+execute addMe();
+execute addMe();
+execute addMe();
+execute addMe();
+execute addMe();
+execute addMe();
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/runtimets/queries/procedure/concurrent_procedure/concurrent_procedure.3.ddl.sqlpp b/asterix-bad/src/test/resources/runtimets/queries/procedure/concurrent_procedure/concurrent_procedure.3.ddl.sqlpp
new file mode 100644
index 0000000..5ed56cf
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/queries/procedure/concurrent_procedure/concurrent_procedure.3.ddl.sqlpp
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+* Description  : Simple Insert Procedure
+* Expected Res : 3
+* Date         : Jan 2017
+* Author       : Steven Jacobs
+*/
+
+use channels;
+drop procedure addMe@0;
diff --git a/asterix-bad/src/test/resources/runtimets/queries/procedure/concurrent_procedure/concurrent_procedure.4.query.sqlpp b/asterix-bad/src/test/resources/runtimets/queries/procedure/concurrent_procedure/concurrent_procedure.4.query.sqlpp
new file mode 100644
index 0000000..2094a77
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/queries/procedure/concurrent_procedure/concurrent_procedure.4.query.sqlpp
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+* Description  : Simple Insert Procedure
+* Expected Res : Success
+* Date         : Jan 2017
+* Author       : Steven Jacobs
+*/
+
+use channels;
+select value count(location) from UserLocations location;
diff --git a/asterix-bad/src/test/resources/runtimets/queries/procedure/delete_procedure_with_parameters/delete_procedure_with_parameters.1.ddl.sqlpp b/asterix-bad/src/test/resources/runtimets/queries/procedure/delete_procedure_with_parameters/delete_procedure_with_parameters.1.ddl.sqlpp
new file mode 100644
index 0000000..42b0d40
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/queries/procedure/delete_procedure_with_parameters/delete_procedure_with_parameters.1.ddl.sqlpp
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+* Description  : Simple Delete Procedure with parameters
+* Expected Res : Success
+* Date         : May 2017
+* Author       : Steven Jacobs
+*/
+
+drop dataverse channels if exists;
+create dataverse channels;
+use channels;
+create type myLocation as {
+  id: int
+};
+create dataset UserLocations(myLocation)
+primary key id;
+insert into UserLocations(
+  [{"id":0, "roomNumber":4815162342},
+  {"id":1, "roomNumber":"lost"},
+  {"id":2, "roomNumber":108},
+  {"id":3, "roomNumber":"jacob"}]
+);
+create procedure deleteSome(r, otherRoom) {
+delete from UserLocations
+where roomNumber = r
+or roomNumber = otherRoom
+};
diff --git a/asterix-bad/src/test/resources/runtimets/queries/procedure/delete_procedure_with_parameters/delete_procedure_with_parameters.2.query.sqlpp b/asterix-bad/src/test/resources/runtimets/queries/procedure/delete_procedure_with_parameters/delete_procedure_with_parameters.2.query.sqlpp
new file mode 100644
index 0000000..88166bb
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/queries/procedure/delete_procedure_with_parameters/delete_procedure_with_parameters.2.query.sqlpp
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+* Description  : Simple Delete Procedure with parameters
+* Expected Res : Success
+* Date         : May 2017
+* Author       : Steven Jacobs
+*/
+
+use channels;
+select value count(roomNumber) from UserLocations;
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/runtimets/queries/procedure/delete_procedure_with_parameters/delete_procedure_with_parameters.3.update.sqlpp b/asterix-bad/src/test/resources/runtimets/queries/procedure/delete_procedure_with_parameters/delete_procedure_with_parameters.3.update.sqlpp
new file mode 100644
index 0000000..8d03794
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/queries/procedure/delete_procedure_with_parameters/delete_procedure_with_parameters.3.update.sqlpp
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+* Description  : Simple Delete Procedure with parameters
+* Expected Res : Success
+* Date         : May 2017
+* Author       : Steven Jacobs
+*/
+
+use channels;
+execute deleteSome(108,"jacob");
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/runtimets/queries/procedure/delete_procedure_with_parameters/delete_procedure_with_parameters.4.query.sqlpp b/asterix-bad/src/test/resources/runtimets/queries/procedure/delete_procedure_with_parameters/delete_procedure_with_parameters.4.query.sqlpp
new file mode 100644
index 0000000..88166bb
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/queries/procedure/delete_procedure_with_parameters/delete_procedure_with_parameters.4.query.sqlpp
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+* Description  : Simple Delete Procedure with parameters
+* Expected Res : Success
+* Date         : May 2017
+* Author       : Steven Jacobs
+*/
+
+use channels;
+select value count(roomNumber) from UserLocations;
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/runtimets/queries/procedure/query_procedure_with_parameters/query_procedure_with_parameters.1.ddl.sqlpp b/asterix-bad/src/test/resources/runtimets/queries/procedure/query_procedure_with_parameters/query_procedure_with_parameters.1.ddl.sqlpp
new file mode 100644
index 0000000..8949a4a
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/queries/procedure/query_procedure_with_parameters/query_procedure_with_parameters.1.ddl.sqlpp
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+* Description  : Simple Query Procedure with parameters
+* Expected Res : Success
+* Date         : May 2017
+* Author       : Steven Jacobs
+*/
+
+drop dataverse channels if exists;
+create dataverse channels;
+use channels;
+create type myLocation as {
+  id: int
+};
+create dataset UserLocations(myLocation)
+primary key id;
+insert into UserLocations(
+  [{"id":0, "roomNumber":4815162342},
+  {"id":1, "roomNumber":"lost"},
+  {"id":2, "roomNumber":108},
+  {"id":3, "roomNumber":"jacob"}]
+);
+create procedure selectSome(r, otherRoom) {
+select roomNumber from UserLocations
+where roomNumber = r
+or roomNumber = otherRoom
+order by id
+};
diff --git a/asterix-bad/src/test/resources/runtimets/queries/procedure/query_procedure_with_parameters/query_procedure_with_parameters.2.update.sqlpp b/asterix-bad/src/test/resources/runtimets/queries/procedure/query_procedure_with_parameters/query_procedure_with_parameters.2.update.sqlpp
new file mode 100644
index 0000000..aa0722a
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/queries/procedure/query_procedure_with_parameters/query_procedure_with_parameters.2.update.sqlpp
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+* Description  : Simple Query Procedure with parameters
+* Expected Res : Success
+* Date         : May 2017
+* Author       : Steven Jacobs
+*/
+
+use channels;
+execute selectSome(108,"jacob");
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/runtimets/queries/procedure/query_procedure_with_parameters/query_procedure_with_parameters.3.update.sqlpp b/asterix-bad/src/test/resources/runtimets/queries/procedure/query_procedure_with_parameters/query_procedure_with_parameters.3.update.sqlpp
new file mode 100644
index 0000000..aa0722a
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/queries/procedure/query_procedure_with_parameters/query_procedure_with_parameters.3.update.sqlpp
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+* Description  : Simple Query Procedure with parameters
+* Expected Res : Success
+* Date         : May 2017
+* Author       : Steven Jacobs
+*/
+
+use channels;
+execute selectSome(108,"jacob");
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/runtimets/results/channel/disasters_with_friends/disasters_with_friends.1.adm b/asterix-bad/src/test/resources/runtimets/results/channel/disasters_with_friends/disasters_with_friends.1.adm
new file mode 100644
index 0000000..d8263ee
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/results/channel/disasters_with_friends/disasters_with_friends.1.adm
@@ -0,0 +1 @@
+2
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/runtimets/results/procedure/concurrent_procedure/concurrent_procedure.4.adm b/asterix-bad/src/test/resources/runtimets/results/procedure/concurrent_procedure/concurrent_procedure.4.adm
new file mode 100644
index 0000000..62f9457
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/results/procedure/concurrent_procedure/concurrent_procedure.4.adm
@@ -0,0 +1 @@
+6
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/runtimets/results/procedure/delete_procedure_with_parameters/delete_procedure_with_parameters.1.adm b/asterix-bad/src/test/resources/runtimets/results/procedure/delete_procedure_with_parameters/delete_procedure_with_parameters.1.adm
new file mode 100644
index 0000000..bf0d87a
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/results/procedure/delete_procedure_with_parameters/delete_procedure_with_parameters.1.adm
@@ -0,0 +1 @@
+4
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/runtimets/results/procedure/delete_procedure_with_parameters/delete_procedure_with_parameters.2.adm b/asterix-bad/src/test/resources/runtimets/results/procedure/delete_procedure_with_parameters/delete_procedure_with_parameters.2.adm
new file mode 100644
index 0000000..d8263ee
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/results/procedure/delete_procedure_with_parameters/delete_procedure_with_parameters.2.adm
@@ -0,0 +1 @@
+2
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/runtimets/results/procedure/query_procedure_with_parameters/query_procedure_with_parameters.1.adm b/asterix-bad/src/test/resources/runtimets/results/procedure/query_procedure_with_parameters/query_procedure_with_parameters.1.adm
new file mode 100644
index 0000000..bee5525
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/results/procedure/query_procedure_with_parameters/query_procedure_with_parameters.1.adm
@@ -0,0 +1,2 @@
+{ "roomNumber": 108 }
+{ "roomNumber": "jacob" }
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/runtimets/results/procedure/query_procedure_with_parameters/query_procedure_with_parameters.2.adm b/asterix-bad/src/test/resources/runtimets/results/procedure/query_procedure_with_parameters/query_procedure_with_parameters.2.adm
new file mode 100644
index 0000000..bee5525
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/results/procedure/query_procedure_with_parameters/query_procedure_with_parameters.2.adm
@@ -0,0 +1,2 @@
+{ "roomNumber": 108 }
+{ "roomNumber": "jacob" }
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/runtimets/testsuite.xml b/asterix-bad/src/test/resources/runtimets/testsuite.xml
index 12d7d55..1b2844b 100644
--- a/asterix-bad/src/test/resources/runtimets/testsuite.xml
+++ b/asterix-bad/src/test/resources/runtimets/testsuite.xml
@@ -21,60 +21,80 @@
              QueryOffsetPath="queries"
              QueryFileExtension=".sqlpp">
   <test-group name="channel">
-    <test-case FilePath="channel">
-        <compilation-unit name="room_occupants">
-            <output-dir compare="Text">room_occupants</output-dir>
-        </compilation-unit>
+    <test-case FilePath="procedure">
+      <compilation-unit name="delete_procedure">
+        <output-dir compare="Text">delete_procedure</output-dir>
+      </compilation-unit>
     </test-case>
     <test-case FilePath="procedure">
-        <compilation-unit name="insert_procedure">
-            <output-dir compare="Text">insert_procedure</output-dir>
-        </compilation-unit>
+      <compilation-unit name="delete_procedure_with_parameters">
+        <output-dir compare="Text">delete_procedure_with_parameters</output-dir>
+      </compilation-unit>
     </test-case>
     <test-case FilePath="procedure">
-        <compilation-unit name="delete_procedure">
-            <output-dir compare="Text">delete_procedure</output-dir>
-        </compilation-unit>
+      <compilation-unit name="query_procedure">
+        <output-dir compare="Text">query_procedure</output-dir>
+      </compilation-unit>
     </test-case>
     <test-case FilePath="procedure">
-        <compilation-unit name="query_procedure">
-            <output-dir compare="Text">query_procedure</output-dir>
-        </compilation-unit>
+      <compilation-unit name="query_procedure_with_parameters">
+        <output-dir compare="Text">query_procedure_with_parameters</output-dir>
+      </compilation-unit>
     </test-case>
     <test-case FilePath="procedure">
-        <compilation-unit name="repetitive_insert_procedure">
-            <output-dir compare="Text">repetitive_insert_procedure</output-dir>
-        </compilation-unit>
+      <compilation-unit name="insert_procedure">
+        <output-dir compare="Text">insert_procedure</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="procedure">
+      <compilation-unit name="concurrent_procedure">
+        <output-dir compare="Text">concurrent_procedure</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="procedure">
+      <compilation-unit name="repetitive_insert_procedure">
+        <output-dir compare="Text">repetitive_insert_procedure</output-dir>
+      </compilation-unit>
     </test-case>
     <test-case FilePath="channel">
-        <compilation-unit name="create_channel_check_datasets">
-            <output-dir compare="Text">create_channel_check_datasets</output-dir>
-        </compilation-unit>
+      <compilation-unit name="room_occupants">
+        <output-dir compare="Text">room_occupants</output-dir>
+      </compilation-unit>
     </test-case>
     <test-case FilePath="channel">
-        <compilation-unit name="create_channel_check_metadata">
-            <output-dir compare="Text">create_channel_check_metadata</output-dir>
-        </compilation-unit>
+      <compilation-unit name="create_channel_check_datasets">
+        <output-dir compare="Text">create_channel_check_datasets</output-dir>
+      </compilation-unit>
     </test-case>
     <test-case FilePath="channel">
-        <compilation-unit name="drop_channel_check_datasets">
-            <output-dir compare="Text">drop_channel_check_datasets</output-dir>
-        </compilation-unit>
+      <compilation-unit name="create_channel_check_metadata">
+        <output-dir compare="Text">create_channel_check_metadata</output-dir>
+      </compilation-unit>
     </test-case>
     <test-case FilePath="channel">
-        <compilation-unit name="drop_channel_check_metadata">
-            <output-dir compare="Text">drop_channel_check_metadata</output-dir>
-        </compilation-unit>
+      <compilation-unit name="drop_channel_check_datasets">
+        <output-dir compare="Text">drop_channel_check_datasets</output-dir>
+      </compilation-unit>
     </test-case>
     <test-case FilePath="channel">
-        <compilation-unit name="subscribe_channel_check_subscriptions">
-            <output-dir compare="Text">subscribe_channel_check_subscriptions</output-dir>
-        </compilation-unit>
+      <compilation-unit name="drop_channel_check_metadata">
+        <output-dir compare="Text">drop_channel_check_metadata</output-dir>
+      </compilation-unit>
     </test-case>
-    <!-- <test-case FilePath="channel">
-        <compilation-unit name="ten_minute_channel">
-            <output-dir compare="Text">ten_minute_channel</output-dir>
-        </compilation-unit>
-    </test-case> -->
+    <test-case FilePath="channel">
+      <compilation-unit name="subscribe_channel_check_subscriptions">
+        <output-dir compare="Text">subscribe_channel_check_subscriptions</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="channel">
+      <compilation-unit name="disasters_with_friends">
+        <output-dir compare="Text">disasters_with_friends</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="channel">
+      <compilation-unit name="ten_minute_channel">
+        <output-dir compare="Text">ten_minute_channel</output-dir>
+      </compilation-unit>
+    </test-case>
   </test-group>
 </test-suite>