Enabling Filters on Active Datasets

Change-Id: I499b1d9a1525ba5ea9396d21118c6f6cf3e870d2
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java b/asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java
index 391d84e..7178db1 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java
@@ -1,18 +1,18 @@
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
+ * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
+ * regarding copyright ownership.  The ASF licenses this file
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
+ * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *   http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
+ * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
  */
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/extension/BADMetadataExtension.java b/asterix-bad/src/main/java/org/apache/asterix/bad/extension/BADMetadataExtension.java
index d0639e2..28f7a79 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/extension/BADMetadataExtension.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/extension/BADMetadataExtension.java
@@ -135,6 +135,7 @@
         BuiltinFunctions.addFunction(BADFunctions.CURRENT_CHANNEL_TIME, ADateTimeTypeComputer.INSTANCE, false);
         BuiltinFunctions.addFunction(BADFunctions.PREVIOUS_CHANNEL_TIME, ADateTimeTypeComputer.INSTANCE, false);
         BuiltinFunctions.addFunction(BADFunctions.IS_NEW, ABooleanTypeComputer.INSTANCE, true);
+        BuiltinFunctions.addFunction(BADFunctions.ACTIVE_TIMESTAMP, ADateTimeTypeComputer.INSTANCE, true);
 
         // to shadow the master feed rewriter
         BuiltinFunctions.addPrivateFunction(BuiltinFunctions.FEED_COLLECT, BADFeedRewriter.INSTANCE, true);
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/feed/operators/BADLSMPrimaryInsertOperatorNodePushable.java b/asterix-bad/src/main/java/org/apache/asterix/bad/feed/operators/BADLSMPrimaryInsertOperatorNodePushable.java
index 813fd0b..3239584 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/feed/operators/BADLSMPrimaryInsertOperatorNodePushable.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/feed/operators/BADLSMPrimaryInsertOperatorNodePushable.java
@@ -44,13 +44,15 @@
 
     @Override
     protected void beforeModification(ITupleReference tuple) {
-        if ((tuple.getFieldCount() == 3
-                && tuple.getFieldData(0)[tuple.getFieldStart(2)] == ATypeTag.SERIALIZED_RECORD_TYPE_TAG)
-                || (tuple.getFieldCount() == 4
-                        && tuple.getFieldData(0)[tuple.getFieldStart(2)] == ATypeTag.SERIALIZED_RECORD_TYPE_TAG)) {
-            int targetIdx = tuple.getFieldStart(2) + 14;
+        if (tuple.getFieldCount() >= 3
+                && tuple.getFieldData(0)[tuple.getFieldStart(2)] == ATypeTag.SERIALIZED_RECORD_TYPE_TAG
+                && tuple.getFieldLength(2) == 22) {
+            long currMilli = System.currentTimeMillis();
             ByteBuffer tupleBuff = ByteBuffer.wrap(tuple.getFieldData(0));
-            tupleBuff.putLong(targetIdx, System.currentTimeMillis());
+            tupleBuff.putLong(tuple.getFieldStart(2) + 14, currMilli);
+            if (tuple.getFieldCount() == 4) {
+                tupleBuff.putLong(tuple.getFieldStart(3) + 1, currMilli);
+            }
         }
     }
 }
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/feed/operators/BADLSMPrimaryUpsertOperatorDescriptor.java b/asterix-bad/src/main/java/org/apache/asterix/bad/feed/operators/BADLSMPrimaryUpsertOperatorDescriptor.java
index 46f9bc8..0f1ce21 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/feed/operators/BADLSMPrimaryUpsertOperatorDescriptor.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/feed/operators/BADLSMPrimaryUpsertOperatorDescriptor.java
@@ -37,11 +37,11 @@
             IMissingWriterFactory missingWriterFactory,
             IModificationOperationCallbackFactory modificationOpCallbackFactory,
             ISearchOperationCallbackFactory searchOpCallbackFactory,
-            IFrameOperationCallbackFactory frameOpCallbackFactory, int numPrimaryKeys, ARecordType recordType,
-            int filterIndex, boolean hasSecondaries) {
+            IFrameOperationCallbackFactory frameOpCallbackFactory, int numPrimaryKeys, Integer filterSourceIndicator,
+            ARecordType itemType, int filterIndex, boolean hasSecondaries) {
         super(spec, outRecDesc, fieldPermutation, indexHelperFactory, missingWriterFactory,
                 modificationOpCallbackFactory, searchOpCallbackFactory, frameOpCallbackFactory, numPrimaryKeys,
-                recordType, filterIndex, hasSecondaries);
+                filterSourceIndicator, itemType, filterIndex, hasSecondaries);
     }
 
     @Override
@@ -49,7 +49,7 @@
             IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
         RecordDescriptor intputRecDesc = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
         return new BADLSMPrimaryUpsertOperatorNodePushable(ctx, partition, indexHelperFactory, fieldPermutation,
-                intputRecDesc, modCallbackFactory, searchOpCallbackFactory, numPrimaryKeys, recordType, filterIndex,
-                frameOpCallbackFactory, missingWriterFactory, hasSecondaries);
+                intputRecDesc, modCallbackFactory, searchOpCallbackFactory, numPrimaryKeys, filterSourceIndicator,
+                filterItemType, filterIndex, frameOpCallbackFactory, missingWriterFactory, hasSecondaries);
     }
 }
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/feed/operators/BADLSMPrimaryUpsertOperatorNodePushable.java b/asterix-bad/src/main/java/org/apache/asterix/bad/feed/operators/BADLSMPrimaryUpsertOperatorNodePushable.java
index b0c79d5..fc5ccde 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/feed/operators/BADLSMPrimaryUpsertOperatorNodePushable.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/feed/operators/BADLSMPrimaryUpsertOperatorNodePushable.java
@@ -36,23 +36,25 @@
     public BADLSMPrimaryUpsertOperatorNodePushable(IHyracksTaskContext ctx, int partition,
             IIndexDataflowHelperFactory indexHelperFactory, int[] fieldPermutation, RecordDescriptor inputRecDesc,
             IModificationOperationCallbackFactory modCallbackFactory,
-            ISearchOperationCallbackFactory searchCallbackFactory, int numOfPrimaryKeys, ARecordType recordType,
-            int filterFieldIndex, IFrameOperationCallbackFactory frameOpCallbackFactory,
+            ISearchOperationCallbackFactory searchCallbackFactory, int numOfPrimaryKeys, Integer filterSourceIndicator,
+            ARecordType recordType, int filterFieldIndex, IFrameOperationCallbackFactory frameOpCallbackFactory,
             IMissingWriterFactory missingWriterFactory, boolean hasSecondaries) throws HyracksDataException {
         super(ctx, partition, indexHelperFactory, fieldPermutation, inputRecDesc, modCallbackFactory,
-                searchCallbackFactory, numOfPrimaryKeys, recordType, filterFieldIndex, frameOpCallbackFactory,
-                missingWriterFactory, hasSecondaries);
+                searchCallbackFactory, numOfPrimaryKeys, filterSourceIndicator, recordType, filterFieldIndex,
+                frameOpCallbackFactory, missingWriterFactory, hasSecondaries);
     }
 
     @Override
     protected void beforeModification(ITupleReference tuple) {
-        if ((tuple.getFieldCount() == 3
-                && tuple.getFieldData(0)[tuple.getFieldStart(2)] == ATypeTag.SERIALIZED_RECORD_TYPE_TAG)
-                || (tuple.getFieldCount() == 4
-                        && tuple.getFieldData(0)[tuple.getFieldStart(2)] == ATypeTag.SERIALIZED_RECORD_TYPE_TAG)) {
-            int targetIdx = tuple.getFieldStart(2) + 14;
+        if (tuple.getFieldCount() >= 3
+                && tuple.getFieldData(0)[tuple.getFieldStart(2)] == ATypeTag.SERIALIZED_RECORD_TYPE_TAG
+                && tuple.getFieldLength(2) == 22) {
+            long currMilli = System.currentTimeMillis();
             ByteBuffer tupleBuff = ByteBuffer.wrap(tuple.getFieldData(0));
-            tupleBuff.putLong(targetIdx, System.currentTimeMillis());
+            tupleBuff.putLong(tuple.getFieldStart(2) + 14, currMilli);
+            if (tuple.getFieldCount() == 4) {
+                tupleBuff.putLong(tuple.getFieldStart(3) + 1, currMilli);
+            }
         }
     }
 }
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/function/BADFunctionCollection.java b/asterix-bad/src/main/java/org/apache/asterix/bad/function/BADFunctionCollection.java
index 15bb617..e6d2309 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/function/BADFunctionCollection.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/function/BADFunctionCollection.java
@@ -1,22 +1,20 @@
 /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
  *
- *  * Licensed to the Apache Software Foundation (ASF) under one
- *  * or more contributor license agreements. See the NOTICE file
- *  * distributed with this work for additional information
- *  * regarding copyright ownership. The ASF licenses this file
- *  * to you under the Apache License, Version 2.0 (the
- *  * "License"); you may not use this file except in compliance
- *  * with the License. You may obtain a copy of the License at
- *  *
- *  * http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing,
- *  * software distributed under the License is distributed on an
- *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  * KIND, either express or implied. See the License for the
- *  * specific language governing permissions and limitations
- *  * under the License.
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 
 package org.apache.asterix.bad.function;
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/function/BADFunctionRegistrant.java b/asterix-bad/src/main/java/org/apache/asterix/bad/function/BADFunctionRegistrant.java
index 15a5fc5..5dfb62d 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/function/BADFunctionRegistrant.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/function/BADFunctionRegistrant.java
@@ -1,22 +1,20 @@
 /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
  *
- *  * Licensed to the Apache Software Foundation (ASF) under one
- *  * or more contributor license agreements. See the NOTICE file
- *  * distributed with this work for additional information
- *  * regarding copyright ownership. The ASF licenses this file
- *  * to you under the Apache License, Version 2.0 (the
- *  * "License"); you may not use this file except in compliance
- *  * with the License. You may obtain a copy of the License at
- *  *
- *  * http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing,
- *  * software distributed under the License is distributed on an
- *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  * KIND, either express or implied. See the License for the
- *  * specific language governing permissions and limitations
- *  * under the License.
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 package org.apache.asterix.bad.function;
 
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/function/BADFunctions.java b/asterix-bad/src/main/java/org/apache/asterix/bad/function/BADFunctions.java
index a342210..085bd80 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/function/BADFunctions.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/function/BADFunctions.java
@@ -1,22 +1,20 @@
 /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- *  * Licensed to the Apache Software Foundation (ASF) under one
- *  * or more contributor license agreements. See the NOTICE file
- *  * distributed with this work for additional information
- *  * regarding copyright ownership. The ASF licenses this file
- *  * to you under the Apache License, Version 2.0 (the
- *  * "License"); you may not use this file except in compliance
- *  * with the License. You may obtain a copy of the License at
- *  *
- *  * http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing,
- *  * software distributed under the License is distributed on an
- *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  * KIND, either express or implied. See the License for the
- *  * specific language governing permissions and limitations
- *  * under the License.
+ *   http://www.apache.org/licenses/LICENSE-2.0
  *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 
 package org.apache.asterix.bad.function;
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/function/runtime/CurrentChannelTimeDescriptor.java b/asterix-bad/src/main/java/org/apache/asterix/bad/function/runtime/CurrentChannelTimeDescriptor.java
index f3392ad..90191a3 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/function/runtime/CurrentChannelTimeDescriptor.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/function/runtime/CurrentChannelTimeDescriptor.java
@@ -1,22 +1,20 @@
 /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- *  * Licensed to the Apache Software Foundation (ASF) under one
- *  * or more contributor license agreements. See the NOTICE file
- *  * distributed with this work for additional information
- *  * regarding copyright ownership. The ASF licenses this file
- *  * to you under the Apache License, Version 2.0 (the
- *  * "License"); you may not use this file except in compliance
- *  * with the License. You may obtain a copy of the License at
- *  *
- *  * http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing,
- *  * software distributed under the License is distributed on an
- *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  * KIND, either express or implied. See the License for the
- *  * specific language governing permissions and limitations
- *  * under the License.
+ *   http://www.apache.org/licenses/LICENSE-2.0
  *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 package org.apache.asterix.bad.function.runtime;
 
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/function/runtime/PreviousChannelTimeDescriptor.java b/asterix-bad/src/main/java/org/apache/asterix/bad/function/runtime/PreviousChannelTimeDescriptor.java
index f4dd1f5..800c3c4 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/function/runtime/PreviousChannelTimeDescriptor.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/function/runtime/PreviousChannelTimeDescriptor.java
@@ -1,22 +1,20 @@
 /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- *  * Licensed to the Apache Software Foundation (ASF) under one
- *  * or more contributor license agreements. See the NOTICE file
- *  * distributed with this work for additional information
- *  * regarding copyright ownership. The ASF licenses this file
- *  * to you under the Apache License, Version 2.0 (the
- *  * "License"); you may not use this file except in compliance
- *  * with the License. You may obtain a copy of the License at
- *  *
- *  * http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing,
- *  * software distributed under the License is distributed on an
- *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  * KIND, either express or implied. See the License for the
- *  * specific language governing permissions and limitations
- *  * under the License.
+ *   http://www.apache.org/licenses/LICENSE-2.0
  *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 package org.apache.asterix.bad.function.runtime;
 
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADExpressionToPlanTranslator.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADExpressionToPlanTranslator.java
index 23d10c5..abcd70d 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADExpressionToPlanTranslator.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADExpressionToPlanTranslator.java
@@ -27,6 +27,7 @@
 import org.apache.asterix.lang.common.struct.VarIdentifier;
 import org.apache.asterix.metadata.declared.DatasetDataSource;
 import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.utils.DatasetUtil;
 import org.apache.asterix.om.base.ADateTime;
 import org.apache.asterix.om.base.ARecord;
 import org.apache.asterix.om.base.IAObject;
@@ -72,32 +73,43 @@
 
     @Override
     protected ILogicalOperator translateDelete(DatasetDataSource targetDatasource, Mutable<ILogicalExpression> varRef,
-            List<Mutable<ILogicalExpression>> varRefsForLoading,
-            List<Mutable<ILogicalExpression>> additionalFilteringExpressions, ILogicalOperator inputOp,
+            List<Mutable<ILogicalExpression>> varRefsForLoading, LogicalVariable seqVar, ILogicalOperator pkeyAssignOp,
             CompiledStatements.ICompiledDmlStatement stmt) throws AlgebricksException {
         SourceLocation sourceLoc = stmt.getSourceLocation();
         InsertDeleteUpsertOperator deleteOp;
+        LogicalVariable metaVar = null;
+
         if (!targetDatasource.getDataset().hasMetaPart()) {
             deleteOp = new InsertDeleteUpsertOperator(targetDatasource, varRef, varRefsForLoading,
                     InsertDeleteUpsertOperator.Kind.DELETE, false);
         } else {
             // prepare meta record
             IAType metaType = metadataProvider.findMetaType(targetDatasource.getDataset());
-            LogicalVariable metaVar = context.newVar();
+            metaVar = context.newVar();
             AssignOperator metaVariableAssignOp =
                     new AssignOperator(metaVar, new MutableObject<>(makeMetaRecordExpr(metaType)));
-            metaVariableAssignOp.getInputs().add(new MutableObject<>(inputOp));
+            metaVariableAssignOp.getInputs().add(new MutableObject<>(pkeyAssignOp));
             metaVariableAssignOp.setSourceLocation(sourceLoc);
             // create insert op uses meta record
             deleteOp = new InsertDeleteUpsertOperator(targetDatasource, varRef, varRefsForLoading,
                     Collections.singletonList(new MutableObject<>(new VariableReferenceExpression(metaVar))),
                     InsertDeleteUpsertOperator.Kind.DELETE, false);
             // change current inputOp to be meta op
-            inputOp = metaVariableAssignOp;
+            pkeyAssignOp = metaVariableAssignOp;
         }
-        deleteOp.setAdditionalFilteringExpressions(additionalFilteringExpressions);
-        deleteOp.getInputs().add(new MutableObject<>(inputOp));
+
+        deleteOp.getInputs().add(new MutableObject<>(pkeyAssignOp));
         deleteOp.setSourceLocation(sourceLoc);
+
+        List<String> filterField = DatasetUtil.getFilterField(targetDatasource.getDataset());
+        List<Mutable<ILogicalExpression>> filterExprs = null;
+        Integer filterSourceIndicator = DatasetUtil.getFilterSourceIndicator(targetDatasource.getDataset());
+        if (filterField != null) {
+            filterExprs = generatedFilterExprs(deleteOp, filterField, filterSourceIndicator == 0 ? seqVar : metaVar,
+                    sourceLoc);
+        }
+        deleteOp.setAdditionalFilteringExpressions(filterExprs);
+
         DelegateOperator leafOperator = new DelegateOperator(new CommitOperator(true));
         leafOperator.getInputs().add(new MutableObject<>(deleteOp));
         leafOperator.setSourceLocation(sourceLoc);
@@ -105,23 +117,23 @@
     }
 
     @Override
-    protected ILogicalOperator translateUpsert(DatasetDataSource targetDatasource, Mutable<ILogicalExpression> varRef,
-            List<Mutable<ILogicalExpression>> varRefsForLoading, List<Mutable<ILogicalExpression>> filterExprs,
-            ILogicalOperator pkeyAssignOp, List<String> additionalFilteringField, LogicalVariable unnestVar,
-            ILogicalOperator topOp, List<Mutable<ILogicalExpression>> exprs, LogicalVariable resVar,
-            AssignOperator additionalFilteringAssign, CompiledStatements.ICompiledDmlStatement stmt,
-            IResultMetadata resultMetadata) throws AlgebricksException {
+    protected ILogicalOperator translateUpsert(DatasetDataSource targetDatasource,
+            Mutable<ILogicalExpression> payloadVarRef, List<Mutable<ILogicalExpression>> varRefsForLoading,
+            ILogicalOperator pkeyAssignOp, LogicalVariable unnestVar, ILogicalOperator topOp,
+            List<Mutable<ILogicalExpression>> pkeyExprs, LogicalVariable seqVar,
+            CompiledStatements.ICompiledDmlStatement stmt, IResultMetadata resultMetadata) throws AlgebricksException {
         SourceLocation sourceLoc = stmt.getSourceLocation();
         CompiledStatements.CompiledUpsertStatement compiledUpsert = (CompiledStatements.CompiledUpsertStatement) stmt;
         Expression returnExpression = compiledUpsert.getReturnExpression();
         InsertDeleteUpsertOperator upsertOp;
         ILogicalOperator rootOperator;
+        LogicalVariable metaVar = null;
 
         ARecordType recordType = (ARecordType) targetDatasource.getItemType();
 
         if (targetDatasource.getDataset().hasMetaPart()) {
             IAType metaType = metadataProvider.findMetaType(targetDatasource.getDataset());
-            LogicalVariable metaVar = context.newVar();
+            metaVar = context.newVar();
             AssignOperator metaVariableAssignOp =
                     new AssignOperator(metaVar, new MutableObject<>(makeMetaRecordExpr(metaType)));
             metaVariableAssignOp.getInputs().add(new MutableObject<>(pkeyAssignOp));
@@ -131,7 +143,7 @@
             List<Mutable<ILogicalExpression>> metaExprs = new ArrayList<>(1);
             VariableReferenceExpression metaVarRef = new VariableReferenceExpression(metaVar);
             metaExprs.add(new MutableObject<>(metaVarRef));
-            upsertOp = new InsertDeleteUpsertOperator(targetDatasource, varRef, varRefsForLoading, metaExprs,
+            upsertOp = new InsertDeleteUpsertOperator(targetDatasource, payloadVarRef, varRefsForLoading, metaExprs,
                     InsertDeleteUpsertOperator.Kind.UPSERT, false);
 
             // set previous meta vars
@@ -142,23 +154,33 @@
             metaTypes.add(targetDatasource.getMetaItemType());
             upsertOp.setPrevAdditionalNonFilteringTypes(metaTypes);
         } else {
-            upsertOp = new InsertDeleteUpsertOperator(targetDatasource, varRef, varRefsForLoading,
+            upsertOp = new InsertDeleteUpsertOperator(targetDatasource, payloadVarRef, varRefsForLoading,
                     InsertDeleteUpsertOperator.Kind.UPSERT, false);
-            // Create and add a new variable used for representing the original record
-            if (additionalFilteringField != null) {
-                upsertOp.setPrevFilterVar(context.newVar());
-                upsertOp.setPrevFilterType(recordType.getFieldType(additionalFilteringField.get(0)));
-            }
         }
+
         // Create and add a new variable used for representing the original record
         upsertOp.setUpsertIndicatorVar(context.newVar());
         upsertOp.setUpsertIndicatorVarType(BuiltinType.ABOOLEAN);
         upsertOp.setPrevRecordVar(context.newVar());
         upsertOp.setPrevRecordType(recordType);
         upsertOp.setSourceLocation(sourceLoc);
-        upsertOp.setAdditionalFilteringExpressions(filterExprs);
         upsertOp.getInputs().add(new MutableObject<>(pkeyAssignOp));
 
+        List<String> filterField = DatasetUtil.getFilterField(targetDatasource.getDataset());
+        List<Mutable<ILogicalExpression>> filterExprs = null;
+        Integer filterSourceIndicator = DatasetUtil.getFilterSourceIndicator(targetDatasource.getDataset());
+        if (filterField != null) {
+            filterExprs = generatedFilterExprs(upsertOp, filterField, filterSourceIndicator == 0 ? seqVar : metaVar,
+                    sourceLoc);
+            ARecordType filterSourceType = filterSourceIndicator == 0 ? (ARecordType) targetDatasource.getItemType()
+                    : (ARecordType) targetDatasource.getMetaItemType();
+            upsertOp.setAdditionalFilteringExpressions(filterExprs);
+            upsertOp.setPrevFilterVar(context.newVar());
+            upsertOp.setPrevFilterType(filterSourceType.getFieldType(filterField.get(0)));
+        } else {
+            upsertOp.setAdditionalFilteringExpressions(null);
+        }
+
         // Set up delegate operator
         DelegateOperator delegateOperator = new DelegateOperator(new CommitOperator(returnExpression == null));
         delegateOperator.getInputs().add(new MutableObject<>(upsertOp));
@@ -171,10 +193,10 @@
 
     @Override
     protected ILogicalOperator translateInsert(DatasetDataSource targetDatasource, Mutable<ILogicalExpression> varRef,
-            List<Mutable<ILogicalExpression>> varRefsForLoading, List<Mutable<ILogicalExpression>> filterExprs,
-            ILogicalOperator inputOp, CompiledStatements.ICompiledDmlStatement stmt, IResultMetadata resultMetadata)
-            throws AlgebricksException {
+            List<Mutable<ILogicalExpression>> varRefsForLoading, LogicalVariable seqVar, ILogicalOperator pkeyAssignOp,
+            CompiledStatements.ICompiledDmlStatement stmt, IResultMetadata resultMetadata) throws AlgebricksException {
         SourceLocation sourceLoc = stmt.getSourceLocation();
+        LogicalVariable metaVar = null;
 
         InsertDeleteUpsertOperator insertOp;
         if (!targetDatasource.getDataset().hasMetaPart()) {
@@ -183,21 +205,30 @@
         } else {
             // prepare meta record
             IAType metaType = metadataProvider.findMetaType(targetDatasource.getDataset());
-            LogicalVariable metaVar = context.newVar();
+            metaVar = context.newVar();
             AssignOperator metaVariableAssignOp =
                     new AssignOperator(metaVar, new MutableObject<>(makeMetaRecordExpr(metaType)));
-            metaVariableAssignOp.getInputs().add(new MutableObject<>(inputOp));
+            metaVariableAssignOp.getInputs().add(new MutableObject<>(pkeyAssignOp));
             metaVariableAssignOp.setSourceLocation(sourceLoc);
             // create insert op uses meta record
             insertOp = new InsertDeleteUpsertOperator(targetDatasource, varRef, varRefsForLoading,
                     Collections.singletonList(new MutableObject<>(new VariableReferenceExpression(metaVar))),
                     InsertDeleteUpsertOperator.Kind.INSERT, false);
             // change current inputOp to be meta op
-            inputOp = metaVariableAssignOp;
+            pkeyAssignOp = metaVariableAssignOp;
+        }
+
+        insertOp.getInputs().add(new MutableObject<>(pkeyAssignOp));
+        insertOp.setSourceLocation(sourceLoc);
+
+        List<String> filterField = DatasetUtil.getFilterField(targetDatasource.getDataset());
+        List<Mutable<ILogicalExpression>> filterExprs = null;
+        Integer filterSourceIndicator = DatasetUtil.getFilterSourceIndicator(targetDatasource.getDataset());
+        if (filterField != null) {
+            filterExprs = generatedFilterExprs(insertOp, filterField, filterSourceIndicator == 0 ? seqVar : metaVar,
+                    sourceLoc);
         }
         insertOp.setAdditionalFilteringExpressions(filterExprs);
-        insertOp.getInputs().add(new MutableObject<>(inputOp));
-        insertOp.setSourceLocation(sourceLoc);
 
         // Adds the commit operator.
         CompiledStatements.CompiledInsertStatement compiledInsert = (CompiledStatements.CompiledInsertStatement) stmt;
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADRuleSetFactory.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADRuleSetFactory.java
index 3c3ec59..be93512 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADRuleSetFactory.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADRuleSetFactory.java
@@ -22,6 +22,7 @@
 import java.util.List;
 
 import org.apache.asterix.bad.rules.InsertBrokerNotifierForChannelRule;
+import org.apache.asterix.bad.rules.RewriteActiveFunctionsRule;
 import org.apache.asterix.bad.rules.RewriteChannelTimeFunctionToLocalVarRule;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.compiler.provider.DefaultRuleSetFactory;
@@ -92,6 +93,7 @@
             if (rule instanceof FeedScanCollectionToUnnest) {
                 alteredRuleCollection.add(i + 1, new MetaFunctionToMetaVariableRule());
                 alteredRuleCollection.add(i + 1, new RewriteChannelTimeFunctionToLocalVarRule());
+                alteredRuleCollection.add(i + 1, new RewriteActiveFunctionsRule());
             }
         }
 
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/AbstractCreateChannelStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/AbstractCreateChannelStatement.java
index 43f11fd..943e0a4 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/AbstractCreateChannelStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/AbstractCreateChannelStatement.java
@@ -136,7 +136,7 @@
         List<String> fieldNames = new ArrayList<>();
         fieldNames.add(BADConstants.SubscriptionId);
         partitionFields.add(fieldNames);
-        IDatasetDetailsDecl idd = new InternalDetailsDecl(partitionFields, keyIndicators, true, null);
+        IDatasetDetailsDecl idd = new InternalDetailsDecl(partitionFields, keyIndicators, true, null, null);
         TypeExpression subItemType = new TypeReferenceExpression(
                 new Pair<>(MetadataConstants.METADATA_DATAVERSE_NAME, subscriptionsTypeName));
         DatasetDecl createSubscriptionsDataset = new DatasetDecl(dataverseName, new Identifier(subscriptionsTableName),
@@ -151,7 +151,7 @@
             fieldNames = new ArrayList<>();
             fieldNames.add(BADConstants.ResultId);
             partitionFields.add(fieldNames);
-            idd = new InternalDetailsDecl(partitionFields, keyIndicators, true, null);
+            idd = new InternalDetailsDecl(partitionFields, keyIndicators, true, null, null);
             TypeExpression resultItemType =
                     new TypeReferenceExpression(new Pair<>(MetadataConstants.METADATA_DATAVERSE_NAME, resultsTypeName));
             DatasetDecl createResultsDataset = new DatasetDecl(dataverseName, new Identifier(resultsTableName),
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataProvider.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataProvider.java
index 290db49..5a1e151 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataProvider.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataProvider.java
@@ -120,9 +120,13 @@
         }
         // add the previous filter third
         int fieldIdx = -1;
+        Integer filterSourceIndicator = null;
+        ARecordType filterItemType = null;
         if (numFilterFields > 0) {
             String filterField = DatasetUtil.getFilterField(dataset).get(0);
-            String[] fieldNames = itemType.getFieldNames();
+            filterSourceIndicator = DatasetUtil.getFilterSourceIndicator(dataset);
+            filterItemType = filterSourceIndicator == 0 ? itemType : metaItemType;
+            String[] fieldNames = filterItemType.getFieldNames();
             int i = 0;
             for (; i < fieldNames.length; i++) {
                 if (fieldNames[i].equals(filterField)) {
@@ -130,9 +134,10 @@
                 }
             }
             fieldIdx = i;
-            outputTypeTraits[f] = dataFormat.getTypeTraitProvider().getTypeTrait(itemType.getFieldTypes()[fieldIdx]);
+            outputTypeTraits[f] =
+                    dataFormat.getTypeTraitProvider().getTypeTrait(filterItemType.getFieldTypes()[fieldIdx]);
             outputSerDes[f] =
-                    dataFormat.getSerdeProvider().getSerializerDeserializer(itemType.getFieldTypes()[fieldIdx]);
+                    dataFormat.getSerdeProvider().getSerializerDeserializer(filterItemType.getFieldTypes()[fieldIdx]);
             f++;
         }
         for (int j = 0; j < inputRecordDesc.getFieldCount(); j++) {
@@ -142,7 +147,8 @@
         RecordDescriptor outputRecordDesc = new RecordDescriptor(outputSerDes, outputTypeTraits);
         op = new BADLSMPrimaryUpsertOperatorDescriptor(spec, outputRecordDesc, fieldPermutation, idfh,
                 missingWriterFactory, modificationCallbackFactory, searchCallbackFactory,
-                dataset.getFrameOpCallbackFactory(metadataProvider), numKeys, itemType, fieldIdx, hasSecondaries);
+                dataset.getFrameOpCallbackFactory(metadataProvider), numKeys, filterSourceIndicator, filterItemType,
+                fieldIdx, hasSecondaries);
         return new Pair<>(op, splitsAndConstraint.second);
     }
 }
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/rules/RewriteActiveFunctionsRule.java b/asterix-bad/src/main/java/org/apache/asterix/bad/rules/RewriteActiveFunctionsRule.java
new file mode 100644
index 0000000..a58341f
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/rules/RewriteActiveFunctionsRule.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.rules;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.asterix.bad.BADConstants;
+import org.apache.asterix.bad.function.BADFunctions;
+import org.apache.asterix.lang.common.util.FunctionUtil;
+import org.apache.asterix.om.base.AString;
+import org.apache.asterix.om.constants.AsterixConstantValue;
+import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import org.apache.hyracks.algebricks.core.algebra.functions.AlgebricksBuiltinFunctions;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+public class RewriteActiveFunctionsRule implements IAlgebraicRewriteRule {
+
+    @Override
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
+        Mutable<ILogicalExpression> exprRef;
+        if (opRef.getValue().getOperatorTag() == LogicalOperatorTag.INNERJOIN) {
+            InnerJoinOperator selectOp = (InnerJoinOperator) opRef.getValue();
+            exprRef = selectOp.getCondition();
+        } else if (opRef.getValue().getOperatorTag() == LogicalOperatorTag.SELECT) {
+            SelectOperator selectOp = (SelectOperator) opRef.getValue();
+            exprRef = selectOp.getCondition();
+        } else {
+            return false;
+        }
+        return visit(exprRef, opRef, context);
+    }
+
+    private ScalarFunctionCallExpression makeActiveTsAccessExpr(LogicalVariable dsVar) {
+        ILogicalExpression activeTsFunc =
+                new ScalarFunctionCallExpression(FunctionUtil.getFunctionInfo(BuiltinFunctions.META),
+                        new MutableObject<>(new VariableReferenceExpression(dsVar)));
+        return new ScalarFunctionCallExpression(FunctionUtil.getFunctionInfo(BuiltinFunctions.FIELD_ACCESS_BY_NAME),
+                new MutableObject<>(activeTsFunc), new MutableObject<>(new ConstantExpression(
+                        new AsterixConstantValue(new AString(BADConstants.FIELD_NAME_ACTIVE_TS)))));
+    }
+
+    private AbstractFunctionCallExpression makeAndForIsNew(AbstractFunctionCallExpression funcExpr) {
+        LogicalVariable dsVar = getDsVar(funcExpr);
+        ILogicalExpression previousChannelTimeExpr = new ScalarFunctionCallExpression(
+                BuiltinFunctions.getBuiltinFunctionInfo(BADFunctions.PREVIOUS_CHANNEL_TIME),
+                new MutableObject<>(new VariableReferenceExpression(dsVar)));
+        ILogicalExpression currentChannelTimeExpr = new ScalarFunctionCallExpression(
+                BuiltinFunctions.getBuiltinFunctionInfo(BADFunctions.CURRENT_CHANNEL_TIME),
+                new MutableObject<>(new VariableReferenceExpression(dsVar)));
+        ILogicalExpression lessThanExpr = new ScalarFunctionCallExpression(
+                FunctionUtil.getFunctionInfo(AlgebricksBuiltinFunctions.LT),
+                new MutableObject<>(makeActiveTsAccessExpr(dsVar)), new MutableObject<>(currentChannelTimeExpr));
+        ILogicalExpression greaterThanExpr = new ScalarFunctionCallExpression(
+                FunctionUtil.getFunctionInfo(AlgebricksBuiltinFunctions.GT),
+                new MutableObject<>(makeActiveTsAccessExpr(dsVar)), new MutableObject<>(previousChannelTimeExpr));
+        return new ScalarFunctionCallExpression(FunctionUtil.getFunctionInfo(BuiltinFunctions.AND),
+                new MutableObject<>(lessThanExpr), new MutableObject<>(greaterThanExpr));
+    }
+
+    private LogicalVariable getDsVar(AbstractFunctionCallExpression funcExpr) {
+        return ((VariableReferenceExpression) funcExpr.getArguments().get(0).getValue()).getVariableReference();
+    }
+
+    private boolean visit(Mutable<ILogicalExpression> exprRef, Mutable<ILogicalOperator> opRef,
+            IOptimizationContext context) throws AlgebricksException {
+        boolean changed = false;
+        if (exprRef.getValue().getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) {
+            AbstractFunctionCallExpression funcExpr = (AbstractFunctionCallExpression) exprRef.getValue();
+
+            if (funcExpr.getFunctionIdentifier() == BADFunctions.IS_NEW) {
+                exprRef.setValue(makeAndForIsNew(funcExpr));
+                changed = true;
+            } else if (funcExpr.getFunctionIdentifier() == BADFunctions.ACTIVE_TIMESTAMP) {
+                LogicalVariable channelTimeVar = context.newVar();
+                LogicalVariable dsVar = getDsVar(funcExpr);
+                ILogicalExpression activeTsFunc =
+                        new ScalarFunctionCallExpression(FunctionUtil.getFunctionInfo(BuiltinFunctions.META),
+                                new MutableObject<>(new VariableReferenceExpression(dsVar)));
+                ScalarFunctionCallExpression faExpr = new ScalarFunctionCallExpression(
+                        FunctionUtil.getFunctionInfo(BuiltinFunctions.FIELD_ACCESS_BY_NAME),
+                        new MutableObject<>(activeTsFunc), new MutableObject<>(new ConstantExpression(
+                                new AsterixConstantValue(new AString(BADConstants.FIELD_NAME_ACTIVE_TS)))));
+
+                AssignOperator assignOp = new AssignOperator(channelTimeVar, new MutableObject<>(faExpr));
+
+                List<LogicalVariable> liveVars = new ArrayList<>();
+                for (int i = 0; i < opRef.getValue().getInputs().size(); i++) {
+                    Mutable<ILogicalOperator> inputOpRef = opRef.getValue().getInputs().get(i);
+                    VariableUtilities.getLiveVariables(inputOpRef.getValue(), liveVars);
+                    if (liveVars.contains(dsVar)) {
+                        assignOp.getInputs().add(new MutableObject<>(inputOpRef.getValue()));
+                        inputOpRef.setValue(assignOp);
+                        exprRef.setValue(new VariableReferenceExpression(channelTimeVar));
+                        context.computeAndSetTypeEnvironmentForOperator(assignOp);
+                    }
+                }
+
+                changed = true;
+            } else {
+                for (Mutable<ILogicalExpression> argExpr : funcExpr.getArguments()) {
+                    changed = changed || visit(argExpr, opRef, context);
+                }
+            }
+        }
+        return changed;
+    }
+
+}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/rules/RewriteChannelTimeFunctionToLocalVarRule.java b/asterix-bad/src/main/java/org/apache/asterix/bad/rules/RewriteChannelTimeFunctionToLocalVarRule.java
index 56f5e39..1542d1e 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/rules/RewriteChannelTimeFunctionToLocalVarRule.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/rules/RewriteChannelTimeFunctionToLocalVarRule.java
@@ -1,22 +1,20 @@
 /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
  *
- *  * Licensed to the Apache Software Foundation (ASF) under one
- *  * or more contributor license agreements. See the NOTICE file
- *  * distributed with this work for additional information
- *  * regarding copyright ownership. The ASF licenses this file
- *  * to you under the Apache License, Version 2.0 (the
- *  * "License"); you may not use this file except in compliance
- *  * with the License. You may obtain a copy of the License at
- *  *
- *  * http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing,
- *  * software distributed under the License is distributed on an
- *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  * KIND, either express or implied. See the License for the
- *  * specific language governing permissions and limitations
- *  * under the License.
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 package org.apache.asterix.bad.rules;
 
@@ -27,7 +25,6 @@
 
 import org.apache.asterix.bad.BADConstants;
 import org.apache.asterix.bad.function.BADFunctions;
-import org.apache.asterix.lang.common.util.FunctionUtil;
 import org.apache.asterix.metadata.declared.DataSource;
 import org.apache.asterix.om.base.AString;
 import org.apache.asterix.om.constants.AsterixConstantValue;
@@ -45,7 +42,6 @@
 import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
 import org.apache.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
 import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
-import org.apache.hyracks.algebricks.core.algebra.functions.AlgebricksBuiltinFunctions;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -57,7 +53,6 @@
     @Override
     public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
             throws AlgebricksException {
-
         Mutable<ILogicalExpression> exprRef;
         if (opRef.getValue().getOperatorTag() == LogicalOperatorTag.INNERJOIN) {
             InnerJoinOperator selectOp = (InnerJoinOperator) opRef.getValue();
@@ -72,32 +67,30 @@
         Set<Mutable<ILogicalExpression>> activeFunctionSet = new HashSet<>();
         Set<LogicalVariable> needPrevDsSet = new HashSet<>();
         Set<LogicalVariable> needCurrDsSet = new HashSet<>();
-        Set<LogicalVariable> needActiveDsSet = new HashSet<>();
+        String channelName =
+                (String) context.getMetadataProvider().getConfig().getOrDefault(BADConstants.CONFIG_CHANNEL_NAME, "");
 
         // collect active functions
-        collectChannelTimeFunctions(exprRef, activeFunctionSet, needPrevDsSet, needCurrDsSet, needActiveDsSet);
+        boolean rewriteFunc = collectChannelTimeFunctions(exprRef, activeFunctionSet, needPrevDsSet, needCurrDsSet,
+                true, channelName);
         if (activeFunctionSet.size() == 0) {
-            return false;
+            return rewriteFunc;
         }
 
         // add assigns for active functions
         Map<LogicalVariable, LogicalVariable> prevMap = new HashMap<>();
         Map<LogicalVariable, LogicalVariable> currMap = new HashMap<>();
-        Map<LogicalVariable, LogicalVariable> activeMap = new HashMap<>();
-        createChannelTimeAssignOps(opRef, needPrevDsSet, needCurrDsSet, needActiveDsSet, prevMap, currMap, activeMap,
-                context);
+        createChannelTimeAssignOps(opRef, needPrevDsSet, needCurrDsSet, prevMap, currMap, context, channelName);
 
         // update expressions with new vars
-        updateActiveFuncExprsWithVars(prevMap, currMap, activeMap, activeFunctionSet);
+        updateActiveFuncExprsWithVars(prevMap, currMap, activeFunctionSet);
 
         context.computeAndSetTypeEnvironmentForOperator(opRef.getValue());
         return true;
     }
 
     private void updateActiveFuncExprsWithVars(Map<LogicalVariable, LogicalVariable> prevMap,
-            Map<LogicalVariable, LogicalVariable> currMap, Map<LogicalVariable, LogicalVariable> activeMap,
-            Set<Mutable<ILogicalExpression>> activeFuncExprs) {
-
+            Map<LogicalVariable, LogicalVariable> currMap, Set<Mutable<ILogicalExpression>> activeFuncExprs) {
         for (Mutable<ILogicalExpression> expr : activeFuncExprs) {
             AbstractFunctionCallExpression funcExpr = (AbstractFunctionCallExpression) expr.getValue();
             LogicalVariable dsVar = ((VariableReferenceExpression) ((AbstractFunctionCallExpression) expr.getValue())
@@ -106,36 +99,20 @@
                 expr.setValue(new VariableReferenceExpression(currMap.get(dsVar)));
             } else if (funcExpr.getFunctionIdentifier() == BADFunctions.PREVIOUS_CHANNEL_TIME) {
                 expr.setValue(new VariableReferenceExpression(prevMap.get(dsVar)));
-            } else {
-                ILogicalExpression lessThanExpr =
-                        new ScalarFunctionCallExpression(FunctionUtil.getFunctionInfo(AlgebricksBuiltinFunctions.LT),
-                                new MutableObject<>(new VariableReferenceExpression(activeMap.get(dsVar))),
-                                new MutableObject<>(new VariableReferenceExpression(currMap.get(dsVar))));
-                ILogicalExpression greaterThanExpr =
-                        new ScalarFunctionCallExpression(FunctionUtil.getFunctionInfo(AlgebricksBuiltinFunctions.GT),
-                                new MutableObject<>(new VariableReferenceExpression(activeMap.get(dsVar))),
-                                new MutableObject<>(new VariableReferenceExpression(prevMap.get(dsVar))));
-                ScalarFunctionCallExpression andExpr =
-                        new ScalarFunctionCallExpression(FunctionUtil.getFunctionInfo(BuiltinFunctions.AND),
-                                new MutableObject<>(lessThanExpr), new MutableObject<>(greaterThanExpr));
-                expr.setValue(andExpr);
             }
         }
     }
 
     private void createChannelTimeAssignOps(Mutable<ILogicalOperator> opRef, Set<LogicalVariable> needPrevDsSet,
-            Set<LogicalVariable> needCurrDsSet, Set<LogicalVariable> needActiveDsSet,
-            Map<LogicalVariable, LogicalVariable> prevMap, Map<LogicalVariable, LogicalVariable> currMap,
-            Map<LogicalVariable, LogicalVariable> activeMap, IOptimizationContext context) {
+            Set<LogicalVariable> needCurrDsSet, Map<LogicalVariable, LogicalVariable> prevMap,
+            Map<LogicalVariable, LogicalVariable> currMap, IOptimizationContext context, String channelName) {
         ILogicalOperator currOp = opRef.getValue();
-        String channelName =
-                (String) context.getMetadataProvider().getConfig().getOrDefault(BADConstants.CONFIG_CHANNEL_NAME, "");
         if (opRef.getValue().getOperatorTag() == LogicalOperatorTag.DATASOURCESCAN) {
             DataSourceScanOperator dataScanOp = (DataSourceScanOperator) opRef.getValue();
             DataSource ds = (DataSource) dataScanOp.getDataSource();
             LogicalVariable dsVar = ds.getDataRecordVariable(dataScanOp.getScanVariables());
 
-            if (needPrevDsSet.contains(dsVar) || needActiveDsSet.contains(dsVar)) {
+            if (needPrevDsSet.contains(dsVar)) {
                 LogicalVariable channelTimeVar = context.newVar();
                 ILogicalExpression previousChannelTimeExpr = new ScalarFunctionCallExpression(
                         BuiltinFunctions.getBuiltinFunctionInfo(BADFunctions.PREVIOUS_CHANNEL_TIME),
@@ -148,7 +125,7 @@
                 prevMap.put(dsVar, channelTimeVar);
             }
 
-            if (needCurrDsSet.contains(dsVar) || needActiveDsSet.contains(dsVar)) {
+            if (needCurrDsSet.contains(dsVar)) {
                 LogicalVariable channelTimeVar = context.newVar();
                 ILogicalExpression previousChannelTimeExpr = new ScalarFunctionCallExpression(
                         BuiltinFunctions.getBuiltinFunctionInfo(BADFunctions.CURRENT_CHANNEL_TIME), new MutableObject<>(
@@ -159,55 +136,51 @@
                 opRef.setValue(assignOp);
                 currMap.put(dsVar, channelTimeVar);
             }
-
-            if (needActiveDsSet.contains(dsVar)) {
-                LogicalVariable channelTimeVar = context.newVar();
-
-                ILogicalExpression activeTsFunc =
-                        new ScalarFunctionCallExpression(FunctionUtil.getFunctionInfo(BuiltinFunctions.META),
-                                new MutableObject<>(new VariableReferenceExpression(dsVar)));
-                ScalarFunctionCallExpression faExpr = new ScalarFunctionCallExpression(
-                        FunctionUtil.getFunctionInfo(BuiltinFunctions.FIELD_ACCESS_BY_NAME),
-                        new MutableObject<>(activeTsFunc), new MutableObject<>(new ConstantExpression(
-                                new AsterixConstantValue(new AString(BADConstants.FIELD_NAME_ACTIVE_TS)))));
-                AssignOperator assignOp = new AssignOperator(channelTimeVar, new MutableObject<>(faExpr));
-                assignOp.getInputs().add(new MutableObject<>(opRef.getValue()));
-                opRef.setValue(assignOp);
-                activeMap.put(dsVar, channelTimeVar);
-            }
         }
         for (Mutable<ILogicalOperator> input : currOp.getInputs()) {
-            createChannelTimeAssignOps(input, needPrevDsSet, needCurrDsSet, needActiveDsSet, prevMap, currMap,
-                    activeMap, context);
+            createChannelTimeAssignOps(input, needPrevDsSet, needCurrDsSet, prevMap, currMap, context, channelName);
         }
     }
 
-    private void collectChannelTimeFunctions(Mutable<ILogicalExpression> exprRef,
+    private boolean collectChannelTimeFunctions(Mutable<ILogicalExpression> exprRef,
             Set<Mutable<ILogicalExpression>> activeFunctionSet, Set<LogicalVariable> needPrevDsSet,
-            Set<LogicalVariable> needCurrDsSet, Set<LogicalVariable> needActive) {
+            Set<LogicalVariable> needCurrDsSet, boolean disjunctiveFlag, String channelName) {
+        boolean rewriteFunc = false;
         if (exprRef.getValue().getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) {
             AbstractFunctionCallExpression funcExpr = (AbstractFunctionCallExpression) exprRef.getValue();
-            if (funcExpr.getFunctionIdentifier() == BADFunctions.IS_NEW
-                    || funcExpr.getFunctionIdentifier() == BADFunctions.PREVIOUS_CHANNEL_TIME
+            if (funcExpr.getFunctionIdentifier().equals(BuiltinFunctions.OR)) {
+                disjunctiveFlag = false;
+            }
+            if (funcExpr.getFunctionIdentifier() == BADFunctions.PREVIOUS_CHANNEL_TIME
                     || funcExpr.getFunctionIdentifier() == BADFunctions.CURRENT_CHANNEL_TIME) {
-                // add to active func set for later replacement
-                activeFunctionSet.add(exprRef);
                 // collect ds var to see what assign op needs to be added
-                LogicalVariable dsVar = ((VariableReferenceExpression) funcExpr.getArguments().get(0).getValue())
-                        .getVariableReference();
-                if (funcExpr.getFunctionIdentifier() == BADFunctions.PREVIOUS_CHANNEL_TIME) {
-                    needPrevDsSet.add(dsVar);
-                } else if (funcExpr.getFunctionIdentifier() == BADFunctions.CURRENT_CHANNEL_TIME) {
-                    needCurrDsSet.add(dsVar);
+                LogicalExpressionTag arg0ExprTag = funcExpr.getArguments().get(0).getValue().getExpressionTag();
+                if (arg0ExprTag == LogicalExpressionTag.CONSTANT) {
+                    return false;
+                }
+                if (!disjunctiveFlag) {
+                    LogicalVariable dsVar = ((VariableReferenceExpression) funcExpr.getArguments().get(0).getValue())
+                            .getVariableReference();
+                    activeFunctionSet.add(exprRef);
+                    if (funcExpr.getFunctionIdentifier() == BADFunctions.PREVIOUS_CHANNEL_TIME) {
+                        needPrevDsSet.add(dsVar);
+                    } else if (funcExpr.getFunctionIdentifier() == BADFunctions.CURRENT_CHANNEL_TIME) {
+                        needCurrDsSet.add(dsVar);
+                    }
                 } else {
-                    needActive.add(dsVar);
+                    // if disjunctive, modify to get ts and wait for introduce filter rule to work
+                    funcExpr.getArguments().set(0, new MutableObject<>(
+                            new ConstantExpression(new AsterixConstantValue(new AString(channelName)))));
+                    rewriteFunc = true;
                 }
             } else {
                 for (Mutable<ILogicalExpression> argExpr : funcExpr.getArguments()) {
-                    collectChannelTimeFunctions(argExpr, activeFunctionSet, needPrevDsSet, needCurrDsSet, needActive);
+                    rewriteFunc = rewriteFunc || collectChannelTimeFunctions(argExpr, activeFunctionSet, needPrevDsSet,
+                            needCurrDsSet, disjunctiveFlag, channelName);
                 }
             }
         }
+        return rewriteFunc;
     }
 
 }
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/ActiveTimestampManager.java b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/ActiveTimestampManager.java
index adbbd03..0a24a72 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/ActiveTimestampManager.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/ActiveTimestampManager.java
@@ -1,22 +1,20 @@
 /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- *  * Licensed to the Apache Software Foundation (ASF) under one
- *  * or more contributor license agreements. See the NOTICE file
- *  * distributed with this work for additional information
- *  * regarding copyright ownership. The ASF licenses this file
- *  * to you under the Apache License, Version 2.0 (the
- *  * "License"); you may not use this file except in compliance
- *  * with the License. You may obtain a copy of the License at
- *  *
- *  * http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing,
- *  * software distributed under the License is distributed on an
- *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  * KIND, either express or implied. See the License for the
- *  * specific language governing permissions and limitations
- *  * under the License.
+ *   http://www.apache.org/licenses/LICENSE-2.0
  *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 package org.apache.asterix.bad.runtime;
 
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/ActiveTimestampState.java b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/ActiveTimestampState.java
index ad7e424..9067073 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/ActiveTimestampState.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/ActiveTimestampState.java
@@ -1,22 +1,20 @@
 /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- *  * Licensed to the Apache Software Foundation (ASF) under one
- *  * or more contributor license agreements. See the NOTICE file
- *  * distributed with this work for additional information
- *  * regarding copyright ownership. The ASF licenses this file
- *  * to you under the Apache License, Version 2.0 (the
- *  * "License"); you may not use this file except in compliance
- *  * with the License. You may obtain a copy of the License at
- *  *
- *  * http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing,
- *  * software distributed under the License is distributed on an
- *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  * KIND, either express or implied. See the License for the
- *  * specific language governing permissions and limitations
- *  * under the License.
+ *   http://www.apache.org/licenses/LICENSE-2.0
  *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 package org.apache.asterix.bad.runtime;
 
diff --git a/asterix-bad/src/main/resources/lang-extension/lang.txt b/asterix-bad/src/main/resources/lang-extension/lang.txt
index 49f0cb4..f7eba70 100644
--- a/asterix-bad/src/main/resources/lang-extension/lang.txt
+++ b/asterix-bad/src/main/resources/lang-extension/lang.txt
@@ -144,12 +144,12 @@
     ( <HINTS> hints = Properties() )?
     ( <WITH> withRecord = RecordConstructor() )?
       {
-        // TODO: add filters on meta records
+        try{
         InternalDetailsDecl idd = new InternalDetailsDecl(primaryKeyFields.second,
                                                           primaryKeyFields.first,
                                                           autogenerated,
-                                                          null);
-        try{
+                                                          1,
+                                                          Collections.singletonList("_active_timestamp"));
         stmt = new DatasetDecl(nameComponents.first,
                                    nameComponents.second,
                                    datasetTypeExpr,
diff --git a/asterix-bad/src/test/resources/optimizerts/queries/bad_cq/new_tweets_active_ts.sqlpp b/asterix-bad/src/test/resources/optimizerts/queries/bad_cq/new_tweets_active_ts.sqlpp
new file mode 100644
index 0000000..86b0f28
--- /dev/null
+++ b/asterix-bad/src/test/resources/optimizerts/queries/bad_cq/new_tweets_active_ts.sqlpp
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+DROP DATAVERSE test IF EXISTS;
+CREATE DATAVERSE test;
+USE test;
+
+CREATE TYPE Tweet AS OPEN {
+ tid: bigint,
+ area_code: string,
+ text: string,
+ location: point,
+ timestamp: datetime
+};
+
+
+CREATE ACTIVE DATASET Tweets(Tweet) PRIMARY KEY tid;
+
+write output to nc1:"rttest/new_tweets.sqlpp";
+set `compiler.indexonly` "false";
+
+CREATE CONTINUOUS CHANNEL NearbyTweets(oid) PERIOD duration("PT3S")
+{
+ SELECT t FROM Tweets t WHERE active_timestamp(t) > previous_channel_time(t) AND active_timestamp(t) < current_channel_time(t)
+};
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/optimizerts/queries/bad_cq/new_tweets_meta_and.sqlpp b/asterix-bad/src/test/resources/optimizerts/queries/bad_cq/new_tweets_meta_and.sqlpp
new file mode 100644
index 0000000..27184bf
--- /dev/null
+++ b/asterix-bad/src/test/resources/optimizerts/queries/bad_cq/new_tweets_meta_and.sqlpp
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+DROP DATAVERSE test IF EXISTS;
+CREATE DATAVERSE test;
+USE test;
+
+CREATE TYPE Tweet AS OPEN {
+ tid: bigint,
+ area_code: string,
+ text: string,
+ location: point,
+ timestamp: datetime
+};
+
+
+CREATE ACTIVE DATASET Tweets(Tweet) PRIMARY KEY tid;
+
+write output to nc1:"rttest/new_tweets.sqlpp";
+set `compiler.indexonly` "false";
+
+CREATE CONTINUOUS CHANNEL NearbyTweets(oid) PERIOD duration("PT3S")
+{
+ SELECT t FROM Tweets t WHERE meta(t).`_active_timestamp` > current_channel_time(t)
+ AND meta(t).`_active_timestamp` < previous_channel_time(t)
+};
diff --git a/asterix-bad/src/test/resources/optimizerts/queries/bad_cq/new_tweets_meta_or.sqlpp b/asterix-bad/src/test/resources/optimizerts/queries/bad_cq/new_tweets_meta_or.sqlpp
new file mode 100644
index 0000000..311caf0
--- /dev/null
+++ b/asterix-bad/src/test/resources/optimizerts/queries/bad_cq/new_tweets_meta_or.sqlpp
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+DROP DATAVERSE test IF EXISTS;
+CREATE DATAVERSE test;
+USE test;
+
+CREATE TYPE Tweet AS OPEN {
+ tid: bigint,
+ area_code: string,
+ text: string,
+ location: point,
+ timestamp: datetime
+};
+
+
+CREATE ACTIVE DATASET Tweets(Tweet) PRIMARY KEY tid;
+
+write output to nc1:"rttest/new_tweets.sqlpp";
+set `compiler.indexonly` "false";
+
+CREATE CONTINUOUS CHANNEL NearbyTweets(oid) PERIOD duration("PT3S")
+{
+ SELECT t FROM Tweets t WHERE meta(t).`_active_timestamp` > current_channel_time(t)
+ OR meta(t).`_active_timestamp` < previous_channel_time(t)
+};
diff --git a/asterix-bad/src/test/resources/optimizerts/results/bad_cq/new_nearby_tweets.plan b/asterix-bad/src/test/resources/optimizerts/results/bad_cq/new_nearby_tweets.plan
index e4c1ec7..617ce18 100644
--- a/asterix-bad/src/test/resources/optimizerts/results/bad_cq/new_nearby_tweets.plan
+++ b/asterix-bad/src/test/resources/optimizerts/results/bad_cq/new_nearby_tweets.plan
@@ -1,13 +1,13 @@
 -- NOTIFY_BROKERS  |PARTITIONED|
   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-    -- PRE_CLUSTERED_GROUP_BY[$$120, $$121, $$channelExecutionTime]  |PARTITIONED|
+    -- PRE_CLUSTERED_GROUP_BY[$$118, $$119, $$channelExecutionTime]  |PARTITIONED|
             {
               -- AGGREGATE  |LOCAL|
                 -- NESTED_TUPLE_SOURCE  |LOCAL|
             }
       -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-        -- STABLE_SORT [$$120(ASC), $$121(ASC), $$channelExecutionTime(ASC)]  |PARTITIONED|
-          -- HASH_PARTITION_EXCHANGE [$$120, $$121, $$channelExecutionTime]  |PARTITIONED|
+        -- STABLE_SORT [$$118(ASC), $$119(ASC), $$channelExecutionTime(ASC)]  |PARTITIONED|
+          -- HASH_PARTITION_EXCHANGE [$$118, $$119, $$channelExecutionTime]  |PARTITIONED|
             -- PRE_SORTED_DISTINCT_BY  |PARTITIONED|
               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                 -- STABLE_SORT [$$97(ASC)]  |PARTITIONED|
@@ -58,16 +58,14 @@
                                                                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                                       -- STREAM_PROJECT  |PARTITIONED|
                                                                         -- STREAM_SELECT  |PARTITIONED|
-                                                                          -- ASSIGN  |PARTITIONED|
+                                                                          -- STREAM_PROJECT  |PARTITIONED|
                                                                             -- ASSIGN  |PARTITIONED|
-                                                                              -- ASSIGN  |PARTITIONED|
-                                                                                -- STREAM_PROJECT  |PARTITIONED|
-                                                                                  -- ASSIGN  |PARTITIONED|
-                                                                                    -- STREAM_PROJECT  |PARTITIONED|
-                                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                        -- DATASOURCE_SCAN  |PARTITIONED|
-                                                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                            -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                                              -- STREAM_PROJECT  |PARTITIONED|
+                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                  -- DATASOURCE_SCAN  |PARTITIONED|
+                                                                                    -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                                                                      -- ASSIGN  |PARTITIONED|
+                                                                                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
                                                                     -- BROADCAST_EXCHANGE  |PARTITIONED|
                                                                       -- STREAM_PROJECT  |PARTITIONED|
                                                                         -- ASSIGN  |PARTITIONED|
diff --git a/asterix-bad/src/test/resources/optimizerts/results/bad_cq/new_tweets.plan b/asterix-bad/src/test/resources/optimizerts/results/bad_cq/new_tweets.plan
index ebf5624..6134bd4 100644
--- a/asterix-bad/src/test/resources/optimizerts/results/bad_cq/new_tweets.plan
+++ b/asterix-bad/src/test/resources/optimizerts/results/bad_cq/new_tweets.plan
@@ -1,13 +1,13 @@
 -- NOTIFY_BROKERS  |PARTITIONED|
   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-    -- PRE_CLUSTERED_GROUP_BY[$$93, $$94, $$channelExecutionTime]  |PARTITIONED|
+    -- PRE_CLUSTERED_GROUP_BY[$$91, $$92, $$channelExecutionTime]  |PARTITIONED|
             {
               -- AGGREGATE  |LOCAL|
                 -- NESTED_TUPLE_SOURCE  |LOCAL|
             }
       -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-        -- STABLE_SORT [$$93(ASC), $$94(ASC), $$channelExecutionTime(ASC)]  |PARTITIONED|
-          -- HASH_PARTITION_EXCHANGE [$$93, $$94, $$channelExecutionTime]  |PARTITIONED|
+        -- STABLE_SORT [$$91(ASC), $$92(ASC), $$channelExecutionTime(ASC)]  |PARTITIONED|
+          -- HASH_PARTITION_EXCHANGE [$$91, $$92, $$channelExecutionTime]  |PARTITIONED|
             -- PRE_SORTED_DISTINCT_BY  |PARTITIONED|
               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                 -- STABLE_SORT [$$79(ASC)]  |PARTITIONED|
@@ -55,12 +55,11 @@
                                                               -- ASSIGN  |PARTITIONED|
                                                                 -- STREAM_PROJECT  |PARTITIONED|
                                                                   -- STREAM_SELECT  |PARTITIONED|
-                                                                    -- ASSIGN  |PARTITIONED|
+                                                                    -- STREAM_PROJECT  |PARTITIONED|
                                                                       -- ASSIGN  |PARTITIONED|
                                                                         -- STREAM_PROJECT  |PARTITIONED|
-                                                                          -- ASSIGN  |PARTITIONED|
-                                                                            -- STREAM_PROJECT  |PARTITIONED|
-                                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                -- DATASOURCE_SCAN  |PARTITIONED|
-                                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                            -- DATASOURCE_SCAN  |PARTITIONED|
+                                                                              -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                                                                -- ASSIGN  |PARTITIONED|
+                                                                                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterix-bad/src/test/resources/optimizerts/results/bad_cq/new_tweets_active_ts.plan b/asterix-bad/src/test/resources/optimizerts/results/bad_cq/new_tweets_active_ts.plan
new file mode 100644
index 0000000..7ac1e96
--- /dev/null
+++ b/asterix-bad/src/test/resources/optimizerts/results/bad_cq/new_tweets_active_ts.plan
@@ -0,0 +1,65 @@
+-- NOTIFY_BROKERS  |PARTITIONED|
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    -- PRE_CLUSTERED_GROUP_BY[$$96, $$97, $$channelExecutionTime]  |PARTITIONED|
+            {
+              -- AGGREGATE  |LOCAL|
+                -- NESTED_TUPLE_SOURCE  |LOCAL|
+            }
+      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+        -- STABLE_SORT [$$96(ASC), $$97(ASC), $$channelExecutionTime(ASC)]  |PARTITIONED|
+          -- HASH_PARTITION_EXCHANGE [$$96, $$97, $$channelExecutionTime]  |PARTITIONED|
+            -- PRE_SORTED_DISTINCT_BY  |PARTITIONED|
+              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                -- STABLE_SORT [$$83(ASC)]  |PARTITIONED|
+                  -- HASH_PARTITION_EXCHANGE [$$83]  |PARTITIONED|
+                    -- STREAM_PROJECT  |PARTITIONED|
+                      -- COMMIT  |PARTITIONED|
+                        -- STREAM_PROJECT  |PARTITIONED|
+                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                            -- INDEX_INSERT_DELETE  |PARTITIONED|
+                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                -- STREAM_PROJECT  |PARTITIONED|
+                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                    -- INSERT_DELETE  |PARTITIONED|
+                                      -- HASH_PARTITION_EXCHANGE [$$77]  |PARTITIONED|
+                                        -- ASSIGN  |PARTITIONED|
+                                          -- STREAM_PROJECT  |PARTITIONED|
+                                            -- ASSIGN  |PARTITIONED|
+                                              -- STREAM_PROJECT  |PARTITIONED|
+                                                -- ASSIGN  |PARTITIONED|
+                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                    -- ASSIGN  |PARTITIONED|
+                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                        -- NESTED_LOOP  |PARTITIONED|
+                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                            -- STREAM_PROJECT  |PARTITIONED|
+                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                -- HYBRID_HASH_JOIN [$$90, $$88][$$84, $$85]  |PARTITIONED|
+                                                                  -- HASH_PARTITION_EXCHANGE [$$90, $$88]  |PARTITIONED|
+                                                                    -- STREAM_PROJECT  |PARTITIONED|
+                                                                      -- ASSIGN  |PARTITIONED|
+                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                          -- DATASOURCE_SCAN  |PARTITIONED|
+                                                                            -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                                                              -- ASSIGN  |UNPARTITIONED|
+                                                                                -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
+                                                                  -- HASH_PARTITION_EXCHANGE [$$84, $$85]  |PARTITIONED|
+                                                                    -- STREAM_PROJECT  |PARTITIONED|
+                                                                      -- ASSIGN  |PARTITIONED|
+                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                          -- DATASOURCE_SCAN  |PARTITIONED|
+                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                              -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                          -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                                            -- STREAM_PROJECT  |PARTITIONED|
+                                                              -- ASSIGN  |PARTITIONED|
+                                                                -- STREAM_PROJECT  |PARTITIONED|
+                                                                  -- STREAM_SELECT  |PARTITIONED|
+                                                                    -- STREAM_PROJECT  |PARTITIONED|
+                                                                      -- ASSIGN  |PARTITIONED|
+                                                                        -- STREAM_PROJECT  |PARTITIONED|
+                                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                            -- DATASOURCE_SCAN  |PARTITIONED|
+                                                                              -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                                                                -- ASSIGN  |PARTITIONED|
+                                                                                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterix-bad/src/test/resources/optimizerts/results/bad_cq/new_tweets_meta_active_ts.plan b/asterix-bad/src/test/resources/optimizerts/results/bad_cq/new_tweets_meta_active_ts.plan
new file mode 100644
index 0000000..2bcef50
--- /dev/null
+++ b/asterix-bad/src/test/resources/optimizerts/results/bad_cq/new_tweets_meta_active_ts.plan
@@ -0,0 +1,65 @@
+-- NOTIFY_BROKERS  |PARTITIONED|
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    -- PRE_CLUSTERED_GROUP_BY[$$95, $$96, $$channelExecutionTime]  |PARTITIONED|
+            {
+              -- AGGREGATE  |LOCAL|
+                -- NESTED_TUPLE_SOURCE  |LOCAL|
+            }
+      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+        -- STABLE_SORT [$$95(ASC), $$96(ASC), $$channelExecutionTime(ASC)]  |PARTITIONED|
+          -- HASH_PARTITION_EXCHANGE [$$95, $$96, $$channelExecutionTime]  |PARTITIONED|
+            -- PRE_SORTED_DISTINCT_BY  |PARTITIONED|
+              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                -- STABLE_SORT [$$85(ASC)]  |PARTITIONED|
+                  -- HASH_PARTITION_EXCHANGE [$$85]  |PARTITIONED|
+                    -- STREAM_PROJECT  |PARTITIONED|
+                      -- COMMIT  |PARTITIONED|
+                        -- STREAM_PROJECT  |PARTITIONED|
+                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                            -- INDEX_INSERT_DELETE  |PARTITIONED|
+                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                -- STREAM_PROJECT  |PARTITIONED|
+                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                    -- INSERT_DELETE  |PARTITIONED|
+                                      -- HASH_PARTITION_EXCHANGE [$$79]  |PARTITIONED|
+                                        -- ASSIGN  |PARTITIONED|
+                                          -- STREAM_PROJECT  |PARTITIONED|
+                                            -- ASSIGN  |PARTITIONED|
+                                              -- STREAM_PROJECT  |PARTITIONED|
+                                                -- ASSIGN  |PARTITIONED|
+                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                    -- ASSIGN  |PARTITIONED|
+                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                        -- NESTED_LOOP  |PARTITIONED|
+                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                            -- STREAM_PROJECT  |PARTITIONED|
+                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                -- HYBRID_HASH_JOIN [$$92, $$90][$$86, $$87]  |PARTITIONED|
+                                                                  -- HASH_PARTITION_EXCHANGE [$$92, $$90]  |PARTITIONED|
+                                                                    -- STREAM_PROJECT  |PARTITIONED|
+                                                                      -- ASSIGN  |PARTITIONED|
+                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                          -- DATASOURCE_SCAN  |PARTITIONED|
+                                                                            -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                                                              -- ASSIGN  |UNPARTITIONED|
+                                                                                -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
+                                                                  -- HASH_PARTITION_EXCHANGE [$$86, $$87]  |PARTITIONED|
+                                                                    -- STREAM_PROJECT  |PARTITIONED|
+                                                                      -- ASSIGN  |PARTITIONED|
+                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                          -- DATASOURCE_SCAN  |PARTITIONED|
+                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                              -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                          -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                                            -- STREAM_PROJECT  |PARTITIONED|
+                                                              -- ASSIGN  |PARTITIONED|
+                                                                -- STREAM_PROJECT  |PARTITIONED|
+                                                                  -- STREAM_SELECT  |PARTITIONED|
+                                                                    -- STREAM_PROJECT  |PARTITIONED|
+                                                                      -- ASSIGN  |PARTITIONED|
+                                                                        -- STREAM_PROJECT  |PARTITIONED|
+                                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                            -- DATASOURCE_SCAN  |PARTITIONED|
+                                                                              -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                                                                -- ASSIGN  |PARTITIONED|
+                                                                                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterix-bad/src/test/resources/optimizerts/results/bad_cq/new_tweets_meta_and.plan b/asterix-bad/src/test/resources/optimizerts/results/bad_cq/new_tweets_meta_and.plan
new file mode 100644
index 0000000..2bcef50
--- /dev/null
+++ b/asterix-bad/src/test/resources/optimizerts/results/bad_cq/new_tweets_meta_and.plan
@@ -0,0 +1,65 @@
+-- NOTIFY_BROKERS  |PARTITIONED|
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    -- PRE_CLUSTERED_GROUP_BY[$$95, $$96, $$channelExecutionTime]  |PARTITIONED|
+            {
+              -- AGGREGATE  |LOCAL|
+                -- NESTED_TUPLE_SOURCE  |LOCAL|
+            }
+      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+        -- STABLE_SORT [$$95(ASC), $$96(ASC), $$channelExecutionTime(ASC)]  |PARTITIONED|
+          -- HASH_PARTITION_EXCHANGE [$$95, $$96, $$channelExecutionTime]  |PARTITIONED|
+            -- PRE_SORTED_DISTINCT_BY  |PARTITIONED|
+              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                -- STABLE_SORT [$$85(ASC)]  |PARTITIONED|
+                  -- HASH_PARTITION_EXCHANGE [$$85]  |PARTITIONED|
+                    -- STREAM_PROJECT  |PARTITIONED|
+                      -- COMMIT  |PARTITIONED|
+                        -- STREAM_PROJECT  |PARTITIONED|
+                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                            -- INDEX_INSERT_DELETE  |PARTITIONED|
+                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                -- STREAM_PROJECT  |PARTITIONED|
+                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                    -- INSERT_DELETE  |PARTITIONED|
+                                      -- HASH_PARTITION_EXCHANGE [$$79]  |PARTITIONED|
+                                        -- ASSIGN  |PARTITIONED|
+                                          -- STREAM_PROJECT  |PARTITIONED|
+                                            -- ASSIGN  |PARTITIONED|
+                                              -- STREAM_PROJECT  |PARTITIONED|
+                                                -- ASSIGN  |PARTITIONED|
+                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                    -- ASSIGN  |PARTITIONED|
+                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                        -- NESTED_LOOP  |PARTITIONED|
+                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                            -- STREAM_PROJECT  |PARTITIONED|
+                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                -- HYBRID_HASH_JOIN [$$92, $$90][$$86, $$87]  |PARTITIONED|
+                                                                  -- HASH_PARTITION_EXCHANGE [$$92, $$90]  |PARTITIONED|
+                                                                    -- STREAM_PROJECT  |PARTITIONED|
+                                                                      -- ASSIGN  |PARTITIONED|
+                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                          -- DATASOURCE_SCAN  |PARTITIONED|
+                                                                            -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                                                              -- ASSIGN  |UNPARTITIONED|
+                                                                                -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
+                                                                  -- HASH_PARTITION_EXCHANGE [$$86, $$87]  |PARTITIONED|
+                                                                    -- STREAM_PROJECT  |PARTITIONED|
+                                                                      -- ASSIGN  |PARTITIONED|
+                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                          -- DATASOURCE_SCAN  |PARTITIONED|
+                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                              -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                          -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                                            -- STREAM_PROJECT  |PARTITIONED|
+                                                              -- ASSIGN  |PARTITIONED|
+                                                                -- STREAM_PROJECT  |PARTITIONED|
+                                                                  -- STREAM_SELECT  |PARTITIONED|
+                                                                    -- STREAM_PROJECT  |PARTITIONED|
+                                                                      -- ASSIGN  |PARTITIONED|
+                                                                        -- STREAM_PROJECT  |PARTITIONED|
+                                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                            -- DATASOURCE_SCAN  |PARTITIONED|
+                                                                              -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                                                                -- ASSIGN  |PARTITIONED|
+                                                                                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterix-bad/src/test/resources/optimizerts/results/bad_cq/new_tweets_meta_or.plan b/asterix-bad/src/test/resources/optimizerts/results/bad_cq/new_tweets_meta_or.plan
new file mode 100644
index 0000000..7ed194c
--- /dev/null
+++ b/asterix-bad/src/test/resources/optimizerts/results/bad_cq/new_tweets_meta_or.plan
@@ -0,0 +1,66 @@
+-- NOTIFY_BROKERS  |PARTITIONED|
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    -- PRE_CLUSTERED_GROUP_BY[$$97, $$98, $$channelExecutionTime]  |PARTITIONED|
+            {
+              -- AGGREGATE  |LOCAL|
+                -- NESTED_TUPLE_SOURCE  |LOCAL|
+            }
+      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+        -- STABLE_SORT [$$97(ASC), $$98(ASC), $$channelExecutionTime(ASC)]  |PARTITIONED|
+          -- HASH_PARTITION_EXCHANGE [$$97, $$98, $$channelExecutionTime]  |PARTITIONED|
+            -- PRE_SORTED_DISTINCT_BY  |PARTITIONED|
+              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                -- STABLE_SORT [$$85(ASC)]  |PARTITIONED|
+                  -- HASH_PARTITION_EXCHANGE [$$85]  |PARTITIONED|
+                    -- STREAM_PROJECT  |PARTITIONED|
+                      -- COMMIT  |PARTITIONED|
+                        -- STREAM_PROJECT  |PARTITIONED|
+                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                            -- INDEX_INSERT_DELETE  |PARTITIONED|
+                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                -- STREAM_PROJECT  |PARTITIONED|
+                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                    -- INSERT_DELETE  |PARTITIONED|
+                                      -- HASH_PARTITION_EXCHANGE [$$79]  |PARTITIONED|
+                                        -- ASSIGN  |PARTITIONED|
+                                          -- STREAM_PROJECT  |PARTITIONED|
+                                            -- ASSIGN  |PARTITIONED|
+                                              -- STREAM_PROJECT  |PARTITIONED|
+                                                -- ASSIGN  |PARTITIONED|
+                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                    -- ASSIGN  |PARTITIONED|
+                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                        -- NESTED_LOOP  |PARTITIONED|
+                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                            -- STREAM_PROJECT  |PARTITIONED|
+                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                -- HYBRID_HASH_JOIN [$$92, $$90][$$86, $$87]  |PARTITIONED|
+                                                                  -- HASH_PARTITION_EXCHANGE [$$92, $$90]  |PARTITIONED|
+                                                                    -- STREAM_PROJECT  |PARTITIONED|
+                                                                      -- ASSIGN  |PARTITIONED|
+                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                          -- DATASOURCE_SCAN  |PARTITIONED|
+                                                                            -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                                                              -- ASSIGN  |UNPARTITIONED|
+                                                                                -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
+                                                                  -- HASH_PARTITION_EXCHANGE [$$86, $$87]  |PARTITIONED|
+                                                                    -- STREAM_PROJECT  |PARTITIONED|
+                                                                      -- ASSIGN  |PARTITIONED|
+                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                          -- DATASOURCE_SCAN  |PARTITIONED|
+                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                              -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                          -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                                            -- STREAM_PROJECT  |PARTITIONED|
+                                                              -- ASSIGN  |PARTITIONED|
+                                                                -- STREAM_PROJECT  |PARTITIONED|
+                                                                  -- STREAM_SELECT  |PARTITIONED|
+                                                                    -- STREAM_PROJECT  |PARTITIONED|
+                                                                      -- ASSIGN  |PARTITIONED|
+                                                                        -- ASSIGN  |PARTITIONED|
+                                                                          -- ASSIGN  |PARTITIONED|
+                                                                            -- STREAM_PROJECT  |PARTITIONED|
+                                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                -- DATASOURCE_SCAN  |PARTITIONED|
+                                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterix-bad/src/test/resources/optimizerts/results/bad_cq/new_tweets_push.plan b/asterix-bad/src/test/resources/optimizerts/results/bad_cq/new_tweets_push.plan
index ebf5624..6134bd4 100644
--- a/asterix-bad/src/test/resources/optimizerts/results/bad_cq/new_tweets_push.plan
+++ b/asterix-bad/src/test/resources/optimizerts/results/bad_cq/new_tweets_push.plan
@@ -1,13 +1,13 @@
 -- NOTIFY_BROKERS  |PARTITIONED|
   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-    -- PRE_CLUSTERED_GROUP_BY[$$93, $$94, $$channelExecutionTime]  |PARTITIONED|
+    -- PRE_CLUSTERED_GROUP_BY[$$91, $$92, $$channelExecutionTime]  |PARTITIONED|
             {
               -- AGGREGATE  |LOCAL|
                 -- NESTED_TUPLE_SOURCE  |LOCAL|
             }
       -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-        -- STABLE_SORT [$$93(ASC), $$94(ASC), $$channelExecutionTime(ASC)]  |PARTITIONED|
-          -- HASH_PARTITION_EXCHANGE [$$93, $$94, $$channelExecutionTime]  |PARTITIONED|
+        -- STABLE_SORT [$$91(ASC), $$92(ASC), $$channelExecutionTime(ASC)]  |PARTITIONED|
+          -- HASH_PARTITION_EXCHANGE [$$91, $$92, $$channelExecutionTime]  |PARTITIONED|
             -- PRE_SORTED_DISTINCT_BY  |PARTITIONED|
               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                 -- STABLE_SORT [$$79(ASC)]  |PARTITIONED|
@@ -55,12 +55,11 @@
                                                               -- ASSIGN  |PARTITIONED|
                                                                 -- STREAM_PROJECT  |PARTITIONED|
                                                                   -- STREAM_SELECT  |PARTITIONED|
-                                                                    -- ASSIGN  |PARTITIONED|
+                                                                    -- STREAM_PROJECT  |PARTITIONED|
                                                                       -- ASSIGN  |PARTITIONED|
                                                                         -- STREAM_PROJECT  |PARTITIONED|
-                                                                          -- ASSIGN  |PARTITIONED|
-                                                                            -- STREAM_PROJECT  |PARTITIONED|
-                                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                -- DATASOURCE_SCAN  |PARTITIONED|
-                                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                            -- DATASOURCE_SCAN  |PARTITIONED|
+                                                                              -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                                                                -- ASSIGN  |PARTITIONED|
+                                                                                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterix-bad/src/test/resources/optimizerts/results/bad_cq/unseen_nearby_tweets.plan b/asterix-bad/src/test/resources/optimizerts/results/bad_cq/unseen_nearby_tweets.plan
index 5bf7f10..a6a17de 100644
--- a/asterix-bad/src/test/resources/optimizerts/results/bad_cq/unseen_nearby_tweets.plan
+++ b/asterix-bad/src/test/resources/optimizerts/results/bad_cq/unseen_nearby_tweets.plan
@@ -1,13 +1,13 @@
 -- NOTIFY_BROKERS  |PARTITIONED|
   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-    -- PRE_CLUSTERED_GROUP_BY[$$125, $$126, $$channelExecutionTime]  |PARTITIONED|
+    -- PRE_CLUSTERED_GROUP_BY[$$128, $$129, $$channelExecutionTime]  |PARTITIONED|
             {
               -- AGGREGATE  |LOCAL|
                 -- NESTED_TUPLE_SOURCE  |LOCAL|
             }
       -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-        -- STABLE_SORT [$$125(ASC), $$126(ASC), $$channelExecutionTime(ASC)]  |PARTITIONED|
-          -- HASH_PARTITION_EXCHANGE [$$125, $$126, $$channelExecutionTime]  |PARTITIONED|
+        -- STABLE_SORT [$$128(ASC), $$129(ASC), $$channelExecutionTime(ASC)]  |PARTITIONED|
+          -- HASH_PARTITION_EXCHANGE [$$128, $$129, $$channelExecutionTime]  |PARTITIONED|
             -- PRE_SORTED_DISTINCT_BY  |PARTITIONED|
               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                 -- STABLE_SORT [$$99(ASC)]  |PARTITIONED|
diff --git a/asterix-bad/src/test/resources/optimizerts/results/bad_rq/channel-subscribe.plan b/asterix-bad/src/test/resources/optimizerts/results/bad_rq/channel-subscribe.plan
index a5a40c1..017bd9e 100644
--- a/asterix-bad/src/test/resources/optimizerts/results/bad_rq/channel-subscribe.plan
+++ b/asterix-bad/src/test/resources/optimizerts/results/bad_rq/channel-subscribe.plan
@@ -1,17 +1,17 @@
 -- NOTIFY_BROKERS  |PARTITIONED|
   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-    -- PRE_CLUSTERED_GROUP_BY[$$100, $$101, $$channelExecutionTime]  |PARTITIONED|
+    -- PRE_CLUSTERED_GROUP_BY[$$106, $$107, $$channelExecutionTime]  |PARTITIONED|
             {
               -- AGGREGATE  |LOCAL|
                 -- NESTED_TUPLE_SOURCE  |LOCAL|
             }
       -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-        -- STABLE_SORT [$$100(ASC), $$101(ASC), $$channelExecutionTime(ASC)]  |PARTITIONED|
-          -- HASH_PARTITION_EXCHANGE [$$100, $$101, $$channelExecutionTime]  |PARTITIONED|
+        -- STABLE_SORT [$$106(ASC), $$107(ASC), $$channelExecutionTime(ASC)]  |PARTITIONED|
+          -- HASH_PARTITION_EXCHANGE [$$106, $$107, $$channelExecutionTime]  |PARTITIONED|
             -- PRE_SORTED_DISTINCT_BY  |PARTITIONED|
               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                -- STABLE_SORT [$$85(ASC)]  |PARTITIONED|
-                  -- HASH_PARTITION_EXCHANGE [$$85]  |PARTITIONED|
+                -- STABLE_SORT [$$91(ASC)]  |PARTITIONED|
+                  -- HASH_PARTITION_EXCHANGE [$$91]  |PARTITIONED|
                     -- STREAM_PROJECT  |PARTITIONED|
                       -- COMMIT  |PARTITIONED|
                         -- STREAM_PROJECT  |PARTITIONED|
@@ -21,7 +21,7 @@
                                 -- STREAM_PROJECT  |PARTITIONED|
                                   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                     -- INSERT_DELETE  |PARTITIONED|
-                                      -- HASH_PARTITION_EXCHANGE [$$79]  |PARTITIONED|
+                                      -- HASH_PARTITION_EXCHANGE [$$85]  |PARTITIONED|
                                         -- ASSIGN  |PARTITIONED|
                                           -- STREAM_PROJECT  |PARTITIONED|
                                             -- ASSIGN  |PARTITIONED|
@@ -35,8 +35,8 @@
                                                             -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                               -- STREAM_PROJECT  |PARTITIONED|
                                                                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                  -- HYBRID_HASH_JOIN [$$91, $$89][$$86, $$87]  |PARTITIONED|
-                                                                    -- HASH_PARTITION_EXCHANGE [$$91, $$89]  |PARTITIONED|
+                                                                  -- HYBRID_HASH_JOIN [$$97, $$95][$$92, $$93]  |PARTITIONED|
+                                                                    -- HASH_PARTITION_EXCHANGE [$$97, $$95]  |PARTITIONED|
                                                                       -- STREAM_PROJECT  |PARTITIONED|
                                                                         -- ASSIGN  |PARTITIONED|
                                                                           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
@@ -44,7 +44,7 @@
                                                                               -- BROADCAST_EXCHANGE  |PARTITIONED|
                                                                                 -- ASSIGN  |UNPARTITIONED|
                                                                                   -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
-                                                                    -- HASH_PARTITION_EXCHANGE [$$86, $$87]  |PARTITIONED|
+                                                                    -- HASH_PARTITION_EXCHANGE [$$92, $$93]  |PARTITIONED|
                                                                       -- STREAM_PROJECT  |PARTITIONED|
                                                                         -- ASSIGN  |PARTITIONED|
                                                                           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
@@ -65,7 +65,7 @@
       -- STREAM_PROJECT  |PARTITIONED|
         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
           -- INSERT_DELETE  |PARTITIONED|
-            -- HASH_PARTITION_EXCHANGE [$$13]  |PARTITIONED|
+            -- HASH_PARTITION_EXCHANGE [$$14]  |PARTITIONED|
               -- ASSIGN  |UNPARTITIONED|
                 -- STREAM_PROJECT  |UNPARTITIONED|
                   -- ASSIGN  |UNPARTITIONED|
diff --git a/asterix-bad/src/test/resources/runtimets/queries/bad_rq/add_index/add_index.5.query.sqlpp b/asterix-bad/src/test/resources/runtimets/queries/bad_rq/add_index/add_index.5.query.sqlpp
index eebfbc4..f5b5a03 100644
--- a/asterix-bad/src/test/resources/runtimets/queries/bad_rq/add_index/add_index.5.query.sqlpp
+++ b/asterix-bad/src/test/resources/runtimets/queries/bad_rq/add_index/add_index.5.query.sqlpp
@@ -19,4 +19,4 @@
 use channels;
 
 select value array_count(
-(select * from EmergencyChannelResults where deliveryTime > datetime("2017-05-02T17:52:59.570Z")));
\ No newline at end of file
+(select * from EmergencyChannelResults where deliveryTime > datetime("2017-05-02T17:52:59.570Z"))) > 1000;
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/runtimets/results/bad_rq/add_index/add_index.1.adm b/asterix-bad/src/test/resources/runtimets/results/bad_rq/add_index/add_index.1.adm
index 2e9bdd9..f32a580 100644
--- a/asterix-bad/src/test/resources/runtimets/results/bad_rq/add_index/add_index.1.adm
+++ b/asterix-bad/src/test/resources/runtimets/results/bad_rq/add_index/add_index.1.adm
@@ -1 +1 @@
-1074
\ No newline at end of file
+true
\ No newline at end of file