blob: abcd70d13ae458b4476745ef0761ea5b79ed311a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.asterix.bad.lang;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.asterix.algebra.operators.CommitOperator;
import org.apache.asterix.lang.common.base.Expression;
import org.apache.asterix.lang.common.struct.VarIdentifier;
import org.apache.asterix.metadata.declared.DatasetDataSource;
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.metadata.utils.DatasetUtil;
import org.apache.asterix.om.base.ADateTime;
import org.apache.asterix.om.base.ARecord;
import org.apache.asterix.om.base.IAObject;
import org.apache.asterix.om.constants.AsterixConstantValue;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.BuiltinType;
import org.apache.asterix.om.types.IAType;
import org.apache.asterix.translator.CompiledStatements;
import org.apache.asterix.translator.SqlppExpressionToPlanTranslator;
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
import org.apache.hyracks.algebricks.core.algebra.expressions.IAlgebricksConstantValue;
import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
import org.apache.hyracks.api.exceptions.SourceLocation;
import org.apache.hyracks.api.result.IResultMetadata;
/**
* This class overrides the SqlppExpressionToPlanTranslator in AsterixDB to allow insert/upsert to
* datasets with meta records (active datasets). If inserting/upserting into an active dataset, the plan
* translator would attach a dummy active records containing the current timestamp as the active timestamp.
* In the case of data feeds, this active timestamp would then be updated by
* BADLSMPrimaryInsertOperatorNodePushable/BADLSMPrimaryUpsertOperatorNodePushable.
* Updates to SqlppExpressionToPlanTranslator in the AsterixDB master needs to be propagated into this class when
* bringing the BAD codebase to latest master.
*
* IMPORTANT NOTE: Currently, we assume active datasets are the only user of the datasets with meta records.
* If one want to use meta datasets in the BAD branch in the future, this needs to be refactored.
*/
public class BADExpressionToPlanTranslator extends SqlppExpressionToPlanTranslator {
public BADExpressionToPlanTranslator(MetadataProvider metadataProvider, int currentVarCounter,
Map<VarIdentifier, IAObject> externalVars) throws AlgebricksException {
super(metadataProvider, currentVarCounter, externalVars);
}
@Override
protected ILogicalOperator translateDelete(DatasetDataSource targetDatasource, Mutable<ILogicalExpression> varRef,
List<Mutable<ILogicalExpression>> varRefsForLoading, LogicalVariable seqVar, ILogicalOperator pkeyAssignOp,
CompiledStatements.ICompiledDmlStatement stmt) throws AlgebricksException {
SourceLocation sourceLoc = stmt.getSourceLocation();
InsertDeleteUpsertOperator deleteOp;
LogicalVariable metaVar = null;
if (!targetDatasource.getDataset().hasMetaPart()) {
deleteOp = new InsertDeleteUpsertOperator(targetDatasource, varRef, varRefsForLoading,
InsertDeleteUpsertOperator.Kind.DELETE, false);
} else {
// prepare meta record
IAType metaType = metadataProvider.findMetaType(targetDatasource.getDataset());
metaVar = context.newVar();
AssignOperator metaVariableAssignOp =
new AssignOperator(metaVar, new MutableObject<>(makeMetaRecordExpr(metaType)));
metaVariableAssignOp.getInputs().add(new MutableObject<>(pkeyAssignOp));
metaVariableAssignOp.setSourceLocation(sourceLoc);
// create insert op uses meta record
deleteOp = new InsertDeleteUpsertOperator(targetDatasource, varRef, varRefsForLoading,
Collections.singletonList(new MutableObject<>(new VariableReferenceExpression(metaVar))),
InsertDeleteUpsertOperator.Kind.DELETE, false);
// change current inputOp to be meta op
pkeyAssignOp = metaVariableAssignOp;
}
deleteOp.getInputs().add(new MutableObject<>(pkeyAssignOp));
deleteOp.setSourceLocation(sourceLoc);
List<String> filterField = DatasetUtil.getFilterField(targetDatasource.getDataset());
List<Mutable<ILogicalExpression>> filterExprs = null;
Integer filterSourceIndicator = DatasetUtil.getFilterSourceIndicator(targetDatasource.getDataset());
if (filterField != null) {
filterExprs = generatedFilterExprs(deleteOp, filterField, filterSourceIndicator == 0 ? seqVar : metaVar,
sourceLoc);
}
deleteOp.setAdditionalFilteringExpressions(filterExprs);
DelegateOperator leafOperator = new DelegateOperator(new CommitOperator(true));
leafOperator.getInputs().add(new MutableObject<>(deleteOp));
leafOperator.setSourceLocation(sourceLoc);
return leafOperator;
}
@Override
protected ILogicalOperator translateUpsert(DatasetDataSource targetDatasource,
Mutable<ILogicalExpression> payloadVarRef, List<Mutable<ILogicalExpression>> varRefsForLoading,
ILogicalOperator pkeyAssignOp, LogicalVariable unnestVar, ILogicalOperator topOp,
List<Mutable<ILogicalExpression>> pkeyExprs, LogicalVariable seqVar,
CompiledStatements.ICompiledDmlStatement stmt, IResultMetadata resultMetadata) throws AlgebricksException {
SourceLocation sourceLoc = stmt.getSourceLocation();
CompiledStatements.CompiledUpsertStatement compiledUpsert = (CompiledStatements.CompiledUpsertStatement) stmt;
Expression returnExpression = compiledUpsert.getReturnExpression();
InsertDeleteUpsertOperator upsertOp;
ILogicalOperator rootOperator;
LogicalVariable metaVar = null;
ARecordType recordType = (ARecordType) targetDatasource.getItemType();
if (targetDatasource.getDataset().hasMetaPart()) {
IAType metaType = metadataProvider.findMetaType(targetDatasource.getDataset());
metaVar = context.newVar();
AssignOperator metaVariableAssignOp =
new AssignOperator(metaVar, new MutableObject<>(makeMetaRecordExpr(metaType)));
metaVariableAssignOp.getInputs().add(new MutableObject<>(pkeyAssignOp));
pkeyAssignOp = metaVariableAssignOp;
metaVariableAssignOp.setSourceLocation(sourceLoc);
List<Mutable<ILogicalExpression>> metaExprs = new ArrayList<>(1);
VariableReferenceExpression metaVarRef = new VariableReferenceExpression(metaVar);
metaExprs.add(new MutableObject<>(metaVarRef));
upsertOp = new InsertDeleteUpsertOperator(targetDatasource, payloadVarRef, varRefsForLoading, metaExprs,
InsertDeleteUpsertOperator.Kind.UPSERT, false);
// set previous meta vars
List<LogicalVariable> metaVars = new ArrayList<>();
metaVars.add(context.newVar());
upsertOp.setPrevAdditionalNonFilteringVars(metaVars);
List<Object> metaTypes = new ArrayList<>();
metaTypes.add(targetDatasource.getMetaItemType());
upsertOp.setPrevAdditionalNonFilteringTypes(metaTypes);
} else {
upsertOp = new InsertDeleteUpsertOperator(targetDatasource, payloadVarRef, varRefsForLoading,
InsertDeleteUpsertOperator.Kind.UPSERT, false);
}
// Create and add a new variable used for representing the original record
upsertOp.setUpsertIndicatorVar(context.newVar());
upsertOp.setUpsertIndicatorVarType(BuiltinType.ABOOLEAN);
upsertOp.setPrevRecordVar(context.newVar());
upsertOp.setPrevRecordType(recordType);
upsertOp.setSourceLocation(sourceLoc);
upsertOp.getInputs().add(new MutableObject<>(pkeyAssignOp));
List<String> filterField = DatasetUtil.getFilterField(targetDatasource.getDataset());
List<Mutable<ILogicalExpression>> filterExprs = null;
Integer filterSourceIndicator = DatasetUtil.getFilterSourceIndicator(targetDatasource.getDataset());
if (filterField != null) {
filterExprs = generatedFilterExprs(upsertOp, filterField, filterSourceIndicator == 0 ? seqVar : metaVar,
sourceLoc);
ARecordType filterSourceType = filterSourceIndicator == 0 ? (ARecordType) targetDatasource.getItemType()
: (ARecordType) targetDatasource.getMetaItemType();
upsertOp.setAdditionalFilteringExpressions(filterExprs);
upsertOp.setPrevFilterVar(context.newVar());
upsertOp.setPrevFilterType(filterSourceType.getFieldType(filterField.get(0)));
} else {
upsertOp.setAdditionalFilteringExpressions(null);
}
// Set up delegate operator
DelegateOperator delegateOperator = new DelegateOperator(new CommitOperator(returnExpression == null));
delegateOperator.getInputs().add(new MutableObject<>(upsertOp));
delegateOperator.setSourceLocation(sourceLoc);
rootOperator = delegateOperator;
// Compiles the return expression.
return processReturningExpression(rootOperator, upsertOp, compiledUpsert, resultMetadata);
}
@Override
protected ILogicalOperator translateInsert(DatasetDataSource targetDatasource, Mutable<ILogicalExpression> varRef,
List<Mutable<ILogicalExpression>> varRefsForLoading, LogicalVariable seqVar, ILogicalOperator pkeyAssignOp,
CompiledStatements.ICompiledDmlStatement stmt, IResultMetadata resultMetadata) throws AlgebricksException {
SourceLocation sourceLoc = stmt.getSourceLocation();
LogicalVariable metaVar = null;
InsertDeleteUpsertOperator insertOp;
if (!targetDatasource.getDataset().hasMetaPart()) {
insertOp = new InsertDeleteUpsertOperator(targetDatasource, varRef, varRefsForLoading,
InsertDeleteUpsertOperator.Kind.INSERT, false);
} else {
// prepare meta record
IAType metaType = metadataProvider.findMetaType(targetDatasource.getDataset());
metaVar = context.newVar();
AssignOperator metaVariableAssignOp =
new AssignOperator(metaVar, new MutableObject<>(makeMetaRecordExpr(metaType)));
metaVariableAssignOp.getInputs().add(new MutableObject<>(pkeyAssignOp));
metaVariableAssignOp.setSourceLocation(sourceLoc);
// create insert op uses meta record
insertOp = new InsertDeleteUpsertOperator(targetDatasource, varRef, varRefsForLoading,
Collections.singletonList(new MutableObject<>(new VariableReferenceExpression(metaVar))),
InsertDeleteUpsertOperator.Kind.INSERT, false);
// change current inputOp to be meta op
pkeyAssignOp = metaVariableAssignOp;
}
insertOp.getInputs().add(new MutableObject<>(pkeyAssignOp));
insertOp.setSourceLocation(sourceLoc);
List<String> filterField = DatasetUtil.getFilterField(targetDatasource.getDataset());
List<Mutable<ILogicalExpression>> filterExprs = null;
Integer filterSourceIndicator = DatasetUtil.getFilterSourceIndicator(targetDatasource.getDataset());
if (filterField != null) {
filterExprs = generatedFilterExprs(insertOp, filterField, filterSourceIndicator == 0 ? seqVar : metaVar,
sourceLoc);
}
insertOp.setAdditionalFilteringExpressions(filterExprs);
// Adds the commit operator.
CompiledStatements.CompiledInsertStatement compiledInsert = (CompiledStatements.CompiledInsertStatement) stmt;
Expression returnExpression = compiledInsert.getReturnExpression();
DelegateOperator rootOperator = new DelegateOperator(new CommitOperator(returnExpression == null));
rootOperator.getInputs().add(new MutableObject<>(insertOp));
rootOperator.setSourceLocation(sourceLoc);
// Compiles the return expression.
return processReturningExpression(rootOperator, insertOp, compiledInsert, resultMetadata);
}
private ILogicalExpression makeMetaRecordExpr(IAType metaRecordType) {
ARecord metaRecord =
new ARecord((ARecordType) metaRecordType, new IAObject[] { new ADateTime(System.currentTimeMillis()) });
IAlgebricksConstantValue metaConstantVal = new AsterixConstantValue(metaRecord);
ILogicalExpression expr = new ConstantExpression(metaConstantVal);
return expr;
}
}