| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.asterix.app.translator; |
| |
| import static org.apache.asterix.common.api.IIdentifierMapper.Modifier.PLURAL; |
| import static org.apache.asterix.common.utils.IdentifierUtil.dataset; |
| import static org.apache.asterix.common.utils.IdentifierUtil.dataverse; |
| |
| import java.io.File; |
| import java.io.FileInputStream; |
| import java.io.InputStream; |
| import java.rmi.RemoteException; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.Date; |
| import java.util.EnumSet; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.LinkedHashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Objects; |
| import java.util.Properties; |
| import java.util.Set; |
| import java.util.concurrent.ExecutorService; |
| |
| import org.apache.asterix.active.ActivityState; |
| import org.apache.asterix.active.EntityId; |
| import org.apache.asterix.active.IActiveEntityEventsListener; |
| import org.apache.asterix.active.NoRetryPolicyFactory; |
| import org.apache.asterix.algebra.extension.ExtensionStatement; |
| import org.apache.asterix.api.common.APIFramework; |
| import org.apache.asterix.api.http.server.AbstractQueryApiServlet; |
| import org.apache.asterix.api.http.server.ApiServlet; |
| import org.apache.asterix.app.active.ActiveEntityEventsListener; |
| import org.apache.asterix.app.active.ActiveNotificationHandler; |
| import org.apache.asterix.app.active.FeedEventsListener; |
| import org.apache.asterix.app.external.ExternalLibraryJobUtils; |
| import org.apache.asterix.app.result.ExecutionError; |
| import org.apache.asterix.app.result.ResultHandle; |
| import org.apache.asterix.app.result.ResultReader; |
| import org.apache.asterix.app.result.fields.ErrorsPrinter; |
| import org.apache.asterix.app.result.fields.ResultHandlePrinter; |
| import org.apache.asterix.app.result.fields.ResultsPrinter; |
| import org.apache.asterix.app.result.fields.StatusPrinter; |
| import org.apache.asterix.common.api.IClientRequest; |
| import org.apache.asterix.common.api.IMetadataLockManager; |
| import org.apache.asterix.common.api.IRequestTracker; |
| import org.apache.asterix.common.api.IResponsePrinter; |
| import org.apache.asterix.common.cluster.IClusterStateManager; |
| import org.apache.asterix.common.config.DatasetConfig.DatasetType; |
| import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp; |
| import org.apache.asterix.common.config.DatasetConfig.IndexType; |
| import org.apache.asterix.common.config.DatasetConfig.TransactionState; |
| import org.apache.asterix.common.config.GlobalConfig; |
| import org.apache.asterix.common.dataflow.ICcApplicationContext; |
| import org.apache.asterix.common.exceptions.ACIDException; |
| import org.apache.asterix.common.exceptions.AsterixException; |
| import org.apache.asterix.common.exceptions.CompilationException; |
| import org.apache.asterix.common.exceptions.ErrorCode; |
| import org.apache.asterix.common.exceptions.ExceptionUtils; |
| import org.apache.asterix.common.exceptions.MetadataException; |
| import org.apache.asterix.common.exceptions.RuntimeDataException; |
| import org.apache.asterix.common.exceptions.WarningCollector; |
| import org.apache.asterix.common.exceptions.WarningUtil; |
| import org.apache.asterix.common.external.IDataSourceAdapter; |
| import org.apache.asterix.common.functions.ExternalFunctionLanguage; |
| import org.apache.asterix.common.functions.FunctionSignature; |
| import org.apache.asterix.common.metadata.DataverseName; |
| import org.apache.asterix.common.metadata.IMetadataLockUtil; |
| import org.apache.asterix.common.utils.JobUtils; |
| import org.apache.asterix.common.utils.JobUtils.ProgressState; |
| import org.apache.asterix.common.utils.StorageConstants; |
| import org.apache.asterix.compiler.provider.ILangCompilationProvider; |
| import org.apache.asterix.external.dataset.adapter.AdapterIdentifier; |
| import org.apache.asterix.external.indexing.ExternalFile; |
| import org.apache.asterix.external.indexing.IndexingConstants; |
| import org.apache.asterix.external.operators.FeedIntakeOperatorNodePushable; |
| import org.apache.asterix.external.util.ExternalDataConstants; |
| import org.apache.asterix.external.util.ExternalDataUtils; |
| import org.apache.asterix.lang.common.base.Expression; |
| import org.apache.asterix.lang.common.base.IReturningStatement; |
| import org.apache.asterix.lang.common.base.IRewriterFactory; |
| import org.apache.asterix.lang.common.base.IStatementRewriter; |
| import org.apache.asterix.lang.common.base.Statement; |
| import org.apache.asterix.lang.common.expression.CallExpr; |
| import org.apache.asterix.lang.common.expression.IndexedTypeExpression; |
| import org.apache.asterix.lang.common.expression.LiteralExpr; |
| import org.apache.asterix.lang.common.expression.TypeExpression; |
| import org.apache.asterix.lang.common.expression.TypeReferenceExpression; |
| import org.apache.asterix.lang.common.literal.MissingLiteral; |
| import org.apache.asterix.lang.common.statement.AdapterDropStatement; |
| import org.apache.asterix.lang.common.statement.CompactStatement; |
| import org.apache.asterix.lang.common.statement.ConnectFeedStatement; |
| import org.apache.asterix.lang.common.statement.CreateAdapterStatement; |
| import org.apache.asterix.lang.common.statement.CreateDataverseStatement; |
| import org.apache.asterix.lang.common.statement.CreateFeedPolicyStatement; |
| import org.apache.asterix.lang.common.statement.CreateFeedStatement; |
| import org.apache.asterix.lang.common.statement.CreateFunctionStatement; |
| import org.apache.asterix.lang.common.statement.CreateIndexStatement; |
| import org.apache.asterix.lang.common.statement.CreateLibraryStatement; |
| import org.apache.asterix.lang.common.statement.CreateSynonymStatement; |
| import org.apache.asterix.lang.common.statement.DatasetDecl; |
| import org.apache.asterix.lang.common.statement.DataverseDecl; |
| import org.apache.asterix.lang.common.statement.DataverseDropStatement; |
| import org.apache.asterix.lang.common.statement.DeleteStatement; |
| import org.apache.asterix.lang.common.statement.DisconnectFeedStatement; |
| import org.apache.asterix.lang.common.statement.DropDatasetStatement; |
| import org.apache.asterix.lang.common.statement.ExternalDetailsDecl; |
| import org.apache.asterix.lang.common.statement.FeedDropStatement; |
| import org.apache.asterix.lang.common.statement.FeedPolicyDropStatement; |
| import org.apache.asterix.lang.common.statement.FunctionDecl; |
| import org.apache.asterix.lang.common.statement.FunctionDropStatement; |
| import org.apache.asterix.lang.common.statement.IndexDropStatement; |
| import org.apache.asterix.lang.common.statement.InsertStatement; |
| import org.apache.asterix.lang.common.statement.InternalDetailsDecl; |
| import org.apache.asterix.lang.common.statement.LibraryDropStatement; |
| import org.apache.asterix.lang.common.statement.LoadStatement; |
| import org.apache.asterix.lang.common.statement.NodeGroupDropStatement; |
| import org.apache.asterix.lang.common.statement.NodegroupDecl; |
| import org.apache.asterix.lang.common.statement.Query; |
| import org.apache.asterix.lang.common.statement.RefreshExternalDatasetStatement; |
| import org.apache.asterix.lang.common.statement.SetStatement; |
| import org.apache.asterix.lang.common.statement.StartFeedStatement; |
| import org.apache.asterix.lang.common.statement.StopFeedStatement; |
| import org.apache.asterix.lang.common.statement.SynonymDropStatement; |
| import org.apache.asterix.lang.common.statement.TypeDecl; |
| import org.apache.asterix.lang.common.statement.TypeDropStatement; |
| import org.apache.asterix.lang.common.statement.WriteStatement; |
| import org.apache.asterix.lang.common.struct.Identifier; |
| import org.apache.asterix.lang.common.struct.VarIdentifier; |
| import org.apache.asterix.lang.common.util.FunctionUtil; |
| import org.apache.asterix.metadata.IDatasetDetails; |
| import org.apache.asterix.metadata.MetadataManager; |
| import org.apache.asterix.metadata.MetadataTransactionContext; |
| import org.apache.asterix.metadata.bootstrap.MetadataBuiltinEntities; |
| import org.apache.asterix.metadata.dataset.hints.DatasetHints; |
| import org.apache.asterix.metadata.dataset.hints.DatasetHints.DatasetNodegroupCardinalityHint; |
| import org.apache.asterix.metadata.declared.MetadataProvider; |
| import org.apache.asterix.metadata.entities.BuiltinTypeMap; |
| import org.apache.asterix.metadata.entities.CompactionPolicy; |
| import org.apache.asterix.metadata.entities.Dataset; |
| import org.apache.asterix.metadata.entities.DatasourceAdapter; |
| import org.apache.asterix.metadata.entities.Datatype; |
| import org.apache.asterix.metadata.entities.Dataverse; |
| import org.apache.asterix.metadata.entities.ExternalDatasetDetails; |
| import org.apache.asterix.metadata.entities.Feed; |
| import org.apache.asterix.metadata.entities.FeedConnection; |
| import org.apache.asterix.metadata.entities.FeedPolicyEntity; |
| import org.apache.asterix.metadata.entities.Function; |
| import org.apache.asterix.metadata.entities.Index; |
| import org.apache.asterix.metadata.entities.InternalDatasetDetails; |
| import org.apache.asterix.metadata.entities.Library; |
| import org.apache.asterix.metadata.entities.NodeGroup; |
| import org.apache.asterix.metadata.entities.Synonym; |
| import org.apache.asterix.metadata.feeds.FeedMetadataUtil; |
| import org.apache.asterix.metadata.functions.ExternalFunctionCompilerUtil; |
| import org.apache.asterix.metadata.lock.ExternalDatasetsRegistry; |
| import org.apache.asterix.metadata.utils.DatasetUtil; |
| import org.apache.asterix.metadata.utils.ExternalIndexingOperations; |
| import org.apache.asterix.metadata.utils.IndexUtil; |
| import org.apache.asterix.metadata.utils.KeyFieldTypeUtil; |
| import org.apache.asterix.metadata.utils.MetadataConstants; |
| import org.apache.asterix.metadata.utils.MetadataUtil; |
| import org.apache.asterix.metadata.utils.TypeUtil; |
| import org.apache.asterix.om.base.IAObject; |
| import org.apache.asterix.om.types.ARecordType; |
| import org.apache.asterix.om.types.ATypeTag; |
| import org.apache.asterix.om.types.BuiltinType; |
| import org.apache.asterix.om.types.IAType; |
| import org.apache.asterix.om.types.TypeSignature; |
| import org.apache.asterix.transaction.management.service.transaction.DatasetIdFactory; |
| import org.apache.asterix.translator.AbstractLangTranslator; |
| import org.apache.asterix.translator.ClientRequest; |
| import org.apache.asterix.translator.CompiledStatements.CompiledDeleteStatement; |
| import org.apache.asterix.translator.CompiledStatements.CompiledInsertStatement; |
| import org.apache.asterix.translator.CompiledStatements.CompiledLoadFromFileStatement; |
| import org.apache.asterix.translator.CompiledStatements.CompiledUpsertStatement; |
| import org.apache.asterix.translator.CompiledStatements.ICompiledDmlStatement; |
| import org.apache.asterix.translator.ExecutionPlans; |
| import org.apache.asterix.translator.ExecutionPlansHtmlPrintUtil; |
| import org.apache.asterix.translator.IRequestParameters; |
| import org.apache.asterix.translator.IStatementExecutor; |
| import org.apache.asterix.translator.SchedulableClientRequest; |
| import org.apache.asterix.translator.SessionConfig; |
| import org.apache.asterix.translator.SessionOutput; |
| import org.apache.asterix.translator.TypeTranslator; |
| import org.apache.asterix.translator.util.ValidateUtil; |
| import org.apache.asterix.utils.DataverseUtil; |
| import org.apache.asterix.utils.FeedOperations; |
| import org.apache.asterix.utils.FlushDatasetUtil; |
| import org.apache.commons.lang3.StringUtils; |
| import org.apache.commons.lang3.mutable.Mutable; |
| import org.apache.commons.lang3.mutable.MutableBoolean; |
| import org.apache.commons.lang3.mutable.MutableObject; |
| import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; |
| import org.apache.hyracks.algebricks.common.utils.Pair; |
| import org.apache.hyracks.algebricks.common.utils.Triple; |
| import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression.FunctionKind; |
| import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; |
| import org.apache.hyracks.algebricks.data.IAWriterFactory; |
| import org.apache.hyracks.algebricks.data.IResultSerializerFactoryProvider; |
| import org.apache.hyracks.algebricks.runtime.serializer.ResultSerializerFactoryProvider; |
| import org.apache.hyracks.algebricks.runtime.writers.PrinterBasedWriterFactory; |
| import org.apache.hyracks.api.client.IClusterInfoCollector; |
| import org.apache.hyracks.api.client.IHyracksClientConnection; |
| import org.apache.hyracks.api.exceptions.HyracksDataException; |
| import org.apache.hyracks.api.exceptions.IWarningCollector; |
| import org.apache.hyracks.api.exceptions.SourceLocation; |
| import org.apache.hyracks.api.exceptions.Warning; |
| import org.apache.hyracks.api.io.FileSplit; |
| import org.apache.hyracks.api.io.UnmanagedFileSplit; |
| import org.apache.hyracks.api.job.JobFlag; |
| import org.apache.hyracks.api.job.JobId; |
| import org.apache.hyracks.api.job.JobSpecification; |
| import org.apache.hyracks.api.result.IResultSet; |
| import org.apache.hyracks.api.result.ResultSetId; |
| import org.apache.hyracks.control.cc.ClusterControllerService; |
| import org.apache.hyracks.control.common.controllers.CCConfig; |
| import org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor.DropOption; |
| import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory; |
| import org.apache.logging.log4j.Level; |
| import org.apache.logging.log4j.LogManager; |
| import org.apache.logging.log4j.Logger; |
| |
| /* |
| * Provides functionality for executing a batch of Query statements (queries included) |
| * sequentially. |
| */ |
| public class QueryTranslator extends AbstractLangTranslator implements IStatementExecutor { |
| |
| private static final Logger LOGGER = LogManager.getLogger(); |
| |
| public static final boolean IS_DEBUG_MODE = false;// true |
| protected final List<Statement> statements; |
| protected final ICcApplicationContext appCtx; |
| protected final SessionOutput sessionOutput; |
| protected final SessionConfig sessionConfig; |
| protected Dataverse activeDataverse; |
| protected final List<FunctionDecl> declaredFunctions; |
| protected final ILangCompilationProvider compilationProvider; |
| protected final APIFramework apiFramework; |
| protected final IRewriterFactory rewriterFactory; |
| protected final ExecutorService executorService; |
| protected final EnumSet<JobFlag> jobFlags = EnumSet.noneOf(JobFlag.class); |
| protected final IMetadataLockManager lockManager; |
| protected final IMetadataLockUtil lockUtil; |
| protected final IResponsePrinter responsePrinter; |
| protected final WarningCollector warningCollector; |
| |
| public QueryTranslator(ICcApplicationContext appCtx, List<Statement> statements, SessionOutput output, |
| ILangCompilationProvider compilationProvider, ExecutorService executorService, |
| IResponsePrinter responsePrinter) { |
| this.appCtx = appCtx; |
| this.lockManager = appCtx.getMetadataLockManager(); |
| this.lockUtil = appCtx.getMetadataLockUtil(); |
| this.statements = statements; |
| this.sessionOutput = output; |
| this.sessionConfig = output.config(); |
| this.compilationProvider = compilationProvider; |
| declaredFunctions = new ArrayList<>(); |
| apiFramework = new APIFramework(compilationProvider); |
| rewriterFactory = compilationProvider.getRewriterFactory(); |
| activeDataverse = MetadataBuiltinEntities.DEFAULT_DATAVERSE; |
| this.executorService = executorService; |
| this.responsePrinter = responsePrinter; |
| this.warningCollector = new WarningCollector(); |
| if (appCtx.getServiceContext().getAppConfig().getBoolean(CCConfig.Option.ENFORCE_FRAME_WRITER_PROTOCOL)) { |
| this.jobFlags.add(JobFlag.ENFORCE_CONTRACT); |
| } |
| } |
| |
| public SessionOutput getSessionOutput() { |
| return sessionOutput; |
| } |
| |
| public IWarningCollector getWarningCollector() { |
| return warningCollector; |
| } |
| |
| @Override |
| public void compileAndExecute(IHyracksClientConnection hcc, IRequestParameters requestParameters) throws Exception { |
| validateStatements(requestParameters); |
| trackRequest(requestParameters); |
| int resultSetIdCounter = 0; |
| FileSplit outputFile = null; |
| IAWriterFactory writerFactory = PrinterBasedWriterFactory.INSTANCE; |
| IResultSerializerFactoryProvider resultSerializerFactoryProvider = ResultSerializerFactoryProvider.INSTANCE; |
| String threadName = Thread.currentThread().getName(); |
| Thread.currentThread().setName( |
| QueryTranslator.class.getSimpleName() + ":" + requestParameters.getRequestReference().getUuid()); |
| Map<String, String> config = new HashMap<>(); |
| final IResultSet resultSet = requestParameters.getResultSet(); |
| final ResultDelivery resultDelivery = requestParameters.getResultProperties().getDelivery(); |
| final long maxResultReads = requestParameters.getResultProperties().getMaxReads(); |
| final Stats stats = requestParameters.getStats(); |
| final StatementProperties statementProperties = requestParameters.getStatementProperties(); |
| final ResultMetadata outMetadata = requestParameters.getOutMetadata(); |
| final Map<String, IAObject> stmtParams = requestParameters.getStatementParameters(); |
| warningCollector.setMaxWarnings(sessionConfig.getMaxWarnings()); |
| try { |
| for (Statement stmt : statements) { |
| if (sessionConfig.is(SessionConfig.FORMAT_HTML)) { |
| sessionOutput.out().println(ApiServlet.HTML_STATEMENT_SEPARATOR); |
| } |
| validateOperation(appCtx, activeDataverse, stmt); |
| MetadataProvider metadataProvider = MetadataProvider.create(appCtx, activeDataverse); |
| metadataProvider.getConfig().putAll(config); |
| metadataProvider.setWriterFactory(writerFactory); |
| metadataProvider.setResultSerializerFactoryProvider(resultSerializerFactoryProvider); |
| metadataProvider.setOutputFile(outputFile); |
| IStatementRewriter stmtRewriter = rewriterFactory.createStatementRewriter(); |
| rewriteStatement(stmt, stmtRewriter, metadataProvider); // Rewrite the statement's AST. |
| Statement.Kind kind = stmt.getKind(); |
| statementProperties.setKind(kind); |
| switch (kind) { |
| case SET: |
| handleSetStatement(stmt, config); |
| break; |
| case DATAVERSE_DECL: |
| activeDataverse = handleUseDataverseStatement(metadataProvider, stmt); |
| break; |
| case CREATE_DATAVERSE: |
| handleCreateDataverseStatement(metadataProvider, stmt, requestParameters); |
| break; |
| case DATASET_DECL: |
| handleCreateDatasetStatement(metadataProvider, stmt, hcc, requestParameters); |
| break; |
| case CREATE_INDEX: |
| handleCreateIndexStatement(metadataProvider, stmt, hcc, requestParameters); |
| break; |
| case TYPE_DECL: |
| handleCreateTypeStatement(metadataProvider, stmt); |
| break; |
| case NODEGROUP_DECL: |
| handleCreateNodeGroupStatement(metadataProvider, stmt); |
| break; |
| case DATAVERSE_DROP: |
| handleDataverseDropStatement(metadataProvider, stmt, hcc, requestParameters); |
| break; |
| case DATASET_DROP: |
| handleDatasetDropStatement(metadataProvider, stmt, hcc, requestParameters); |
| break; |
| case INDEX_DROP: |
| handleIndexDropStatement(metadataProvider, stmt, hcc, requestParameters); |
| break; |
| case TYPE_DROP: |
| handleTypeDropStatement(metadataProvider, stmt); |
| break; |
| case NODEGROUP_DROP: |
| handleNodegroupDropStatement(metadataProvider, stmt); |
| break; |
| case CREATE_ADAPTER: |
| handleCreateAdapterStatement(metadataProvider, stmt); |
| break; |
| case ADAPTER_DROP: |
| handleAdapterDropStatement(metadataProvider, stmt); |
| break; |
| case CREATE_FUNCTION: |
| handleCreateFunctionStatement(metadataProvider, stmt, stmtRewriter, requestParameters); |
| break; |
| case FUNCTION_DROP: |
| handleFunctionDropStatement(metadataProvider, stmt, requestParameters); |
| break; |
| case CREATE_LIBRARY: |
| handleCreateLibraryStatement(metadataProvider, stmt, hcc, requestParameters); |
| break; |
| case LIBRARY_DROP: |
| handleLibraryDropStatement(metadataProvider, stmt, hcc, requestParameters); |
| break; |
| case CREATE_SYNONYM: |
| handleCreateSynonymStatement(metadataProvider, stmt); |
| break; |
| case SYNONYM_DROP: |
| handleDropSynonymStatement(metadataProvider, stmt); |
| break; |
| case LOAD: |
| handleLoadStatement(metadataProvider, stmt, hcc); |
| break; |
| case INSERT: |
| case UPSERT: |
| if (((InsertStatement) stmt).getReturnExpression() != null) { |
| metadataProvider.setResultSetId(new ResultSetId(resultSetIdCounter++)); |
| metadataProvider.setResultAsyncMode(resultDelivery == ResultDelivery.ASYNC |
| || resultDelivery == ResultDelivery.DEFERRED); |
| metadataProvider.setMaxResultReads(maxResultReads); |
| } |
| handleInsertUpsertStatement(metadataProvider, stmt, hcc, resultSet, resultDelivery, outMetadata, |
| stats, false, requestParameters, stmtParams, stmtRewriter); |
| break; |
| case DELETE: |
| handleDeleteStatement(metadataProvider, stmt, hcc, false, stmtParams, stmtRewriter); |
| break; |
| case CREATE_FEED: |
| handleCreateFeedStatement(metadataProvider, stmt); |
| break; |
| case DROP_FEED: |
| handleDropFeedStatement(metadataProvider, stmt, hcc); |
| break; |
| case DROP_FEED_POLICY: |
| handleDropFeedPolicyStatement(metadataProvider, stmt); |
| break; |
| case CONNECT_FEED: |
| handleConnectFeedStatement(metadataProvider, stmt); |
| break; |
| case DISCONNECT_FEED: |
| handleDisconnectFeedStatement(metadataProvider, stmt); |
| break; |
| case START_FEED: |
| handleStartFeedStatement(metadataProvider, stmt, hcc); |
| break; |
| case STOP_FEED: |
| handleStopFeedStatement(metadataProvider, stmt); |
| break; |
| case CREATE_FEED_POLICY: |
| handleCreateFeedPolicyStatement(metadataProvider, stmt); |
| break; |
| case QUERY: |
| metadataProvider.setResultSetId(new ResultSetId(resultSetIdCounter++)); |
| metadataProvider.setResultAsyncMode( |
| resultDelivery == ResultDelivery.ASYNC || resultDelivery == ResultDelivery.DEFERRED); |
| metadataProvider.setMaxResultReads(maxResultReads); |
| if (stats.getProfileType() == Stats.ProfileType.FULL) { |
| this.jobFlags.add(JobFlag.PROFILE_RUNTIME); |
| } |
| handleQuery(metadataProvider, (Query) stmt, hcc, resultSet, resultDelivery, outMetadata, stats, |
| requestParameters, stmtParams, stmtRewriter); |
| break; |
| case COMPACT: |
| handleCompactStatement(metadataProvider, stmt, hcc); |
| break; |
| case EXTERNAL_DATASET_REFRESH: |
| handleExternalDatasetRefreshStatement(metadataProvider, stmt, hcc); |
| break; |
| case WRITE: |
| Pair<IAWriterFactory, FileSplit> result = handleWriteStatement(stmt); |
| writerFactory = (result.first != null) ? result.first : writerFactory; |
| outputFile = result.second; |
| break; |
| case FUNCTION_DECL: |
| handleDeclareFunctionStatement(metadataProvider, stmt); |
| break; |
| case EXTENSION: |
| final ExtensionStatement extStmt = (ExtensionStatement) stmt; |
| statementProperties.setName(extStmt.getName()); |
| extStmt.handle(hcc, this, requestParameters, metadataProvider, resultSetIdCounter); |
| break; |
| default: |
| throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_STATE, stmt.getSourceLocation(), |
| "Unexpected statement: " + kind); |
| } |
| } |
| } finally { |
| // async queries are completed after their job completes |
| if (ResultDelivery.ASYNC != resultDelivery) { |
| appCtx.getRequestTracker().complete(requestParameters.getRequestReference().getUuid()); |
| } |
| Thread.currentThread().setName(threadName); |
| } |
| } |
| |
| protected void handleSetStatement(Statement stmt, Map<String, String> config) throws CompilationException { |
| SetStatement ss = (SetStatement) stmt; |
| String pname = ss.getPropName(); |
| String pvalue = ss.getPropValue(); |
| if (pname.startsWith(APIFramework.PREFIX_INTERNAL_PARAMETERS)) { |
| throw new CompilationException(ErrorCode.ILLEGAL_SET_PARAMETER, pname); |
| } |
| config.put(pname, pvalue); |
| } |
| |
| protected Pair<IAWriterFactory, FileSplit> handleWriteStatement(Statement stmt) |
| throws InstantiationException, IllegalAccessException, ClassNotFoundException { |
| WriteStatement ws = (WriteStatement) stmt; |
| File f = new File(ws.getFileName()); |
| FileSplit outputFile = new UnmanagedFileSplit(ws.getNcName().getValue(), f.getPath()); |
| IAWriterFactory writerFactory = null; |
| if (ws.getWriterClassName() != null) { |
| writerFactory = (IAWriterFactory) Class.forName(ws.getWriterClassName()).newInstance(); |
| } |
| return new Pair<>(writerFactory, outputFile); |
| } |
| |
| protected Dataverse handleUseDataverseStatement(MetadataProvider metadataProvider, Statement stmt) |
| throws Exception { |
| DataverseDecl dvd = (DataverseDecl) stmt; |
| DataverseName dvName = dvd.getDataverseName(); |
| metadataProvider.validateDataverseName(dvName, dvd.getSourceLocation()); |
| lockManager.acquireDataverseReadLock(metadataProvider.getLocks(), dvName); |
| try { |
| return doUseDataverseStatement(metadataProvider, dvd); |
| } finally { |
| metadataProvider.getLocks().unlock(); |
| } |
| } |
| |
| protected Dataverse doUseDataverseStatement(MetadataProvider metadataProvider, DataverseDecl stmtUseDataverse) |
| throws Exception { |
| MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| try { |
| DataverseName dvName = stmtUseDataverse.getDataverseName(); |
| Dataverse dv = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), dvName); |
| if (dv == null) { |
| throw new MetadataException(ErrorCode.UNKNOWN_DATAVERSE, stmtUseDataverse.getSourceLocation(), dvName); |
| } |
| MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| return dv; |
| } catch (Exception e) { |
| abort(e, e, mdTxnCtx); |
| throw e; |
| } |
| } |
| |
| protected void handleCreateDataverseStatement(MetadataProvider metadataProvider, Statement stmt, |
| IRequestParameters requestParameters) throws Exception { |
| CreateDataverseStatement stmtCreateDataverse = (CreateDataverseStatement) stmt; |
| DataverseName dvName = stmtCreateDataverse.getDataverseName(); |
| metadataProvider.validateDataverseName(dvName, stmtCreateDataverse.getSourceLocation()); |
| lockUtil.createDataverseBegin(lockManager, metadataProvider.getLocks(), dvName); |
| try { |
| doCreateDataverseStatement(metadataProvider, stmtCreateDataverse, requestParameters); |
| } finally { |
| metadataProvider.getLocks().unlock(); |
| } |
| } |
| |
| @SuppressWarnings("squid:S00112") |
| protected boolean doCreateDataverseStatement(MetadataProvider metadataProvider, |
| CreateDataverseStatement stmtCreateDataverse, IRequestParameters requestParameters) throws Exception { |
| MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| try { |
| DataverseName dvName = stmtCreateDataverse.getDataverseName(); |
| Dataverse dv = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), dvName); |
| if (dv != null) { |
| if (stmtCreateDataverse.getIfNotExists()) { |
| MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| return false; |
| } else { |
| throw new CompilationException(ErrorCode.DATAVERSE_EXISTS, stmtCreateDataverse.getSourceLocation(), |
| dvName); |
| } |
| } |
| MetadataManager.INSTANCE.addDataverse(metadataProvider.getMetadataTxnContext(), |
| new Dataverse(dvName, stmtCreateDataverse.getFormat(), MetadataUtil.PENDING_NO_OP)); |
| MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| return true; |
| } catch (Exception e) { |
| abort(e, e, mdTxnCtx); |
| throw e; |
| } |
| } |
| |
| protected static void validateCompactionPolicy(String compactionPolicy, |
| Map<String, String> compactionPolicyProperties, MetadataTransactionContext mdTxnCtx, |
| boolean isExternalDataset, SourceLocation sourceLoc) throws CompilationException, Exception { |
| CompactionPolicy compactionPolicyEntity = MetadataManager.INSTANCE.getCompactionPolicy(mdTxnCtx, |
| MetadataConstants.METADATA_DATAVERSE_NAME, compactionPolicy); |
| if (compactionPolicyEntity == null) { |
| throw new CompilationException(ErrorCode.COMPILATION_ERROR, sourceLoc, |
| "Unknown compaction policy: " + compactionPolicy); |
| } |
| String compactionPolicyFactoryClassName = compactionPolicyEntity.getClassName(); |
| ILSMMergePolicyFactory mergePolicyFactory = |
| (ILSMMergePolicyFactory) Class.forName(compactionPolicyFactoryClassName).newInstance(); |
| if (isExternalDataset && mergePolicyFactory.getName().compareTo("correlated-prefix") == 0) { |
| throw new CompilationException(ErrorCode.COMPILATION_ERROR, sourceLoc, |
| "The correlated-prefix merge policy cannot be used with external " + dataset(PLURAL)); |
| } |
| if (compactionPolicyProperties == null) { |
| if (mergePolicyFactory.getName().compareTo("no-merge") != 0) { |
| throw new CompilationException(ErrorCode.COMPILATION_ERROR, sourceLoc, |
| "Compaction policy properties are missing."); |
| } |
| } else { |
| for (Map.Entry<String, String> entry : compactionPolicyProperties.entrySet()) { |
| if (!mergePolicyFactory.getPropertiesNames().contains(entry.getKey())) { |
| throw new CompilationException(ErrorCode.COMPILATION_ERROR, sourceLoc, |
| "Invalid compaction policy property: " + entry.getKey()); |
| } |
| } |
| for (String p : mergePolicyFactory.getPropertiesNames()) { |
| if (!compactionPolicyProperties.containsKey(p)) { |
| throw new CompilationException(ErrorCode.COMPILATION_ERROR, sourceLoc, |
| "Missing compaction policy property: " + p); |
| } |
| } |
| } |
| } |
| |
| public void handleCreateDatasetStatement(MetadataProvider metadataProvider, Statement stmt, |
| IHyracksClientConnection hcc, IRequestParameters requestParameters) throws Exception { |
| DatasetDecl dd = (DatasetDecl) stmt; |
| String datasetName = dd.getName().getValue(); |
| metadataProvider.validateDatabaseObjectName(dd.getDataverse(), datasetName, stmt.getSourceLocation()); |
| DataverseName dataverseName = getActiveDataverseName(dd.getDataverse()); |
| TypeExpression itemTypeExpr = dd.getItemType(); |
| DataverseName itemTypeDataverseName; |
| String itemTypeName; |
| boolean itemTypeAnonymous; |
| switch (itemTypeExpr.getTypeKind()) { |
| case TYPEREFERENCE: |
| TypeReferenceExpression itemTypeRefExpr = (TypeReferenceExpression) itemTypeExpr; |
| Pair<DataverseName, Identifier> itemTypeIdent = itemTypeRefExpr.getIdent(); |
| itemTypeDataverseName = itemTypeIdent.first != null ? itemTypeIdent.first : dataverseName; |
| itemTypeName = itemTypeRefExpr.getIdent().second.getValue(); |
| itemTypeAnonymous = false; |
| break; |
| case RECORD: |
| itemTypeDataverseName = dataverseName; |
| itemTypeName = TypeUtil.createDatasetInlineTypeName(datasetName, false); |
| itemTypeAnonymous = true; |
| break; |
| default: |
| throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_STATE, stmt.getSourceLocation(), |
| String.valueOf(itemTypeExpr.getTypeKind())); |
| } |
| |
| TypeExpression metaItemTypeExpr = dd.getMetaItemType(); |
| DataverseName metaItemTypeDataverseName = null; |
| String metaItemTypeName = null; |
| boolean metaItemTypeAnonymous; |
| if (metaItemTypeExpr != null) { |
| switch (metaItemTypeExpr.getTypeKind()) { |
| case TYPEREFERENCE: |
| TypeReferenceExpression metaItemTypeRefExpr = (TypeReferenceExpression) metaItemTypeExpr; |
| Pair<DataverseName, Identifier> metaItemTypeIdent = metaItemTypeRefExpr.getIdent(); |
| metaItemTypeDataverseName = |
| metaItemTypeIdent.first != null ? metaItemTypeIdent.first : dataverseName; |
| metaItemTypeName = metaItemTypeRefExpr.getIdent().second.getValue(); |
| metaItemTypeAnonymous = false; |
| break; |
| case RECORD: |
| metaItemTypeDataverseName = dataverseName; |
| metaItemTypeName = TypeUtil.createDatasetInlineTypeName(datasetName, true); |
| metaItemTypeAnonymous = true; |
| break; |
| default: |
| throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_STATE, stmt.getSourceLocation(), |
| String.valueOf(metaItemTypeExpr.getTypeKind())); |
| } |
| } else { |
| metaItemTypeAnonymous = true; // doesn't matter |
| } |
| |
| String nodegroupName = dd.getNodegroupName(); |
| String compactionPolicy = dd.getCompactionPolicy(); |
| boolean defaultCompactionPolicy = compactionPolicy == null; |
| |
| lockUtil.createDatasetBegin(lockManager, metadataProvider.getLocks(), dataverseName, datasetName, |
| itemTypeDataverseName, itemTypeName, itemTypeAnonymous, metaItemTypeDataverseName, metaItemTypeName, |
| metaItemTypeAnonymous, nodegroupName, compactionPolicy, defaultCompactionPolicy, dd.getDatasetType(), |
| dd.getDatasetDetailsDecl()); |
| try { |
| doCreateDatasetStatement(metadataProvider, dd, dataverseName, datasetName, itemTypeDataverseName, |
| itemTypeExpr, itemTypeName, metaItemTypeExpr, metaItemTypeDataverseName, metaItemTypeName, hcc, |
| requestParameters); |
| } finally { |
| metadataProvider.getLocks().unlock(); |
| } |
| } |
| |
| protected void doCreateDatasetStatement(MetadataProvider metadataProvider, DatasetDecl dd, |
| DataverseName dataverseName, String datasetName, DataverseName itemTypeDataverseName, |
| TypeExpression itemTypeExpr, String itemTypeName, TypeExpression metaItemTypeExpr, |
| DataverseName metaItemTypeDataverseName, String metaItemTypeName, IHyracksClientConnection hcc, |
| IRequestParameters requestParameters) throws Exception { |
| MutableObject<ProgressState> progress = new MutableObject<>(ProgressState.NO_PROGRESS); |
| SourceLocation sourceLoc = dd.getSourceLocation(); |
| DatasetType dsType = dd.getDatasetType(); |
| String ngNameId = dd.getNodegroupName(); |
| String compactionPolicy = dd.getCompactionPolicy(); |
| Map<String, String> compactionPolicyProperties = dd.getCompactionPolicyProperties(); |
| String compressionScheme = metadataProvider.getCompressionManager() |
| .getDdlOrDefaultCompressionScheme(dd.getDatasetCompressionScheme()); |
| MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| boolean bActiveTxn = true; |
| metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| Dataset dataset = null; |
| Datatype itemTypeEntity = null, metaItemTypeEntity = null; |
| boolean itemTypeAdded = false, metaItemTypeAdded = false; |
| try { |
| // Check if the dataverse exists |
| Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName); |
| if (dv == null) { |
| throw new CompilationException(ErrorCode.UNKNOWN_DATAVERSE, sourceLoc, dataverseName); |
| } |
| |
| IDatasetDetails datasetDetails; |
| Dataset ds = metadataProvider.findDataset(dataverseName, datasetName); |
| if (ds != null) { |
| if (dd.getIfNotExists()) { |
| MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| return; |
| } else { |
| throw new CompilationException(ErrorCode.DATASET_EXISTS, sourceLoc, datasetName, dataverseName); |
| } |
| } |
| |
| IAType itemType; |
| boolean itemTypeIsInline = false; |
| switch (itemTypeExpr.getTypeKind()) { |
| case TYPEREFERENCE: |
| itemTypeEntity = metadataProvider.findTypeEntity(itemTypeDataverseName, itemTypeName); |
| if (itemTypeEntity == null || itemTypeEntity.getIsAnonymous()) { |
| // anonymous types cannot be referred from CREATE DATASET |
| throw new AsterixException(ErrorCode.UNKNOWN_TYPE, sourceLoc, |
| DatasetUtil.getFullyQualifiedDisplayName(itemTypeDataverseName, itemTypeName)); |
| } |
| itemType = itemTypeEntity.getDatatype(); |
| validateDatasetItemType(dsType, itemType, false, sourceLoc); |
| break; |
| case RECORD: |
| itemType = translateType(itemTypeDataverseName, itemTypeName, itemTypeExpr, mdTxnCtx); |
| validateDatasetItemType(dsType, itemType, false, sourceLoc); |
| itemTypeEntity = new Datatype(itemTypeDataverseName, itemTypeName, itemType, true); |
| itemTypeIsInline = true; |
| break; |
| default: |
| throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_STATE, sourceLoc, |
| String.valueOf(itemTypeExpr.getTypeKind())); |
| } |
| String ngName = ngNameId != null ? ngNameId |
| : configureNodegroupForDataset(appCtx, dd.getHints(), dataverseName, datasetName, metadataProvider, |
| sourceLoc); |
| |
| if (compactionPolicy == null) { |
| compactionPolicy = StorageConstants.DEFAULT_COMPACTION_POLICY_NAME; |
| compactionPolicyProperties = StorageConstants.DEFAULT_COMPACTION_POLICY_PROPERTIES; |
| } else { |
| validateCompactionPolicy(compactionPolicy, compactionPolicyProperties, mdTxnCtx, false, sourceLoc); |
| } |
| |
| IAType metaItemType = null; |
| boolean metaItemTypeIsInline = false; |
| switch (dsType) { |
| case INTERNAL: |
| if (metaItemTypeExpr != null) { |
| switch (metaItemTypeExpr.getTypeKind()) { |
| case TYPEREFERENCE: |
| metaItemTypeEntity = |
| metadataProvider.findTypeEntity(metaItemTypeDataverseName, metaItemTypeName); |
| if (metaItemTypeEntity == null || metaItemTypeEntity.getIsAnonymous()) { |
| // anonymous types cannot be referred from CREATE DATASET |
| throw new AsterixException(ErrorCode.UNKNOWN_TYPE, sourceLoc, DatasetUtil |
| .getFullyQualifiedDisplayName(metaItemTypeDataverseName, metaItemTypeName)); |
| } |
| metaItemType = metaItemTypeEntity.getDatatype(); |
| validateDatasetItemType(dsType, metaItemType, true, sourceLoc); |
| break; |
| case RECORD: |
| metaItemType = translateType(metaItemTypeDataverseName, metaItemTypeName, |
| metaItemTypeExpr, mdTxnCtx); |
| validateDatasetItemType(dsType, metaItemType, true, sourceLoc); |
| metaItemTypeEntity = |
| new Datatype(metaItemTypeDataverseName, metaItemTypeName, metaItemType, true); |
| metaItemTypeIsInline = true; |
| break; |
| default: |
| throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_STATE, sourceLoc, |
| String.valueOf(metaItemTypeExpr.getTypeKind())); |
| } |
| } |
| ARecordType metaRecType = (ARecordType) metaItemType; |
| |
| List<List<String>> partitioningExprs = |
| ((InternalDetailsDecl) dd.getDatasetDetailsDecl()).getPartitioningExprs(); |
| List<Integer> keySourceIndicators = |
| ((InternalDetailsDecl) dd.getDatasetDetailsDecl()).getKeySourceIndicators(); |
| boolean autogenerated = ((InternalDetailsDecl) dd.getDatasetDetailsDecl()).isAutogenerated(); |
| ARecordType aRecordType = (ARecordType) itemType; |
| List<IAType> partitioningTypes = ValidateUtil.validatePartitioningExpressions(aRecordType, |
| metaRecType, partitioningExprs, keySourceIndicators, autogenerated, sourceLoc); |
| |
| List<String> filterField = ((InternalDetailsDecl) dd.getDatasetDetailsDecl()).getFilterField(); |
| Integer filterSourceIndicator = |
| ((InternalDetailsDecl) dd.getDatasetDetailsDecl()).getFilterSourceIndicator(); |
| |
| if (filterField != null) { |
| ValidateUtil.validateFilterField(aRecordType, metaRecType, filterSourceIndicator, filterField, |
| sourceLoc); |
| } |
| if (compactionPolicy == null && filterField != null) { |
| // If the dataset has a filter and the user didn't specify a merge |
| // policy, then we will pick the |
| // correlated-prefix as the default merge policy. |
| compactionPolicy = StorageConstants.DEFAULT_FILTERED_DATASET_COMPACTION_POLICY_NAME; |
| compactionPolicyProperties = StorageConstants.DEFAULT_COMPACTION_POLICY_PROPERTIES; |
| } |
| datasetDetails = new InternalDatasetDetails(InternalDatasetDetails.FileStructure.BTREE, |
| InternalDatasetDetails.PartitioningStrategy.HASH, partitioningExprs, partitioningExprs, |
| keySourceIndicators, partitioningTypes, autogenerated, filterSourceIndicator, filterField); |
| break; |
| case EXTERNAL: |
| ExternalDetailsDecl externalDetails = (ExternalDetailsDecl) dd.getDatasetDetailsDecl(); |
| Map<String, String> properties = createExternalDatasetProperties(dataverseName, dd, itemTypeEntity, |
| metadataProvider, mdTxnCtx); |
| ExternalDataUtils.normalize(properties); |
| ExternalDataUtils.validate(properties); |
| validateExternalDatasetProperties(externalDetails, properties, dd.getSourceLocation(), mdTxnCtx); |
| datasetDetails = new ExternalDatasetDetails(externalDetails.getAdapter(), properties, new Date(), |
| TransactionState.COMMIT); |
| break; |
| default: |
| throw new CompilationException(ErrorCode.COMPILATION_UNKNOWN_DATASET_TYPE, |
| dataset.getDatasetType().toString()); |
| } |
| |
| // #. initialize DatasetIdFactory if it is not initialized. |
| if (!DatasetIdFactory.isInitialized()) { |
| DatasetIdFactory.initialize(MetadataManager.INSTANCE.getMostRecentDatasetId()); |
| } |
| |
| // #. add a new dataset with PendingAddOp |
| dataset = new Dataset(dataverseName, datasetName, itemTypeDataverseName, itemTypeName, |
| metaItemTypeDataverseName, metaItemTypeName, ngName, compactionPolicy, compactionPolicyProperties, |
| datasetDetails, dd.getHints(), dsType, DatasetIdFactory.generateDatasetId(), |
| MetadataUtil.PENDING_ADD_OP, compressionScheme); |
| MetadataManager.INSTANCE.addDataset(metadataProvider.getMetadataTxnContext(), dataset); |
| |
| if (itemTypeIsInline) { |
| MetadataManager.INSTANCE.addDatatype(mdTxnCtx, itemTypeEntity); |
| itemTypeAdded = true; |
| } |
| if (metaItemTypeIsInline) { |
| MetadataManager.INSTANCE.addDatatype(mdTxnCtx, metaItemTypeEntity); |
| metaItemTypeAdded = true; |
| } |
| |
| if (dsType == DatasetType.INTERNAL) { |
| JobSpecification jobSpec = DatasetUtil.createDatasetJobSpec(dataset, metadataProvider); |
| |
| // #. make metadataTxn commit before calling runJob. |
| MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| bActiveTxn = false; |
| progress.setValue(ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA); |
| |
| // #. runJob |
| runJob(hcc, jobSpec); |
| |
| // #. begin new metadataTxn |
| mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| bActiveTxn = true; |
| metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| } |
| |
| // #. add a new dataset with PendingNoOp after deleting the dataset with |
| // PendingAddOp |
| MetadataManager.INSTANCE.dropDataset(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName, |
| requestParameters.isForceDropDataset()); |
| dataset.setPendingOp(MetadataUtil.PENDING_NO_OP); |
| MetadataManager.INSTANCE.addDataset(metadataProvider.getMetadataTxnContext(), dataset); |
| MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| } catch (Exception e) { |
| if (bActiveTxn) { |
| abort(e, e, mdTxnCtx); |
| } |
| |
| if (progress.getValue() == ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA) { |
| |
| // #. execute compensation operations |
| // remove the index in NC |
| // [Notice] |
| // As long as we updated(and committed) metadata, we should remove any effect of |
| // the job |
| // because an exception occurs during runJob. |
| mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| bActiveTxn = true; |
| metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| try { |
| JobSpecification jobSpec = |
| DatasetUtil.dropDatasetJobSpec(dataset, metadataProvider, EnumSet.of(DropOption.IF_EXISTS)); |
| MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| bActiveTxn = false; |
| runJob(hcc, jobSpec); |
| } catch (Exception e2) { |
| e.addSuppressed(e2); |
| if (bActiveTxn) { |
| abort(e, e2, mdTxnCtx); |
| } |
| } |
| |
| // remove the record from the metadata. |
| mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| try { |
| MetadataManager.INSTANCE.dropDataset(mdTxnCtx, dataverseName, datasetName, |
| requestParameters.isForceDropDataset()); |
| if (itemTypeAdded) { |
| MetadataManager.INSTANCE.dropDatatype(mdTxnCtx, itemTypeEntity.getDataverseName(), |
| itemTypeEntity.getDatatypeName()); |
| } |
| if (metaItemTypeAdded) { |
| MetadataManager.INSTANCE.dropDatatype(mdTxnCtx, metaItemTypeEntity.getDataverseName(), |
| metaItemTypeEntity.getDatatypeName()); |
| } |
| MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| } catch (Exception e2) { |
| e.addSuppressed(e2); |
| abort(e, e2, mdTxnCtx); |
| throw new IllegalStateException("System is inconsistent state: pending dataset(" + dataverseName |
| + "." + datasetName + ") couldn't be removed from the metadata", e); |
| } |
| } |
| throw e; |
| } |
| } |
| |
| protected void validateDatasetItemType(DatasetType datasetType, IAType itemType, boolean isMetaItemType, |
| SourceLocation sourceLoc) throws AlgebricksException { |
| if (itemType.getTypeTag() != ATypeTag.OBJECT) { |
| throw new CompilationException(ErrorCode.COMPILATION_ERROR, sourceLoc, |
| String.format(StringUtils.capitalize(dataset()) + " %s has to be a record type.", |
| isMetaItemType ? "meta type" : "type")); |
| } |
| } |
| |
| protected Map<String, String> createExternalDatasetProperties(DataverseName dataverseName, DatasetDecl dd, |
| Datatype itemType, MetadataProvider metadataProvider, MetadataTransactionContext mdTxnCtx) |
| throws AlgebricksException { |
| ExternalDetailsDecl externalDetails = (ExternalDetailsDecl) dd.getDatasetDetailsDecl(); |
| return externalDetails.getProperties(); |
| } |
| |
| protected static void validateIfResourceIsActiveInFeed(ICcApplicationContext appCtx, Dataset dataset, |
| SourceLocation sourceLoc) throws CompilationException { |
| ActiveNotificationHandler activeEventHandler = |
| (ActiveNotificationHandler) appCtx.getActiveNotificationHandler(); |
| IActiveEntityEventsListener[] listeners = activeEventHandler.getEventListeners(); |
| for (IActiveEntityEventsListener listener : listeners) { |
| if (listener.isEntityUsingDataset(dataset) && listener.isActive()) { |
| throw new CompilationException(ErrorCode.COMPILATION_CANT_DROP_ACTIVE_DATASET, sourceLoc, |
| DatasetUtil.getFullyQualifiedDisplayName(dataset), listener.getEntityId().toString()); |
| } |
| } |
| } |
| |
| protected static String configureNodegroupForDataset(ICcApplicationContext appCtx, Map<String, String> hints, |
| DataverseName dataverseName, String datasetName, MetadataProvider metadataProvider, |
| SourceLocation sourceLoc) throws Exception { |
| IClusterStateManager csm = appCtx.getClusterStateManager(); |
| Set<String> allNodes = csm.getParticipantNodes(true); |
| Set<String> selectedNodes = new LinkedHashSet<>(); |
| String hintValue = hints.get(DatasetNodegroupCardinalityHint.NAME); |
| if (hintValue == null) { |
| selectedNodes.addAll(allNodes); |
| } else { |
| int nodegroupCardinality; |
| final Pair<Boolean, String> validation = DatasetHints.validate(appCtx, DatasetNodegroupCardinalityHint.NAME, |
| hints.get(DatasetNodegroupCardinalityHint.NAME)); |
| boolean valid = validation.first; |
| if (!valid) { |
| throw new CompilationException(ErrorCode.COMPILATION_ERROR, sourceLoc, |
| "Incorrect use of hint '" + DatasetNodegroupCardinalityHint.NAME + "': " + validation.second); |
| } else { |
| nodegroupCardinality = Integer.parseInt(hints.get(DatasetNodegroupCardinalityHint.NAME)); |
| } |
| List<String> allNodeList = new ArrayList<>(allNodes); |
| Collections.shuffle(allNodeList); |
| selectedNodes.addAll(allNodeList.subList(0, nodegroupCardinality)); |
| } |
| // Creates the associated node group for the dataset. |
| return DatasetUtil.createNodeGroupForNewDataset(dataverseName, datasetName, selectedNodes, metadataProvider); |
| } |
| |
| public void handleCreateIndexStatement(MetadataProvider metadataProvider, Statement stmt, |
| IHyracksClientConnection hcc, IRequestParameters requestParameters) throws Exception { |
| CreateIndexStatement stmtCreateIndex = (CreateIndexStatement) stmt; |
| String datasetName = stmtCreateIndex.getDatasetName().getValue(); |
| String indexName = stmtCreateIndex.getIndexName().getValue(); |
| metadataProvider.validateDatabaseObjectName(stmtCreateIndex.getDataverseName(), indexName, |
| stmt.getSourceLocation()); |
| DataverseName dataverseName = getActiveDataverseName(stmtCreateIndex.getDataverseName()); |
| lockUtil.createIndexBegin(lockManager, metadataProvider.getLocks(), dataverseName, datasetName); |
| try { |
| doCreateIndex(metadataProvider, stmtCreateIndex, dataverseName, datasetName, hcc, requestParameters); |
| } finally { |
| metadataProvider.getLocks().unlock(); |
| } |
| } |
| |
| protected void doCreateIndex(MetadataProvider metadataProvider, CreateIndexStatement stmtCreateIndex, |
| DataverseName dataverseName, String datasetName, IHyracksClientConnection hcc, |
| IRequestParameters requestParameters) throws Exception { |
| SourceLocation sourceLoc = stmtCreateIndex.getSourceLocation(); |
| MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| boolean bActiveTxn = true; |
| metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| try { |
| // Check if the dataverse exists |
| Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName); |
| if (dv == null) { |
| throw new CompilationException(ErrorCode.UNKNOWN_DATAVERSE, sourceLoc, dataverseName); |
| } |
| |
| Dataset ds = metadataProvider.findDataset(dataverseName, datasetName); |
| if (ds == null) { |
| throw new CompilationException(ErrorCode.UNKNOWN_DATASET_IN_DATAVERSE, sourceLoc, datasetName, |
| dataverseName); |
| } |
| |
| DatasetType datasetType = ds.getDatasetType(); |
| IndexType indexType = stmtCreateIndex.getIndexType(); |
| boolean isSecondaryPrimary = stmtCreateIndex.getFieldExprs().isEmpty(); |
| validateIndexType(datasetType, indexType, isSecondaryPrimary, sourceLoc); |
| |
| String indexName = stmtCreateIndex.getIndexName().getValue(); |
| Index index = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), dataverseName, |
| datasetName, indexName); |
| if (index != null) { |
| if (stmtCreateIndex.getIfNotExists()) { |
| MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| return; |
| } else { |
| throw new CompilationException(ErrorCode.INDEX_EXISTS, sourceLoc, indexName); |
| } |
| } |
| |
| List<Integer> keySourceIndicators; |
| if (isSecondaryPrimary && datasetType == DatasetType.INTERNAL) { |
| // find keySourceIndicators for secondary primary index since the parser isn't aware of them |
| keySourceIndicators = ((InternalDatasetDetails) ds.getDatasetDetails()).getKeySourceIndicator(); |
| } else { |
| keySourceIndicators = stmtCreateIndex.getFieldSourceIndicators(); |
| } |
| // disable creating an index on meta fields (fields with source indicator == 1 are meta fields) |
| if (keySourceIndicators.stream().anyMatch(fieldSource -> fieldSource == 1) && !isSecondaryPrimary) { |
| throw new AsterixException(ErrorCode.COMPILATION_ERROR, sourceLoc, |
| "Cannot create index on meta fields"); |
| } |
| Datatype dt = MetadataManager.INSTANCE.getDatatype(metadataProvider.getMetadataTxnContext(), |
| ds.getItemTypeDataverseName(), ds.getItemTypeName()); |
| ARecordType aRecordType = (ARecordType) dt.getDatatype(); |
| ARecordType metaRecordType = null; |
| if (ds.hasMetaPart()) { |
| Datatype metaDt = MetadataManager.INSTANCE.getDatatype(metadataProvider.getMetadataTxnContext(), |
| ds.getMetaItemTypeDataverseName(), ds.getMetaItemTypeName()); |
| metaRecordType = (ARecordType) metaDt.getDatatype(); |
| } |
| |
| List<List<String>> indexFields = new ArrayList<>(); |
| List<IAType> indexFieldTypes = new ArrayList<>(); |
| int keyIndex = 0; |
| boolean overridesFieldTypes = false; |
| |
| // this set is used to detect duplicates in the specified keys in the create |
| // index statement |
| // e.g. CREATE INDEX someIdx on dataset(id,id). |
| // checking only the names is not enough. Need also to check the source |
| // indicators for cases like: |
| // CREATE INDEX someIdx on dataset(meta().id, id) |
| Set<Pair<List<String>, Integer>> indexKeysSet = new HashSet<>(); |
| |
| for (Pair<List<String>, IndexedTypeExpression> fieldExpr : stmtCreateIndex.getFieldExprs()) { |
| IAType fieldType = null; |
| ARecordType subType = |
| KeyFieldTypeUtil.chooseSource(keySourceIndicators, keyIndex, aRecordType, metaRecordType); |
| boolean isOpen = subType.isOpen(); |
| int i = 0; |
| if (fieldExpr.first.size() > 1 && !isOpen) { |
| while (i < fieldExpr.first.size() - 1 && !isOpen) { |
| subType = (ARecordType) subType.getFieldType(fieldExpr.first.get(i)); |
| i++; |
| isOpen = subType.isOpen(); |
| } |
| } |
| if (fieldExpr.second == null) { |
| fieldType = subType.getSubFieldType(fieldExpr.first.subList(i, fieldExpr.first.size())); |
| } else { |
| if (!stmtCreateIndex.isEnforced() && indexType != IndexType.BTREE) { |
| throw new AsterixException(ErrorCode.INDEX_ILLEGAL_NON_ENFORCED_TYPED, sourceLoc, indexType); |
| } |
| if (stmtCreateIndex.isEnforced() && !fieldExpr.second.isUnknownable()) { |
| throw new AsterixException(ErrorCode.INDEX_ILLEGAL_ENFORCED_NON_OPTIONAL, sourceLoc, |
| String.valueOf(fieldExpr.first)); |
| } |
| // don't allow creating an enforced index on a closed-type field, fields that |
| // are part of schema. |
| // get the field type, if it's not null, then the field is closed-type |
| if (stmtCreateIndex.isEnforced() |
| && subType.getSubFieldType(fieldExpr.first.subList(i, fieldExpr.first.size())) != null) { |
| throw new AsterixException(ErrorCode.INDEX_ILLEGAL_ENFORCED_ON_CLOSED_FIELD, sourceLoc, |
| String.valueOf(fieldExpr.first)); |
| } |
| if (!isOpen) { |
| throw new CompilationException(ErrorCode.COMPILATION_ERROR, sourceLoc, "Typed index on \"" |
| + fieldExpr.first + "\" field could be created only for open datatype"); |
| } |
| if (stmtCreateIndex.hasMetaField()) { |
| throw new CompilationException(ErrorCode.COMPILATION_ERROR, sourceLoc, |
| "Typed open index can only be created on the record part"); |
| } |
| Map<TypeSignature, IAType> typeMap = TypeTranslator.computeTypes(dataverseName, indexName, |
| fieldExpr.second.getType(), dataverseName, mdTxnCtx); |
| TypeSignature typeSignature = new TypeSignature(dataverseName, indexName); |
| fieldType = typeMap.get(typeSignature); |
| overridesFieldTypes = true; |
| } |
| if (fieldType == null) { |
| throw new CompilationException(ErrorCode.UNKNOWN_TYPE, sourceLoc, fieldExpr.second == null |
| ? String.valueOf(fieldExpr.first) : String.valueOf(fieldExpr.second)); |
| } |
| |
| // try to add the key & its source to the set of keys, if key couldn't be added, |
| // there is a duplicate |
| if (!indexKeysSet |
| .add(new Pair<>(fieldExpr.first, stmtCreateIndex.getFieldSourceIndicators().get(keyIndex)))) { |
| throw new AsterixException(ErrorCode.INDEX_ILLEGAL_REPETITIVE_FIELD, sourceLoc, |
| String.valueOf(fieldExpr.first)); |
| } |
| |
| indexFields.add(fieldExpr.first); |
| indexFieldTypes.add(fieldType); |
| ++keyIndex; |
| } |
| |
| validateIndexKeyFields(stmtCreateIndex, keySourceIndicators, aRecordType, metaRecordType, indexFields, |
| indexFieldTypes); |
| |
| Index newIndex = new Index(dataverseName, datasetName, indexName, indexType, indexFields, |
| keySourceIndicators, indexFieldTypes, stmtCreateIndex.getGramLength(), overridesFieldTypes, |
| stmtCreateIndex.isEnforced(), false, MetadataUtil.PENDING_ADD_OP); |
| |
| bActiveTxn = false; // doCreateIndexImpl() takes over the current transaction |
| doCreateIndexImpl(hcc, metadataProvider, ds, newIndex, jobFlags, sourceLoc); |
| |
| } catch (Exception e) { |
| if (bActiveTxn) { |
| abort(e, e, mdTxnCtx); |
| } |
| throw e; |
| } |
| } |
| |
| private void doCreateIndexImpl(IHyracksClientConnection hcc, MetadataProvider metadataProvider, Dataset ds, |
| Index index, EnumSet<JobFlag> jobFlags, SourceLocation sourceLoc) throws Exception { |
| ProgressState progress = ProgressState.NO_PROGRESS; |
| boolean bActiveTxn = true; |
| Index filesIndex = null; |
| boolean firstExternalDatasetIndex = false; |
| boolean datasetLocked = false; |
| List<ExternalFile> externalFilesSnapshot; |
| MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext(); |
| JobSpecification spec; |
| boolean filesIndexReplicated = false; |
| try { |
| index.setPendingOp(MetadataUtil.PENDING_ADD_OP); |
| if (ds.getDatasetType() == DatasetType.INTERNAL) { |
| validateDatasetState(metadataProvider, ds, sourceLoc); |
| } else { |
| // External dataset |
| // Check if the dataset is indexible |
| if (!ExternalIndexingOperations.isIndexible((ExternalDatasetDetails) ds.getDatasetDetails())) { |
| throw new CompilationException(ErrorCode.COMPILATION_ERROR, sourceLoc, |
| dataset() + " using " + ((ExternalDatasetDetails) ds.getDatasetDetails()).getAdapter() |
| + " adapter can't be indexed"); |
| } |
| // Check if the name of the index is valid |
| if (!ExternalIndexingOperations.isValidIndexName(index.getDatasetName(), index.getIndexName())) { |
| throw new CompilationException(ErrorCode.COMPILATION_ERROR, sourceLoc, |
| "external " + dataset() + " index name is invalid"); |
| } |
| |
| // Check if the files index exist |
| filesIndex = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), |
| index.getDataverseName(), index.getDatasetName(), |
| IndexingConstants.getFilesIndexName(index.getDatasetName())); |
| firstExternalDatasetIndex = filesIndex == null; |
| // Lock external dataset |
| ExternalDatasetsRegistry.INSTANCE.buildIndexBegin(ds, firstExternalDatasetIndex); |
| datasetLocked = true; |
| if (firstExternalDatasetIndex) { |
| // Verify that no one has created an index before we acquire the lock |
| filesIndex = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), |
| index.getDataverseName(), index.getDatasetName(), |
| IndexingConstants.getFilesIndexName(index.getDatasetName())); |
| if (filesIndex != null) { |
| ExternalDatasetsRegistry.INSTANCE.buildIndexEnd(ds, firstExternalDatasetIndex); |
| firstExternalDatasetIndex = false; |
| ExternalDatasetsRegistry.INSTANCE.buildIndexBegin(ds, firstExternalDatasetIndex); |
| } |
| } |
| if (firstExternalDatasetIndex) { |
| // Get snapshot from External File System |
| externalFilesSnapshot = ExternalIndexingOperations.getSnapshotFromExternalFileSystem(ds); |
| // Add an entry for the files index |
| filesIndex = new Index(index.getDataverseName(), index.getDatasetName(), |
| IndexingConstants.getFilesIndexName(index.getDatasetName()), IndexType.BTREE, |
| ExternalIndexingOperations.FILE_INDEX_FIELD_NAMES, null, |
| ExternalIndexingOperations.FILE_INDEX_FIELD_TYPES, false, false, false, |
| MetadataUtil.PENDING_ADD_OP); |
| MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), filesIndex); |
| // Add files to the external files index |
| for (ExternalFile file : externalFilesSnapshot) { |
| MetadataManager.INSTANCE.addExternalFile(mdTxnCtx, file); |
| } |
| // This is the first index for the external dataset, replicate the files index |
| spec = ExternalIndexingOperations.buildFilesIndexCreateJobSpec(ds, externalFilesSnapshot, |
| metadataProvider); |
| if (spec == null) { |
| throw new CompilationException(ErrorCode.COMPILATION_ERROR, sourceLoc, |
| "Failed to create job spec for replicating files index for external " + dataset()); |
| } |
| filesIndexReplicated = true; |
| runJob(hcc, spec, jobFlags); |
| } |
| } |
| |
| // check whether there exists another enforced index on the same field |
| if (index.isEnforced()) { |
| List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes( |
| metadataProvider.getMetadataTxnContext(), index.getDataverseName(), index.getDatasetName()); |
| for (Index existingIndex : indexes) { |
| if (existingIndex.getKeyFieldNames().equals(index.getKeyFieldNames()) |
| && !existingIndex.getKeyFieldTypes().equals(index.getKeyFieldTypes()) |
| && existingIndex.isEnforced()) { |
| throw new CompilationException(ErrorCode.COMPILATION_ERROR, sourceLoc, "Cannot create index " |
| + index.getIndexName() + " , enforced index " + existingIndex.getIndexName() |
| + " on field \"" + StringUtils.join(index.getKeyFieldNames(), ',') |
| + "\" is already defined with type \"" + existingIndex.getKeyFieldTypes() + "\""); |
| } |
| } |
| } |
| // #. add a new index with PendingAddOp |
| MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), index); |
| // #. prepare to create the index artifact in NC. |
| spec = IndexUtil.buildSecondaryIndexCreationJobSpec(ds, index, metadataProvider, sourceLoc); |
| if (spec == null) { |
| throw new CompilationException(ErrorCode.COMPILATION_ERROR, sourceLoc, |
| "Failed to create job spec for creating index '" + ds.getDatasetName() + "." |
| + index.getIndexName() + "'"); |
| } |
| MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| bActiveTxn = false; |
| progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA; |
| // #. create the index artifact in NC. |
| runJob(hcc, spec, jobFlags); |
| |
| // #. flush the internal dataset |
| // We need this to guarantee the correctness of component Id acceleration for |
| // secondary-to-primary index. |
| // Otherwise, the new secondary index component would corresponding to a partial |
| // memory component |
| // of the primary index, which is incorrect. |
| if (ds.getDatasetType() == DatasetType.INTERNAL) { |
| FlushDatasetUtil.flushDataset(hcc, metadataProvider, index.getDataverseName(), index.getDatasetName()); |
| } |
| |
| mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| bActiveTxn = true; |
| metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| |
| // #. load data into the index in NC. |
| spec = IndexUtil.buildSecondaryIndexLoadingJobSpec(ds, index, metadataProvider, sourceLoc); |
| MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| bActiveTxn = false; |
| |
| runJob(hcc, spec, jobFlags); |
| |
| // #. begin new metadataTxn |
| mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| bActiveTxn = true; |
| metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| |
| // #. add another new index with PendingNoOp after deleting the index with |
| // PendingAddOp |
| MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), index.getDataverseName(), |
| index.getDatasetName(), index.getIndexName()); |
| index.setPendingOp(MetadataUtil.PENDING_NO_OP); |
| MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), index); |
| // add another new files index with PendingNoOp after deleting the index with |
| // PendingAddOp |
| if (firstExternalDatasetIndex) { |
| MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), index.getDataverseName(), |
| index.getDatasetName(), filesIndex.getIndexName()); |
| filesIndex.setPendingOp(MetadataUtil.PENDING_NO_OP); |
| MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), filesIndex); |
| // update transaction timestamp |
| ((ExternalDatasetDetails) ds.getDatasetDetails()).setRefreshTimestamp(new Date()); |
| MetadataManager.INSTANCE.updateDataset(mdTxnCtx, ds); |
| } |
| MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| } catch (Exception e) { |
| if (bActiveTxn) { |
| abort(e, e, mdTxnCtx); |
| } |
| // If files index was replicated for external dataset, it should be cleaned up |
| // on NC side |
| if (filesIndexReplicated) { |
| mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| bActiveTxn = true; |
| try { |
| JobSpecification jobSpec = |
| ExternalIndexingOperations.buildDropFilesIndexJobSpec(metadataProvider, ds); |
| MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| bActiveTxn = false; |
| runJob(hcc, jobSpec, jobFlags); |
| } catch (Exception e2) { |
| e.addSuppressed(e2); |
| if (bActiveTxn) { |
| abort(e, e2, mdTxnCtx); |
| } |
| } |
| } |
| |
| if (progress == ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA) { |
| // #. execute compensation operations |
| // remove the index in NC |
| mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| bActiveTxn = true; |
| metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| try { |
| JobSpecification jobSpec = IndexUtil.buildDropIndexJobSpec(index, metadataProvider, ds, sourceLoc); |
| MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| bActiveTxn = false; |
| runJob(hcc, jobSpec, jobFlags); |
| } catch (Exception e2) { |
| e.addSuppressed(e2); |
| if (bActiveTxn) { |
| abort(e, e2, mdTxnCtx); |
| } |
| } |
| |
| if (firstExternalDatasetIndex) { |
| mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| try { |
| // Drop External Files from metadata |
| MetadataManager.INSTANCE.dropDatasetExternalFiles(mdTxnCtx, ds); |
| MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| } catch (Exception e2) { |
| e.addSuppressed(e2); |
| abort(e, e2, mdTxnCtx); |
| throw new IllegalStateException( |
| "System is inconsistent state: pending files for(" + index.getDataverseName() + "." |
| + index.getDatasetName() + ") couldn't be removed from the metadata", |
| e); |
| } |
| mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| try { |
| // Drop the files index from metadata |
| MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), |
| index.getDataverseName(), index.getDatasetName(), |
| IndexingConstants.getFilesIndexName(index.getDatasetName())); |
| MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| } catch (Exception e2) { |
| e.addSuppressed(e2); |
| abort(e, e2, mdTxnCtx); |
| throw new IllegalStateException("System is inconsistent state: pending index(" |
| + index.getDataverseName() + "." + index.getDatasetName() + "." |
| + IndexingConstants.getFilesIndexName(index.getDatasetName()) |
| + ") couldn't be removed from the metadata", e); |
| } |
| } |
| // remove the record from the metadata. |
| mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| try { |
| MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), |
| index.getDataverseName(), index.getDatasetName(), index.getIndexName()); |
| MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| } catch (Exception e2) { |
| e.addSuppressed(e2); |
| abort(e, e2, mdTxnCtx); |
| throw new IllegalStateException("System is in inconsistent state: pending index(" |
| + index.getDataverseName() + "." + index.getDatasetName() + "." + index.getIndexName() |
| + ") couldn't be removed from the metadata", e); |
| } |
| } |
| throw e; |
| } finally { |
| if (datasetLocked) { |
| ExternalDatasetsRegistry.INSTANCE.buildIndexEnd(ds, firstExternalDatasetIndex); |
| } |
| } |
| } |
| |
| protected void validateIndexType(DatasetType datasetType, IndexType indexType, boolean isSecondaryPrimaryIndex, |
| SourceLocation sourceLoc) throws AlgebricksException { |
| // disable creating secondary primary index on an external dataset |
| if (datasetType == DatasetType.EXTERNAL && isSecondaryPrimaryIndex) { |
| throw new CompilationException(ErrorCode.CANNOT_CREATE_SEC_PRIMARY_IDX_ON_EXT_DATASET); |
| } |
| } |
| |
| protected void validateIndexKeyFields(CreateIndexStatement stmtCreateIndex, List<Integer> keySourceIndicators, |
| ARecordType aRecordType, ARecordType metaRecordType, List<List<String>> indexFields, |
| List<IAType> indexFieldTypes) throws AlgebricksException { |
| ValidateUtil.validateKeyFields(aRecordType, metaRecordType, indexFields, keySourceIndicators, indexFieldTypes, |
| stmtCreateIndex.getIndexType(), stmtCreateIndex.getSourceLocation()); |
| } |
| |
| protected void handleCreateTypeStatement(MetadataProvider metadataProvider, Statement stmt) throws Exception { |
| TypeDecl stmtCreateType = (TypeDecl) stmt; |
| SourceLocation sourceLoc = stmtCreateType.getSourceLocation(); |
| String typeName = stmtCreateType.getIdent().getValue(); |
| metadataProvider.validateDatabaseObjectName(stmtCreateType.getDataverseName(), typeName, sourceLoc); |
| DataverseName dataverseName = getActiveDataverseName(stmtCreateType.getDataverseName()); |
| MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| lockUtil.createTypeBegin(lockManager, metadataProvider.getLocks(), dataverseName, typeName); |
| try { |
| Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName); |
| if (dv == null) { |
| throw new CompilationException(ErrorCode.UNKNOWN_DATAVERSE, sourceLoc, dataverseName); |
| } |
| Datatype dt = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataverseName, typeName); |
| if (dt != null) { |
| if (!stmtCreateType.getIfNotExists()) { |
| throw new CompilationException(ErrorCode.TYPE_EXISTS, sourceLoc, typeName); |
| } |
| } else { |
| if (BuiltinTypeMap.getBuiltinType(typeName) != null) { |
| throw new CompilationException(ErrorCode.COMPILATION_ERROR, sourceLoc, |
| "Cannot redefine builtin type " + typeName + "."); |
| } else if (TypeUtil.isReservedInlineTypeName(typeName)) { |
| throw new CompilationException(ErrorCode.COMPILATION_ERROR, sourceLoc, |
| "Reserved type name " + typeName + "."); |
| } else { |
| IAType type = translateType(dataverseName, typeName, stmtCreateType.getTypeDef(), mdTxnCtx); |
| MetadataManager.INSTANCE.addDatatype(mdTxnCtx, new Datatype(dataverseName, typeName, type, false)); |
| } |
| } |
| MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| } catch (Exception e) { |
| abort(e, e, mdTxnCtx); |
| throw e; |
| } finally { |
| metadataProvider.getLocks().unlock(); |
| } |
| } |
| |
| private IAType translateType(DataverseName dataverseName, String typeName, TypeExpression typeDef, |
| MetadataTransactionContext mdTxnCtx) throws AlgebricksException { |
| Map<TypeSignature, IAType> typeMap = |
| TypeTranslator.computeTypes(dataverseName, typeName, typeDef, dataverseName, mdTxnCtx); |
| TypeSignature typeSignature = new TypeSignature(dataverseName, typeName); |
| return typeMap.get(typeSignature); |
| } |
| |
| protected void handleDataverseDropStatement(MetadataProvider metadataProvider, Statement stmt, |
| IHyracksClientConnection hcc, IRequestParameters requestParameters) throws Exception { |
| DataverseDropStatement stmtDropDataverse = (DataverseDropStatement) stmt; |
| SourceLocation sourceLoc = stmtDropDataverse.getSourceLocation(); |
| DataverseName dataverseName = stmtDropDataverse.getDataverseName(); |
| metadataProvider.validateDataverseName(dataverseName, sourceLoc); |
| if (dataverseName.equals(MetadataBuiltinEntities.DEFAULT_DATAVERSE_NAME) |
| || dataverseName.equals(MetadataConstants.METADATA_DATAVERSE_NAME)) { |
| throw new CompilationException(ErrorCode.COMPILATION_ERROR, sourceLoc, |
| dataverseName + " " + dataverse() + " can't be dropped"); |
| } |
| lockUtil.dropDataverseBegin(lockManager, metadataProvider.getLocks(), dataverseName); |
| try { |
| doDropDataverse(stmtDropDataverse, metadataProvider, hcc, requestParameters); |
| } finally { |
| metadataProvider.getLocks().unlock(); |
| ExternalDatasetsRegistry.INSTANCE.releaseAcquiredLocks(metadataProvider); |
| } |
| } |
| |
| protected boolean doDropDataverse(DataverseDropStatement stmtDropDataverse, MetadataProvider metadataProvider, |
| IHyracksClientConnection hcc, IRequestParameters requestParameters) throws Exception { |
| SourceLocation sourceLoc = stmtDropDataverse.getSourceLocation(); |
| DataverseName dataverseName = stmtDropDataverse.getDataverseName(); |
| ProgressState progress = ProgressState.NO_PROGRESS; |
| MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| boolean bActiveTxn = true; |
| metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| List<FeedEventsListener> feedsToStop = new ArrayList<>(); |
| List<Dataset> externalDatasetsToDeregister = new ArrayList<>(); |
| List<JobSpecification> jobsToExecute = new ArrayList<>(); |
| try { |
| Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName); |
| if (dv == null) { |
| if (stmtDropDataverse.getIfExists()) { |
| MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| return false; |
| } else { |
| throw new CompilationException(ErrorCode.UNKNOWN_DATAVERSE, sourceLoc, dataverseName); |
| } |
| } |
| |
| if (stmtDropDataverse.getIfEmpty() && isDataverseNotEmpty(dataverseName, mdTxnCtx)) { |
| MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| return false; |
| } |
| |
| validateDataverseStateBeforeDrop(metadataProvider, dv, sourceLoc); |
| |
| // #. prepare jobs which will drop corresponding feed storage |
| ActiveNotificationHandler activeEventHandler = |
| (ActiveNotificationHandler) appCtx.getActiveNotificationHandler(); |
| IActiveEntityEventsListener[] activeListeners = activeEventHandler.getEventListeners(); |
| for (IActiveEntityEventsListener listener : activeListeners) { |
| EntityId activeEntityId = listener.getEntityId(); |
| if (activeEntityId.getExtensionName().equals(Feed.EXTENSION_NAME) |
| && activeEntityId.getDataverseName().equals(dataverseName)) { |
| FeedEventsListener feedListener = (FeedEventsListener) listener; |
| feedsToStop.add(feedListener); |
| jobsToExecute |
| .add(FeedOperations.buildRemoveFeedStorageJob(metadataProvider, feedListener.getFeed())); |
| } |
| } |
| |
| // #. prepare jobs which will drop corresponding datasets with indexes. |
| List<Dataset> datasets = MetadataManager.INSTANCE.getDataverseDatasets(mdTxnCtx, dataverseName); |
| for (Dataset dataset : datasets) { |
| String datasetName = dataset.getDatasetName(); |
| List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName); |
| DatasetType dsType = dataset.getDatasetType(); |
| if (dsType == DatasetType.INTERNAL) { |
| for (Index index : indexes) { |
| jobsToExecute.add(IndexUtil.buildDropIndexJobSpec(index, metadataProvider, dataset, sourceLoc)); |
| } |
| } else if (dsType == DatasetType.EXTERNAL) { |
| for (Index index : indexes) { |
| if (ExternalIndexingOperations.isFileIndex(index)) { |
| jobsToExecute.add( |
| ExternalIndexingOperations.buildDropFilesIndexJobSpec(metadataProvider, dataset)); |
| } else { |
| jobsToExecute |
| .add(IndexUtil.buildDropIndexJobSpec(index, metadataProvider, dataset, sourceLoc)); |
| } |
| } |
| externalDatasetsToDeregister.add(dataset); |
| } |
| } |
| |
| // #. prepare jobs which will drop corresponding libraries. |
| List<Library> libraries = MetadataManager.INSTANCE.getDataverseLibraries(mdTxnCtx, dataverseName); |
| for (Library library : libraries) { |
| jobsToExecute.add(ExternalLibraryJobUtils.buildDropLibraryJobSpec(dataverseName, library.getName(), |
| metadataProvider)); |
| } |
| |
| jobsToExecute.add(DataverseUtil.dropDataverseJobSpec(dv, metadataProvider)); |
| |
| // #. mark PendingDropOp on the dataverse record by |
| // first, deleting the dataverse record from the DATAVERSE_DATASET |
| // second, inserting the dataverse record with the PendingDropOp value into the DATAVERSE_DATASET |
| // Note: the delete operation fails if the dataverse cannot be deleted due to metadata dependencies |
| MetadataManager.INSTANCE.dropDataverse(mdTxnCtx, dataverseName); |
| MetadataManager.INSTANCE.addDataverse(mdTxnCtx, |
| new Dataverse(dataverseName, dv.getDataFormat(), MetadataUtil.PENDING_DROP_OP)); |
| |
| MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| bActiveTxn = false; |
| progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA; |
| |
| for (Dataset externalDataset : externalDatasetsToDeregister) { |
| ExternalDatasetsRegistry.INSTANCE.removeDatasetInfo(externalDataset); |
| } |
| |
| for (FeedEventsListener feedListener : feedsToStop) { |
| if (feedListener.getState() != ActivityState.STOPPED) { |
| feedListener.stop(metadataProvider); |
| } |
| feedListener.unregister(); |
| } |
| |
| for (JobSpecification jobSpec : jobsToExecute) { |
| runJob(hcc, jobSpec); |
| } |
| |
| mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| bActiveTxn = true; |
| metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| |
| // #. finally, delete the dataverse. |
| MetadataManager.INSTANCE.dropDataverse(mdTxnCtx, dataverseName); |
| |
| // Drops all node groups that no longer needed |
| for (Dataset dataset : datasets) { |
| String nodeGroup = dataset.getNodeGroupName(); |
| lockManager.acquireNodeGroupWriteLock(metadataProvider.getLocks(), nodeGroup); |
| if (MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, nodeGroup) != null) { |
| MetadataManager.INSTANCE.dropNodegroup(mdTxnCtx, nodeGroup, true); |
| } |
| } |
| |
| if (activeDataverse.getDataverseName().equals(dataverseName)) { |
| activeDataverse = MetadataBuiltinEntities.DEFAULT_DATAVERSE; |
| } |
| MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| return true; |
| } catch (Exception e) { |
| if (bActiveTxn) { |
| abort(e, e, mdTxnCtx); |
| } |
| |
| if (progress == ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA) { |
| if (activeDataverse.getDataverseName().equals(dataverseName)) { |
| activeDataverse = MetadataBuiltinEntities.DEFAULT_DATAVERSE; |
| } |
| |
| // #. execute compensation operations |
| // remove the all artifacts in NC |
| try { |
| for (JobSpecification jobSpec : jobsToExecute) { |
| runJob(hcc, jobSpec); |
| } |
| } catch (Exception e2) { |
| // do no throw exception since still the metadata needs to be compensated. |
| e.addSuppressed(e2); |
| } |
| |
| // remove the record from the metadata. |
| mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| try { |
| MetadataManager.INSTANCE.dropDataverse(mdTxnCtx, dataverseName); |
| MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| } catch (Exception e2) { |
| e.addSuppressed(e2); |
| abort(e, e2, mdTxnCtx); |
| throw new IllegalStateException("System is inconsistent state: pending dataverse(" + dataverseName |
| + ") couldn't be removed from the metadata", e); |
| } |
| } |
| throw e; |
| } |
| } |
| |
| protected boolean isDataverseNotEmpty(DataverseName dataverseName, MetadataTransactionContext mdTxnCtx) |
| throws AlgebricksException { |
| return MetadataManager.INSTANCE.isDataverseNotEmpty(mdTxnCtx, dataverseName); |
| } |
| |
| protected void validateDataverseStateBeforeDrop(MetadataProvider metadataProvider, Dataverse dataverse, |
| SourceLocation sourceLoc) throws AlgebricksException { |
| // may be overriden by product extensions for additional checks before dropping the dataverse |
| } |
| |
| public void handleDatasetDropStatement(MetadataProvider metadataProvider, Statement stmt, |
| IHyracksClientConnection hcc, IRequestParameters requestParameters) throws Exception { |
| DropDatasetStatement stmtDelete = (DropDatasetStatement) stmt; |
| SourceLocation sourceLoc = stmtDelete.getSourceLocation(); |
| String datasetName = stmtDelete.getDatasetName().getValue(); |
| metadataProvider.validateDatabaseObjectName(stmtDelete.getDataverseName(), datasetName, sourceLoc); |
| DataverseName dataverseName = getActiveDataverseName(stmtDelete.getDataverseName()); |
| lockUtil.dropDatasetBegin(lockManager, metadataProvider.getLocks(), dataverseName, datasetName); |
| try { |
| doDropDataset(dataverseName, datasetName, metadataProvider, stmtDelete.getIfExists(), hcc, |
| requestParameters, true, sourceLoc); |
| } finally { |
| metadataProvider.getLocks().unlock(); |
| ExternalDatasetsRegistry.INSTANCE.releaseAcquiredLocks(metadataProvider); |
| } |
| } |
| |
| protected boolean doDropDataset(DataverseName dataverseName, String datasetName, MetadataProvider metadataProvider, |
| boolean ifExists, IHyracksClientConnection hcc, IRequestParameters requestParameters, |
| boolean dropCorrespondingNodeGroup, SourceLocation sourceLoc) throws Exception { |
| MutableObject<ProgressState> progress = new MutableObject<>(ProgressState.NO_PROGRESS); |
| MutableObject<MetadataTransactionContext> mdTxnCtx = |
| new MutableObject<>(MetadataManager.INSTANCE.beginTransaction()); |
| MutableBoolean bActiveTxn = new MutableBoolean(true); |
| metadataProvider.setMetadataTxnContext(mdTxnCtx.getValue()); |
| List<JobSpecification> jobsToExecute = new ArrayList<>(); |
| Dataset ds = null; |
| try { |
| // Check if the dataverse exists |
| Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx.getValue(), dataverseName); |
| if (dv == null) { |
| if (ifExists) { |
| if (warningCollector.shouldWarn()) { |
| warningCollector.warn(Warning.of(sourceLoc, ErrorCode.UNKNOWN_DATAVERSE, dataverseName)); |
| } |
| MetadataManager.INSTANCE.commitTransaction(mdTxnCtx.getValue()); |
| return false; |
| } else { |
| throw new CompilationException(ErrorCode.UNKNOWN_DATAVERSE, sourceLoc, dataverseName); |
| } |
| } |
| ds = metadataProvider.findDataset(dataverseName, datasetName); |
| if (ds == null) { |
| if (ifExists) { |
| MetadataManager.INSTANCE.commitTransaction(mdTxnCtx.getValue()); |
| return false; |
| } else { |
| throw new CompilationException(ErrorCode.UNKNOWN_DATASET_IN_DATAVERSE, sourceLoc, datasetName, |
| dataverseName); |
| } |
| } |
| validateDatasetState(metadataProvider, ds, sourceLoc); |
| |
| ds.drop(metadataProvider, mdTxnCtx, jobsToExecute, bActiveTxn, progress, hcc, dropCorrespondingNodeGroup, |
| sourceLoc, Collections.emptySet(), requestParameters.isForceDropDataset()); |
| |
| MetadataManager.INSTANCE.commitTransaction(mdTxnCtx.getValue()); |
| return true; |
| } catch (Exception e) { |
| LOGGER.error("failed to drop dataset; executing compensating operations", e); |
| if (bActiveTxn.booleanValue()) { |
| abort(e, e, mdTxnCtx.getValue()); |
| } |
| |
| if (progress.getValue() == ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA) { |
| // #. execute compensation operations |
| // remove the all indexes in NC |
| try { |
| if (ds != null) { |
| jobsToExecute.clear(); |
| ds.drop(metadataProvider, mdTxnCtx, jobsToExecute, bActiveTxn, progress, hcc, |
| dropCorrespondingNodeGroup, sourceLoc, EnumSet.of(DropOption.IF_EXISTS), |
| requestParameters.isForceDropDataset()); |
| } |
| for (JobSpecification jobSpec : jobsToExecute) { |
| JobUtils.runJob(hcc, jobSpec, true); |
| } |
| } catch (Exception e2) { |
| // do no throw exception since still the metadata needs to be compensated. |
| e.addSuppressed(e2); |
| } |
| |
| // remove the record from the metadata. |
| mdTxnCtx.setValue(MetadataManager.INSTANCE.beginTransaction()); |
| metadataProvider.setMetadataTxnContext(mdTxnCtx.getValue()); |
| try { |
| MetadataManager.INSTANCE.dropDataset(metadataProvider.getMetadataTxnContext(), dataverseName, |
| datasetName, requestParameters.isForceDropDataset()); |
| MetadataManager.INSTANCE.commitTransaction(mdTxnCtx.getValue()); |
| } catch (Exception e2) { |
| e.addSuppressed(e2); |
| abort(e, e2, mdTxnCtx.getValue()); |
| throw new IllegalStateException("System is inconsistent state: pending dataset(" + dataverseName |
| + "." + datasetName + ") couldn't be removed from the metadata", e); |
| } |
| } |
| throw e; |
| } |
| } |
| |
| protected void handleIndexDropStatement(MetadataProvider metadataProvider, Statement stmt, |
| IHyracksClientConnection hcc, IRequestParameters requestParameters) throws Exception { |
| IndexDropStatement stmtIndexDrop = (IndexDropStatement) stmt; |
| metadataProvider.validateDatabaseObjectName(stmtIndexDrop.getDataverseName(), |
| stmtIndexDrop.getIndexName().getValue(), stmtIndexDrop.getSourceLocation()); |
| DataverseName dataverseName = getActiveDataverseName(stmtIndexDrop.getDataverseName()); |
| String datasetName = stmtIndexDrop.getDatasetName().getValue(); |
| lockUtil.dropIndexBegin(lockManager, metadataProvider.getLocks(), dataverseName, datasetName); |
| try { |
| doDropIndex(metadataProvider, stmtIndexDrop, dataverseName, datasetName, hcc, requestParameters); |
| } finally { |
| metadataProvider.getLocks().unlock(); |
| ExternalDatasetsRegistry.INSTANCE.releaseAcquiredLocks(metadataProvider); |
| } |
| } |
| |
| protected boolean doDropIndex(MetadataProvider metadataProvider, IndexDropStatement stmtIndexDrop, |
| DataverseName dataverseName, String datasetName, IHyracksClientConnection hcc, |
| IRequestParameters requestParameters) throws Exception { |
| SourceLocation sourceLoc = stmtIndexDrop.getSourceLocation(); |
| String indexName = stmtIndexDrop.getIndexName().getValue(); |
| ProgressState progress = ProgressState.NO_PROGRESS; |
| List<JobSpecification> jobsToExecute = new ArrayList<>(); |
| // For external index |
| boolean dropFilesIndex = false; |
| |
| MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| boolean bActiveTxn = true; |
| metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| try { |
| Dataset ds = metadataProvider.findDataset(dataverseName, datasetName); |
| if (ds == null) { |
| throw new CompilationException(ErrorCode.UNKNOWN_DATASET_IN_DATAVERSE, sourceLoc, datasetName, |
| dataverseName); |
| } |
| if (ds.getDatasetType() == DatasetType.INTERNAL) { |
| Index index = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataverseName, datasetName, indexName); |
| if (index == null) { |
| if (stmtIndexDrop.getIfExists()) { |
| MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| return false; |
| } else { |
| throw new CompilationException(ErrorCode.UNKNOWN_INDEX, sourceLoc, indexName); |
| } |
| } |
| ensureNonPrimaryIndexDrop(index, sourceLoc); |
| validateDatasetState(metadataProvider, ds, sourceLoc); |
| // #. prepare a job to drop the index in NC. |
| jobsToExecute.add(IndexUtil.buildDropIndexJobSpec(index, metadataProvider, ds, sourceLoc)); |
| |
| // #. mark PendingDropOp on the existing index |
| MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName); |
| MetadataManager.INSTANCE.addIndex(mdTxnCtx, |
| new Index(dataverseName, datasetName, indexName, index.getIndexType(), index.getKeyFieldNames(), |
| index.getKeyFieldSourceIndicators(), index.getKeyFieldTypes(), |
| index.isOverridingKeyFieldTypes(), index.isEnforced(), index.isPrimaryIndex(), |
| MetadataUtil.PENDING_DROP_OP)); |
| |
| // #. commit the existing transaction before calling runJob. |
| MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| bActiveTxn = false; |
| progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA; |
| |
| for (JobSpecification jobSpec : jobsToExecute) { |
| runJob(hcc, jobSpec); |
| } |
| |
| // #. begin a new transaction |
| mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| bActiveTxn = true; |
| metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| |
| // #. finally, delete the existing index |
| MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName); |
| } else { |
| // External dataset |
| indexName = stmtIndexDrop.getIndexName().getValue(); |
| Index index = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataverseName, datasetName, indexName); |
| if (index == null) { |
| if (stmtIndexDrop.getIfExists()) { |
| MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| return false; |
| } else { |
| throw new CompilationException(ErrorCode.UNKNOWN_INDEX, sourceLoc, indexName); |
| } |
| } else if (ExternalIndexingOperations.isFileIndex(index)) { |
| throw new CompilationException(ErrorCode.COMPILATION_ERROR, sourceLoc, |
| "Dropping " + dataset() + " files index is not allowed."); |
| } |
| ensureNonPrimaryIndexDrop(index, sourceLoc); |
| // #. prepare a job to drop the index in NC. |
| jobsToExecute.add(IndexUtil.buildDropIndexJobSpec(index, metadataProvider, ds, sourceLoc)); |
| List<Index> datasetIndexes = |
| MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName); |
| if (datasetIndexes.size() == 2) { |
| dropFilesIndex = true; |
| // only one index + the files index, we need to delete both of the indexes |
| for (Index externalIndex : datasetIndexes) { |
| if (ExternalIndexingOperations.isFileIndex(externalIndex)) { |
| jobsToExecute |
| .add(ExternalIndexingOperations.buildDropFilesIndexJobSpec(metadataProvider, ds)); |
| // #. mark PendingDropOp on the existing files index |
| MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, |
| externalIndex.getIndexName()); |
| MetadataManager.INSTANCE.addIndex(mdTxnCtx, |
| new Index(dataverseName, datasetName, externalIndex.getIndexName(), |
| externalIndex.getIndexType(), externalIndex.getKeyFieldNames(), |
| externalIndex.getKeyFieldSourceIndicators(), index.getKeyFieldTypes(), |
| index.isOverridingKeyFieldTypes(), index.isEnforced(), |
| externalIndex.isPrimaryIndex(), MetadataUtil.PENDING_DROP_OP)); |
| } |
| } |
| } |
| |
| // #. mark PendingDropOp on the existing index |
| MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName); |
| MetadataManager.INSTANCE.addIndex(mdTxnCtx, |
| new Index(dataverseName, datasetName, indexName, index.getIndexType(), index.getKeyFieldNames(), |
| index.getKeyFieldSourceIndicators(), index.getKeyFieldTypes(), |
| index.isOverridingKeyFieldTypes(), index.isEnforced(), index.isPrimaryIndex(), |
| MetadataUtil.PENDING_DROP_OP)); |
| |
| // #. commit the existing transaction before calling runJob. |
| MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| bActiveTxn = false; |
| progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA; |
| |
| for (JobSpecification jobSpec : jobsToExecute) { |
| runJob(hcc, jobSpec); |
| } |
| |
| // #. begin a new transaction |
| mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| bActiveTxn = true; |
| metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| |
| // #. finally, delete the existing index |
| MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName); |
| if (dropFilesIndex) { |
| // delete the files index too |
| MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, |
| IndexingConstants.getFilesIndexName(datasetName)); |
| MetadataManager.INSTANCE.dropDatasetExternalFiles(mdTxnCtx, ds); |
| ExternalDatasetsRegistry.INSTANCE.removeDatasetInfo(ds); |
| } |
| } |
| MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| return true; |
| } catch (Exception e) { |
| if (bActiveTxn) { |
| abort(e, e, mdTxnCtx); |
| } |
| |
| if (progress == ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA) { |
| // #. execute compensation operations |
| // remove the all indexes in NC |
| try { |
| for (JobSpecification jobSpec : jobsToExecute) { |
| runJob(hcc, jobSpec); |
| } |
| } catch (Exception e2) { |
| // do no throw exception since still the metadata needs to be compensated. |
| e.addSuppressed(e2); |
| } |
| |
| // remove the record from the metadata. |
| mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| try { |
| MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName, |
| datasetName, indexName); |
| if (dropFilesIndex) { |
| MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName, |
| datasetName, IndexingConstants.getFilesIndexName(datasetName)); |
| } |
| MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| } catch (Exception e2) { |
| e.addSuppressed(e2); |
| abort(e, e2, mdTxnCtx); |
| throw new IllegalStateException("System is inconsistent state: pending index(" + dataverseName + "." |
| + datasetName + "." + indexName + ") couldn't be removed from the metadata", e); |
| } |
| } |
| |
| throw e; |
| } |
| } |
| |
| protected void handleTypeDropStatement(MetadataProvider metadataProvider, Statement stmt) throws Exception { |
| TypeDropStatement stmtTypeDrop = (TypeDropStatement) stmt; |
| SourceLocation sourceLoc = stmtTypeDrop.getSourceLocation(); |
| String typeName = stmtTypeDrop.getTypeName().getValue(); |
| metadataProvider.validateDatabaseObjectName(stmtTypeDrop.getDataverseName(), typeName, sourceLoc); |
| DataverseName dataverseName = getActiveDataverseName(stmtTypeDrop.getDataverseName()); |
| MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| lockUtil.dropTypeBegin(lockManager, metadataProvider.getLocks(), dataverseName, typeName); |
| try { |
| // Check if the dataverse exists |
| Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName); |
| if (dv == null) { |
| if (stmtTypeDrop.getIfExists()) { |
| if (warningCollector.shouldWarn()) { |
| warningCollector.warn(Warning.of(sourceLoc, ErrorCode.UNKNOWN_DATAVERSE, dataverseName)); |
| } |
| MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| return; |
| } else { |
| throw new CompilationException(ErrorCode.UNKNOWN_DATAVERSE, sourceLoc, dataverseName); |
| } |
| } |
| |
| Datatype dt = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataverseName, typeName); |
| if (dt == null) { |
| if (!stmtTypeDrop.getIfExists()) { |
| throw new CompilationException(ErrorCode.UNKNOWN_TYPE, sourceLoc, typeName); |
| } |
| } else { |
| MetadataManager.INSTANCE.dropDatatype(mdTxnCtx, dataverseName, typeName); |
| } |
| MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| } catch (Exception e) { |
| abort(e, e, mdTxnCtx); |
| throw e; |
| } finally { |
| metadataProvider.getLocks().unlock(); |
| } |
| } |
| |
| protected void handleNodegroupDropStatement(MetadataProvider metadataProvider, Statement stmt) throws Exception { |
| NodeGroupDropStatement stmtDelete = (NodeGroupDropStatement) stmt; |
| SourceLocation sourceLoc = stmtDelete.getSourceLocation(); |
| String nodegroupName = stmtDelete.getNodeGroupName().getValue(); |
| metadataProvider.validateDatabaseObjectName(null, nodegroupName, sourceLoc); |
| MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| lockManager.acquireNodeGroupWriteLock(metadataProvider.getLocks(), nodegroupName); |
| try { |
| NodeGroup ng = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, nodegroupName); |
| if (ng == null) { |
| if (!stmtDelete.getIfExists()) { |
| throw new CompilationException(ErrorCode.UNKNOWN_NODEGROUP, sourceLoc, nodegroupName); |
| } |
| } else { |
| MetadataManager.INSTANCE.dropNodegroup(mdTxnCtx, nodegroupName, false); |
| } |
| |
| MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| } catch (Exception e) { |
| abort(e, e, mdTxnCtx); |
| throw e; |
| } finally { |
| metadataProvider.getLocks().unlock(); |
| } |
| } |
| |
| protected void handleDeclareFunctionStatement(MetadataProvider metadataProvider, Statement stmt) throws Exception { |
| FunctionDecl fds = (FunctionDecl) stmt; |
| FunctionSignature signature = fds.getSignature(); |
| metadataProvider.validateDatabaseObjectName(signature.getDataverseName(), signature.getName(), |
| stmt.getSourceLocation()); |
| signature.setDataverseName(getActiveDataverseName(signature.getDataverseName())); |
| declaredFunctions.add(fds); |
| } |
| |
| public void handleCreateFunctionStatement(MetadataProvider metadataProvider, Statement stmt, |
| IStatementRewriter stmtRewriter, IRequestParameters requestParameters) throws Exception { |
| CreateFunctionStatement cfs = (CreateFunctionStatement) stmt; |
| FunctionSignature signature = cfs.getFunctionSignature(); |
| metadataProvider.validateDatabaseObjectName(signature.getDataverseName(), signature.getName(), |
| stmt.getSourceLocation()); |
| DataverseName dataverseName = getActiveDataverseName(signature.getDataverseName()); |
| signature.setDataverseName(dataverseName); |
| DataverseName libraryDataverseName = null; |
| String libraryName = cfs.getLibraryName(); |
| if (libraryName != null) { |
| libraryDataverseName = cfs.getLibraryDataverseName(); |
| if (libraryDataverseName == null) { |
| libraryDataverseName = dataverseName; |
| } |
| } |
| |
| lockUtil.createFunctionBegin(lockManager, metadataProvider.getLocks(), dataverseName, signature.getName(), |
| libraryDataverseName, libraryName); |
| try { |
| doCreateFunction(metadataProvider, cfs, signature, stmtRewriter, requestParameters); |
| } finally { |
| metadataProvider.getLocks().unlock(); |
| metadataProvider.setDefaultDataverse(activeDataverse); |
| } |
| } |
| |
| protected void doCreateFunction(MetadataProvider metadataProvider, CreateFunctionStatement cfs, |
| FunctionSignature functionSignature, IStatementRewriter stmtRewriter, IRequestParameters requestParameters) |
| throws Exception { |
| DataverseName dataverseName = functionSignature.getDataverseName(); |
| SourceLocation sourceLoc = cfs.getSourceLocation(); |
| MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| try { |
| Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName); |
| if (dv == null) { |
| throw new CompilationException(ErrorCode.UNKNOWN_DATAVERSE, sourceLoc, dataverseName); |
| } |
| List<TypeSignature> existingInlineTypes; |
| Function existingFunction = MetadataManager.INSTANCE.getFunction(mdTxnCtx, functionSignature); |
| if (existingFunction != null) { |
| if (cfs.getIfNotExists()) { |
| MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| return; |
| } else if (!cfs.getReplaceIfExists()) { |
| throw new CompilationException(ErrorCode.FUNCTION_EXISTS, cfs.getSourceLocation(), |
| functionSignature.toString(false)); |
| } |
| existingInlineTypes = TypeUtil.getFunctionInlineTypes(existingFunction); |
| } else { |
| existingInlineTypes = Collections.emptyList(); |
| } |
| |
| Map<TypeSignature, Datatype> newInlineTypes; |
| Function function; |
| if (cfs.isExternal()) { |
| if (functionSignature.getArity() == FunctionIdentifier.VARARGS) { |
| throw new CompilationException(ErrorCode.COMPILATION_ERROR, cfs.getSourceLocation(), |
| "Variable number of parameters is not supported for external functions"); |
| } |
| List<Pair<VarIdentifier, TypeExpression>> paramList = cfs.getParameters(); |
| int paramCount = paramList.size(); |
| List<String> paramNames = new ArrayList<>(paramCount); |
| List<TypeSignature> paramTypes = new ArrayList<>(paramCount); |
| LinkedHashSet<TypeSignature> depTypes = new LinkedHashSet<>(); |
| newInlineTypes = new HashMap<>(); |
| |
| for (int i = 0; i < paramCount; i++) { |
| Pair<VarIdentifier, TypeExpression> paramPair = paramList.get(i); |
| TypeSignature paramTypeSignature; |
| TypeSignature paramDepTypeSignature; |
| Datatype paramInlineTypeEntity; |
| TypeExpression paramTypeExpr = paramPair.getSecond(); |
| if (paramTypeExpr != null) { |
| Triple<TypeSignature, TypeSignature, Datatype> paramTypeInfo = translateFunctionParameterType( |
| functionSignature, i, paramTypeExpr, sourceLoc, metadataProvider, mdTxnCtx); |
| paramTypeSignature = paramTypeInfo.first; |
| paramDepTypeSignature = paramTypeInfo.second; |
| paramInlineTypeEntity = paramTypeInfo.third; |
| } else { |
| paramTypeSignature = null; // == any |
| paramDepTypeSignature = null; |
| paramInlineTypeEntity = null; |
| } |
| paramTypes.add(paramTypeSignature); // null == any |
| if (paramDepTypeSignature != null) { |
| depTypes.add(paramDepTypeSignature); |
| } |
| if (paramInlineTypeEntity != null) { |
| newInlineTypes.put(paramTypeSignature, paramInlineTypeEntity); |
| } |
| VarIdentifier paramName = paramPair.getFirst(); |
| paramNames.add(stmtRewriter.toFunctionParameterName(paramName)); |
| } |
| |
| TypeSignature returnTypeSignature; |
| TypeSignature returnDepTypeSignature; |
| Datatype returnInlineTypeEntity; |
| TypeExpression returnTypeExpr = cfs.getReturnType(); |
| if (returnTypeExpr != null) { |
| Triple<TypeSignature, TypeSignature, Datatype> returnTypeInfo = translateFunctionParameterType( |
| functionSignature, -1, returnTypeExpr, sourceLoc, metadataProvider, mdTxnCtx); |
| returnTypeSignature = returnTypeInfo.first; |
| returnDepTypeSignature = returnTypeInfo.second; |
| returnInlineTypeEntity = returnTypeInfo.third; |
| } else { |
| returnTypeSignature = null; // == any |
| returnDepTypeSignature = null; |
| returnInlineTypeEntity = null; |
| } |
| if (returnDepTypeSignature != null) { |
| depTypes.add(returnDepTypeSignature); |
| } |
| if (returnInlineTypeEntity != null) { |
| newInlineTypes.put(returnTypeSignature, returnInlineTypeEntity); |
| } |
| |
| DataverseName libraryDataverseName = cfs.getLibraryDataverseName(); |
| if (libraryDataverseName == null) { |
| libraryDataverseName = dataverseName; |
| } |
| String libraryName = cfs.getLibraryName(); |
| Library library = MetadataManager.INSTANCE.getLibrary(mdTxnCtx, libraryDataverseName, libraryName); |
| if (library == null) { |
| throw new CompilationException(ErrorCode.UNKNOWN_LIBRARY, sourceLoc, libraryName); |
| } |
| |
| ExternalFunctionLanguage language = |
| ExternalFunctionCompilerUtil.getExternalFunctionLanguage(library.getLanguage()); |
| List<String> externalIdentifier = cfs.getExternalIdentifier(); |
| ExternalFunctionCompilerUtil.validateExternalIdentifier(externalIdentifier, language, |
| cfs.getSourceLocation()); |
| List<List<Triple<DataverseName, String, String>>> dependencies = |
| FunctionUtil.getExternalFunctionDependencies(depTypes); |
| |
| function = new Function(functionSignature, paramNames, paramTypes, returnTypeSignature, null, |
| FunctionKind.SCALAR.toString(), library.getLanguage(), libraryDataverseName, libraryName, |
| externalIdentifier, cfs.getNullCall(), cfs.getDeterministic(), cfs.getResources(), |
| dependencies); |
| } else { |
| List<Pair<VarIdentifier, TypeExpression>> paramList = cfs.getParameters(); |
| int paramCount = paramList.size(); |
| List<VarIdentifier> paramVars = new ArrayList<>(paramCount); |
| List<String> paramNames = new ArrayList<>(paramCount); |
| for (Pair<VarIdentifier, TypeExpression> paramPair : paramList) { |
| VarIdentifier paramName = paramPair.getFirst(); |
| paramVars.add(paramName); |
| paramNames.add(stmtRewriter.toFunctionParameterName(paramName)); |
| if (paramPair.getSecond() != null) { |
| throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_STATE, sourceLoc, |
| paramName.toString()); |
| } |
| } |
| |
| // Check whether the function is usable: |
| // create a function declaration for this function, |
| // and a query body calls this function with each argument set to 'missing' |
| FunctionDecl fd = new FunctionDecl(functionSignature, paramVars, cfs.getFunctionBodyExpression(), true); |
| fd.setSourceLocation(sourceLoc); |
| CallExpr fcall = new CallExpr(functionSignature, |
| Collections.nCopies(paramVars.size(), new LiteralExpr(MissingLiteral.INSTANCE))); |
| fcall.setSourceLocation(sourceLoc); |
| metadataProvider.setDefaultDataverse(dv); |
| Query wrappedQuery = new Query(false); |
| wrappedQuery.setSourceLocation(sourceLoc); |
| wrappedQuery.setBody(fcall); |
| wrappedQuery.setTopLevel(false); |
| List<FunctionDecl> fdList = new ArrayList<>(declaredFunctions); |
| fdList.add(fd); |
| apiFramework.reWriteQuery(fdList, metadataProvider, wrappedQuery, sessionOutput, false, false, |
| Collections.emptyList(), warningCollector); |
| Expression fdNormBody = fd.getNormalizedFuncBody(); |
| if (fdNormBody == null) { |
| throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_STATE, sourceLoc, |
| functionSignature.toString()); |
| } |
| List<List<Triple<DataverseName, String, String>>> dependencies = |
| FunctionUtil.getFunctionDependencies(rewriterFactory.createQueryRewriter(), fdNormBody); |
| |
| newInlineTypes = Collections.emptyMap(); |
| function = new Function(functionSignature, paramNames, null, null, cfs.getFunctionBody(), |
| FunctionKind.SCALAR.toString(), compilationProvider.getParserFactory().getLanguage(), null, |
| null, null, null, null, null, dependencies); |
| } |
| |
| if (existingFunction == null) { |
| // add new function and its inline types |
| for (Datatype newInlineType : newInlineTypes.values()) { |
| MetadataManager.INSTANCE.addDatatype(mdTxnCtx, newInlineType); |
| } |
| MetadataManager.INSTANCE.addFunction(mdTxnCtx, function); |
| } else { |
| // replace existing function and its inline types |
| for (TypeSignature existingInlineType : existingInlineTypes) { |
| Datatype newInlineType = |
| newInlineTypes.isEmpty() ? null : newInlineTypes.remove(existingInlineType); |
| if (newInlineType == null) { |
| MetadataManager.INSTANCE.dropDatatype(mdTxnCtx, existingInlineType.getDataverseName(), |
| existingInlineType.getName()); |
| } else { |
| MetadataManager.INSTANCE.updateDatatype(mdTxnCtx, newInlineType); |
| } |
| } |
| for (Datatype inlineType : newInlineTypes.values()) { |
| MetadataManager.INSTANCE.addDatatype(mdTxnCtx, inlineType); |
| } |
| MetadataManager.INSTANCE.updateFunction(mdTxnCtx, function); |
| } |
| if (LOGGER.isInfoEnabled()) { |
| LOGGER.info("Installed function: " + functionSignature); |
| } |
| MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| } catch (Exception e) { |
| abort(e, e, mdTxnCtx); |
| throw e; |
| } |
| } |
| |
| private Triple<TypeSignature, TypeSignature, Datatype> translateFunctionParameterType( |
| FunctionSignature functionSignature, int paramIdx, TypeExpression paramTypeExpr, SourceLocation sourceLoc, |
| MetadataProvider metadataProvider, MetadataTransactionContext mdTxnCtx) throws AlgebricksException { |
| TypeSignature paramTypeSignature, depTypeSignature; |
| Datatype paramInlineTypeEntity = null; |
| switch (paramTypeExpr.getTypeKind()) { |
| case TYPEREFERENCE: |
| TypeReferenceExpression paramTypeRefExpr = (TypeReferenceExpression) paramTypeExpr; |
| String paramTypeName = paramTypeRefExpr.getIdent().second.getValue(); |
| BuiltinType builtinType = BuiltinTypeMap.getBuiltinType(paramTypeName); |
| if (builtinType != null) { |
| // built-in type |
| paramTypeSignature = new TypeSignature(builtinType); |
| depTypeSignature = null; |
| } else { |
| // user-defined type |
| DataverseName paramTypeDataverseName = paramTypeRefExpr.getIdent().first; |
| if (paramTypeDataverseName == null) { |
| paramTypeDataverseName = functionSignature.getDataverseName(); |
| } |
| Datatype paramTypeEntity = metadataProvider.findTypeEntity(paramTypeDataverseName, paramTypeName); |
| if (paramTypeEntity == null || paramTypeEntity.getIsAnonymous()) { |
| throw new CompilationException(ErrorCode.UNKNOWN_TYPE, sourceLoc, paramTypeName); |
| } |
| paramTypeSignature = depTypeSignature = new TypeSignature(paramTypeDataverseName, paramTypeName); |
| } |
| break; |
| case ORDEREDLIST: |
| case UNORDEREDLIST: |
| DataverseName paramTypeDataverseName = functionSignature.getDataverseName(); |
| paramTypeName = TypeUtil.createFunctionParameterTypeName(functionSignature.getName(), |
| functionSignature.getArity(), paramIdx); |
| IAType paramType = translateType(paramTypeDataverseName, paramTypeName, paramTypeExpr, mdTxnCtx); |
| paramTypeSignature = new TypeSignature(paramTypeDataverseName, paramTypeName); |
| depTypeSignature = |
| FunctionUtil.getTypeDependencyFromFunctionParameter(paramTypeExpr, paramTypeDataverseName); |
| paramInlineTypeEntity = new Datatype(paramTypeDataverseName, paramTypeName, paramType, true); |
| break; |
| default: |
| throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_STATE, sourceLoc); |
| } |
| |
| return new Triple<>(paramTypeSignature, depTypeSignature, paramInlineTypeEntity); |
| } |
| |
| protected void handleFunctionDropStatement(MetadataProvider metadataProvider, Statement stmt, |
| IRequestParameters requestParameters) throws Exception { |
| FunctionDropStatement stmtDropFunction = (FunctionDropStatement) stmt; |
| FunctionSignature signature = stmtDropFunction.getFunctionSignature(); |
| metadataProvider.validateDatabaseObjectName(signature.getDataverseName(), signature.getName(), |
| stmtDropFunction.getSourceLocation()); |
| DataverseName dataverseName = getActiveDataverseName(signature.getDataverseName()); |
| signature.setDataverseName(dataverseName); |
| lockUtil.dropFunctionBegin(lockManager, metadataProvider.getLocks(), dataverseName, signature.getName()); |
| try { |
| doDropFunction(metadataProvider, stmtDropFunction, signature, requestParameters); |
| } finally { |
| metadataProvider.getLocks().unlock(); |
| } |
| } |
| |
| protected boolean doDropFunction(MetadataProvider metadataProvider, FunctionDropStatement stmtDropFunction, |
| FunctionSignature signature, IRequestParameters requestParameters) throws Exception { |
| DataverseName dataverseName = signature.getDataverseName(); |
| SourceLocation sourceLoc = stmtDropFunction.getSourceLocation(); |
| MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| try { |
| Dataverse dataverse = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName); |
| if (dataverse == null) { |
| if (stmtDropFunction.getIfExists()) { |
| MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| return false; |
| } else { |
| throw new CompilationException(ErrorCode.UNKNOWN_DATAVERSE, sourceLoc, dataverseName); |
| } |
| } |
| Function function = MetadataManager.INSTANCE.getFunction(mdTxnCtx, signature); |
| if (function == null) { |
| if (stmtDropFunction.getIfExists()) { |
| MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| return false; |
| } else { |
| throw new CompilationException(ErrorCode.UNKNOWN_FUNCTION, sourceLoc, signature.toString()); |
| } |
| } |
| |
| List<TypeSignature> inlineTypes = TypeUtil.getFunctionInlineTypes(function); |
| |
| MetadataManager.INSTANCE.dropFunction(mdTxnCtx, signature); |
| for (TypeSignature inlineType : inlineTypes) { |
| MetadataManager.INSTANCE.dropDatatype(mdTxnCtx, inlineType.getDataverseName(), inlineType.getName()); |
| } |
| MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| return true; |
| } catch (Exception e) { |
| abort(e, e, mdTxnCtx); |
| throw e; |
| } |
| } |
| |
| protected void handleCreateAdapterStatement(MetadataProvider metadataProvider, Statement stmt) throws Exception { |
| CreateAdapterStatement cas = (CreateAdapterStatement) stmt; |
| String adapterName = cas.getAdapterName(); |
| metadataProvider.validateDatabaseObjectName(cas.getDataverseName(), adapterName, cas.getSourceLocation()); |
| DataverseName dataverseName = getActiveDataverseName(cas.getDataverseName()); |
| DataverseName libraryDataverseName = cas.getLibraryDataverseName(); |
| if (libraryDataverseName == null) { |
| libraryDataverseName = dataverseName; |
| } |
| String libraryName = cas.getLibraryName(); |
| lockUtil.createAdapterBegin(lockManager, metadataProvider.getLocks(), dataverseName, adapterName, |
| libraryDataverseName, libraryName); |
| try { |
| doCreateAdapter(metadataProvider, cas); |
| } finally { |
| metadataProvider.getLocks().unlock(); |
| } |
| } |
| |
| protected void doCreateAdapter(MetadataProvider metadataProvider, CreateAdapterStatement cas) throws Exception { |
| SourceLocation sourceLoc = cas.getSourceLocation(); |
| MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| try { |
| DataverseName dataverseName = getActiveDataverseName(cas.getDataverseName()); |
| Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName); |
| if (dv == null) { |
| throw new CompilationException(ErrorCode.UNKNOWN_DATAVERSE, sourceLoc, dataverseName); |
| } |
| String adapterName = cas.getAdapterName(); |
| DatasourceAdapter adapter = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, dataverseName, adapterName); |
| if (adapter != null) { |
| if (cas.getIfNotExists()) { |
| MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| return; |
| } |
| throw new CompilationException(ErrorCode.ADAPTER_EXISTS, sourceLoc, adapterName); |
| } |
| |
| DataverseName libraryDataverseName = cas.getLibraryDataverseName(); |
| if (libraryDataverseName == null) { |
| libraryDataverseName = dataverseName; |
| } |
| String libraryName = cas.getLibraryName(); |
| Library library = MetadataManager.INSTANCE.getLibrary(mdTxnCtx, libraryDataverseName, libraryName); |
| if (library == null) { |
| throw new CompilationException(ErrorCode.UNKNOWN_LIBRARY, sourceLoc, libraryName); |
| } |
| // Add adapters |
| ExternalFunctionLanguage language = |
| ExternalFunctionCompilerUtil.getExternalFunctionLanguage(library.getLanguage()); |
| List<String> externalIdentifier = cas.getExternalIdentifier(); |
| ExternalFunctionCompilerUtil.validateExternalIdentifier(externalIdentifier, language, |
| cas.getSourceLocation()); |
| |
| if (language != ExternalFunctionLanguage.JAVA) { |
| throw new CompilationException(ErrorCode.UNSUPPORTED_ADAPTER_LANGUAGE, cas.getSourceLocation(), |
| language.name()); |
| } |
| String adapterFactoryClass = externalIdentifier.get(0); |
| |
| adapter = new DatasourceAdapter(new AdapterIdentifier(dataverseName, adapterName), |
| IDataSourceAdapter.AdapterType.EXTERNAL, adapterFactoryClass, libraryDataverseName, libraryName); |
| MetadataManager.INSTANCE.addAdapter(mdTxnCtx, adapter); |
| if (LOGGER.isInfoEnabled()) { |
| LOGGER.info("Installed adapter: " + adapterName); |
| } |
| MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| } catch (Exception e) { |
| abort(e, e, mdTxnCtx); |
| throw e; |
| } |
| } |
| |
| protected void handleAdapterDropStatement(MetadataProvider metadataProvider, Statement stmt) throws Exception { |
| AdapterDropStatement stmtDropAdapter = (AdapterDropStatement) stmt; |
| SourceLocation sourceLoc = stmtDropAdapter.getSourceLocation(); |
| String adapterName = stmtDropAdapter.getAdapterName(); |
| metadataProvider.validateDatabaseObjectName(stmtDropAdapter.getDataverseName(), adapterName, sourceLoc); |
| DataverseName dataverseName = getActiveDataverseName(stmtDropAdapter.getDataverseName()); |
| lockUtil.dropAdapterBegin(lockManager, metadataProvider.getLocks(), dataverseName, adapterName); |
| try { |
| doDropAdapter(metadataProvider, stmtDropAdapter, dataverseName, adapterName); |
| } finally { |
| metadataProvider.getLocks().unlock(); |
| } |
| } |
| |
| protected boolean doDropAdapter(MetadataProvider metadataProvider, AdapterDropStatement stmtDropAdapter, |
| DataverseName dataverseName, String adapterName) throws Exception { |
| SourceLocation sourceLoc = stmtDropAdapter.getSourceLocation(); |
| MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| try { |
| Dataverse dataverse = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName); |
| if (dataverse == null) { |
| if (stmtDropAdapter.getIfExists()) { |
| MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| return false; |
| } else { |
| throw new CompilationException(ErrorCode.UNKNOWN_DATAVERSE, sourceLoc, dataverseName); |
| } |
| } |
| DatasourceAdapter adapter = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, dataverseName, adapterName); |
| if (adapter == null) { |
| if (stmtDropAdapter.getIfExists()) { |
| MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| return false; |
| } else { |
| throw new CompilationException(ErrorCode.UNKNOWN_ADAPTER, sourceLoc, adapterName); |
| } |
| } |
| |
| MetadataManager.INSTANCE.dropAdapter(mdTxnCtx, dataverseName, adapterName); |
| MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| return true; |
| } catch (Exception e) { |
| abort(e, e, mdTxnCtx); |
| throw e; |
| } |
| } |
| |
| protected void handleCreateLibraryStatement(MetadataProvider metadataProvider, Statement stmt, |
| IHyracksClientConnection hcc, IRequestParameters requestParameters) throws Exception { |
| CreateLibraryStatement cls = (CreateLibraryStatement) stmt; |
| metadataProvider.validateDatabaseObjectName(cls.getDataverseName(), cls.getLibraryName(), |
| cls.getSourceLocation()); |
| DataverseName dataverseName = getActiveDataverseName(cls.getDataverseName()); |
| String libraryName = cls.getLibraryName(); |
| String libraryHash = cls.getHash(); |
| lockUtil.createLibraryBegin(lockManager, metadataProvider.getLocks(), dataverseName, libraryName); |
| try { |
| doCreateLibrary(metadataProvider, dataverseName, libraryName, libraryHash, cls, hcc, requestParameters); |
| } finally { |
| metadataProvider.getLocks().unlock(); |
| } |
| } |
| |
| protected void doCreateLibrary(MetadataProvider metadataProvider, DataverseName dataverseName, String libraryName, |
| String libraryHash, CreateLibraryStatement cls, IHyracksClientConnection hcc, |
| IRequestParameters requestParameters) throws Exception { |
| JobUtils.ProgressState progress = ProgressState.NO_PROGRESS; |
| boolean prepareJobSuccessful = false; |
| JobSpecification abortJobSpec = null; |
| Library existingLibrary = null; |
| MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| boolean bActiveTxn = true; |
| metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| try { |
| Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName); |
| if (dv == null) { |
| throw new CompilationException(ErrorCode.UNKNOWN_DATAVERSE, dataverseName); |
| } |
| ExternalFunctionLanguage language = cls.getLang(); |
| existingLibrary = MetadataManager.INSTANCE.getLibrary(mdTxnCtx, dataverseName, libraryName); |
| if (existingLibrary != null && !cls.getReplaceIfExists()) { |
| throw new CompilationException(ErrorCode.COMPILATION_ERROR, |
| "A library with this name " + libraryName + " already exists."); |
| } |
| |
| // #. add/update library with PendingAddOp |
| Library libraryPendingAdd = |
| new Library(dataverseName, libraryName, language.name(), libraryHash, MetadataUtil.PENDING_ADD_OP); |
| if (existingLibrary == null) { |
| MetadataManager.INSTANCE.addLibrary(mdTxnCtx, libraryPendingAdd); |
| } else { |
| MetadataManager.INSTANCE.updateLibrary(mdTxnCtx, libraryPendingAdd); |
| } |
| |
| // #. prepare to create library artifacts in NC. |
| Triple<JobSpecification, JobSpecification, JobSpecification> jobSpecs = |
| ExternalLibraryJobUtils.buildCreateLibraryJobSpec(dataverseName, libraryName, language, |
| cls.getLocation(), cls.getAuthToken(), metadataProvider); |
| JobSpecification prepareJobSpec = jobSpecs.first; |
| JobSpecification commitJobSpec = jobSpecs.second; |
| abortJobSpec = jobSpecs.third; |
| |
| MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| bActiveTxn = false; |
| progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA; |
| |
| // #. create library artifacts in NCs. |
| runJob(hcc, prepareJobSpec, jobFlags); |
| prepareJobSuccessful = true; |
| runJob(hcc, commitJobSpec, jobFlags); |
| |
| // #. begin new metadataTxn |
| mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| bActiveTxn = true; |
| metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| |
| Library newLibrary = |
| new Library(dataverseName, libraryName, language.name(), libraryHash, MetadataUtil.PENDING_NO_OP); |
| MetadataManager.INSTANCE.updateLibrary(mdTxnCtx, newLibrary); |
| |
| MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| } catch (Exception e) { |
| if (bActiveTxn) { |
| abort(e, e, mdTxnCtx); |
| } |
| if (progress == ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA) { |
| boolean undoFailure = false; |
| if (!prepareJobSuccessful) { |
| // 'prepare' job failed -> try running 'abort' job |
| try { |
| runJob(hcc, abortJobSpec, jobFlags); |
| } catch (Exception e2) { |
| e.addSuppressed(e2); |
| undoFailure = true; |
| } |
| } else if (existingLibrary == null) { |
| // 'commit' job failed for a new library -> try removing the library |
| try { |
| JobSpecification dropLibraryJobSpec = ExternalLibraryJobUtils |
| .buildDropLibraryJobSpec(dataverseName, libraryName, metadataProvider); |
| runJob(hcc, dropLibraryJobSpec, jobFlags); |
| } catch (Exception e2) { |
| e.addSuppressed(e2); |
| undoFailure = true; |
| } |
| } else { |
| // 'commit' job failed for an existing library -> bad state |
| undoFailure = true; |
| } |
| |
| // revert/remove the record from the metadata. |
| mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| try { |
| if (existingLibrary == null) { |
| MetadataManager.INSTANCE.dropLibrary(mdTxnCtx, dataverseName, libraryName); |
| } else { |
| MetadataManager.INSTANCE.updateLibrary(mdTxnCtx, existingLibrary); |
| } |
| MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| } catch (Exception e2) { |
| e.addSuppressed(e2); |
| abort(e, e2, mdTxnCtx); |
| throw new IllegalStateException("System is inconsistent state: pending library(" + libraryName |
| + ") couldn't be reverted/removed from the metadata", e); |
| } |
| |
| if (undoFailure) { |
| throw new IllegalStateException( |
| "System is inconsistent state: library(" + libraryName + ") couldn't be deployed", e); |
| } |
| } |
| throw e; |
| } |
| } |
| |
| protected void handleLibraryDropStatement(MetadataProvider metadataProvider, Statement stmt, |
| IHyracksClientConnection hcc, IRequestParameters requestParameters) throws Exception { |
| LibraryDropStatement stmtDropLibrary = (LibraryDropStatement) stmt; |
| String libraryName = stmtDropLibrary.getLibraryName(); |
| metadataProvider.validateDatabaseObjectName(stmtDropLibrary.getDataverseName(), libraryName, |
| stmtDropLibrary.getSourceLocation()); |
| DataverseName dataverseName = getActiveDataverseName(stmtDropLibrary.getDataverseName()); |
| lockUtil.dropLibraryBegin(lockManager, metadataProvider.getLocks(), dataverseName, libraryName); |
| try { |
| doDropLibrary(metadataProvider, stmtDropLibrary, dataverseName, libraryName, hcc, requestParameters); |
| } finally { |
| metadataProvider.getLocks().unlock(); |
| } |
| } |
| |
| protected boolean doDropLibrary(MetadataProvider metadataProvider, LibraryDropStatement stmtDropLibrary, |
| DataverseName dataverseName, String libraryName, IHyracksClientConnection hcc, |
| IRequestParameters requestParameters) throws Exception { |
| JobUtils.ProgressState progress = ProgressState.NO_PROGRESS; |
| MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| boolean bActiveTxn = true; |
| metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| try { |
| Dataverse dataverse = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName); |
| if (dataverse == null) { |
| if (stmtDropLibrary.getIfExists()) { |
| MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| return false; |
| } else { |
| throw new CompilationException(ErrorCode.UNKNOWN_DATAVERSE, stmtDropLibrary.getSourceLocation(), |
| dataverseName); |
| } |
| } |
| Library library = MetadataManager.INSTANCE.getLibrary(mdTxnCtx, dataverseName, libraryName); |
| if (library == null) { |
| if (stmtDropLibrary.getIfExists()) { |
| MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| return false; |
| } else { |
| throw new CompilationException(ErrorCode.UNKNOWN_LIBRARY, stmtDropLibrary.getSourceLocation(), |
| libraryName); |
| } |
| } |
| |
| // #. mark the existing library as PendingDropOp |
| // do drop instead of update because drop will fail if the library is used by functions/adapters |
| MetadataManager.INSTANCE.dropLibrary(mdTxnCtx, dataverseName, libraryName); |
| MetadataManager.INSTANCE.addLibrary(mdTxnCtx, new Library(dataverseName, libraryName, library.getLanguage(), |
| library.getHash(), MetadataUtil.PENDING_DROP_OP)); |
| |
| // #. drop library artifacts in NCs. |
| JobSpecification jobSpec = |
| ExternalLibraryJobUtils.buildDropLibraryJobSpec(dataverseName, libraryName, metadataProvider); |
| |
| MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| bActiveTxn = false; |
| progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA; |
| |
| // #. drop library artifacts in NCs. |
| runJob(hcc, jobSpec, jobFlags); |
| |
| // #. begin new metadataTxn |
| mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| bActiveTxn = true; |
| metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| |
| // #. drop library |
| MetadataManager.INSTANCE.dropLibrary(mdTxnCtx, dataverseName, libraryName); |
| |
| MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| return true; |
| } catch (Exception e) { |
| if (bActiveTxn) { |
| abort(e, e, mdTxnCtx); |
| } |
| if (progress == ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA) { |
| // remove the record from the metadata. |
| mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| try { |
| MetadataManager.INSTANCE.dropLibrary(mdTxnCtx, dataverseName, libraryName); |
| MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| } catch (Exception e2) { |
| e.addSuppressed(e2); |
| abort(e, e2, mdTxnCtx); |
| throw new IllegalStateException("System is inconsistent state: pending library(" + libraryName |
| + ") couldn't be removed from the metadata", e); |
| } |
| } |
| throw e; |
| } |
| } |
| |
| protected void handleCreateSynonymStatement(MetadataProvider metadataProvider, Statement stmt) throws Exception { |
| CreateSynonymStatement css = (CreateSynonymStatement) stmt; |
| metadataProvider.validateDatabaseObjectName(css.getDataverseName(), css.getSynonymName(), |
| css.getSourceLocation()); |
| DataverseName dataverseName = getActiveDataverseName(css.getDataverseName()); |
| String synonymName = css.getSynonymName(); |
| DataverseName objectDataverseName = |
| css.getObjectDataverseName() != null ? css.getObjectDataverseName() : dataverseName; |
| String objectName = css.getObjectName(); |
| lockUtil.createSynonymBegin(lockManager, metadataProvider.getLocks(), dataverseName, synonymName); |
| try { |
| doCreateSynonym(metadataProvider, css, dataverseName, synonymName, objectDataverseName, objectName); |
| } finally { |
| metadataProvider.getLocks().unlock(); |
| } |
| } |
| |
| protected void doCreateSynonym(MetadataProvider metadataProvider, CreateSynonymStatement css, |
| DataverseName dataverseName, String synonymName, DataverseName objectDataverseName, String objectName) |
| throws Exception { |
| MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| try { |
| Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName); |
| if (dv == null) { |
| throw new CompilationException(ErrorCode.UNKNOWN_DATAVERSE, css.getSourceLocation(), dataverseName); |
| } |
| Synonym synonym = MetadataManager.INSTANCE.getSynonym(metadataProvider.getMetadataTxnContext(), |
| dataverseName, synonymName); |
| if (synonym != null) { |
| if (css.getIfNotExists()) { |
| MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| if (warningCollector.shouldWarn()) { |
| warningCollector |
| .warn(Warning.of(css.getSourceLocation(), ErrorCode.SYNONYM_EXISTS, synonymName)); |
| } |
| return; |
| } |
| throw new CompilationException(ErrorCode.SYNONYM_EXISTS, css.getSourceLocation(), synonymName); |
| } |
| synonym = new Synonym(dataverseName, synonymName, objectDataverseName, objectName); |
| MetadataManager.INSTANCE.addSynonym(metadataProvider.getMetadataTxnContext(), synonym); |
| MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| } catch (Exception e) { |
| abort(e, e, mdTxnCtx); |
| throw e; |
| } |
| } |
| |
| protected void handleDropSynonymStatement(MetadataProvider metadataProvider, Statement stmt) throws Exception { |
| SynonymDropStatement stmtSynDrop = (SynonymDropStatement) stmt; |
| String synonymName = stmtSynDrop.getSynonymName(); |
| metadataProvider.validateDatabaseObjectName(stmtSynDrop.getDataverseName(), synonymName, |
| stmtSynDrop.getSourceLocation()); |
| DataverseName dataverseName = getActiveDataverseName(stmtSynDrop.getDataverseName()); |
| lockUtil.dropSynonymBegin(lockManager, metadataProvider.getLocks(), dataverseName, synonymName); |
| try { |
| doDropSynonym(metadataProvider, stmtSynDrop, dataverseName, synonymName); |
| } finally { |
| metadataProvider.getLocks().unlock(); |
| } |
| } |
| |
| protected boolean doDropSynonym(MetadataProvider metadataProvider, SynonymDropStatement stmtSynDrop, |
| DataverseName dataverseName, String synonymName) throws Exception { |
| MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| try { |
| Synonym synonym = MetadataManager.INSTANCE.getSynonym(mdTxnCtx, dataverseName, synonymName); |
| if (synonym == null) { |
| if (stmtSynDrop.getIfExists()) { |
| MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| return false; |
| } |
| throw new CompilationException(ErrorCode.UNKNOWN_SYNONYM, stmtSynDrop.getSourceLocation(), synonymName); |
| } |
| MetadataManager.INSTANCE.dropSynonym(mdTxnCtx, dataverseName, synonymName); |
| MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| return true; |
| } catch (Exception e) { |
| abort(e, e, mdTxnCtx); |
| throw e; |
| } |
| } |
| |
| protected void handleLoadStatement(MetadataProvider metadataProvider, Statement stmt, IHyracksClientConnection hcc) |
| throws Exception { |
| LoadStatement loadStmt = (LoadStatement) stmt; |
| String datasetName = loadStmt.getDatasetName(); |
| metadataProvider.validateDatabaseObjectName(loadStmt.getDataverseName(), datasetName, |
| loadStmt.getSourceLocation()); |
| DataverseName dataverseName = getActiveDataverseName(loadStmt.getDataverseName()); |
| MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| boolean bActiveTxn = true; |
| metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| lockUtil.modifyDatasetBegin(lockManager, metadataProvider.getLocks(), dataverseName, datasetName); |
| try { |
| Map<String, String> properties = loadStmt.getProperties(); |
| ExternalDataUtils.normalize(properties); |
| ExternalDataUtils.validate(properties); |
| CompiledLoadFromFileStatement cls = new CompiledLoadFromFileStatement(dataverseName, |
| loadStmt.getDatasetName(), loadStmt.getAdapter(), properties, loadStmt.dataIsAlreadySorted()); |
| cls.setSourceLocation(stmt.getSourceLocation()); |
| JobSpecification spec = apiFramework.compileQuery(hcc, metadataProvider, null, 0, null, sessionOutput, cls, |
| null, responsePrinter, warningCollector); |
| afterCompile(); |
| MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| bActiveTxn = false; |
| if (spec != null) { |
| runJob(hcc, spec); |
| } |
| } catch (Exception e) { |
| if (bActiveTxn) { |
| abort(e, e, mdTxnCtx); |
| } |
| throw e; |
| } finally { |
| metadataProvider.getLocks().unlock(); |
| } |
| } |
| |
| public JobSpecification handleInsertUpsertStatement(MetadataProvider metadataProvider, Statement stmt, |
| IHyracksClientConnection hcc, IResultSet resultSet, ResultDelivery resultDelivery, |
| ResultMetadata outMetadata, Stats stats, boolean compileOnly, IRequestParameters requestParameters, |
| Map<String, IAObject> stmtParams, IStatementRewriter stmtRewriter) throws Exception { |
| InsertStatement stmtInsertUpsert = (InsertStatement) stmt; |
| String datasetName = stmtInsertUpsert.getDatasetName(); |
| metadataProvider.validateDatabaseObjectName(stmtInsertUpsert.getDataverseName(), datasetName, |
| stmtInsertUpsert.getSourceLocation()); |
| DataverseName dataverseName = getActiveDataverseName(stmtInsertUpsert.getDataverseName()); |
| final IMetadataLocker locker = new IMetadataLocker() { |
| @Override |
| public void lock() throws AlgebricksException { |
| lockUtil.insertDeleteUpsertBegin(lockManager, metadataProvider.getLocks(), dataverseName, datasetName); |
| } |
| |
| @Override |
| public void unlock() { |
| metadataProvider.getLocks().unlock(); |
| } |
| }; |
| final IStatementCompiler compiler = () -> { |
| MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| boolean bActiveTxn = true; |
| metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| try { |
| metadataProvider.setWriteTransaction(true); |
| final JobSpecification jobSpec = |
| rewriteCompileInsertUpsert(hcc, metadataProvider, stmtInsertUpsert, stmtParams, stmtRewriter); |
| MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| bActiveTxn = false; |
| return jobSpec; |
| } catch (Exception e) { |
| if (bActiveTxn) { |
| abort(e, e, mdTxnCtx); |
| } |
| throw e; |
| } |
| }; |
| if (compileOnly) { |
| locker.lock(); |
| try { |
| return compiler.compile(); |
| } finally { |
| locker.unlock(); |
| } |
| } |
| |
| if (stmtInsertUpsert.getReturnExpression() != null) { |
| deliverResult(hcc, resultSet, compiler, metadataProvider, locker, resultDelivery, outMetadata, stats, |
| requestParameters, false); |
| } else { |
| locker.lock(); |
| try { |
| final JobSpecification jobSpec = compiler.compile(); |
| if (jobSpec == null) { |
| return jobSpec; |
| } |
| runJob(hcc, jobSpec); |
| } finally { |
| locker.unlock(); |
| } |
| } |
| return null; |
| } |
| |
| public JobSpecification handleDeleteStatement(MetadataProvider metadataProvider, Statement stmt, |
| IHyracksClientConnection hcc, boolean compileOnly, Map<String, IAObject> stmtParams, |
| IStatementRewriter stmtRewriter) throws Exception { |
| DeleteStatement stmtDelete = (DeleteStatement) stmt; |
| String datasetName = stmtDelete.getDatasetName(); |
| metadataProvider.validateDatabaseObjectName(stmtDelete.getDataverseName(), datasetName, |
| stmt.getSourceLocation()); |
| DataverseName dataverseName = getActiveDataverseName(stmtDelete.getDataverseName()); |
| MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| boolean bActiveTxn = true; |
| metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| lockUtil.insertDeleteUpsertBegin(lockManager, metadataProvider.getLocks(), dataverseName, datasetName); |
| try { |
| metadataProvider.setWriteTransaction(true); |
| CompiledDeleteStatement clfrqs = new CompiledDeleteStatement(stmtDelete.getVariableExpr(), dataverseName, |
| datasetName, stmtDelete.getCondition(), stmtDelete.getVarCounter(), stmtDelete.getQuery()); |
| clfrqs.setSourceLocation(stmt.getSourceLocation()); |
| JobSpecification jobSpec = |
| rewriteCompileQuery(hcc, metadataProvider, clfrqs.getQuery(), clfrqs, stmtParams, stmtRewriter); |
| afterCompile(); |
| |
| MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| bActiveTxn = false; |
| |
| if (jobSpec != null && !compileOnly) { |
| runJob(hcc, jobSpec); |
| } |
| return jobSpec; |
| } catch (Exception e) { |
| if (bActiveTxn) { |
| abort(e, e, mdTxnCtx); |
| } |
| throw e; |
| } finally { |
| metadataProvider.getLocks().unlock(); |
| } |
| } |
| |
| @Override |
| public JobSpecification rewriteCompileQuery(IClusterInfoCollector clusterInfoCollector, |
| MetadataProvider metadataProvider, Query query, ICompiledDmlStatement stmt, |
| Map<String, IAObject> stmtParams, IStatementRewriter stmtRewriter) |
| throws AlgebricksException, ACIDException { |
| |
| Map<VarIdentifier, IAObject> externalVars = createExternalVariables(stmtParams, stmtRewriter); |
| |
| // Query Rewriting (happens under the same ongoing metadata transaction) |
| Pair<IReturningStatement, Integer> rewrittenResult = apiFramework.reWriteQuery(declaredFunctions, |
| metadataProvider, query, sessionOutput, true, true, externalVars.keySet(), warningCollector); |
| |
| // Query Compilation (happens under the same ongoing metadata transaction) |
| return apiFramework.compileQuery(clusterInfoCollector, metadataProvider, (Query) rewrittenResult.first, |
| rewrittenResult.second, stmt == null ? null : stmt.getDatasetName(), sessionOutput, stmt, externalVars, |
| responsePrinter, warningCollector); |
| } |
| |
| private JobSpecification rewriteCompileInsertUpsert(IClusterInfoCollector clusterInfoCollector, |
| MetadataProvider metadataProvider, InsertStatement insertUpsert, Map<String, IAObject> stmtParams, |
| IStatementRewriter stmtRewriter) throws AlgebricksException, ACIDException { |
| SourceLocation sourceLoc = insertUpsert.getSourceLocation(); |
| |
| Map<VarIdentifier, IAObject> externalVars = createExternalVariables(stmtParams, stmtRewriter); |
| |
| // Insert/upsert statement rewriting (happens under the same ongoing metadata transaction) |
| Pair<IReturningStatement, Integer> rewrittenResult = apiFramework.reWriteQuery(declaredFunctions, |
| metadataProvider, insertUpsert, sessionOutput, true, true, externalVars.keySet(), warningCollector); |
| |
| InsertStatement rewrittenInsertUpsert = (InsertStatement) rewrittenResult.first; |
| DataverseName dataverseName = getActiveDataverseName(rewrittenInsertUpsert.getDataverseName()); |
| String datasetName = rewrittenInsertUpsert.getDatasetName(); |
| CompiledInsertStatement clfrqs; |
| switch (insertUpsert.getKind()) { |
| case INSERT: |
| clfrqs = new CompiledInsertStatement(dataverseName, datasetName, rewrittenInsertUpsert.getQuery(), |
| rewrittenInsertUpsert.getVarCounter(), rewrittenInsertUpsert.getVar(), |
| rewrittenInsertUpsert.getReturnExpression()); |
| clfrqs.setSourceLocation(insertUpsert.getSourceLocation()); |
| break; |
| case UPSERT: |
| clfrqs = new CompiledUpsertStatement(dataverseName, datasetName, rewrittenInsertUpsert.getQuery(), |
| rewrittenInsertUpsert.getVarCounter(), rewrittenInsertUpsert.getVar(), |
| rewrittenInsertUpsert.getReturnExpression()); |
| clfrqs.setSourceLocation(insertUpsert.getSourceLocation()); |
| break; |
| default: |
| throw new CompilationException(ErrorCode.COMPILATION_ERROR, sourceLoc, |
| "Unsupported statement type " + rewrittenInsertUpsert.getKind()); |
| } |
| // Insert/upsert statement compilation (happens under the same ongoing metadata |
| // transaction) |
| return apiFramework.compileQuery(clusterInfoCollector, metadataProvider, rewrittenInsertUpsert.getQuery(), |
| rewrittenResult.second, datasetName, sessionOutput, clfrqs, externalVars, responsePrinter, |
| warningCollector); |
| } |
| |
| protected void handleCreateFeedStatement(MetadataProvider metadataProvider, Statement stmt) throws Exception { |
| CreateFeedStatement cfs = (CreateFeedStatement) stmt; |
| SourceLocation sourceLoc = cfs.getSourceLocation(); |
| String feedName = cfs.getFeedName().getValue(); |
| metadataProvider.validateDatabaseObjectName(cfs.getDataverseName(), feedName, sourceLoc); |
| DataverseName dataverseName = getActiveDataverseName(cfs.getDataverseName()); |
| MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| lockUtil.createFeedBegin(lockManager, metadataProvider.getLocks(), dataverseName, feedName); |
| try { |
| Feed feed = |
| MetadataManager.INSTANCE.getFeed(metadataProvider.getMetadataTxnContext(), dataverseName, feedName); |
| if (feed != null) { |
| if (cfs.getIfNotExists()) { |
| MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| return; |
| } else { |
| throw new CompilationException(ErrorCode.COMPILATION_ERROR, sourceLoc, |
| "A feed with this name " + feedName + " already exists."); |
| } |
| } |
| Map<String, String> configuration = cfs.getConfiguration(); |
| ExternalDataUtils.normalize(configuration); |
| ExternalDataUtils.validate(configuration); |
| feed = new Feed(dataverseName, feedName, configuration); |
| FeedMetadataUtil.validateFeed(feed, mdTxnCtx, appCtx, warningCollector); |
| MetadataManager.INSTANCE.addFeed(metadataProvider.getMetadataTxnContext(), feed); |
| MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| } catch (Exception e) { |
| abort(e, e, mdTxnCtx); |
| throw e; |
| } finally { |
| metadataProvider.getLocks().unlock(); |
| } |
| } |
| |
| protected void handleCreateFeedPolicyStatement(MetadataProvider metadataProvider, Statement stmt) |
| throws AlgebricksException, HyracksDataException { |
| FeedPolicyEntity newPolicy; |
| MetadataTransactionContext mdTxnCtx = null; |
| CreateFeedPolicyStatement cfps = (CreateFeedPolicyStatement) stmt; |
| SourceLocation sourceLoc = cfps.getSourceLocation(); |
| String policyName = cfps.getPolicyName(); |
| metadataProvider.validateDatabaseObjectName(null, policyName, sourceLoc); |
| DataverseName dataverseName = getActiveDataverseName(null); |
| lockUtil.createFeedPolicyBegin(lockManager, metadataProvider.getLocks(), dataverseName, policyName); |
| try { |
| mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| FeedPolicyEntity feedPolicy = MetadataManager.INSTANCE |
| .getFeedPolicy(metadataProvider.getMetadataTxnContext(), dataverseName, policyName); |
| if (feedPolicy != null) { |
| if (cfps.getIfNotExists()) { |
| MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| return; |
| } else { |
| throw new CompilationException(ErrorCode.COMPILATION_ERROR, sourceLoc, |
| "A policy with this name " + policyName + " already exists."); |
| } |
| } |
| boolean extendingExisting = cfps.getSourcePolicyName() != null; |
| String description = cfps.getDescription() == null ? "" : cfps.getDescription(); |
| if (extendingExisting) { |
| FeedPolicyEntity sourceFeedPolicy = MetadataManager.INSTANCE.getFeedPolicy( |
| metadataProvider.getMetadataTxnContext(), dataverseName, cfps.getSourcePolicyName()); |
| if (sourceFeedPolicy == null) { |
| sourceFeedPolicy = MetadataManager.INSTANCE.getFeedPolicy(metadataProvider.getMetadataTxnContext(), |
| MetadataConstants.METADATA_DATAVERSE_NAME, cfps.getSourcePolicyName()); |
| if (sourceFeedPolicy == null) { |
| throw new CompilationException(ErrorCode.COMPILATION_ERROR, sourceLoc, |
| "Unknown policy " + cfps.getSourcePolicyName()); |
| } |
| } |
| Map<String, String> policyProperties = sourceFeedPolicy.getProperties(); |
| policyProperties.putAll(cfps.getProperties()); |
| newPolicy = new FeedPolicyEntity(dataverseName, policyName, description, policyProperties); |
| } else { |
| Properties prop = new Properties(); |
| try { |
| InputStream stream = new FileInputStream(cfps.getSourcePolicyFile()); |
| prop.load(stream); |
| } catch (Exception e) { |
| throw new CompilationException(ErrorCode.COMPILATION_ERROR, sourceLoc, |
| "Unable to read policy file" + cfps.getSourcePolicyFile(), e); |
| } |
| Map<String, String> policyProperties = new HashMap<>(); |
| prop.forEach((key, value) -> policyProperties.put((String) key, (String) value)); |
| newPolicy = new FeedPolicyEntity(dataverseName, policyName, description, policyProperties); |
| } |
| MetadataManager.INSTANCE.addFeedPolicy(mdTxnCtx, newPolicy); |
| MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| } catch (RemoteException | ACIDException e) { |
| abort(e, e, mdTxnCtx); |
| throw HyracksDataException.create(e); |
| } finally { |
| metadataProvider.getLocks().unlock(); |
| } |
| } |
| |
| protected void handleDropFeedStatement(MetadataProvider metadataProvider, Statement stmt, |
| IHyracksClientConnection hcc) throws Exception { |
| FeedDropStatement stmtFeedDrop = (FeedDropStatement) stmt; |
| SourceLocation sourceLoc = stmtFeedDrop.getSourceLocation(); |
| String feedName = stmtFeedDrop.getFeedName().getValue(); |
| metadataProvider.validateDatabaseObjectName(stmtFeedDrop.getDataverseName(), feedName, sourceLoc); |
| DataverseName dataverseName = getActiveDataverseName(stmtFeedDrop.getDataverseName()); |
| MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| lockUtil.dropFeedBegin(lockManager, metadataProvider.getLocks(), dataverseName, feedName); |
| try { |
| Feed feed = MetadataManager.INSTANCE.getFeed(mdTxnCtx, dataverseName, feedName); |
| if (feed == null) { |
| if (!stmtFeedDrop.getIfExists()) { |
| throw new CompilationException(ErrorCode.COMPILATION_ERROR, sourceLoc, |
| "There is no feed with this name " + feedName + "."); |
| } |
| MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| return; |
| } |
| doDropFeed(hcc, metadataProvider, feed, sourceLoc); |
| MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| } catch (Exception e) { |
| abort(e, e, mdTxnCtx); |
| throw e; |
| } finally { |
| metadataProvider.getLocks().unlock(); |
| } |
| } |
| |
| protected void doDropFeed(IHyracksClientConnection hcc, MetadataProvider metadataProvider, Feed feed, |
| SourceLocation sourceLoc) throws Exception { |
| MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext(); |
| EntityId feedId = feed.getFeedId(); |
| ActiveNotificationHandler activeNotificationHandler = |
| (ActiveNotificationHandler) appCtx.getActiveNotificationHandler(); |
| ActiveEntityEventsListener listener = |
| (ActiveEntityEventsListener) activeNotificationHandler.getListener(feedId); |
| if (listener != null && listener.getState() != ActivityState.STOPPED) { |
| throw new CompilationException(ErrorCode.COMPILATION_ERROR, sourceLoc, |
| "Feed " + feedId + " is currently active and connected to the following " + dataset(PLURAL) + "\n" |
| + listener.toString()); |
| } else if (listener != null) { |
| listener.unregister(); |
| } |
| JobSpecification spec = FeedOperations.buildRemoveFeedStorageJob(metadataProvider, |
| MetadataManager.INSTANCE.getFeed(mdTxnCtx, feedId.getDataverseName(), feedId.getEntityName())); |
| runJob(hcc, spec); |
| MetadataManager.INSTANCE.dropFeed(mdTxnCtx, feed.getDataverseName(), feed.getFeedName()); |
| if (LOGGER.isInfoEnabled()) { |
| LOGGER.info("Removed feed " + feedId); |
| } |
| } |
| |
| protected void handleDropFeedPolicyStatement(MetadataProvider metadataProvider, Statement stmt) throws Exception { |
| FeedPolicyDropStatement stmtFeedPolicyDrop = (FeedPolicyDropStatement) stmt; |
| SourceLocation sourceLoc = stmtFeedPolicyDrop.getSourceLocation(); |
| String policyName = stmtFeedPolicyDrop.getPolicyName().getValue(); |
| metadataProvider.validateDatabaseObjectName(stmtFeedPolicyDrop.getDataverseName(), policyName, sourceLoc); |
| DataverseName dataverseName = getActiveDataverseName(stmtFeedPolicyDrop.getDataverseName()); |
| MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| lockUtil.dropFeedPolicyBegin(lockManager, metadataProvider.getLocks(), dataverseName, policyName); |
| try { |
| FeedPolicyEntity feedPolicy = MetadataManager.INSTANCE.getFeedPolicy(mdTxnCtx, dataverseName, policyName); |
| if (feedPolicy == null) { |
| if (!stmtFeedPolicyDrop.getIfExists()) { |
| throw new CompilationException(ErrorCode.COMPILATION_ERROR, sourceLoc, |
| "Unknown policy " + policyName + " in " + dataverse() + " " + dataverseName); |
| } |
| MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| return; |
| } |
| MetadataManager.INSTANCE.dropFeedPolicy(mdTxnCtx, dataverseName, policyName); |
| MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| } catch (Exception e) { |
| abort(e, e, mdTxnCtx); |
| throw e; |
| } finally { |
| metadataProvider.getLocks().unlock(); |
| } |
| } |
| |
| protected void handleStartFeedStatement(MetadataProvider metadataProvider, Statement stmt, |
| IHyracksClientConnection hcc) throws Exception { |
| StartFeedStatement sfs = (StartFeedStatement) stmt; |
| SourceLocation sourceLoc = sfs.getSourceLocation(); |
| DataverseName dataverseName = getActiveDataverseName(sfs.getDataverseName()); |
| String feedName = sfs.getFeedName().getValue(); |
| MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| boolean committed = false; |
| lockUtil.startFeedBegin(lockManager, metadataProvider.getLocks(), dataverseName, feedName); |
| try { |
| metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| // Runtime handler |
| EntityId entityId = new EntityId(Feed.EXTENSION_NAME, dataverseName, feedName); |
| // Feed & Feed Connections |
| Feed feed = FeedMetadataUtil.validateIfFeedExists(dataverseName, feedName, |
| metadataProvider.getMetadataTxnContext()); |
| List<FeedConnection> feedConnections = MetadataManager.INSTANCE |
| .getFeedConections(metadataProvider.getMetadataTxnContext(), dataverseName, feedName); |
| if (feedConnections.isEmpty()) { |
| throw new CompilationException(ErrorCode.FEED_START_FEED_WITHOUT_CONNECTION, sourceLoc, feedName); |
| } |
| for (FeedConnection feedConnection : feedConnections) { |
| // what if the dataset is in a different dataverse |
| lockManager.acquireDatasetReadLock(metadataProvider.getLocks(), feedConnection.getDataverseName(), |
| feedConnection.getDatasetName()); |
| } |
| ActiveNotificationHandler activeEventHandler = |
| (ActiveNotificationHandler) appCtx.getActiveNotificationHandler(); |
| ActiveEntityEventsListener listener = (ActiveEntityEventsListener) activeEventHandler.getListener(entityId); |
| if (listener == null) { |
| // Prepare policy |
| List<Dataset> datasets = new ArrayList<>(); |
| for (FeedConnection connection : feedConnections) { |
| Dataset ds = |
| metadataProvider.findDataset(connection.getDataverseName(), connection.getDatasetName()); |
| datasets.add(ds); |
| } |
| listener = new FeedEventsListener(this, metadataProvider.getApplicationContext(), hcc, entityId, |
| datasets, null, FeedIntakeOperatorNodePushable.class.getSimpleName(), |
| NoRetryPolicyFactory.INSTANCE, feed, feedConnections, compilationProvider.getLanguage()); |
| } |
| MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| committed = true; |
| listener.start(metadataProvider); |
| } catch (Exception e) { |
| if (!committed) { |
| abort(e, e, mdTxnCtx); |
| } |
| throw e; |
| } finally { |
| metadataProvider.getLocks().unlock(); |
| } |
| } |
| |
| protected void handleStopFeedStatement(MetadataProvider metadataProvider, Statement stmt) throws Exception { |
| StopFeedStatement sfst = (StopFeedStatement) stmt; |
| SourceLocation sourceLoc = sfst.getSourceLocation(); |
| DataverseName dataverseName = getActiveDataverseName(sfst.getDataverseName()); |
| String feedName = sfst.getFeedName().getValue(); |
| EntityId entityId = new EntityId(Feed.EXTENSION_NAME, dataverseName, feedName); |
| ActiveNotificationHandler activeEventHandler = |
| (ActiveNotificationHandler) appCtx.getActiveNotificationHandler(); |
| // Obtain runtime info from ActiveListener |
| ActiveEntityEventsListener listener = (ActiveEntityEventsListener) activeEventHandler.getListener(entityId); |
| if (listener == null) { |
| throw new CompilationException(ErrorCode.COMPILATION_ERROR, sourceLoc, |
| "Feed " + feedName + " is not started."); |
| } |
| lockUtil.stopFeedBegin(lockManager, metadataProvider.getLocks(), dataverseName, feedName); |
| try { |
| listener.stop(metadataProvider); |
| } finally { |
| metadataProvider.getLocks().unlock(); |
| } |
| } |
| |
| private void handleConnectFeedStatement(MetadataProvider metadataProvider, Statement stmt) throws Exception { |
| FeedConnection fc; |
| ConnectFeedStatement cfs = (ConnectFeedStatement) stmt; |
| SourceLocation sourceLoc = cfs.getSourceLocation(); |
| DataverseName dataverseName = getActiveDataverseName(cfs.getDataverseName()); |
| String feedName = cfs.getFeedName(); |
| String datasetName = cfs.getDatasetName().getValue(); |
| String policyName = cfs.getPolicy(); |
| String whereClauseBody = cfs.getWhereClauseBody(); |
| MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| // TODO: Check whether we are connecting a change feed to a non-meta dataset |
| // Check whether feed is alive |
| ActiveNotificationHandler activeEventHandler = |
| (ActiveNotificationHandler) appCtx.getActiveNotificationHandler(); |
| // Transaction handling |
| lockUtil.connectFeedBegin(lockManager, metadataProvider.getLocks(), dataverseName, datasetName, feedName); |
| try { |
| // validation |
| Dataset dataset = FeedMetadataUtil.validateIfDatasetExists(metadataProvider, dataverseName, datasetName); |
| Feed feed = FeedMetadataUtil.validateIfFeedExists(dataverseName, feedName, |
| metadataProvider.getMetadataTxnContext()); |
| FeedEventsListener listener = (FeedEventsListener) activeEventHandler.getListener(feed.getFeedId()); |
| if (listener != null && listener.isActive()) { |
| throw new CompilationException(ErrorCode.FEED_CHANGE_FEED_CONNECTIVITY_ON_ALIVE_FEED, sourceLoc, |
| feedName); |
| } |
| ARecordType outputType = FeedMetadataUtil.getOutputType(feed, |
| feed.getConfiguration().get(ExternalDataConstants.KEY_TYPE_NAME)); |
| List<FunctionSignature> appliedFunctions = cfs.getAppliedFunctions(); |
| for (FunctionSignature func : appliedFunctions) { |
| if (MetadataManager.INSTANCE.getFunction(mdTxnCtx, func) == null) { |
| throw new CompilationException(ErrorCode.FEED_CONNECT_FEED_APPLIED_INVALID_FUNCTION, sourceLoc, |
| func.getName()); |
| } |
| } |
| fc = MetadataManager.INSTANCE.getFeedConnection(metadataProvider.getMetadataTxnContext(), dataverseName, |
| feedName, datasetName); |
| if (fc != null) { |
| throw new CompilationException(ErrorCode.COMPILATION_ERROR, sourceLoc, |
| "Feed" + feedName + " is already connected to " + dataset() + " " + datasetName); |
| } |
| fc = new FeedConnection(dataverseName, feedName, datasetName, appliedFunctions, policyName, whereClauseBody, |
| outputType.getTypeName()); |
| MetadataManager.INSTANCE.addFeedConnection(metadataProvider.getMetadataTxnContext(), fc); |
| MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| if (listener != null) { |
| listener.add(dataset); |
| listener.addFeedConnection(fc); |
| } |
| } catch (Exception e) { |
| abort(e, e, mdTxnCtx); |
| throw e; |
| } finally { |
| metadataProvider.getLocks().unlock(); |
| } |
| } |
| |
| protected void handleDisconnectFeedStatement(MetadataProvider metadataProvider, Statement stmt) throws Exception { |
| DisconnectFeedStatement cfs = (DisconnectFeedStatement) stmt; |
| SourceLocation sourceLoc = cfs.getSourceLocation(); |
| DataverseName dataverseName = getActiveDataverseName(cfs.getDataverseName()); |
| String datasetName = cfs.getDatasetName().getValue(); |
| String feedName = cfs.getFeedName().getValue(); |
| MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| lockUtil.disconnectFeedBegin(lockManager, metadataProvider.getLocks(), dataverseName, datasetName, feedName); |
| try { |
| ActiveNotificationHandler activeEventHandler = |
| (ActiveNotificationHandler) appCtx.getActiveNotificationHandler(); |
| // Check whether feed is alive |
| ActiveEntityEventsListener listener = (ActiveEntityEventsListener) activeEventHandler |
| .getListener(new EntityId(Feed.EXTENSION_NAME, dataverseName, feedName)); |
| if (listener != null && listener.isActive()) { |
| throw new CompilationException(ErrorCode.FEED_CHANGE_FEED_CONNECTIVITY_ON_ALIVE_FEED, sourceLoc, |
| feedName); |
| } |
| FeedMetadataUtil.validateIfDatasetExists(metadataProvider, dataverseName, cfs.getDatasetName().getValue()); |
| FeedMetadataUtil.validateIfFeedExists(dataverseName, cfs.getFeedName().getValue(), mdTxnCtx); |
| FeedConnection fc = MetadataManager.INSTANCE.getFeedConnection(metadataProvider.getMetadataTxnContext(), |
| dataverseName, feedName, datasetName); |
| Dataset ds = metadataProvider.findDataset(dataverseName, datasetName); |
| if (ds == null) { |
| throw new CompilationException(ErrorCode.UNKNOWN_DATASET_IN_DATAVERSE, sourceLoc, datasetName, |
| dataverseName); |
| } |
| if (fc == null) { |
| throw new CompilationException(ErrorCode.COMPILATION_ERROR, sourceLoc, "Feed " + feedName |
| + " is currently not connected to " + cfs.getDatasetName().getValue() + ". Invalid operation!"); |
| } |
| MetadataManager.INSTANCE.dropFeedConnection(mdTxnCtx, dataverseName, feedName, datasetName); |
| MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| if (listener != null) { |
| listener.remove(ds); |
| } |
| } catch (Exception e) { |
| abort(e, e, mdTxnCtx); |
| throw e; |
| } finally { |
| metadataProvider.getLocks().unlock(); |
| } |
| } |
| |
| protected void handleCompactStatement(MetadataProvider metadataProvider, Statement stmt, |
| IHyracksClientConnection hcc) throws Exception { |
| CompactStatement compactStatement = (CompactStatement) stmt; |
| SourceLocation sourceLoc = compactStatement.getSourceLocation(); |
| DataverseName dataverseName = getActiveDataverseName(compactStatement.getDataverseName()); |
| String datasetName = compactStatement.getDatasetName().getValue(); |
| MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| boolean bActiveTxn = true; |
| metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| List<JobSpecification> jobsToExecute = new ArrayList<>(); |
| lockUtil.compactBegin(lockManager, metadataProvider.getLocks(), dataverseName, datasetName); |
| try { |
| Dataset ds = metadataProvider.findDataset(dataverseName, datasetName); |
| if (ds == null) { |
| throw new CompilationException(ErrorCode.UNKNOWN_DATASET_IN_DATAVERSE, sourceLoc, datasetName, |
| dataverseName); |
| } |
| // Prepare jobs to compact the datatset and its indexes |
| List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName); |
| if (indexes.isEmpty()) { |
| throw new CompilationException(ErrorCode.COMPILATION_ERROR, sourceLoc, |
| "Cannot compact the external " + dataset() + " " + datasetName + " because it has no indexes"); |
| } |
| Dataverse dataverse = |
| MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), dataverseName); |
| jobsToExecute.add(DatasetUtil.compactDatasetJobSpec(dataverse, datasetName, metadataProvider)); |
| |
| if (ds.getDatasetType() == DatasetType.INTERNAL) { |
| for (Index index : indexes) { |
| if (index.isSecondaryIndex()) { |
| jobsToExecute.add( |
| IndexUtil.buildSecondaryIndexCompactJobSpec(ds, index, metadataProvider, sourceLoc)); |
| } |
| } |
| } else { |
| prepareCompactJobsForExternalDataset(indexes, ds, jobsToExecute, metadataProvider, sourceLoc); |
| } |
| MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| bActiveTxn = false; |
| |
| // #. run the jobs |
| for (JobSpecification jobSpec : jobsToExecute) { |
| runJob(hcc, jobSpec); |
| } |
| } catch (Exception e) { |
| if (bActiveTxn) { |
| abort(e, e, mdTxnCtx); |
| } |
| throw e; |
| } finally { |
| metadataProvider.getLocks().unlock(); |
| ExternalDatasetsRegistry.INSTANCE.releaseAcquiredLocks(metadataProvider); |
| } |
| } |
| |
| protected void prepareCompactJobsForExternalDataset(List<Index> indexes, Dataset ds, |
| List<JobSpecification> jobsToExecute, MetadataProvider metadataProvider, SourceLocation sourceLoc) |
| throws AlgebricksException { |
| for (int j = 0; j < indexes.size(); j++) { |
| jobsToExecute |
| .add(IndexUtil.buildSecondaryIndexCompactJobSpec(ds, indexes.get(j), metadataProvider, sourceLoc)); |
| |
| } |
| } |
| |
| private interface IMetadataLocker { |
| void lock() throws AlgebricksException; |
| |
| void unlock() throws AlgebricksException; |
| } |
| |
| private interface IResultPrinter { |
| void print(JobId jobId) throws HyracksDataException, AlgebricksException; |
| } |
| |
| private interface IStatementCompiler { |
| JobSpecification compile() throws AlgebricksException, RemoteException, ACIDException; |
| } |
| |
| protected void handleQuery(MetadataProvider metadataProvider, Query query, IHyracksClientConnection hcc, |
| IResultSet resultSet, ResultDelivery resultDelivery, ResultMetadata outMetadata, Stats stats, |
| IRequestParameters requestParameters, Map<String, IAObject> stmtParams, IStatementRewriter stmtRewriter) |
| throws Exception { |
| final IMetadataLocker locker = new IMetadataLocker() { |
| @Override |
| public void lock() { |
| } |
| |
| @Override |
| public void unlock() { |
| metadataProvider.getLocks().unlock(); |
| // release external datasets' locks acquired during compilation of the query |
| ExternalDatasetsRegistry.INSTANCE.releaseAcquiredLocks(metadataProvider); |
| } |
| }; |
| final IStatementCompiler compiler = () -> { |
| MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| boolean bActiveTxn = true; |
| metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| try { |
| final JobSpecification jobSpec = |
| rewriteCompileQuery(hcc, metadataProvider, query, null, stmtParams, stmtRewriter); |
| // update stats with count of compile-time warnings. needs to be adapted for multi-statement. |
| stats.updateTotalWarningsCount(warningCollector.getTotalWarningsCount()); |
| afterCompile(); |
| MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| bActiveTxn = false; |
| return query.isExplain() || !sessionConfig.isExecuteQuery() ? null : jobSpec; |
| } catch (Exception e) { |
| LOGGER.log(Level.INFO, e.getMessage(), e); |
| if (bActiveTxn) { |
| abort(e, e, mdTxnCtx); |
| } |
| throw e; |
| } |
| }; |
| deliverResult(hcc, resultSet, compiler, metadataProvider, locker, resultDelivery, outMetadata, stats, |
| requestParameters, true); |
| } |
| |
| private void deliverResult(IHyracksClientConnection hcc, IResultSet resultSet, IStatementCompiler compiler, |
| MetadataProvider metadataProvider, IMetadataLocker locker, ResultDelivery resultDelivery, |
| ResultMetadata outMetadata, Stats stats, IRequestParameters requestParameters, boolean cancellable) |
| throws Exception { |
| final ResultSetId resultSetId = metadataProvider.getResultSetId(); |
| switch (resultDelivery) { |
| case ASYNC: |
| MutableBoolean printed = new MutableBoolean(false); |
| executorService.submit(() -> asyncCreateAndRunJob(hcc, compiler, locker, resultDelivery, |
| requestParameters, cancellable, resultSetId, printed, metadataProvider)); |
| synchronized (printed) { |
| while (!printed.booleanValue()) { |
| printed.wait(); |
| } |
| } |
| break; |
| case IMMEDIATE: |
| createAndRunJob(hcc, jobFlags, null, compiler, locker, resultDelivery, id -> { |
| final ResultReader resultReader = new ResultReader(resultSet, id, resultSetId); |
| updateJobStats(id, stats, metadataProvider.getResultSetId()); |
| responsePrinter.addResultPrinter(new ResultsPrinter(appCtx, resultReader, |
| metadataProvider.findOutputRecordType(), stats, sessionOutput)); |
| responsePrinter.printResults(); |
| }, requestParameters, cancellable, appCtx, metadataProvider); |
| break; |
| case DEFERRED: |
| createAndRunJob(hcc, jobFlags, null, compiler, locker, resultDelivery, id -> { |
| updateJobStats(id, stats, metadataProvider.getResultSetId()); |
| responsePrinter.addResultPrinter( |
| new ResultHandlePrinter(sessionOutput, new ResultHandle(id, resultSetId))); |
| responsePrinter.printResults(); |
| if (outMetadata != null) { |
| outMetadata.getResultSets().add(org.apache.commons.lang3.tuple.Triple.of(id, resultSetId, |
| metadataProvider.findOutputRecordType())); |
| } |
| }, requestParameters, cancellable, appCtx, metadataProvider); |
| break; |
| default: |
| break; |
| } |
| } |
| |
| private void updateJobStats(JobId jobId, Stats stats, ResultSetId rsId) throws HyracksDataException { |
| final ClusterControllerService controllerService = |
| (ClusterControllerService) appCtx.getServiceContext().getControllerService(); |
| org.apache.asterix.api.common.ResultMetadata resultMetadata = |
| (org.apache.asterix.api.common.ResultMetadata) controllerService.getResultDirectoryService() |
| .getResultMetadata(jobId, rsId); |
| stats.setProcessedObjects(resultMetadata.getProcessedObjects()); |
| if (jobFlags.contains(JobFlag.PROFILE_RUNTIME)) { |
| stats.setJobProfile(resultMetadata.getJobProfile()); |
| } |
| stats.updateTotalWarningsCount(resultMetadata.getTotalWarningsCount()); |
| WarningUtil.mergeWarnings(resultMetadata.getWarnings(), warningCollector); |
| } |
| |
| private void asyncCreateAndRunJob(IHyracksClientConnection hcc, IStatementCompiler compiler, IMetadataLocker locker, |
| ResultDelivery resultDelivery, IRequestParameters requestParameters, boolean cancellable, |
| ResultSetId resultSetId, MutableBoolean printed, MetadataProvider metadataProvider) { |
| Mutable<JobId> jobId = new MutableObject<>(JobId.INVALID); |
| try { |
| createAndRunJob(hcc, jobFlags, jobId, compiler, locker, resultDelivery, id -> { |
| final ResultHandle handle = new ResultHandle(id, resultSetId); |
| responsePrinter.addResultPrinter(new StatusPrinter(AbstractQueryApiServlet.ResultStatus.RUNNING)); |
| responsePrinter.addResultPrinter(new ResultHandlePrinter(sessionOutput, handle)); |
| responsePrinter.printResults(); |
| synchronized (printed) { |
| printed.setTrue(); |
| printed.notify(); |
| } |
| }, requestParameters, cancellable, appCtx, metadataProvider); |
| } catch (Exception e) { |
| if (Objects.equals(JobId.INVALID, jobId.getValue())) { |
| // compilation failed |
| responsePrinter.addResultPrinter(new StatusPrinter(AbstractQueryApiServlet.ResultStatus.FAILED)); |
| responsePrinter.addResultPrinter(new ErrorsPrinter(Collections.singletonList(ExecutionError.of(e)))); |
| try { |
| responsePrinter.printResults(); |
| } catch (HyracksDataException ex) { |
| LOGGER.error("failed to print result", ex); |
| } |
| } else { |
| GlobalConfig.ASTERIX_LOGGER.log(Level.ERROR, |
| resultDelivery.name() + " job with id " + jobId.getValue() + " " + "failed", e); |
| } |
| } finally { |
| synchronized (printed) { |
| if (printed.isFalse()) { |
| printed.setTrue(); |
| printed.notify(); |
| } |
| } |
| } |
| } |
| |
| private void runJob(IHyracksClientConnection hcc, JobSpecification jobSpec) throws Exception { |
| runJob(hcc, jobSpec, jobFlags); |
| } |
| |
| private static void runJob(IHyracksClientConnection hcc, JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) |
| throws Exception { |
| JobUtils.runJob(hcc, jobSpec, jobFlags, true); |
| } |
| |
| private static void createAndRunJob(IHyracksClientConnection hcc, EnumSet<JobFlag> jobFlags, Mutable<JobId> jId, |
| IStatementCompiler compiler, IMetadataLocker locker, ResultDelivery resultDelivery, IResultPrinter printer, |
| IRequestParameters requestParameters, boolean cancellable, ICcApplicationContext appCtx, |
| MetadataProvider metadataProvider) throws Exception { |
| final IRequestTracker requestTracker = appCtx.getRequestTracker(); |
| final ClientRequest clientRequest = |
| (ClientRequest) requestTracker.get(requestParameters.getRequestReference().getUuid()); |
| locker.lock(); |
| try { |
| final JobSpecification jobSpec = compiler.compile(); |
| if (jobSpec == null) { |
| return; |
| } |
| if (cancellable) { |
| clientRequest.markCancellable(); |
| } |
| final SchedulableClientRequest schedulableRequest = |
| SchedulableClientRequest.of(clientRequest, requestParameters, metadataProvider, jobSpec); |
| appCtx.getReceptionist().ensureSchedulable(schedulableRequest); |
| final JobId jobId = JobUtils.runJob(hcc, jobSpec, jobFlags, false); |
| clientRequest.setJobId(jobId); |
| if (jId != null) { |
| jId.setValue(jobId); |
| } |
| if (ResultDelivery.ASYNC == resultDelivery) { |
| printer.print(jobId); |
| hcc.waitForCompletion(jobId); |
| } else { |
| hcc.waitForCompletion(jobId); |
| ensureNotCancelled(clientRequest); |
| printer.print(jobId); |
| } |
| } catch (Exception e) { |
| if (ExceptionUtils.getRootCause(e) instanceof InterruptedException) { |
| Thread.currentThread().interrupt(); |
| throw new RuntimeDataException(ErrorCode.REQUEST_CANCELLED, clientRequest.getId()); |
| } |
| throw e; |
| } finally { |
| // complete async jobs after their job completes |
| if (ResultDelivery.ASYNC == resultDelivery) { |
| requestTracker.complete(clientRequest.getId()); |
| } |
| locker.unlock(); |
| } |
| } |
| |
| protected void handleCreateNodeGroupStatement(MetadataProvider metadataProvider, Statement stmt) throws Exception { |
| NodegroupDecl stmtCreateNodegroup = (NodegroupDecl) stmt; |
| SourceLocation sourceLoc = stmtCreateNodegroup.getSourceLocation(); |
| String ngName = stmtCreateNodegroup.getNodegroupName().getValue(); |
| metadataProvider.validateDatabaseObjectName(null, ngName, sourceLoc); |
| |
| MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| lockManager.acquireNodeGroupWriteLock(metadataProvider.getLocks(), ngName); |
| try { |
| NodeGroup ng = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, ngName); |
| if (ng != null) { |
| if (!stmtCreateNodegroup.getIfNotExists()) { |
| throw new CompilationException(ErrorCode.COMPILATION_ERROR, sourceLoc, |
| "A nodegroup with this name " + ngName + " already exists."); |
| } |
| } else { |
| List<Identifier> ncIdentifiers = stmtCreateNodegroup.getNodeControllerNames(); |
| List<String> ncNames = new ArrayList<>(ncIdentifiers.size()); |
| for (Identifier id : ncIdentifiers) { |
| ncNames.add(id.getValue()); |
| } |
| MetadataManager.INSTANCE.addNodegroup(mdTxnCtx, new NodeGroup(ngName, ncNames)); |
| } |
| MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| } catch (Exception e) { |
| abort(e, e, mdTxnCtx); |
| throw e; |
| } finally { |
| metadataProvider.getLocks().unlock(); |
| } |
| } |
| |
| protected void handleExternalDatasetRefreshStatement(MetadataProvider metadataProvider, Statement stmt, |
| IHyracksClientConnection hcc) throws Exception { |
| RefreshExternalDatasetStatement stmtRefresh = (RefreshExternalDatasetStatement) stmt; |
| SourceLocation sourceLoc = stmtRefresh.getSourceLocation(); |
| DataverseName dataverseName = getActiveDataverseName(stmtRefresh.getDataverseName()); |
| String datasetName = stmtRefresh.getDatasetName().getValue(); |
| TransactionState transactionState = TransactionState.COMMIT; |
| MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| boolean bActiveTxn = true; |
| metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| JobSpecification spec = null; |
| Dataset ds = null; |
| List<ExternalFile> metadataFiles = null; |
| List<ExternalFile> deletedFiles = null; |
| List<ExternalFile> addedFiles = null; |
| List<ExternalFile> appendedFiles = null; |
| List<Index> indexes = null; |
| Dataset transactionDataset = null; |
| boolean lockAquired = false; |
| boolean success = false; |
| lockUtil.refreshDatasetBegin(lockManager, metadataProvider.getLocks(), dataverseName, datasetName); |
| try { |
| ds = metadataProvider.findDataset(dataverseName, datasetName); |
| // Dataset exists ? |
| if (ds == null) { |
| throw new CompilationException(ErrorCode.UNKNOWN_DATASET_IN_DATAVERSE, sourceLoc, datasetName, |
| dataverseName); |
| } |
| // Dataset external ? |
| if (ds.getDatasetType() != DatasetType.EXTERNAL) { |
| throw new CompilationException(ErrorCode.COMPILATION_ERROR, sourceLoc, dataset() + " " + datasetName |
| + " in " + dataverse() + " " + dataverseName + " is not an external " + dataset()); |
| } |
| // Dataset has indexes ? |
| indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName); |
| if (indexes.isEmpty()) { |
| throw new CompilationException(ErrorCode.COMPILATION_ERROR, sourceLoc, "External " + dataset() + " " |
| + datasetName + " in " + dataverse() + " " + dataverseName + " doesn't have any index"); |
| } |
| |
| // Record transaction time |
| Date txnTime = new Date(); |
| |
| // refresh lock here |
| ExternalDatasetsRegistry.INSTANCE.refreshBegin(ds); |
| lockAquired = true; |
| |
| // Get internal files |
| metadataFiles = MetadataManager.INSTANCE.getDatasetExternalFiles(mdTxnCtx, ds); |
| deletedFiles = new ArrayList<>(); |
| addedFiles = new ArrayList<>(); |
| appendedFiles = new ArrayList<>(); |
| |
| // Compute delta |
| // Now we compare snapshot with external file system |
| if (ExternalIndexingOperations.isDatasetUptodate(ds, metadataFiles, addedFiles, deletedFiles, |
| appendedFiles)) { |
| ((ExternalDatasetDetails) ds.getDatasetDetails()).setRefreshTimestamp(txnTime); |
| MetadataManager.INSTANCE.updateDataset(mdTxnCtx, ds); |
| MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| // latch will be released in the finally clause |
| return; |
| } |
| |
| // At this point, we know data has changed in the external file system, record |
| // transaction in metadata and start |
| transactionDataset = ExternalIndexingOperations.createTransactionDataset(ds); |
| /* |
| * Remove old dataset record and replace it with a new one |
| */ |
| MetadataManager.INSTANCE.updateDataset(mdTxnCtx, transactionDataset); |
| |
| // Add delta files to the metadata |
| for (ExternalFile file : addedFiles) { |
| MetadataManager.INSTANCE.addExternalFile(mdTxnCtx, file); |
| } |
| for (ExternalFile file : appendedFiles) { |
| MetadataManager.INSTANCE.addExternalFile(mdTxnCtx, file); |
| } |
| for (ExternalFile file : deletedFiles) { |
| MetadataManager.INSTANCE.addExternalFile(mdTxnCtx, file); |
| } |
| |
| // Create the files index update job |
| spec = ExternalIndexingOperations.buildFilesIndexUpdateOp(ds, metadataFiles, addedFiles, appendedFiles, |
| metadataProvider); |
| |
| MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| bActiveTxn = false; |
| transactionState = TransactionState.BEGIN; |
| |
| // run the files update job |
| runJob(hcc, spec); |
| |
| for (Index index : indexes) { |
| if (!ExternalIndexingOperations.isFileIndex(index)) { |
| spec = ExternalIndexingOperations.buildIndexUpdateOp(ds, index, metadataFiles, addedFiles, |
| appendedFiles, metadataProvider, sourceLoc); |
| // run the files update job |
| runJob(hcc, spec); |
| } |
| } |
| |
| // all index updates has completed successfully, record transaction state |
| spec = ExternalIndexingOperations.buildCommitJob(ds, indexes, metadataProvider); |
| |
| // Aquire write latch again -> start a transaction and record the decision to |
| // commit |
| mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| bActiveTxn = true; |
| ((ExternalDatasetDetails) transactionDataset.getDatasetDetails()) |
| .setState(TransactionState.READY_TO_COMMIT); |
| ((ExternalDatasetDetails) transactionDataset.getDatasetDetails()).setRefreshTimestamp(txnTime); |
| MetadataManager.INSTANCE.updateDataset(mdTxnCtx, transactionDataset); |
| MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| bActiveTxn = false; |
| transactionState = TransactionState.READY_TO_COMMIT; |
| // We don't release the latch since this job is expected to be quick |
| runJob(hcc, spec); |
| // Start a new metadata transaction to record the final state of the transaction |
| mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| bActiveTxn = true; |
| |
| for (ExternalFile file : metadataFiles) { |
| if (file.getPendingOp() == ExternalFilePendingOp.DROP_OP) { |
| MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx, file); |
| } else if (file.getPendingOp() == ExternalFilePendingOp.NO_OP) { |
| Iterator<ExternalFile> iterator = appendedFiles.iterator(); |
| while (iterator.hasNext()) { |
| ExternalFile appendedFile = iterator.next(); |
| if (file.getFileName().equals(appendedFile.getFileName())) { |
| // delete existing file |
| MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx, file); |
| // delete existing appended file |
| MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx, appendedFile); |
| // add the original file with appended information |
| appendedFile.setFileNumber(file.getFileNumber()); |
| appendedFile.setPendingOp(ExternalFilePendingOp.NO_OP); |
| MetadataManager.INSTANCE.addExternalFile(mdTxnCtx, appendedFile); |
| iterator.remove(); |
| } |
| } |
| } |
| } |
| |
| // remove the deleted files delta |
| for (ExternalFile file : deletedFiles) { |
| MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx, file); |
| } |
| |
| // insert new files |
| for (ExternalFile file : addedFiles) { |
| MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx, file); |
| file.setPendingOp(ExternalFilePendingOp.NO_OP); |
| MetadataManager.INSTANCE.addExternalFile(mdTxnCtx, file); |
| } |
| |
| // mark the transaction as complete |
| ((ExternalDatasetDetails) transactionDataset.getDatasetDetails()).setState(TransactionState.COMMIT); |
| MetadataManager.INSTANCE.updateDataset(mdTxnCtx, transactionDataset); |
| |
| // commit metadata transaction |
| MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| success = true; |
| } catch (Exception e) { |
| if (bActiveTxn) { |
| abort(e, e, mdTxnCtx); |
| } |
| if (transactionState == TransactionState.READY_TO_COMMIT) { |
| throw new IllegalStateException("System is inconsistent state: commit of (" + dataverseName + "." |
| + datasetName + ") refresh couldn't carry out the commit phase", e); |
| } |
| if (transactionState == TransactionState.COMMIT) { |
| // Nothing to do , everything should be clean |
| throw e; |
| } |
| if (transactionState == TransactionState.BEGIN) { |
| // transaction failed, need to do the following |
| // clean NCs removing transaction components |
| mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| bActiveTxn = true; |
| metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| spec = ExternalIndexingOperations.buildAbortOp(ds, indexes, metadataProvider); |
| MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| bActiveTxn = false; |
| try { |
| runJob(hcc, spec); |
| } catch (Exception e2) { |
| // This should never happen -- fix throw illegal |
| e.addSuppressed(e2); |
| throw new IllegalStateException("System is in inconsistent state. Failed to abort refresh", e); |
| } |
| // remove the delta of files |
| // return the state of the dataset to committed |
| try { |
| mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| for (ExternalFile file : deletedFiles) { |
| MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx, file); |
| } |
| for (ExternalFile file : addedFiles) { |
| MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx, file); |
| } |
| for (ExternalFile file : appendedFiles) { |
| MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx, file); |
| } |
| MetadataManager.INSTANCE.updateDataset(mdTxnCtx, ds); |
| // commit metadata transaction |
| MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| } catch (Exception e2) { |
| abort(e, e2, mdTxnCtx); |
| e.addSuppressed(e2); |
| throw new IllegalStateException("System is in inconsistent state. Failed to drop delta files", e); |
| } |
| } |
| } finally { |
| if (lockAquired) { |
| ExternalDatasetsRegistry.INSTANCE.refreshEnd(ds, success); |
| } |
| metadataProvider.getLocks().unlock(); |
| } |
| } |
| |
| @Override |
| public DataverseName getActiveDataverseName(DataverseName dataverseName) { |
| return dataverseName != null ? dataverseName : activeDataverse.getDataverseName(); |
| } |
| |
| @Override |
| public ExecutionPlans getExecutionPlans() { |
| return apiFramework.getExecutionPlans(); |
| } |
| |
| @Override |
| public IResponsePrinter getResponsePrinter() { |
| return responsePrinter; |
| } |
| |
| @Override |
| public void getWarnings(Collection<? super Warning> outWarnings, long maxWarnings) { |
| warningCollector.getWarnings(outWarnings, maxWarnings); |
| } |
| |
| /** |
| * Abort the ongoing metadata transaction logging the error cause |
| * |
| * @param rootE |
| * @param parentE |
| * @param mdTxnCtx |
| */ |
| public static void abort(Exception rootE, Exception parentE, MetadataTransactionContext mdTxnCtx) { |
| boolean interrupted = Thread.interrupted(); |
| try { |
| if (IS_DEBUG_MODE) { |
| LOGGER.log(Level.ERROR, rootE.getMessage(), rootE); |
| } |
| if (mdTxnCtx != null) { |
| MetadataManager.INSTANCE.abortTransaction(mdTxnCtx); |
| } |
| } catch (Exception e2) { |
| parentE.addSuppressed(e2); |
| throw new IllegalStateException(rootE); |
| } finally { |
| if (interrupted) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| } |
| |
| protected void rewriteStatement(Statement stmt, IStatementRewriter rewriter, MetadataProvider metadataProvider) |
| throws CompilationException, RemoteException { |
| if (!rewriter.isRewritable(stmt.getKind())) { |
| return; |
| } |
| MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| try { |
| rewriter.rewrite(stmt, metadataProvider); |
| MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| } catch (Exception e) { |
| abort(e, e, mdTxnCtx); |
| throw e; |
| } |
| } |
| |
| private void ensureNonPrimaryIndexDrop(Index index, SourceLocation sourceLoc) throws AlgebricksException { |
| if (index.isPrimaryIndex()) { |
| throw new MetadataException(ErrorCode.CANNOT_DROP_INDEX, sourceLoc, index.getIndexName(), |
| index.getDatasetName()); |
| } |
| } |
| |
| protected void afterCompile() { |
| if (sessionOutput.config().is(SessionConfig.FORMAT_HTML)) { |
| ExecutionPlansHtmlPrintUtil.print(sessionOutput.out(), getExecutionPlans()); |
| } |
| } |
| |
| protected void trackRequest(IRequestParameters requestParameters) throws HyracksDataException { |
| final IClientRequest clientRequest = appCtx.getReceptionist().requestReceived(requestParameters); |
| appCtx.getRequestTracker().track(clientRequest); |
| } |
| |
| protected void validateStatements(IRequestParameters requestParameters) throws CompilationException { |
| validateStatements(statements, requestParameters.isMultiStatement(), |
| requestParameters.getStatementCategoryRestrictionMask()); |
| } |
| |
| public static void validateStatements(List<Statement> statements, boolean allowMultiStatement, |
| int stmtCategoryRestrictionMask) throws CompilationException { |
| if (!allowMultiStatement) { |
| if (statements.stream().filter(QueryTranslator::isNotAllowedMultiStatement).count() > 1) { |
| throw new CompilationException(ErrorCode.UNSUPPORTED_MULTIPLE_STATEMENTS); |
| } |
| } |
| if (stmtCategoryRestrictionMask != RequestParameters.NO_CATEGORY_RESTRICTION_MASK) { |
| for (Statement stmt : statements) { |
| if (isNotAllowedStatementCategory(stmt, stmtCategoryRestrictionMask)) { |
| throw new CompilationException(ErrorCode.PROHIBITED_STATEMENT_CATEGORY, stmt.getSourceLocation(), |
| stmt.getKind()); |
| } |
| } |
| } |
| } |
| |
| protected static boolean isNotAllowedMultiStatement(Statement statement) { |
| switch (statement.getKind()) { |
| case DATAVERSE_DECL: |
| case FUNCTION_DECL: |
| case SET: |
| case WRITE: |
| return false; |
| default: |
| return true; |
| } |
| } |
| |
| private static boolean isNotAllowedStatementCategory(Statement statement, int categoryRestrictionMask) { |
| int category = statement.getCategory(); |
| if (category <= 0) { |
| throw new IllegalArgumentException(String.valueOf(category)); |
| } |
| int i = category & categoryRestrictionMask; |
| return i == 0; |
| } |
| |
| private Map<VarIdentifier, IAObject> createExternalVariables(Map<String, IAObject> stmtParams, |
| IStatementRewriter stmtRewriter) { |
| if (stmtParams == null || stmtParams.isEmpty()) { |
| return Collections.emptyMap(); |
| } |
| Map<VarIdentifier, IAObject> m = new HashMap<>(); |
| for (Map.Entry<String, IAObject> me : stmtParams.entrySet()) { |
| String paramName = me.getKey(); |
| String extVarName = stmtRewriter.toExternalVariableName(paramName); |
| if (extVarName != null) { |
| m.put(new VarIdentifier(extVarName), me.getValue()); |
| } |
| } |
| return m; |
| } |
| |
| protected void validateDatasetState(MetadataProvider metadataProvider, Dataset dataset, SourceLocation sourceLoc) |
| throws Exception { |
| validateIfResourceIsActiveInFeed(metadataProvider.getApplicationContext(), dataset, sourceLoc); |
| } |
| |
| private static void ensureNotCancelled(ClientRequest clientRequest) throws RuntimeDataException { |
| if (clientRequest.isCancelled()) { |
| throw new RuntimeDataException(ErrorCode.REQUEST_CANCELLED, clientRequest.getId()); |
| } |
| } |
| |
| protected void validateExternalDatasetProperties(ExternalDetailsDecl externalDetails, |
| Map<String, String> properties, SourceLocation srcLoc, MetadataTransactionContext mdTxnCtx) |
| throws AlgebricksException, HyracksDataException { |
| // Validate adapter specific properties |
| String adapter = externalDetails.getAdapter(); |
| Map<String, String> details = new HashMap<>(properties); |
| details.put(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE, adapter); |
| validateAdapterSpecificProperties(details, srcLoc); |
| } |
| |
| /** |
| * Ensures that the external source container is present |
| * |
| * @param configuration external source properties |
| */ |
| protected void validateAdapterSpecificProperties(Map<String, String> configuration, SourceLocation srcLoc) |
| throws CompilationException { |
| ExternalDataUtils.validateAdapterSpecificProperties(configuration, srcLoc, warningCollector); |
| } |
| } |