Enabling Filters on Active Datasets
Change-Id: I499b1d9a1525ba5ea9396d21118c6f6cf3e870d2
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java b/asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java
index 391d84e..7178db1 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java
@@ -1,18 +1,18 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
+ * or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
+ * regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
+ * with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
+ * KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/extension/BADMetadataExtension.java b/asterix-bad/src/main/java/org/apache/asterix/bad/extension/BADMetadataExtension.java
index d0639e2..28f7a79 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/extension/BADMetadataExtension.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/extension/BADMetadataExtension.java
@@ -135,6 +135,7 @@
BuiltinFunctions.addFunction(BADFunctions.CURRENT_CHANNEL_TIME, ADateTimeTypeComputer.INSTANCE, false);
BuiltinFunctions.addFunction(BADFunctions.PREVIOUS_CHANNEL_TIME, ADateTimeTypeComputer.INSTANCE, false);
BuiltinFunctions.addFunction(BADFunctions.IS_NEW, ABooleanTypeComputer.INSTANCE, true);
+ BuiltinFunctions.addFunction(BADFunctions.ACTIVE_TIMESTAMP, ADateTimeTypeComputer.INSTANCE, true);
// to shadow the master feed rewriter
BuiltinFunctions.addPrivateFunction(BuiltinFunctions.FEED_COLLECT, BADFeedRewriter.INSTANCE, true);
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/feed/operators/BADLSMPrimaryInsertOperatorNodePushable.java b/asterix-bad/src/main/java/org/apache/asterix/bad/feed/operators/BADLSMPrimaryInsertOperatorNodePushable.java
index 813fd0b..3239584 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/feed/operators/BADLSMPrimaryInsertOperatorNodePushable.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/feed/operators/BADLSMPrimaryInsertOperatorNodePushable.java
@@ -44,13 +44,15 @@
@Override
protected void beforeModification(ITupleReference tuple) {
- if ((tuple.getFieldCount() == 3
- && tuple.getFieldData(0)[tuple.getFieldStart(2)] == ATypeTag.SERIALIZED_RECORD_TYPE_TAG)
- || (tuple.getFieldCount() == 4
- && tuple.getFieldData(0)[tuple.getFieldStart(2)] == ATypeTag.SERIALIZED_RECORD_TYPE_TAG)) {
- int targetIdx = tuple.getFieldStart(2) + 14;
+ if (tuple.getFieldCount() >= 3
+ && tuple.getFieldData(0)[tuple.getFieldStart(2)] == ATypeTag.SERIALIZED_RECORD_TYPE_TAG
+ && tuple.getFieldLength(2) == 22) {
+ long currMilli = System.currentTimeMillis();
ByteBuffer tupleBuff = ByteBuffer.wrap(tuple.getFieldData(0));
- tupleBuff.putLong(targetIdx, System.currentTimeMillis());
+ tupleBuff.putLong(tuple.getFieldStart(2) + 14, currMilli);
+ if (tuple.getFieldCount() == 4) {
+ tupleBuff.putLong(tuple.getFieldStart(3) + 1, currMilli);
+ }
}
}
}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/feed/operators/BADLSMPrimaryUpsertOperatorDescriptor.java b/asterix-bad/src/main/java/org/apache/asterix/bad/feed/operators/BADLSMPrimaryUpsertOperatorDescriptor.java
index 46f9bc8..0f1ce21 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/feed/operators/BADLSMPrimaryUpsertOperatorDescriptor.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/feed/operators/BADLSMPrimaryUpsertOperatorDescriptor.java
@@ -37,11 +37,11 @@
IMissingWriterFactory missingWriterFactory,
IModificationOperationCallbackFactory modificationOpCallbackFactory,
ISearchOperationCallbackFactory searchOpCallbackFactory,
- IFrameOperationCallbackFactory frameOpCallbackFactory, int numPrimaryKeys, ARecordType recordType,
- int filterIndex, boolean hasSecondaries) {
+ IFrameOperationCallbackFactory frameOpCallbackFactory, int numPrimaryKeys, Integer filterSourceIndicator,
+ ARecordType itemType, int filterIndex, boolean hasSecondaries) {
super(spec, outRecDesc, fieldPermutation, indexHelperFactory, missingWriterFactory,
modificationOpCallbackFactory, searchOpCallbackFactory, frameOpCallbackFactory, numPrimaryKeys,
- recordType, filterIndex, hasSecondaries);
+ filterSourceIndicator, itemType, filterIndex, hasSecondaries);
}
@Override
@@ -49,7 +49,7 @@
IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
RecordDescriptor intputRecDesc = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
return new BADLSMPrimaryUpsertOperatorNodePushable(ctx, partition, indexHelperFactory, fieldPermutation,
- intputRecDesc, modCallbackFactory, searchOpCallbackFactory, numPrimaryKeys, recordType, filterIndex,
- frameOpCallbackFactory, missingWriterFactory, hasSecondaries);
+ intputRecDesc, modCallbackFactory, searchOpCallbackFactory, numPrimaryKeys, filterSourceIndicator,
+ filterItemType, filterIndex, frameOpCallbackFactory, missingWriterFactory, hasSecondaries);
}
}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/feed/operators/BADLSMPrimaryUpsertOperatorNodePushable.java b/asterix-bad/src/main/java/org/apache/asterix/bad/feed/operators/BADLSMPrimaryUpsertOperatorNodePushable.java
index b0c79d5..fc5ccde 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/feed/operators/BADLSMPrimaryUpsertOperatorNodePushable.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/feed/operators/BADLSMPrimaryUpsertOperatorNodePushable.java
@@ -36,23 +36,25 @@
public BADLSMPrimaryUpsertOperatorNodePushable(IHyracksTaskContext ctx, int partition,
IIndexDataflowHelperFactory indexHelperFactory, int[] fieldPermutation, RecordDescriptor inputRecDesc,
IModificationOperationCallbackFactory modCallbackFactory,
- ISearchOperationCallbackFactory searchCallbackFactory, int numOfPrimaryKeys, ARecordType recordType,
- int filterFieldIndex, IFrameOperationCallbackFactory frameOpCallbackFactory,
+ ISearchOperationCallbackFactory searchCallbackFactory, int numOfPrimaryKeys, Integer filterSourceIndicator,
+ ARecordType recordType, int filterFieldIndex, IFrameOperationCallbackFactory frameOpCallbackFactory,
IMissingWriterFactory missingWriterFactory, boolean hasSecondaries) throws HyracksDataException {
super(ctx, partition, indexHelperFactory, fieldPermutation, inputRecDesc, modCallbackFactory,
- searchCallbackFactory, numOfPrimaryKeys, recordType, filterFieldIndex, frameOpCallbackFactory,
- missingWriterFactory, hasSecondaries);
+ searchCallbackFactory, numOfPrimaryKeys, filterSourceIndicator, recordType, filterFieldIndex,
+ frameOpCallbackFactory, missingWriterFactory, hasSecondaries);
}
@Override
protected void beforeModification(ITupleReference tuple) {
- if ((tuple.getFieldCount() == 3
- && tuple.getFieldData(0)[tuple.getFieldStart(2)] == ATypeTag.SERIALIZED_RECORD_TYPE_TAG)
- || (tuple.getFieldCount() == 4
- && tuple.getFieldData(0)[tuple.getFieldStart(2)] == ATypeTag.SERIALIZED_RECORD_TYPE_TAG)) {
- int targetIdx = tuple.getFieldStart(2) + 14;
+ if (tuple.getFieldCount() >= 3
+ && tuple.getFieldData(0)[tuple.getFieldStart(2)] == ATypeTag.SERIALIZED_RECORD_TYPE_TAG
+ && tuple.getFieldLength(2) == 22) {
+ long currMilli = System.currentTimeMillis();
ByteBuffer tupleBuff = ByteBuffer.wrap(tuple.getFieldData(0));
- tupleBuff.putLong(targetIdx, System.currentTimeMillis());
+ tupleBuff.putLong(tuple.getFieldStart(2) + 14, currMilli);
+ if (tuple.getFieldCount() == 4) {
+ tupleBuff.putLong(tuple.getFieldStart(3) + 1, currMilli);
+ }
}
}
}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/function/BADFunctionCollection.java b/asterix-bad/src/main/java/org/apache/asterix/bad/function/BADFunctionCollection.java
index 15bb617..e6d2309 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/function/BADFunctionCollection.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/function/BADFunctionCollection.java
@@ -1,22 +1,20 @@
/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * * Licensed to the Apache Software Foundation (ASF) under one
- * * or more contributor license agreements. See the NOTICE file
- * * distributed with this work for additional information
- * * regarding copyright ownership. The ASF licenses this file
- * * to you under the Apache License, Version 2.0 (the
- * * "License"); you may not use this file except in compliance
- * * with the License. You may obtain a copy of the License at
- * *
- * * http://www.apache.org/licenses/LICENSE-2.0
- * *
- * * Unless required by applicable law or agreed to in writing,
- * * software distributed under the License is distributed on an
- * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * * KIND, either express or implied. See the License for the
- * * specific language governing permissions and limitations
- * * under the License.
+ * http://www.apache.org/licenses/LICENSE-2.0
*
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
package org.apache.asterix.bad.function;
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/function/BADFunctionRegistrant.java b/asterix-bad/src/main/java/org/apache/asterix/bad/function/BADFunctionRegistrant.java
index 15a5fc5..5dfb62d 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/function/BADFunctionRegistrant.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/function/BADFunctionRegistrant.java
@@ -1,22 +1,20 @@
/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * * Licensed to the Apache Software Foundation (ASF) under one
- * * or more contributor license agreements. See the NOTICE file
- * * distributed with this work for additional information
- * * regarding copyright ownership. The ASF licenses this file
- * * to you under the Apache License, Version 2.0 (the
- * * "License"); you may not use this file except in compliance
- * * with the License. You may obtain a copy of the License at
- * *
- * * http://www.apache.org/licenses/LICENSE-2.0
- * *
- * * Unless required by applicable law or agreed to in writing,
- * * software distributed under the License is distributed on an
- * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * * KIND, either express or implied. See the License for the
- * * specific language governing permissions and limitations
- * * under the License.
+ * http://www.apache.org/licenses/LICENSE-2.0
*
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
package org.apache.asterix.bad.function;
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/function/BADFunctions.java b/asterix-bad/src/main/java/org/apache/asterix/bad/function/BADFunctions.java
index a342210..085bd80 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/function/BADFunctions.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/function/BADFunctions.java
@@ -1,22 +1,20 @@
/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * * Licensed to the Apache Software Foundation (ASF) under one
- * * or more contributor license agreements. See the NOTICE file
- * * distributed with this work for additional information
- * * regarding copyright ownership. The ASF licenses this file
- * * to you under the Apache License, Version 2.0 (the
- * * "License"); you may not use this file except in compliance
- * * with the License. You may obtain a copy of the License at
- * *
- * * http://www.apache.org/licenses/LICENSE-2.0
- * *
- * * Unless required by applicable law or agreed to in writing,
- * * software distributed under the License is distributed on an
- * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * * KIND, either express or implied. See the License for the
- * * specific language governing permissions and limitations
- * * under the License.
+ * http://www.apache.org/licenses/LICENSE-2.0
*
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
package org.apache.asterix.bad.function;
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/function/runtime/CurrentChannelTimeDescriptor.java b/asterix-bad/src/main/java/org/apache/asterix/bad/function/runtime/CurrentChannelTimeDescriptor.java
index f3392ad..90191a3 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/function/runtime/CurrentChannelTimeDescriptor.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/function/runtime/CurrentChannelTimeDescriptor.java
@@ -1,22 +1,20 @@
/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * * Licensed to the Apache Software Foundation (ASF) under one
- * * or more contributor license agreements. See the NOTICE file
- * * distributed with this work for additional information
- * * regarding copyright ownership. The ASF licenses this file
- * * to you under the Apache License, Version 2.0 (the
- * * "License"); you may not use this file except in compliance
- * * with the License. You may obtain a copy of the License at
- * *
- * * http://www.apache.org/licenses/LICENSE-2.0
- * *
- * * Unless required by applicable law or agreed to in writing,
- * * software distributed under the License is distributed on an
- * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * * KIND, either express or implied. See the License for the
- * * specific language governing permissions and limitations
- * * under the License.
+ * http://www.apache.org/licenses/LICENSE-2.0
*
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
package org.apache.asterix.bad.function.runtime;
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/function/runtime/PreviousChannelTimeDescriptor.java b/asterix-bad/src/main/java/org/apache/asterix/bad/function/runtime/PreviousChannelTimeDescriptor.java
index f4dd1f5..800c3c4 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/function/runtime/PreviousChannelTimeDescriptor.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/function/runtime/PreviousChannelTimeDescriptor.java
@@ -1,22 +1,20 @@
/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * * Licensed to the Apache Software Foundation (ASF) under one
- * * or more contributor license agreements. See the NOTICE file
- * * distributed with this work for additional information
- * * regarding copyright ownership. The ASF licenses this file
- * * to you under the Apache License, Version 2.0 (the
- * * "License"); you may not use this file except in compliance
- * * with the License. You may obtain a copy of the License at
- * *
- * * http://www.apache.org/licenses/LICENSE-2.0
- * *
- * * Unless required by applicable law or agreed to in writing,
- * * software distributed under the License is distributed on an
- * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * * KIND, either express or implied. See the License for the
- * * specific language governing permissions and limitations
- * * under the License.
+ * http://www.apache.org/licenses/LICENSE-2.0
*
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
package org.apache.asterix.bad.function.runtime;
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADExpressionToPlanTranslator.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADExpressionToPlanTranslator.java
index 23d10c5..abcd70d 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADExpressionToPlanTranslator.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADExpressionToPlanTranslator.java
@@ -27,6 +27,7 @@
import org.apache.asterix.lang.common.struct.VarIdentifier;
import org.apache.asterix.metadata.declared.DatasetDataSource;
import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.utils.DatasetUtil;
import org.apache.asterix.om.base.ADateTime;
import org.apache.asterix.om.base.ARecord;
import org.apache.asterix.om.base.IAObject;
@@ -72,32 +73,43 @@
@Override
protected ILogicalOperator translateDelete(DatasetDataSource targetDatasource, Mutable<ILogicalExpression> varRef,
- List<Mutable<ILogicalExpression>> varRefsForLoading,
- List<Mutable<ILogicalExpression>> additionalFilteringExpressions, ILogicalOperator inputOp,
+ List<Mutable<ILogicalExpression>> varRefsForLoading, LogicalVariable seqVar, ILogicalOperator pkeyAssignOp,
CompiledStatements.ICompiledDmlStatement stmt) throws AlgebricksException {
SourceLocation sourceLoc = stmt.getSourceLocation();
InsertDeleteUpsertOperator deleteOp;
+ LogicalVariable metaVar = null;
+
if (!targetDatasource.getDataset().hasMetaPart()) {
deleteOp = new InsertDeleteUpsertOperator(targetDatasource, varRef, varRefsForLoading,
InsertDeleteUpsertOperator.Kind.DELETE, false);
} else {
// prepare meta record
IAType metaType = metadataProvider.findMetaType(targetDatasource.getDataset());
- LogicalVariable metaVar = context.newVar();
+ metaVar = context.newVar();
AssignOperator metaVariableAssignOp =
new AssignOperator(metaVar, new MutableObject<>(makeMetaRecordExpr(metaType)));
- metaVariableAssignOp.getInputs().add(new MutableObject<>(inputOp));
+ metaVariableAssignOp.getInputs().add(new MutableObject<>(pkeyAssignOp));
metaVariableAssignOp.setSourceLocation(sourceLoc);
// create insert op uses meta record
deleteOp = new InsertDeleteUpsertOperator(targetDatasource, varRef, varRefsForLoading,
Collections.singletonList(new MutableObject<>(new VariableReferenceExpression(metaVar))),
InsertDeleteUpsertOperator.Kind.DELETE, false);
// change current inputOp to be meta op
- inputOp = metaVariableAssignOp;
+ pkeyAssignOp = metaVariableAssignOp;
}
- deleteOp.setAdditionalFilteringExpressions(additionalFilteringExpressions);
- deleteOp.getInputs().add(new MutableObject<>(inputOp));
+
+ deleteOp.getInputs().add(new MutableObject<>(pkeyAssignOp));
deleteOp.setSourceLocation(sourceLoc);
+
+ List<String> filterField = DatasetUtil.getFilterField(targetDatasource.getDataset());
+ List<Mutable<ILogicalExpression>> filterExprs = null;
+ Integer filterSourceIndicator = DatasetUtil.getFilterSourceIndicator(targetDatasource.getDataset());
+ if (filterField != null) {
+ filterExprs = generatedFilterExprs(deleteOp, filterField, filterSourceIndicator == 0 ? seqVar : metaVar,
+ sourceLoc);
+ }
+ deleteOp.setAdditionalFilteringExpressions(filterExprs);
+
DelegateOperator leafOperator = new DelegateOperator(new CommitOperator(true));
leafOperator.getInputs().add(new MutableObject<>(deleteOp));
leafOperator.setSourceLocation(sourceLoc);
@@ -105,23 +117,23 @@
}
@Override
- protected ILogicalOperator translateUpsert(DatasetDataSource targetDatasource, Mutable<ILogicalExpression> varRef,
- List<Mutable<ILogicalExpression>> varRefsForLoading, List<Mutable<ILogicalExpression>> filterExprs,
- ILogicalOperator pkeyAssignOp, List<String> additionalFilteringField, LogicalVariable unnestVar,
- ILogicalOperator topOp, List<Mutable<ILogicalExpression>> exprs, LogicalVariable resVar,
- AssignOperator additionalFilteringAssign, CompiledStatements.ICompiledDmlStatement stmt,
- IResultMetadata resultMetadata) throws AlgebricksException {
+ protected ILogicalOperator translateUpsert(DatasetDataSource targetDatasource,
+ Mutable<ILogicalExpression> payloadVarRef, List<Mutable<ILogicalExpression>> varRefsForLoading,
+ ILogicalOperator pkeyAssignOp, LogicalVariable unnestVar, ILogicalOperator topOp,
+ List<Mutable<ILogicalExpression>> pkeyExprs, LogicalVariable seqVar,
+ CompiledStatements.ICompiledDmlStatement stmt, IResultMetadata resultMetadata) throws AlgebricksException {
SourceLocation sourceLoc = stmt.getSourceLocation();
CompiledStatements.CompiledUpsertStatement compiledUpsert = (CompiledStatements.CompiledUpsertStatement) stmt;
Expression returnExpression = compiledUpsert.getReturnExpression();
InsertDeleteUpsertOperator upsertOp;
ILogicalOperator rootOperator;
+ LogicalVariable metaVar = null;
ARecordType recordType = (ARecordType) targetDatasource.getItemType();
if (targetDatasource.getDataset().hasMetaPart()) {
IAType metaType = metadataProvider.findMetaType(targetDatasource.getDataset());
- LogicalVariable metaVar = context.newVar();
+ metaVar = context.newVar();
AssignOperator metaVariableAssignOp =
new AssignOperator(metaVar, new MutableObject<>(makeMetaRecordExpr(metaType)));
metaVariableAssignOp.getInputs().add(new MutableObject<>(pkeyAssignOp));
@@ -131,7 +143,7 @@
List<Mutable<ILogicalExpression>> metaExprs = new ArrayList<>(1);
VariableReferenceExpression metaVarRef = new VariableReferenceExpression(metaVar);
metaExprs.add(new MutableObject<>(metaVarRef));
- upsertOp = new InsertDeleteUpsertOperator(targetDatasource, varRef, varRefsForLoading, metaExprs,
+ upsertOp = new InsertDeleteUpsertOperator(targetDatasource, payloadVarRef, varRefsForLoading, metaExprs,
InsertDeleteUpsertOperator.Kind.UPSERT, false);
// set previous meta vars
@@ -142,23 +154,33 @@
metaTypes.add(targetDatasource.getMetaItemType());
upsertOp.setPrevAdditionalNonFilteringTypes(metaTypes);
} else {
- upsertOp = new InsertDeleteUpsertOperator(targetDatasource, varRef, varRefsForLoading,
+ upsertOp = new InsertDeleteUpsertOperator(targetDatasource, payloadVarRef, varRefsForLoading,
InsertDeleteUpsertOperator.Kind.UPSERT, false);
- // Create and add a new variable used for representing the original record
- if (additionalFilteringField != null) {
- upsertOp.setPrevFilterVar(context.newVar());
- upsertOp.setPrevFilterType(recordType.getFieldType(additionalFilteringField.get(0)));
- }
}
+
// Create and add a new variable used for representing the original record
upsertOp.setUpsertIndicatorVar(context.newVar());
upsertOp.setUpsertIndicatorVarType(BuiltinType.ABOOLEAN);
upsertOp.setPrevRecordVar(context.newVar());
upsertOp.setPrevRecordType(recordType);
upsertOp.setSourceLocation(sourceLoc);
- upsertOp.setAdditionalFilteringExpressions(filterExprs);
upsertOp.getInputs().add(new MutableObject<>(pkeyAssignOp));
+ List<String> filterField = DatasetUtil.getFilterField(targetDatasource.getDataset());
+ List<Mutable<ILogicalExpression>> filterExprs = null;
+ Integer filterSourceIndicator = DatasetUtil.getFilterSourceIndicator(targetDatasource.getDataset());
+ if (filterField != null) {
+ filterExprs = generatedFilterExprs(upsertOp, filterField, filterSourceIndicator == 0 ? seqVar : metaVar,
+ sourceLoc);
+ ARecordType filterSourceType = filterSourceIndicator == 0 ? (ARecordType) targetDatasource.getItemType()
+ : (ARecordType) targetDatasource.getMetaItemType();
+ upsertOp.setAdditionalFilteringExpressions(filterExprs);
+ upsertOp.setPrevFilterVar(context.newVar());
+ upsertOp.setPrevFilterType(filterSourceType.getFieldType(filterField.get(0)));
+ } else {
+ upsertOp.setAdditionalFilteringExpressions(null);
+ }
+
// Set up delegate operator
DelegateOperator delegateOperator = new DelegateOperator(new CommitOperator(returnExpression == null));
delegateOperator.getInputs().add(new MutableObject<>(upsertOp));
@@ -171,10 +193,10 @@
@Override
protected ILogicalOperator translateInsert(DatasetDataSource targetDatasource, Mutable<ILogicalExpression> varRef,
- List<Mutable<ILogicalExpression>> varRefsForLoading, List<Mutable<ILogicalExpression>> filterExprs,
- ILogicalOperator inputOp, CompiledStatements.ICompiledDmlStatement stmt, IResultMetadata resultMetadata)
- throws AlgebricksException {
+ List<Mutable<ILogicalExpression>> varRefsForLoading, LogicalVariable seqVar, ILogicalOperator pkeyAssignOp,
+ CompiledStatements.ICompiledDmlStatement stmt, IResultMetadata resultMetadata) throws AlgebricksException {
SourceLocation sourceLoc = stmt.getSourceLocation();
+ LogicalVariable metaVar = null;
InsertDeleteUpsertOperator insertOp;
if (!targetDatasource.getDataset().hasMetaPart()) {
@@ -183,21 +205,30 @@
} else {
// prepare meta record
IAType metaType = metadataProvider.findMetaType(targetDatasource.getDataset());
- LogicalVariable metaVar = context.newVar();
+ metaVar = context.newVar();
AssignOperator metaVariableAssignOp =
new AssignOperator(metaVar, new MutableObject<>(makeMetaRecordExpr(metaType)));
- metaVariableAssignOp.getInputs().add(new MutableObject<>(inputOp));
+ metaVariableAssignOp.getInputs().add(new MutableObject<>(pkeyAssignOp));
metaVariableAssignOp.setSourceLocation(sourceLoc);
// create insert op uses meta record
insertOp = new InsertDeleteUpsertOperator(targetDatasource, varRef, varRefsForLoading,
Collections.singletonList(new MutableObject<>(new VariableReferenceExpression(metaVar))),
InsertDeleteUpsertOperator.Kind.INSERT, false);
// change current inputOp to be meta op
- inputOp = metaVariableAssignOp;
+ pkeyAssignOp = metaVariableAssignOp;
+ }
+
+ insertOp.getInputs().add(new MutableObject<>(pkeyAssignOp));
+ insertOp.setSourceLocation(sourceLoc);
+
+ List<String> filterField = DatasetUtil.getFilterField(targetDatasource.getDataset());
+ List<Mutable<ILogicalExpression>> filterExprs = null;
+ Integer filterSourceIndicator = DatasetUtil.getFilterSourceIndicator(targetDatasource.getDataset());
+ if (filterField != null) {
+ filterExprs = generatedFilterExprs(insertOp, filterField, filterSourceIndicator == 0 ? seqVar : metaVar,
+ sourceLoc);
}
insertOp.setAdditionalFilteringExpressions(filterExprs);
- insertOp.getInputs().add(new MutableObject<>(inputOp));
- insertOp.setSourceLocation(sourceLoc);
// Adds the commit operator.
CompiledStatements.CompiledInsertStatement compiledInsert = (CompiledStatements.CompiledInsertStatement) stmt;
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADRuleSetFactory.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADRuleSetFactory.java
index 3c3ec59..be93512 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADRuleSetFactory.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADRuleSetFactory.java
@@ -22,6 +22,7 @@
import java.util.List;
import org.apache.asterix.bad.rules.InsertBrokerNotifierForChannelRule;
+import org.apache.asterix.bad.rules.RewriteActiveFunctionsRule;
import org.apache.asterix.bad.rules.RewriteChannelTimeFunctionToLocalVarRule;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.compiler.provider.DefaultRuleSetFactory;
@@ -92,6 +93,7 @@
if (rule instanceof FeedScanCollectionToUnnest) {
alteredRuleCollection.add(i + 1, new MetaFunctionToMetaVariableRule());
alteredRuleCollection.add(i + 1, new RewriteChannelTimeFunctionToLocalVarRule());
+ alteredRuleCollection.add(i + 1, new RewriteActiveFunctionsRule());
}
}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/AbstractCreateChannelStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/AbstractCreateChannelStatement.java
index 43f11fd..943e0a4 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/AbstractCreateChannelStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/AbstractCreateChannelStatement.java
@@ -136,7 +136,7 @@
List<String> fieldNames = new ArrayList<>();
fieldNames.add(BADConstants.SubscriptionId);
partitionFields.add(fieldNames);
- IDatasetDetailsDecl idd = new InternalDetailsDecl(partitionFields, keyIndicators, true, null);
+ IDatasetDetailsDecl idd = new InternalDetailsDecl(partitionFields, keyIndicators, true, null, null);
TypeExpression subItemType = new TypeReferenceExpression(
new Pair<>(MetadataConstants.METADATA_DATAVERSE_NAME, subscriptionsTypeName));
DatasetDecl createSubscriptionsDataset = new DatasetDecl(dataverseName, new Identifier(subscriptionsTableName),
@@ -151,7 +151,7 @@
fieldNames = new ArrayList<>();
fieldNames.add(BADConstants.ResultId);
partitionFields.add(fieldNames);
- idd = new InternalDetailsDecl(partitionFields, keyIndicators, true, null);
+ idd = new InternalDetailsDecl(partitionFields, keyIndicators, true, null, null);
TypeExpression resultItemType =
new TypeReferenceExpression(new Pair<>(MetadataConstants.METADATA_DATAVERSE_NAME, resultsTypeName));
DatasetDecl createResultsDataset = new DatasetDecl(dataverseName, new Identifier(resultsTableName),
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataProvider.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataProvider.java
index 290db49..5a1e151 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataProvider.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataProvider.java
@@ -120,9 +120,13 @@
}
// add the previous filter third
int fieldIdx = -1;
+ Integer filterSourceIndicator = null;
+ ARecordType filterItemType = null;
if (numFilterFields > 0) {
String filterField = DatasetUtil.getFilterField(dataset).get(0);
- String[] fieldNames = itemType.getFieldNames();
+ filterSourceIndicator = DatasetUtil.getFilterSourceIndicator(dataset);
+ filterItemType = filterSourceIndicator == 0 ? itemType : metaItemType;
+ String[] fieldNames = filterItemType.getFieldNames();
int i = 0;
for (; i < fieldNames.length; i++) {
if (fieldNames[i].equals(filterField)) {
@@ -130,9 +134,10 @@
}
}
fieldIdx = i;
- outputTypeTraits[f] = dataFormat.getTypeTraitProvider().getTypeTrait(itemType.getFieldTypes()[fieldIdx]);
+ outputTypeTraits[f] =
+ dataFormat.getTypeTraitProvider().getTypeTrait(filterItemType.getFieldTypes()[fieldIdx]);
outputSerDes[f] =
- dataFormat.getSerdeProvider().getSerializerDeserializer(itemType.getFieldTypes()[fieldIdx]);
+ dataFormat.getSerdeProvider().getSerializerDeserializer(filterItemType.getFieldTypes()[fieldIdx]);
f++;
}
for (int j = 0; j < inputRecordDesc.getFieldCount(); j++) {
@@ -142,7 +147,8 @@
RecordDescriptor outputRecordDesc = new RecordDescriptor(outputSerDes, outputTypeTraits);
op = new BADLSMPrimaryUpsertOperatorDescriptor(spec, outputRecordDesc, fieldPermutation, idfh,
missingWriterFactory, modificationCallbackFactory, searchCallbackFactory,
- dataset.getFrameOpCallbackFactory(metadataProvider), numKeys, itemType, fieldIdx, hasSecondaries);
+ dataset.getFrameOpCallbackFactory(metadataProvider), numKeys, filterSourceIndicator, filterItemType,
+ fieldIdx, hasSecondaries);
return new Pair<>(op, splitsAndConstraint.second);
}
}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/rules/RewriteActiveFunctionsRule.java b/asterix-bad/src/main/java/org/apache/asterix/bad/rules/RewriteActiveFunctionsRule.java
new file mode 100644
index 0000000..a58341f
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/rules/RewriteActiveFunctionsRule.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.rules;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.asterix.bad.BADConstants;
+import org.apache.asterix.bad.function.BADFunctions;
+import org.apache.asterix.lang.common.util.FunctionUtil;
+import org.apache.asterix.om.base.AString;
+import org.apache.asterix.om.constants.AsterixConstantValue;
+import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import org.apache.hyracks.algebricks.core.algebra.functions.AlgebricksBuiltinFunctions;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+public class RewriteActiveFunctionsRule implements IAlgebraicRewriteRule {
+
+ @Override
+ public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+ throws AlgebricksException {
+ Mutable<ILogicalExpression> exprRef;
+ if (opRef.getValue().getOperatorTag() == LogicalOperatorTag.INNERJOIN) {
+ InnerJoinOperator selectOp = (InnerJoinOperator) opRef.getValue();
+ exprRef = selectOp.getCondition();
+ } else if (opRef.getValue().getOperatorTag() == LogicalOperatorTag.SELECT) {
+ SelectOperator selectOp = (SelectOperator) opRef.getValue();
+ exprRef = selectOp.getCondition();
+ } else {
+ return false;
+ }
+ return visit(exprRef, opRef, context);
+ }
+
+ private ScalarFunctionCallExpression makeActiveTsAccessExpr(LogicalVariable dsVar) {
+ ILogicalExpression activeTsFunc =
+ new ScalarFunctionCallExpression(FunctionUtil.getFunctionInfo(BuiltinFunctions.META),
+ new MutableObject<>(new VariableReferenceExpression(dsVar)));
+ return new ScalarFunctionCallExpression(FunctionUtil.getFunctionInfo(BuiltinFunctions.FIELD_ACCESS_BY_NAME),
+ new MutableObject<>(activeTsFunc), new MutableObject<>(new ConstantExpression(
+ new AsterixConstantValue(new AString(BADConstants.FIELD_NAME_ACTIVE_TS)))));
+ }
+
+ private AbstractFunctionCallExpression makeAndForIsNew(AbstractFunctionCallExpression funcExpr) {
+ LogicalVariable dsVar = getDsVar(funcExpr);
+ ILogicalExpression previousChannelTimeExpr = new ScalarFunctionCallExpression(
+ BuiltinFunctions.getBuiltinFunctionInfo(BADFunctions.PREVIOUS_CHANNEL_TIME),
+ new MutableObject<>(new VariableReferenceExpression(dsVar)));
+ ILogicalExpression currentChannelTimeExpr = new ScalarFunctionCallExpression(
+ BuiltinFunctions.getBuiltinFunctionInfo(BADFunctions.CURRENT_CHANNEL_TIME),
+ new MutableObject<>(new VariableReferenceExpression(dsVar)));
+ ILogicalExpression lessThanExpr = new ScalarFunctionCallExpression(
+ FunctionUtil.getFunctionInfo(AlgebricksBuiltinFunctions.LT),
+ new MutableObject<>(makeActiveTsAccessExpr(dsVar)), new MutableObject<>(currentChannelTimeExpr));
+ ILogicalExpression greaterThanExpr = new ScalarFunctionCallExpression(
+ FunctionUtil.getFunctionInfo(AlgebricksBuiltinFunctions.GT),
+ new MutableObject<>(makeActiveTsAccessExpr(dsVar)), new MutableObject<>(previousChannelTimeExpr));
+ return new ScalarFunctionCallExpression(FunctionUtil.getFunctionInfo(BuiltinFunctions.AND),
+ new MutableObject<>(lessThanExpr), new MutableObject<>(greaterThanExpr));
+ }
+
+ private LogicalVariable getDsVar(AbstractFunctionCallExpression funcExpr) {
+ return ((VariableReferenceExpression) funcExpr.getArguments().get(0).getValue()).getVariableReference();
+ }
+
+ private boolean visit(Mutable<ILogicalExpression> exprRef, Mutable<ILogicalOperator> opRef,
+ IOptimizationContext context) throws AlgebricksException {
+ boolean changed = false;
+ if (exprRef.getValue().getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) {
+ AbstractFunctionCallExpression funcExpr = (AbstractFunctionCallExpression) exprRef.getValue();
+
+ if (funcExpr.getFunctionIdentifier() == BADFunctions.IS_NEW) {
+ exprRef.setValue(makeAndForIsNew(funcExpr));
+ changed = true;
+ } else if (funcExpr.getFunctionIdentifier() == BADFunctions.ACTIVE_TIMESTAMP) {
+ LogicalVariable channelTimeVar = context.newVar();
+ LogicalVariable dsVar = getDsVar(funcExpr);
+ ILogicalExpression activeTsFunc =
+ new ScalarFunctionCallExpression(FunctionUtil.getFunctionInfo(BuiltinFunctions.META),
+ new MutableObject<>(new VariableReferenceExpression(dsVar)));
+ ScalarFunctionCallExpression faExpr = new ScalarFunctionCallExpression(
+ FunctionUtil.getFunctionInfo(BuiltinFunctions.FIELD_ACCESS_BY_NAME),
+ new MutableObject<>(activeTsFunc), new MutableObject<>(new ConstantExpression(
+ new AsterixConstantValue(new AString(BADConstants.FIELD_NAME_ACTIVE_TS)))));
+
+ AssignOperator assignOp = new AssignOperator(channelTimeVar, new MutableObject<>(faExpr));
+
+ List<LogicalVariable> liveVars = new ArrayList<>();
+ for (int i = 0; i < opRef.getValue().getInputs().size(); i++) {
+ Mutable<ILogicalOperator> inputOpRef = opRef.getValue().getInputs().get(i);
+ VariableUtilities.getLiveVariables(inputOpRef.getValue(), liveVars);
+ if (liveVars.contains(dsVar)) {
+ assignOp.getInputs().add(new MutableObject<>(inputOpRef.getValue()));
+ inputOpRef.setValue(assignOp);
+ exprRef.setValue(new VariableReferenceExpression(channelTimeVar));
+ context.computeAndSetTypeEnvironmentForOperator(assignOp);
+ }
+ }
+
+ changed = true;
+ } else {
+ for (Mutable<ILogicalExpression> argExpr : funcExpr.getArguments()) {
+ changed = changed || visit(argExpr, opRef, context);
+ }
+ }
+ }
+ return changed;
+ }
+
+}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/rules/RewriteChannelTimeFunctionToLocalVarRule.java b/asterix-bad/src/main/java/org/apache/asterix/bad/rules/RewriteChannelTimeFunctionToLocalVarRule.java
index 56f5e39..1542d1e 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/rules/RewriteChannelTimeFunctionToLocalVarRule.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/rules/RewriteChannelTimeFunctionToLocalVarRule.java
@@ -1,22 +1,20 @@
/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * * Licensed to the Apache Software Foundation (ASF) under one
- * * or more contributor license agreements. See the NOTICE file
- * * distributed with this work for additional information
- * * regarding copyright ownership. The ASF licenses this file
- * * to you under the Apache License, Version 2.0 (the
- * * "License"); you may not use this file except in compliance
- * * with the License. You may obtain a copy of the License at
- * *
- * * http://www.apache.org/licenses/LICENSE-2.0
- * *
- * * Unless required by applicable law or agreed to in writing,
- * * software distributed under the License is distributed on an
- * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * * KIND, either express or implied. See the License for the
- * * specific language governing permissions and limitations
- * * under the License.
+ * http://www.apache.org/licenses/LICENSE-2.0
*
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
package org.apache.asterix.bad.rules;
@@ -27,7 +25,6 @@
import org.apache.asterix.bad.BADConstants;
import org.apache.asterix.bad.function.BADFunctions;
-import org.apache.asterix.lang.common.util.FunctionUtil;
import org.apache.asterix.metadata.declared.DataSource;
import org.apache.asterix.om.base.AString;
import org.apache.asterix.om.constants.AsterixConstantValue;
@@ -45,7 +42,6 @@
import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
import org.apache.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
-import org.apache.hyracks.algebricks.core.algebra.functions.AlgebricksBuiltinFunctions;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -57,7 +53,6 @@
@Override
public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
throws AlgebricksException {
-
Mutable<ILogicalExpression> exprRef;
if (opRef.getValue().getOperatorTag() == LogicalOperatorTag.INNERJOIN) {
InnerJoinOperator selectOp = (InnerJoinOperator) opRef.getValue();
@@ -72,32 +67,30 @@
Set<Mutable<ILogicalExpression>> activeFunctionSet = new HashSet<>();
Set<LogicalVariable> needPrevDsSet = new HashSet<>();
Set<LogicalVariable> needCurrDsSet = new HashSet<>();
- Set<LogicalVariable> needActiveDsSet = new HashSet<>();
+ String channelName =
+ (String) context.getMetadataProvider().getConfig().getOrDefault(BADConstants.CONFIG_CHANNEL_NAME, "");
// collect active functions
- collectChannelTimeFunctions(exprRef, activeFunctionSet, needPrevDsSet, needCurrDsSet, needActiveDsSet);
+ boolean rewriteFunc = collectChannelTimeFunctions(exprRef, activeFunctionSet, needPrevDsSet, needCurrDsSet,
+ true, channelName);
if (activeFunctionSet.size() == 0) {
- return false;
+ return rewriteFunc;
}
// add assigns for active functions
Map<LogicalVariable, LogicalVariable> prevMap = new HashMap<>();
Map<LogicalVariable, LogicalVariable> currMap = new HashMap<>();
- Map<LogicalVariable, LogicalVariable> activeMap = new HashMap<>();
- createChannelTimeAssignOps(opRef, needPrevDsSet, needCurrDsSet, needActiveDsSet, prevMap, currMap, activeMap,
- context);
+ createChannelTimeAssignOps(opRef, needPrevDsSet, needCurrDsSet, prevMap, currMap, context, channelName);
// update expressions with new vars
- updateActiveFuncExprsWithVars(prevMap, currMap, activeMap, activeFunctionSet);
+ updateActiveFuncExprsWithVars(prevMap, currMap, activeFunctionSet);
context.computeAndSetTypeEnvironmentForOperator(opRef.getValue());
return true;
}
private void updateActiveFuncExprsWithVars(Map<LogicalVariable, LogicalVariable> prevMap,
- Map<LogicalVariable, LogicalVariable> currMap, Map<LogicalVariable, LogicalVariable> activeMap,
- Set<Mutable<ILogicalExpression>> activeFuncExprs) {
-
+ Map<LogicalVariable, LogicalVariable> currMap, Set<Mutable<ILogicalExpression>> activeFuncExprs) {
for (Mutable<ILogicalExpression> expr : activeFuncExprs) {
AbstractFunctionCallExpression funcExpr = (AbstractFunctionCallExpression) expr.getValue();
LogicalVariable dsVar = ((VariableReferenceExpression) ((AbstractFunctionCallExpression) expr.getValue())
@@ -106,36 +99,20 @@
expr.setValue(new VariableReferenceExpression(currMap.get(dsVar)));
} else if (funcExpr.getFunctionIdentifier() == BADFunctions.PREVIOUS_CHANNEL_TIME) {
expr.setValue(new VariableReferenceExpression(prevMap.get(dsVar)));
- } else {
- ILogicalExpression lessThanExpr =
- new ScalarFunctionCallExpression(FunctionUtil.getFunctionInfo(AlgebricksBuiltinFunctions.LT),
- new MutableObject<>(new VariableReferenceExpression(activeMap.get(dsVar))),
- new MutableObject<>(new VariableReferenceExpression(currMap.get(dsVar))));
- ILogicalExpression greaterThanExpr =
- new ScalarFunctionCallExpression(FunctionUtil.getFunctionInfo(AlgebricksBuiltinFunctions.GT),
- new MutableObject<>(new VariableReferenceExpression(activeMap.get(dsVar))),
- new MutableObject<>(new VariableReferenceExpression(prevMap.get(dsVar))));
- ScalarFunctionCallExpression andExpr =
- new ScalarFunctionCallExpression(FunctionUtil.getFunctionInfo(BuiltinFunctions.AND),
- new MutableObject<>(lessThanExpr), new MutableObject<>(greaterThanExpr));
- expr.setValue(andExpr);
}
}
}
private void createChannelTimeAssignOps(Mutable<ILogicalOperator> opRef, Set<LogicalVariable> needPrevDsSet,
- Set<LogicalVariable> needCurrDsSet, Set<LogicalVariable> needActiveDsSet,
- Map<LogicalVariable, LogicalVariable> prevMap, Map<LogicalVariable, LogicalVariable> currMap,
- Map<LogicalVariable, LogicalVariable> activeMap, IOptimizationContext context) {
+ Set<LogicalVariable> needCurrDsSet, Map<LogicalVariable, LogicalVariable> prevMap,
+ Map<LogicalVariable, LogicalVariable> currMap, IOptimizationContext context, String channelName) {
ILogicalOperator currOp = opRef.getValue();
- String channelName =
- (String) context.getMetadataProvider().getConfig().getOrDefault(BADConstants.CONFIG_CHANNEL_NAME, "");
if (opRef.getValue().getOperatorTag() == LogicalOperatorTag.DATASOURCESCAN) {
DataSourceScanOperator dataScanOp = (DataSourceScanOperator) opRef.getValue();
DataSource ds = (DataSource) dataScanOp.getDataSource();
LogicalVariable dsVar = ds.getDataRecordVariable(dataScanOp.getScanVariables());
- if (needPrevDsSet.contains(dsVar) || needActiveDsSet.contains(dsVar)) {
+ if (needPrevDsSet.contains(dsVar)) {
LogicalVariable channelTimeVar = context.newVar();
ILogicalExpression previousChannelTimeExpr = new ScalarFunctionCallExpression(
BuiltinFunctions.getBuiltinFunctionInfo(BADFunctions.PREVIOUS_CHANNEL_TIME),
@@ -148,7 +125,7 @@
prevMap.put(dsVar, channelTimeVar);
}
- if (needCurrDsSet.contains(dsVar) || needActiveDsSet.contains(dsVar)) {
+ if (needCurrDsSet.contains(dsVar)) {
LogicalVariable channelTimeVar = context.newVar();
ILogicalExpression previousChannelTimeExpr = new ScalarFunctionCallExpression(
BuiltinFunctions.getBuiltinFunctionInfo(BADFunctions.CURRENT_CHANNEL_TIME), new MutableObject<>(
@@ -159,55 +136,51 @@
opRef.setValue(assignOp);
currMap.put(dsVar, channelTimeVar);
}
-
- if (needActiveDsSet.contains(dsVar)) {
- LogicalVariable channelTimeVar = context.newVar();
-
- ILogicalExpression activeTsFunc =
- new ScalarFunctionCallExpression(FunctionUtil.getFunctionInfo(BuiltinFunctions.META),
- new MutableObject<>(new VariableReferenceExpression(dsVar)));
- ScalarFunctionCallExpression faExpr = new ScalarFunctionCallExpression(
- FunctionUtil.getFunctionInfo(BuiltinFunctions.FIELD_ACCESS_BY_NAME),
- new MutableObject<>(activeTsFunc), new MutableObject<>(new ConstantExpression(
- new AsterixConstantValue(new AString(BADConstants.FIELD_NAME_ACTIVE_TS)))));
- AssignOperator assignOp = new AssignOperator(channelTimeVar, new MutableObject<>(faExpr));
- assignOp.getInputs().add(new MutableObject<>(opRef.getValue()));
- opRef.setValue(assignOp);
- activeMap.put(dsVar, channelTimeVar);
- }
}
for (Mutable<ILogicalOperator> input : currOp.getInputs()) {
- createChannelTimeAssignOps(input, needPrevDsSet, needCurrDsSet, needActiveDsSet, prevMap, currMap,
- activeMap, context);
+ createChannelTimeAssignOps(input, needPrevDsSet, needCurrDsSet, prevMap, currMap, context, channelName);
}
}
- private void collectChannelTimeFunctions(Mutable<ILogicalExpression> exprRef,
+ private boolean collectChannelTimeFunctions(Mutable<ILogicalExpression> exprRef,
Set<Mutable<ILogicalExpression>> activeFunctionSet, Set<LogicalVariable> needPrevDsSet,
- Set<LogicalVariable> needCurrDsSet, Set<LogicalVariable> needActive) {
+ Set<LogicalVariable> needCurrDsSet, boolean disjunctiveFlag, String channelName) {
+ boolean rewriteFunc = false;
if (exprRef.getValue().getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) {
AbstractFunctionCallExpression funcExpr = (AbstractFunctionCallExpression) exprRef.getValue();
- if (funcExpr.getFunctionIdentifier() == BADFunctions.IS_NEW
- || funcExpr.getFunctionIdentifier() == BADFunctions.PREVIOUS_CHANNEL_TIME
+ if (funcExpr.getFunctionIdentifier().equals(BuiltinFunctions.OR)) {
+ disjunctiveFlag = false;
+ }
+ if (funcExpr.getFunctionIdentifier() == BADFunctions.PREVIOUS_CHANNEL_TIME
|| funcExpr.getFunctionIdentifier() == BADFunctions.CURRENT_CHANNEL_TIME) {
- // add to active func set for later replacement
- activeFunctionSet.add(exprRef);
// collect ds var to see what assign op needs to be added
- LogicalVariable dsVar = ((VariableReferenceExpression) funcExpr.getArguments().get(0).getValue())
- .getVariableReference();
- if (funcExpr.getFunctionIdentifier() == BADFunctions.PREVIOUS_CHANNEL_TIME) {
- needPrevDsSet.add(dsVar);
- } else if (funcExpr.getFunctionIdentifier() == BADFunctions.CURRENT_CHANNEL_TIME) {
- needCurrDsSet.add(dsVar);
+ LogicalExpressionTag arg0ExprTag = funcExpr.getArguments().get(0).getValue().getExpressionTag();
+ if (arg0ExprTag == LogicalExpressionTag.CONSTANT) {
+ return false;
+ }
+ if (!disjunctiveFlag) {
+ LogicalVariable dsVar = ((VariableReferenceExpression) funcExpr.getArguments().get(0).getValue())
+ .getVariableReference();
+ activeFunctionSet.add(exprRef);
+ if (funcExpr.getFunctionIdentifier() == BADFunctions.PREVIOUS_CHANNEL_TIME) {
+ needPrevDsSet.add(dsVar);
+ } else if (funcExpr.getFunctionIdentifier() == BADFunctions.CURRENT_CHANNEL_TIME) {
+ needCurrDsSet.add(dsVar);
+ }
} else {
- needActive.add(dsVar);
+ // if disjunctive, modify to get ts and wait for introduce filter rule to work
+ funcExpr.getArguments().set(0, new MutableObject<>(
+ new ConstantExpression(new AsterixConstantValue(new AString(channelName)))));
+ rewriteFunc = true;
}
} else {
for (Mutable<ILogicalExpression> argExpr : funcExpr.getArguments()) {
- collectChannelTimeFunctions(argExpr, activeFunctionSet, needPrevDsSet, needCurrDsSet, needActive);
+ rewriteFunc = rewriteFunc || collectChannelTimeFunctions(argExpr, activeFunctionSet, needPrevDsSet,
+ needCurrDsSet, disjunctiveFlag, channelName);
}
}
}
+ return rewriteFunc;
}
}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/ActiveTimestampManager.java b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/ActiveTimestampManager.java
index adbbd03..0a24a72 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/ActiveTimestampManager.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/ActiveTimestampManager.java
@@ -1,22 +1,20 @@
/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * * Licensed to the Apache Software Foundation (ASF) under one
- * * or more contributor license agreements. See the NOTICE file
- * * distributed with this work for additional information
- * * regarding copyright ownership. The ASF licenses this file
- * * to you under the Apache License, Version 2.0 (the
- * * "License"); you may not use this file except in compliance
- * * with the License. You may obtain a copy of the License at
- * *
- * * http://www.apache.org/licenses/LICENSE-2.0
- * *
- * * Unless required by applicable law or agreed to in writing,
- * * software distributed under the License is distributed on an
- * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * * KIND, either express or implied. See the License for the
- * * specific language governing permissions and limitations
- * * under the License.
+ * http://www.apache.org/licenses/LICENSE-2.0
*
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
package org.apache.asterix.bad.runtime;
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/ActiveTimestampState.java b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/ActiveTimestampState.java
index ad7e424..9067073 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/ActiveTimestampState.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/ActiveTimestampState.java
@@ -1,22 +1,20 @@
/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * * Licensed to the Apache Software Foundation (ASF) under one
- * * or more contributor license agreements. See the NOTICE file
- * * distributed with this work for additional information
- * * regarding copyright ownership. The ASF licenses this file
- * * to you under the Apache License, Version 2.0 (the
- * * "License"); you may not use this file except in compliance
- * * with the License. You may obtain a copy of the License at
- * *
- * * http://www.apache.org/licenses/LICENSE-2.0
- * *
- * * Unless required by applicable law or agreed to in writing,
- * * software distributed under the License is distributed on an
- * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * * KIND, either express or implied. See the License for the
- * * specific language governing permissions and limitations
- * * under the License.
+ * http://www.apache.org/licenses/LICENSE-2.0
*
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
package org.apache.asterix.bad.runtime;
diff --git a/asterix-bad/src/main/resources/lang-extension/lang.txt b/asterix-bad/src/main/resources/lang-extension/lang.txt
index 49f0cb4..f7eba70 100644
--- a/asterix-bad/src/main/resources/lang-extension/lang.txt
+++ b/asterix-bad/src/main/resources/lang-extension/lang.txt
@@ -144,12 +144,12 @@
( <HINTS> hints = Properties() )?
( <WITH> withRecord = RecordConstructor() )?
{
- // TODO: add filters on meta records
+ try{
InternalDetailsDecl idd = new InternalDetailsDecl(primaryKeyFields.second,
primaryKeyFields.first,
autogenerated,
- null);
- try{
+ 1,
+ Collections.singletonList("_active_timestamp"));
stmt = new DatasetDecl(nameComponents.first,
nameComponents.second,
datasetTypeExpr,
diff --git a/asterix-bad/src/test/resources/optimizerts/queries/bad_cq/new_tweets_active_ts.sqlpp b/asterix-bad/src/test/resources/optimizerts/queries/bad_cq/new_tweets_active_ts.sqlpp
new file mode 100644
index 0000000..86b0f28
--- /dev/null
+++ b/asterix-bad/src/test/resources/optimizerts/queries/bad_cq/new_tweets_active_ts.sqlpp
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+DROP DATAVERSE test IF EXISTS;
+CREATE DATAVERSE test;
+USE test;
+
+CREATE TYPE Tweet AS OPEN {
+ tid: bigint,
+ area_code: string,
+ text: string,
+ location: point,
+ timestamp: datetime
+};
+
+
+CREATE ACTIVE DATASET Tweets(Tweet) PRIMARY KEY tid;
+
+write output to nc1:"rttest/new_tweets.sqlpp";
+set `compiler.indexonly` "false";
+
+CREATE CONTINUOUS CHANNEL NearbyTweets(oid) PERIOD duration("PT3S")
+{
+ SELECT t FROM Tweets t WHERE active_timestamp(t) > previous_channel_time(t) AND active_timestamp(t) < current_channel_time(t)
+};
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/optimizerts/queries/bad_cq/new_tweets_meta_and.sqlpp b/asterix-bad/src/test/resources/optimizerts/queries/bad_cq/new_tweets_meta_and.sqlpp
new file mode 100644
index 0000000..27184bf
--- /dev/null
+++ b/asterix-bad/src/test/resources/optimizerts/queries/bad_cq/new_tweets_meta_and.sqlpp
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+DROP DATAVERSE test IF EXISTS;
+CREATE DATAVERSE test;
+USE test;
+
+CREATE TYPE Tweet AS OPEN {
+ tid: bigint,
+ area_code: string,
+ text: string,
+ location: point,
+ timestamp: datetime
+};
+
+
+CREATE ACTIVE DATASET Tweets(Tweet) PRIMARY KEY tid;
+
+write output to nc1:"rttest/new_tweets.sqlpp";
+set `compiler.indexonly` "false";
+
+CREATE CONTINUOUS CHANNEL NearbyTweets(oid) PERIOD duration("PT3S")
+{
+ SELECT t FROM Tweets t WHERE meta(t).`_active_timestamp` > current_channel_time(t)
+ AND meta(t).`_active_timestamp` < previous_channel_time(t)
+};
diff --git a/asterix-bad/src/test/resources/optimizerts/queries/bad_cq/new_tweets_meta_or.sqlpp b/asterix-bad/src/test/resources/optimizerts/queries/bad_cq/new_tweets_meta_or.sqlpp
new file mode 100644
index 0000000..311caf0
--- /dev/null
+++ b/asterix-bad/src/test/resources/optimizerts/queries/bad_cq/new_tweets_meta_or.sqlpp
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+DROP DATAVERSE test IF EXISTS;
+CREATE DATAVERSE test;
+USE test;
+
+CREATE TYPE Tweet AS OPEN {
+ tid: bigint,
+ area_code: string,
+ text: string,
+ location: point,
+ timestamp: datetime
+};
+
+
+CREATE ACTIVE DATASET Tweets(Tweet) PRIMARY KEY tid;
+
+write output to nc1:"rttest/new_tweets.sqlpp";
+set `compiler.indexonly` "false";
+
+CREATE CONTINUOUS CHANNEL NearbyTweets(oid) PERIOD duration("PT3S")
+{
+ SELECT t FROM Tweets t WHERE meta(t).`_active_timestamp` > current_channel_time(t)
+ OR meta(t).`_active_timestamp` < previous_channel_time(t)
+};
diff --git a/asterix-bad/src/test/resources/optimizerts/results/bad_cq/new_nearby_tweets.plan b/asterix-bad/src/test/resources/optimizerts/results/bad_cq/new_nearby_tweets.plan
index e4c1ec7..617ce18 100644
--- a/asterix-bad/src/test/resources/optimizerts/results/bad_cq/new_nearby_tweets.plan
+++ b/asterix-bad/src/test/resources/optimizerts/results/bad_cq/new_nearby_tweets.plan
@@ -1,13 +1,13 @@
-- NOTIFY_BROKERS |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- PRE_CLUSTERED_GROUP_BY[$$120, $$121, $$channelExecutionTime] |PARTITIONED|
+ -- PRE_CLUSTERED_GROUP_BY[$$118, $$119, $$channelExecutionTime] |PARTITIONED|
{
-- AGGREGATE |LOCAL|
-- NESTED_TUPLE_SOURCE |LOCAL|
}
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- STABLE_SORT [$$120(ASC), $$121(ASC), $$channelExecutionTime(ASC)] |PARTITIONED|
- -- HASH_PARTITION_EXCHANGE [$$120, $$121, $$channelExecutionTime] |PARTITIONED|
+ -- STABLE_SORT [$$118(ASC), $$119(ASC), $$channelExecutionTime(ASC)] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$118, $$119, $$channelExecutionTime] |PARTITIONED|
-- PRE_SORTED_DISTINCT_BY |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- STABLE_SORT [$$97(ASC)] |PARTITIONED|
@@ -58,16 +58,14 @@
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
-- STREAM_SELECT |PARTITIONED|
- -- ASSIGN |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
-- ASSIGN |PARTITIONED|
- -- ASSIGN |PARTITIONED|
- -- STREAM_PROJECT |PARTITIONED|
- -- ASSIGN |PARTITIONED|
- -- STREAM_PROJECT |PARTITIONED|
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- DATASOURCE_SCAN |PARTITIONED|
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- EMPTY_TUPLE_SOURCE |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- DATASOURCE_SCAN |PARTITIONED|
+ -- BROADCAST_EXCHANGE |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
-- BROADCAST_EXCHANGE |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
-- ASSIGN |PARTITIONED|
diff --git a/asterix-bad/src/test/resources/optimizerts/results/bad_cq/new_tweets.plan b/asterix-bad/src/test/resources/optimizerts/results/bad_cq/new_tweets.plan
index ebf5624..6134bd4 100644
--- a/asterix-bad/src/test/resources/optimizerts/results/bad_cq/new_tweets.plan
+++ b/asterix-bad/src/test/resources/optimizerts/results/bad_cq/new_tweets.plan
@@ -1,13 +1,13 @@
-- NOTIFY_BROKERS |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- PRE_CLUSTERED_GROUP_BY[$$93, $$94, $$channelExecutionTime] |PARTITIONED|
+ -- PRE_CLUSTERED_GROUP_BY[$$91, $$92, $$channelExecutionTime] |PARTITIONED|
{
-- AGGREGATE |LOCAL|
-- NESTED_TUPLE_SOURCE |LOCAL|
}
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- STABLE_SORT [$$93(ASC), $$94(ASC), $$channelExecutionTime(ASC)] |PARTITIONED|
- -- HASH_PARTITION_EXCHANGE [$$93, $$94, $$channelExecutionTime] |PARTITIONED|
+ -- STABLE_SORT [$$91(ASC), $$92(ASC), $$channelExecutionTime(ASC)] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$91, $$92, $$channelExecutionTime] |PARTITIONED|
-- PRE_SORTED_DISTINCT_BY |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- STABLE_SORT [$$79(ASC)] |PARTITIONED|
@@ -55,12 +55,11 @@
-- ASSIGN |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
-- STREAM_SELECT |PARTITIONED|
- -- ASSIGN |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
-- ASSIGN |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
- -- ASSIGN |PARTITIONED|
- -- STREAM_PROJECT |PARTITIONED|
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- DATASOURCE_SCAN |PARTITIONED|
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- EMPTY_TUPLE_SOURCE |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- DATASOURCE_SCAN |PARTITIONED|
+ -- BROADCAST_EXCHANGE |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
diff --git a/asterix-bad/src/test/resources/optimizerts/results/bad_cq/new_tweets_active_ts.plan b/asterix-bad/src/test/resources/optimizerts/results/bad_cq/new_tweets_active_ts.plan
new file mode 100644
index 0000000..7ac1e96
--- /dev/null
+++ b/asterix-bad/src/test/resources/optimizerts/results/bad_cq/new_tweets_active_ts.plan
@@ -0,0 +1,65 @@
+-- NOTIFY_BROKERS |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- PRE_CLUSTERED_GROUP_BY[$$96, $$97, $$channelExecutionTime] |PARTITIONED|
+ {
+ -- AGGREGATE |LOCAL|
+ -- NESTED_TUPLE_SOURCE |LOCAL|
+ }
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STABLE_SORT [$$96(ASC), $$97(ASC), $$channelExecutionTime(ASC)] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$96, $$97, $$channelExecutionTime] |PARTITIONED|
+ -- PRE_SORTED_DISTINCT_BY |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STABLE_SORT [$$83(ASC)] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$83] |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- COMMIT |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- INDEX_INSERT_DELETE |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- INSERT_DELETE |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$77] |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- NESTED_LOOP |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- HYBRID_HASH_JOIN [$$90, $$88][$$84, $$85] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$90, $$88] |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- DATASOURCE_SCAN |PARTITIONED|
+ -- BROADCAST_EXCHANGE |PARTITIONED|
+ -- ASSIGN |UNPARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |UNPARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$84, $$85] |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- DATASOURCE_SCAN |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
+ -- BROADCAST_EXCHANGE |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- STREAM_SELECT |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- DATASOURCE_SCAN |PARTITIONED|
+ -- BROADCAST_EXCHANGE |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
diff --git a/asterix-bad/src/test/resources/optimizerts/results/bad_cq/new_tweets_meta_active_ts.plan b/asterix-bad/src/test/resources/optimizerts/results/bad_cq/new_tweets_meta_active_ts.plan
new file mode 100644
index 0000000..2bcef50
--- /dev/null
+++ b/asterix-bad/src/test/resources/optimizerts/results/bad_cq/new_tweets_meta_active_ts.plan
@@ -0,0 +1,65 @@
+-- NOTIFY_BROKERS |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- PRE_CLUSTERED_GROUP_BY[$$95, $$96, $$channelExecutionTime] |PARTITIONED|
+ {
+ -- AGGREGATE |LOCAL|
+ -- NESTED_TUPLE_SOURCE |LOCAL|
+ }
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STABLE_SORT [$$95(ASC), $$96(ASC), $$channelExecutionTime(ASC)] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$95, $$96, $$channelExecutionTime] |PARTITIONED|
+ -- PRE_SORTED_DISTINCT_BY |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STABLE_SORT [$$85(ASC)] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$85] |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- COMMIT |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- INDEX_INSERT_DELETE |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- INSERT_DELETE |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$79] |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- NESTED_LOOP |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- HYBRID_HASH_JOIN [$$92, $$90][$$86, $$87] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$92, $$90] |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- DATASOURCE_SCAN |PARTITIONED|
+ -- BROADCAST_EXCHANGE |PARTITIONED|
+ -- ASSIGN |UNPARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |UNPARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$86, $$87] |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- DATASOURCE_SCAN |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
+ -- BROADCAST_EXCHANGE |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- STREAM_SELECT |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- DATASOURCE_SCAN |PARTITIONED|
+ -- BROADCAST_EXCHANGE |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
diff --git a/asterix-bad/src/test/resources/optimizerts/results/bad_cq/new_tweets_meta_and.plan b/asterix-bad/src/test/resources/optimizerts/results/bad_cq/new_tweets_meta_and.plan
new file mode 100644
index 0000000..2bcef50
--- /dev/null
+++ b/asterix-bad/src/test/resources/optimizerts/results/bad_cq/new_tweets_meta_and.plan
@@ -0,0 +1,65 @@
+-- NOTIFY_BROKERS |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- PRE_CLUSTERED_GROUP_BY[$$95, $$96, $$channelExecutionTime] |PARTITIONED|
+ {
+ -- AGGREGATE |LOCAL|
+ -- NESTED_TUPLE_SOURCE |LOCAL|
+ }
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STABLE_SORT [$$95(ASC), $$96(ASC), $$channelExecutionTime(ASC)] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$95, $$96, $$channelExecutionTime] |PARTITIONED|
+ -- PRE_SORTED_DISTINCT_BY |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STABLE_SORT [$$85(ASC)] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$85] |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- COMMIT |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- INDEX_INSERT_DELETE |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- INSERT_DELETE |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$79] |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- NESTED_LOOP |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- HYBRID_HASH_JOIN [$$92, $$90][$$86, $$87] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$92, $$90] |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- DATASOURCE_SCAN |PARTITIONED|
+ -- BROADCAST_EXCHANGE |PARTITIONED|
+ -- ASSIGN |UNPARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |UNPARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$86, $$87] |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- DATASOURCE_SCAN |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
+ -- BROADCAST_EXCHANGE |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- STREAM_SELECT |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- DATASOURCE_SCAN |PARTITIONED|
+ -- BROADCAST_EXCHANGE |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
diff --git a/asterix-bad/src/test/resources/optimizerts/results/bad_cq/new_tweets_meta_or.plan b/asterix-bad/src/test/resources/optimizerts/results/bad_cq/new_tweets_meta_or.plan
new file mode 100644
index 0000000..7ed194c
--- /dev/null
+++ b/asterix-bad/src/test/resources/optimizerts/results/bad_cq/new_tweets_meta_or.plan
@@ -0,0 +1,66 @@
+-- NOTIFY_BROKERS |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- PRE_CLUSTERED_GROUP_BY[$$97, $$98, $$channelExecutionTime] |PARTITIONED|
+ {
+ -- AGGREGATE |LOCAL|
+ -- NESTED_TUPLE_SOURCE |LOCAL|
+ }
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STABLE_SORT [$$97(ASC), $$98(ASC), $$channelExecutionTime(ASC)] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$97, $$98, $$channelExecutionTime] |PARTITIONED|
+ -- PRE_SORTED_DISTINCT_BY |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STABLE_SORT [$$85(ASC)] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$85] |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- COMMIT |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- INDEX_INSERT_DELETE |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- INSERT_DELETE |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$79] |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- NESTED_LOOP |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- HYBRID_HASH_JOIN [$$92, $$90][$$86, $$87] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$92, $$90] |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- DATASOURCE_SCAN |PARTITIONED|
+ -- BROADCAST_EXCHANGE |PARTITIONED|
+ -- ASSIGN |UNPARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |UNPARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$86, $$87] |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- DATASOURCE_SCAN |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
+ -- BROADCAST_EXCHANGE |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- STREAM_SELECT |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- DATASOURCE_SCAN |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
diff --git a/asterix-bad/src/test/resources/optimizerts/results/bad_cq/new_tweets_push.plan b/asterix-bad/src/test/resources/optimizerts/results/bad_cq/new_tweets_push.plan
index ebf5624..6134bd4 100644
--- a/asterix-bad/src/test/resources/optimizerts/results/bad_cq/new_tweets_push.plan
+++ b/asterix-bad/src/test/resources/optimizerts/results/bad_cq/new_tweets_push.plan
@@ -1,13 +1,13 @@
-- NOTIFY_BROKERS |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- PRE_CLUSTERED_GROUP_BY[$$93, $$94, $$channelExecutionTime] |PARTITIONED|
+ -- PRE_CLUSTERED_GROUP_BY[$$91, $$92, $$channelExecutionTime] |PARTITIONED|
{
-- AGGREGATE |LOCAL|
-- NESTED_TUPLE_SOURCE |LOCAL|
}
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- STABLE_SORT [$$93(ASC), $$94(ASC), $$channelExecutionTime(ASC)] |PARTITIONED|
- -- HASH_PARTITION_EXCHANGE [$$93, $$94, $$channelExecutionTime] |PARTITIONED|
+ -- STABLE_SORT [$$91(ASC), $$92(ASC), $$channelExecutionTime(ASC)] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$91, $$92, $$channelExecutionTime] |PARTITIONED|
-- PRE_SORTED_DISTINCT_BY |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- STABLE_SORT [$$79(ASC)] |PARTITIONED|
@@ -55,12 +55,11 @@
-- ASSIGN |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
-- STREAM_SELECT |PARTITIONED|
- -- ASSIGN |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
-- ASSIGN |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
- -- ASSIGN |PARTITIONED|
- -- STREAM_PROJECT |PARTITIONED|
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- DATASOURCE_SCAN |PARTITIONED|
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- EMPTY_TUPLE_SOURCE |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- DATASOURCE_SCAN |PARTITIONED|
+ -- BROADCAST_EXCHANGE |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
diff --git a/asterix-bad/src/test/resources/optimizerts/results/bad_cq/unseen_nearby_tweets.plan b/asterix-bad/src/test/resources/optimizerts/results/bad_cq/unseen_nearby_tweets.plan
index 5bf7f10..a6a17de 100644
--- a/asterix-bad/src/test/resources/optimizerts/results/bad_cq/unseen_nearby_tweets.plan
+++ b/asterix-bad/src/test/resources/optimizerts/results/bad_cq/unseen_nearby_tweets.plan
@@ -1,13 +1,13 @@
-- NOTIFY_BROKERS |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- PRE_CLUSTERED_GROUP_BY[$$125, $$126, $$channelExecutionTime] |PARTITIONED|
+ -- PRE_CLUSTERED_GROUP_BY[$$128, $$129, $$channelExecutionTime] |PARTITIONED|
{
-- AGGREGATE |LOCAL|
-- NESTED_TUPLE_SOURCE |LOCAL|
}
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- STABLE_SORT [$$125(ASC), $$126(ASC), $$channelExecutionTime(ASC)] |PARTITIONED|
- -- HASH_PARTITION_EXCHANGE [$$125, $$126, $$channelExecutionTime] |PARTITIONED|
+ -- STABLE_SORT [$$128(ASC), $$129(ASC), $$channelExecutionTime(ASC)] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$128, $$129, $$channelExecutionTime] |PARTITIONED|
-- PRE_SORTED_DISTINCT_BY |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- STABLE_SORT [$$99(ASC)] |PARTITIONED|
diff --git a/asterix-bad/src/test/resources/optimizerts/results/bad_rq/channel-subscribe.plan b/asterix-bad/src/test/resources/optimizerts/results/bad_rq/channel-subscribe.plan
index a5a40c1..017bd9e 100644
--- a/asterix-bad/src/test/resources/optimizerts/results/bad_rq/channel-subscribe.plan
+++ b/asterix-bad/src/test/resources/optimizerts/results/bad_rq/channel-subscribe.plan
@@ -1,17 +1,17 @@
-- NOTIFY_BROKERS |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- PRE_CLUSTERED_GROUP_BY[$$100, $$101, $$channelExecutionTime] |PARTITIONED|
+ -- PRE_CLUSTERED_GROUP_BY[$$106, $$107, $$channelExecutionTime] |PARTITIONED|
{
-- AGGREGATE |LOCAL|
-- NESTED_TUPLE_SOURCE |LOCAL|
}
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- STABLE_SORT [$$100(ASC), $$101(ASC), $$channelExecutionTime(ASC)] |PARTITIONED|
- -- HASH_PARTITION_EXCHANGE [$$100, $$101, $$channelExecutionTime] |PARTITIONED|
+ -- STABLE_SORT [$$106(ASC), $$107(ASC), $$channelExecutionTime(ASC)] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$106, $$107, $$channelExecutionTime] |PARTITIONED|
-- PRE_SORTED_DISTINCT_BY |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- STABLE_SORT [$$85(ASC)] |PARTITIONED|
- -- HASH_PARTITION_EXCHANGE [$$85] |PARTITIONED|
+ -- STABLE_SORT [$$91(ASC)] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$91] |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
-- COMMIT |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
@@ -21,7 +21,7 @@
-- STREAM_PROJECT |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- INSERT_DELETE |PARTITIONED|
- -- HASH_PARTITION_EXCHANGE [$$79] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$85] |PARTITIONED|
-- ASSIGN |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
-- ASSIGN |PARTITIONED|
@@ -35,8 +35,8 @@
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- HYBRID_HASH_JOIN [$$91, $$89][$$86, $$87] |PARTITIONED|
- -- HASH_PARTITION_EXCHANGE [$$91, $$89] |PARTITIONED|
+ -- HYBRID_HASH_JOIN [$$97, $$95][$$92, $$93] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$97, $$95] |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
-- ASSIGN |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
@@ -44,7 +44,7 @@
-- BROADCAST_EXCHANGE |PARTITIONED|
-- ASSIGN |UNPARTITIONED|
-- EMPTY_TUPLE_SOURCE |UNPARTITIONED|
- -- HASH_PARTITION_EXCHANGE [$$86, $$87] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$92, $$93] |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
-- ASSIGN |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
@@ -65,7 +65,7 @@
-- STREAM_PROJECT |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- INSERT_DELETE |PARTITIONED|
- -- HASH_PARTITION_EXCHANGE [$$13] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$14] |PARTITIONED|
-- ASSIGN |UNPARTITIONED|
-- STREAM_PROJECT |UNPARTITIONED|
-- ASSIGN |UNPARTITIONED|
diff --git a/asterix-bad/src/test/resources/runtimets/queries/bad_rq/add_index/add_index.5.query.sqlpp b/asterix-bad/src/test/resources/runtimets/queries/bad_rq/add_index/add_index.5.query.sqlpp
index eebfbc4..f5b5a03 100644
--- a/asterix-bad/src/test/resources/runtimets/queries/bad_rq/add_index/add_index.5.query.sqlpp
+++ b/asterix-bad/src/test/resources/runtimets/queries/bad_rq/add_index/add_index.5.query.sqlpp
@@ -19,4 +19,4 @@
use channels;
select value array_count(
-(select * from EmergencyChannelResults where deliveryTime > datetime("2017-05-02T17:52:59.570Z")));
\ No newline at end of file
+(select * from EmergencyChannelResults where deliveryTime > datetime("2017-05-02T17:52:59.570Z"))) > 1000;
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/runtimets/results/bad_rq/add_index/add_index.1.adm b/asterix-bad/src/test/resources/runtimets/results/bad_rq/add_index/add_index.1.adm
index 2e9bdd9..f32a580 100644
--- a/asterix-bad/src/test/resources/runtimets/results/bad_rq/add_index/add_index.1.adm
+++ b/asterix-bad/src/test/resources/runtimets/results/bad_rq/add_index/add_index.1.adm
@@ -1 +1 @@
-1074
\ No newline at end of file
+true
\ No newline at end of file