| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.processors.query.h2; |
| |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.LinkedHashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Objects; |
| import java.util.Set; |
| import java.util.UUID; |
| import org.apache.ignite.IgniteCheckedException; |
| import org.apache.ignite.IgniteDataStreamer; |
| import org.apache.ignite.IgniteSystemProperties; |
| import org.apache.ignite.cache.QueryEntity; |
| import org.apache.ignite.cache.QueryIndex; |
| import org.apache.ignite.cache.query.BulkLoadContextCursor; |
| import org.apache.ignite.cache.query.FieldsQueryCursor; |
| import org.apache.ignite.configuration.CacheConfiguration; |
| import org.apache.ignite.internal.GridKernalContext; |
| import org.apache.ignite.internal.IgniteInternalFuture; |
| import org.apache.ignite.internal.processors.bulkload.BulkLoadAckClientParameters; |
| import org.apache.ignite.internal.processors.bulkload.BulkLoadCacheWriter; |
| import org.apache.ignite.internal.processors.bulkload.BulkLoadParser; |
| import org.apache.ignite.internal.processors.bulkload.BulkLoadProcessor; |
| import org.apache.ignite.internal.processors.bulkload.BulkLoadStreamerWriter; |
| import org.apache.ignite.internal.processors.cache.GridCacheContext; |
| import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; |
| import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode; |
| import org.apache.ignite.internal.processors.query.GridQueryProperty; |
| import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor; |
| import org.apache.ignite.internal.processors.query.IgniteSQLException; |
| import org.apache.ignite.internal.processors.query.NestedTxMode; |
| import org.apache.ignite.internal.processors.query.QueryEntityEx; |
| import org.apache.ignite.internal.processors.query.QueryField; |
| import org.apache.ignite.internal.processors.query.QueryUtils; |
| import org.apache.ignite.internal.processors.query.SqlClientContext; |
| import org.apache.ignite.internal.processors.query.h2.dml.DmlBulkLoadDataConverter; |
| import org.apache.ignite.internal.processors.query.h2.dml.UpdatePlan; |
| import org.apache.ignite.internal.processors.query.h2.dml.UpdatePlanBuilder; |
| import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table; |
| import org.apache.ignite.internal.processors.query.h2.sql.GridSqlAlterTableAddColumn; |
| import org.apache.ignite.internal.processors.query.h2.sql.GridSqlAlterTableDropColumn; |
| import org.apache.ignite.internal.processors.query.h2.sql.GridSqlColumn; |
| import org.apache.ignite.internal.processors.query.h2.sql.GridSqlCreateIndex; |
| import org.apache.ignite.internal.processors.query.h2.sql.GridSqlCreateTable; |
| import org.apache.ignite.internal.processors.query.h2.sql.GridSqlDropIndex; |
| import org.apache.ignite.internal.processors.query.h2.sql.GridSqlDropTable; |
| import org.apache.ignite.internal.processors.query.h2.sql.GridSqlStatement; |
| import org.apache.ignite.internal.processors.query.schema.SchemaOperationException; |
| import org.apache.ignite.internal.processors.query.stat.StatisticsKey; |
| import org.apache.ignite.internal.processors.query.stat.StatisticsTarget; |
| import org.apache.ignite.internal.processors.query.stat.config.StatisticsObjectConfiguration; |
| import org.apache.ignite.internal.sql.SqlCommandProcessor; |
| import org.apache.ignite.internal.sql.command.SqlAnalyzeCommand; |
| import org.apache.ignite.internal.sql.command.SqlBeginTransactionCommand; |
| import org.apache.ignite.internal.sql.command.SqlBulkLoadCommand; |
| import org.apache.ignite.internal.sql.command.SqlCommand; |
| import org.apache.ignite.internal.sql.command.SqlCommitTransactionCommand; |
| import org.apache.ignite.internal.sql.command.SqlDropStatisticsCommand; |
| import org.apache.ignite.internal.sql.command.SqlKillQueryCommand; |
| import org.apache.ignite.internal.sql.command.SqlRefreshStatitsicsCommand; |
| import org.apache.ignite.internal.sql.command.SqlRollbackTransactionCommand; |
| import org.apache.ignite.internal.sql.command.SqlSetStreamingCommand; |
| import org.apache.ignite.internal.util.future.GridFinishedFuture; |
| import org.apache.ignite.internal.util.lang.IgniteClosureX; |
| import org.apache.ignite.internal.util.typedef.F; |
| import org.apache.ignite.internal.util.typedef.internal.U; |
| import org.apache.ignite.lang.IgniteBiTuple; |
| import org.apache.ignite.plugin.security.SecurityPermission; |
| import org.h2.command.Prepared; |
| import org.h2.command.ddl.AlterTableAlterColumn; |
| import org.h2.command.ddl.CreateIndex; |
| import org.h2.command.ddl.CreateTable; |
| import org.h2.command.ddl.DropIndex; |
| import org.h2.command.ddl.DropTable; |
| import org.h2.command.dml.NoOperation; |
| import org.h2.table.Column; |
| import org.h2.value.DataType; |
| import org.h2.value.Value; |
| import org.jetbrains.annotations.NotNull; |
| import org.jetbrains.annotations.Nullable; |
| |
| import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.mvccEnabled; |
| import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.tx; |
| import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.txStart; |
| import static org.apache.ignite.internal.processors.query.QueryUtils.convert; |
| import static org.apache.ignite.internal.processors.query.QueryUtils.isDdlOnSchemaSupported; |
| import static org.apache.ignite.internal.processors.query.h2.sql.GridSqlQueryParser.PARAM_WRAP_VALUE; |
| |
| /** |
| * Processor responsible for execution of all non-SELECT and non-DML commands. |
| */ |
| public class CommandProcessor extends SqlCommandProcessor { |
| /** Schema manager. */ |
| private final SchemaManager schemaMgr; |
| |
| /** H2 Indexing. */ |
| private final IgniteH2Indexing idx; |
| |
| /** Is backward compatible handling of UUID through DDL enabled. */ |
| private static final boolean handleUuidAsByte = |
| IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_SQL_UUID_DDL_BYTE_FORMAT, false); |
| |
| /** |
| * Constructor. |
| * |
| * @param ctx Kernal context. |
| * @param schemaMgr Schema manager. |
| */ |
| public CommandProcessor(GridKernalContext ctx, SchemaManager schemaMgr, IgniteH2Indexing idx) { |
| super(ctx, schemaMgr); |
| |
| this.schemaMgr = schemaMgr; |
| this.idx = idx; |
| } |
| |
| /** |
| * Execute command. |
| * |
| * @param sql SQL. |
| * @param cmdNative Native command (if any). |
| * @param cmdH2 H2 command (if any). |
| * @param params Parameters. |
| * @param cliCtx Client context. |
| * @param qryId Running query ID. |
| * @return Result. |
| */ |
| public CommandResult runCommand(String sql, SqlCommand cmdNative, GridSqlStatement cmdH2, |
| QueryParameters params, @Nullable SqlClientContext cliCtx, long qryId) throws IgniteCheckedException { |
| assert cmdNative != null || cmdH2 != null; |
| |
| // Do execute. |
| FieldsQueryCursor<List<?>> res = H2Utils.zeroCursor(); |
| boolean unregister = true; |
| |
| if (cmdNative != null) { |
| assert cmdH2 == null; |
| |
| if (isCommandSupported(cmdNative)) { |
| FieldsQueryCursor<List<?>> resNative = runNativeCommand(sql, cmdNative, params, cliCtx, qryId); |
| |
| if (resNative != null) { |
| res = resNative; |
| unregister = !(cmdNative instanceof SqlBulkLoadCommand); |
| } |
| } |
| else |
| throw new UnsupportedOperationException("Unsupported command: " + cmdNative); |
| } |
| else { |
| assert cmdH2 != null; |
| |
| runCommandH2(sql, cmdH2); |
| } |
| |
| return new CommandResult(res, unregister); |
| } |
| |
| /** |
| * Execute native command. |
| * |
| * @param sql SQL. |
| * @param cmdNative Native command. |
| * @param params Parameters. |
| * @param cliCtx Client context. |
| * @param qryId Running query ID. |
| * @return Result. |
| */ |
| public FieldsQueryCursor<List<?>> runNativeCommand(String sql, SqlCommand cmdNative, |
| QueryParameters params, @Nullable SqlClientContext cliCtx, Long qryId) throws IgniteCheckedException { |
| if (super.isCommandSupported(cmdNative)) |
| return runCommand(cmdNative); |
| |
| if (cmdNative instanceof SqlBulkLoadCommand) |
| return processBulkLoadCommand((SqlBulkLoadCommand)cmdNative, qryId); |
| else if (cmdNative instanceof SqlSetStreamingCommand) |
| processSetStreamingCommand((SqlSetStreamingCommand)cmdNative, cliCtx); |
| else if (cmdNative instanceof SqlAnalyzeCommand) |
| processAnalyzeCommand((SqlAnalyzeCommand)cmdNative); |
| else if (cmdNative instanceof SqlRefreshStatitsicsCommand) |
| processRefreshStatisticsCommand((SqlRefreshStatitsicsCommand)cmdNative); |
| else if (cmdNative instanceof SqlDropStatisticsCommand) |
| processDropStatisticsCommand((SqlDropStatisticsCommand)cmdNative); |
| else |
| processTxCommand(cmdNative, params); |
| |
| return null; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean isCommandSupported(SqlCommand cmd) { |
| return super.isCommandSupported(cmd) |
| || cmd instanceof SqlBeginTransactionCommand |
| || cmd instanceof SqlCommitTransactionCommand |
| || cmd instanceof SqlRollbackTransactionCommand |
| || cmd instanceof SqlBulkLoadCommand |
| || cmd instanceof SqlSetStreamingCommand; |
| } |
| |
| /** |
| * Process kill query command |
| * |
| * @param cmd Command. |
| */ |
| private void processKillQueryCommand(SqlKillQueryCommand cmd) { |
| idx.runningQueryManager().cancelQuery(cmd.nodeQueryId(), cmd.nodeId(), cmd.async()); |
| } |
| |
| /** |
| * Process analyze command. |
| * |
| * @param cmd Sql analyze command. |
| */ |
| private void processAnalyzeCommand(SqlAnalyzeCommand cmd) throws IgniteCheckedException { |
| ctx.security().authorize(SecurityPermission.CHANGE_STATISTICS); |
| |
| IgniteH2Indexing indexing = (IgniteH2Indexing)ctx.query().getIndexing(); |
| |
| StatisticsObjectConfiguration objCfgs[] = cmd.configurations().stream() |
| .map(t -> { |
| if (t.key().schema() == null) { |
| StatisticsKey key = new StatisticsKey(cmd.schemaName(), t.key().obj()); |
| |
| return new StatisticsObjectConfiguration(key, t.columns().values(), |
| t.maxPartitionObsolescencePercent()); |
| } |
| else |
| return t; |
| }).toArray(StatisticsObjectConfiguration[]::new); |
| |
| indexing.statsManager().collectStatistics(objCfgs); |
| } |
| |
| /** |
| * Process refresh statistics command. |
| * |
| * @param cmd Refresh statistics command. |
| */ |
| private void processRefreshStatisticsCommand(SqlRefreshStatitsicsCommand cmd) throws IgniteCheckedException { |
| ctx.security().authorize(SecurityPermission.REFRESH_STATISTICS); |
| |
| IgniteH2Indexing indexing = (IgniteH2Indexing)ctx.query().getIndexing(); |
| |
| StatisticsTarget[] targets = cmd.targets().stream() |
| .map(t -> (t.schema() == null) ? new StatisticsTarget(cmd.schemaName(), t.obj(), t.columns()) : t) |
| .toArray(StatisticsTarget[]::new); |
| |
| indexing.statsManager().refreshStatistics(targets); |
| } |
| |
| /** |
| * Process drop statistics command. |
| * |
| * @param cmd Drop statistics command. |
| */ |
| private void processDropStatisticsCommand(SqlDropStatisticsCommand cmd) throws IgniteCheckedException { |
| ctx.security().authorize(SecurityPermission.CHANGE_STATISTICS); |
| |
| IgniteH2Indexing indexing = (IgniteH2Indexing)ctx.query().getIndexing(); |
| |
| StatisticsTarget[] targets = cmd.targets().stream() |
| .map(t -> (t.schema() == null) ? new StatisticsTarget(cmd.schemaName(), t.obj(), t.columns()) : t) |
| .toArray(StatisticsTarget[]::new); |
| |
| indexing.statsManager().dropStatistics(targets); |
| } |
| |
| /** |
| * Execute DDL statement. |
| * |
| * @param sql SQL. |
| * @param cmdH2 Command. |
| */ |
| private void runCommandH2(String sql, GridSqlStatement cmdH2) { |
| IgniteInternalFuture fut = null; |
| |
| try { |
| finishActiveTxIfNecessary(); |
| |
| if (cmdH2 instanceof GridSqlCreateIndex) { |
| GridSqlCreateIndex cmd = (GridSqlCreateIndex)cmdH2; |
| |
| isDdlOnSchemaSupported(cmd.schemaName()); |
| |
| GridH2Table tbl = schemaMgr.dataTable(cmd.schemaName(), cmd.tableName()); |
| |
| if (tbl == null) |
| throw new SchemaOperationException(SchemaOperationException.CODE_TABLE_NOT_FOUND, cmd.tableName()); |
| |
| assert tbl.rowDescriptor() != null; |
| |
| QueryIndex newIdx = new QueryIndex(); |
| |
| newIdx.setName(cmd.index().getName()); |
| |
| newIdx.setIndexType(cmd.index().getIndexType()); |
| |
| LinkedHashMap<String, Boolean> flds = new LinkedHashMap<>(); |
| |
| // Let's replace H2's table and property names by those operated by GridQueryProcessor. |
| GridQueryTypeDescriptor typeDesc = tbl.rowDescriptor().type(); |
| |
| for (Map.Entry<String, Boolean> e : cmd.index().getFields().entrySet()) { |
| GridQueryProperty prop = typeDesc.property(e.getKey()); |
| |
| if (prop == null) |
| throw new SchemaOperationException(SchemaOperationException.CODE_COLUMN_NOT_FOUND, e.getKey()); |
| |
| flds.put(prop.name(), e.getValue()); |
| } |
| |
| newIdx.setFields(flds); |
| |
| fut = ctx.query().dynamicIndexCreate(tbl.cacheName(), cmd.schemaName(), typeDesc.tableName(), |
| newIdx, cmd.ifNotExists(), 0); |
| } |
| else if (cmdH2 instanceof GridSqlDropIndex) { |
| GridSqlDropIndex cmd = (GridSqlDropIndex)cmdH2; |
| |
| isDdlOnSchemaSupported(cmd.schemaName()); |
| |
| GridH2Table tbl = schemaMgr.dataTableForIndex(cmd.schemaName(), cmd.indexName()); |
| |
| if (tbl != null) { |
| fut = ctx.query().dynamicIndexDrop(tbl.cacheName(), cmd.schemaName(), cmd.indexName(), |
| cmd.ifExists()); |
| } |
| else { |
| if (cmd.ifExists()) |
| fut = new GridFinishedFuture(); |
| else |
| throw new SchemaOperationException(SchemaOperationException.CODE_INDEX_NOT_FOUND, |
| cmd.indexName()); |
| } |
| } |
| else if (cmdH2 instanceof GridSqlCreateTable) { |
| GridSqlCreateTable cmd = (GridSqlCreateTable)cmdH2; |
| |
| ctx.security().authorize(cmd.cacheName(), SecurityPermission.CACHE_CREATE); |
| |
| isDdlOnSchemaSupported(cmd.schemaName()); |
| |
| GridH2Table tbl = schemaMgr.dataTable(cmd.schemaName(), cmd.tableName()); |
| |
| if (tbl != null) { |
| if (!cmd.ifNotExists()) |
| throw new SchemaOperationException(SchemaOperationException.CODE_TABLE_EXISTS, |
| cmd.tableName()); |
| } |
| else { |
| QueryEntity e = toQueryEntity(cmd); |
| |
| CacheConfiguration<?, ?> ccfg = new CacheConfiguration<>(cmd.tableName()); |
| |
| ccfg.setQueryEntities(Collections.singleton(e)); |
| ccfg.setSqlSchema(cmd.schemaName()); |
| |
| SchemaOperationException err = |
| QueryUtils.checkQueryEntityConflicts(ccfg, ctx.cache().cacheDescriptors().values()); |
| |
| if (err != null) |
| throw err; |
| |
| if (!F.isEmpty(cmd.cacheName()) && ctx.cache().cacheDescriptor(cmd.cacheName()) != null) { |
| ctx.query().dynamicAddQueryEntity( |
| cmd.cacheName(), |
| cmd.schemaName(), |
| e, |
| cmd.parallelism(), |
| true |
| ).get(); |
| } |
| else { |
| ctx.query().dynamicTableCreate( |
| cmd.schemaName(), |
| e, |
| cmd.templateName(), |
| cmd.cacheName(), |
| cmd.cacheGroup(), |
| cmd.dataRegionName(), |
| cmd.affinityKey(), |
| cmd.atomicityMode(), |
| cmd.writeSynchronizationMode(), |
| cmd.backups(), |
| cmd.ifNotExists(), |
| cmd.encrypted(), |
| cmd.parallelism() |
| ); |
| } |
| } |
| } |
| else if (cmdH2 instanceof GridSqlDropTable) { |
| GridSqlDropTable cmd = (GridSqlDropTable)cmdH2; |
| |
| isDdlOnSchemaSupported(cmd.schemaName()); |
| |
| GridH2Table tbl = schemaMgr.dataTable(cmd.schemaName(), cmd.tableName()); |
| |
| if (tbl == null) { |
| if (!cmd.ifExists()) |
| throw new SchemaOperationException(SchemaOperationException.CODE_TABLE_NOT_FOUND, |
| cmd.tableName()); |
| } |
| else { |
| ctx.security().authorize(tbl.cacheName(), SecurityPermission.CACHE_DESTROY); |
| |
| ctx.query().dynamicTableDrop(tbl.cacheName(), cmd.tableName(), cmd.ifExists()); |
| } |
| } |
| else if (cmdH2 instanceof GridSqlAlterTableAddColumn) { |
| GridSqlAlterTableAddColumn cmd = (GridSqlAlterTableAddColumn)cmdH2; |
| |
| isDdlOnSchemaSupported(cmd.schemaName()); |
| |
| GridH2Table tbl = schemaMgr.dataTable(cmd.schemaName(), cmd.tableName()); |
| |
| if (tbl == null) { |
| if (!cmd.ifTableExists()) |
| throw new SchemaOperationException(SchemaOperationException.CODE_TABLE_NOT_FOUND, |
| cmd.tableName()); |
| } |
| else { |
| if (QueryUtils.isSqlType(tbl.rowDescriptor().type().valueClass())) |
| throw new SchemaOperationException("Cannot add column(s) because table was created " + |
| "with " + PARAM_WRAP_VALUE + "=false option."); |
| |
| List<QueryField> cols = new ArrayList<>(cmd.columns().length); |
| |
| boolean allFieldsNullable = true; |
| |
| for (GridSqlColumn col : cmd.columns()) { |
| if (tbl.doesColumnExist(col.columnName())) { |
| if ((!cmd.ifNotExists() || cmd.columns().length != 1)) { |
| throw new SchemaOperationException(SchemaOperationException.CODE_COLUMN_EXISTS, |
| col.columnName()); |
| } |
| else { |
| cols = null; |
| |
| break; |
| } |
| } |
| |
| QueryField field = new QueryField(col.columnName(), |
| getTypeClassName(col), |
| col.column().isNullable(), col.defaultValue(), |
| col.precision(), col.scale()); |
| |
| cols.add(field); |
| |
| allFieldsNullable &= field.isNullable(); |
| } |
| |
| if (cols != null) { |
| assert tbl.rowDescriptor() != null; |
| |
| if (!allFieldsNullable) |
| QueryUtils.checkNotNullAllowed(tbl.cacheInfo().config()); |
| |
| fut = ctx.query().dynamicColumnAdd(tbl.cacheName(), cmd.schemaName(), |
| tbl.rowDescriptor().type().tableName(), cols, cmd.ifTableExists(), cmd.ifNotExists()); |
| } |
| } |
| } |
| else if (cmdH2 instanceof GridSqlAlterTableDropColumn) { |
| GridSqlAlterTableDropColumn cmd = (GridSqlAlterTableDropColumn)cmdH2; |
| |
| isDdlOnSchemaSupported(cmd.schemaName()); |
| |
| GridH2Table tbl = schemaMgr.dataTable(cmd.schemaName(), cmd.tableName()); |
| |
| if (tbl == null) { |
| if (!cmd.ifTableExists()) |
| throw new SchemaOperationException(SchemaOperationException.CODE_TABLE_NOT_FOUND, |
| cmd.tableName()); |
| } |
| else { |
| assert tbl.rowDescriptor() != null; |
| |
| GridCacheContext cctx = tbl.cacheContext(); |
| |
| assert cctx != null; |
| |
| if (cctx.mvccEnabled()) |
| throw new IgniteSQLException("Cannot drop column(s) with enabled MVCC. " + |
| "Operation is unsupported at the moment.", IgniteQueryErrorCode.UNSUPPORTED_OPERATION); |
| |
| if (QueryUtils.isSqlType(tbl.rowDescriptor().type().valueClass())) |
| throw new SchemaOperationException("Cannot drop column(s) because table was created " + |
| "with " + PARAM_WRAP_VALUE + "=false option."); |
| |
| List<String> cols = new ArrayList<>(cmd.columns().length); |
| |
| GridQueryTypeDescriptor type = tbl.rowDescriptor().type(); |
| |
| for (String colName : cmd.columns()) { |
| if (!tbl.doesColumnExist(colName)) { |
| if ((!cmd.ifExists() || cmd.columns().length != 1)) { |
| throw new SchemaOperationException(SchemaOperationException.CODE_COLUMN_NOT_FOUND, |
| colName); |
| } |
| else { |
| cols = null; |
| |
| break; |
| } |
| } |
| |
| SchemaOperationException err = QueryUtils.validateDropColumn(type, colName); |
| |
| if (err != null) |
| throw err; |
| |
| cols.add(colName); |
| } |
| |
| if (cols != null) { |
| fut = ctx.query().dynamicColumnRemove(tbl.cacheName(), cmd.schemaName(), |
| type.tableName(), cols, cmd.ifTableExists(), cmd.ifExists()); |
| } |
| } |
| } |
| else |
| throw new IgniteSQLException("Unsupported DDL operation: " + sql, |
| IgniteQueryErrorCode.UNSUPPORTED_OPERATION); |
| |
| if (fut != null) |
| fut.get(); |
| } |
| catch (SchemaOperationException e) { |
| U.error(null, "DDL operation failure", e); |
| throw convert(e); |
| } |
| catch (IgniteSQLException e) { |
| throw e; |
| } |
| catch (Exception e) { |
| throw new IgniteSQLException(e.getMessage(), e); |
| } |
| } |
| |
| /** |
| * Convert this statement to query entity and do Ignite specific sanity checks on the way. |
| * @return Query entity mimicking this SQL statement. |
| */ |
| private static QueryEntity toQueryEntity(GridSqlCreateTable createTbl) { |
| QueryEntityEx res = new QueryEntityEx(); |
| |
| res.setTableName(createTbl.tableName()); |
| |
| Set<String> notNullFields = null; |
| |
| HashMap<String, Object> dfltValues = new HashMap<>(); |
| |
| Map<String, Integer> precision = new HashMap<>(); |
| Map<String, Integer> scale = new HashMap<>(); |
| |
| for (Map.Entry<String, GridSqlColumn> e : createTbl.columns().entrySet()) { |
| GridSqlColumn gridCol = e.getValue(); |
| |
| Column col = gridCol.column(); |
| |
| res.addQueryField(e.getKey(), getTypeClassName(gridCol), null); |
| |
| if (!col.isNullable()) { |
| if (notNullFields == null) |
| notNullFields = new HashSet<>(); |
| |
| notNullFields.add(e.getKey()); |
| } |
| |
| Object dfltVal = gridCol.defaultValue(); |
| |
| if (dfltVal != null) |
| dfltValues.put(e.getKey(), dfltVal); |
| |
| if (col.getType() == Value.DECIMAL) { |
| if (col.getPrecision() < H2Utils.DECIMAL_DEFAULT_PRECISION) |
| precision.put(e.getKey(), (int)col.getPrecision()); |
| |
| if (col.getScale() < H2Utils.DECIMAL_DEFAULT_SCALE) |
| scale.put(e.getKey(), col.getScale()); |
| } |
| |
| if (col.getType() == Value.STRING || |
| col.getType() == Value.STRING_FIXED || |
| col.getType() == Value.STRING_IGNORECASE || |
| col.getType() == Value.BYTES) |
| if (col.getPrecision() < H2Utils.STRING_DEFAULT_PRECISION) |
| precision.put(e.getKey(), (int)col.getPrecision()); |
| } |
| |
| if (!F.isEmpty(dfltValues)) |
| res.setDefaultFieldValues(dfltValues); |
| |
| if (!F.isEmpty(precision)) |
| res.setFieldsPrecision(precision); |
| |
| if (!F.isEmpty(scale)) |
| res.setFieldsScale(scale); |
| |
| String valTypeName = QueryUtils.createTableValueTypeName(createTbl.schemaName(), createTbl.tableName()); |
| String keyTypeName = QueryUtils.createTableKeyTypeName(valTypeName); |
| |
| if (!F.isEmpty(createTbl.keyTypeName())) |
| keyTypeName = createTbl.keyTypeName(); |
| |
| if (!F.isEmpty(createTbl.valueTypeName())) |
| valTypeName = createTbl.valueTypeName(); |
| |
| assert createTbl.wrapKey() != null; |
| assert createTbl.wrapValue() != null; |
| |
| if (!createTbl.wrapKey()) { |
| GridSqlColumn pkCol = createTbl.columns().get(createTbl.primaryKeyColumns().iterator().next()); |
| |
| keyTypeName = getTypeClassName(pkCol); |
| |
| res.setKeyFieldName(pkCol.columnName()); |
| } |
| else { |
| res.setKeyFields(createTbl.primaryKeyColumns()); |
| |
| res.setPreserveKeysOrder(true); |
| } |
| |
| if (!createTbl.wrapValue()) { |
| GridSqlColumn valCol = null; |
| |
| for (Map.Entry<String, GridSqlColumn> e : createTbl.columns().entrySet()) { |
| if (!createTbl.primaryKeyColumns().contains(e.getKey())) { |
| valCol = e.getValue(); |
| |
| break; |
| } |
| } |
| |
| assert valCol != null; |
| |
| valTypeName = getTypeClassName(valCol); |
| |
| res.setValueFieldName(valCol.columnName()); |
| } |
| |
| res.setValueType(valTypeName); |
| res.setKeyType(keyTypeName); |
| |
| if (!F.isEmpty(notNullFields)) |
| res.setNotNullFields(notNullFields); |
| |
| // Fill key object with all fields for new tables. |
| res.fillAbsentPKsWithDefaults(true); |
| |
| if (Objects.nonNull(createTbl.primaryKeyInlineSize())) |
| res.setPrimaryKeyInlineSize(createTbl.primaryKeyInlineSize()); |
| |
| if (Objects.nonNull(createTbl.affinityKeyInlineSize())) |
| res.setAffinityKeyInlineSize(createTbl.affinityKeyInlineSize()); |
| |
| return res; |
| } |
| |
| /** |
| * @param cmd Statement. |
| * @return Whether {@code cmd} is a DDL statement we're able to handle. |
| */ |
| public static boolean isCommand(Prepared cmd) { |
| return cmd instanceof CreateIndex || cmd instanceof DropIndex || cmd instanceof CreateTable || |
| cmd instanceof DropTable || cmd instanceof AlterTableAlterColumn; |
| } |
| |
| /** |
| * @param cmd Statement. |
| * @return Whether {@code cmd} is a no-op. |
| */ |
| public static boolean isCommandNoOp(Prepared cmd) { |
| return cmd instanceof NoOperation; |
| } |
| |
| /** |
| * Helper function for obtaining type class name for H2. |
| * |
| * @param col Column. |
| * @return Type class name. |
| */ |
| private static String getTypeClassName(GridSqlColumn col) { |
| int type = col.column().getType(); |
| |
| switch (type) { |
| case Value.UUID : |
| if (!handleUuidAsByte) |
| return UUID.class.getName(); |
| |
| default: |
| return DataType.getTypeClassName(type); |
| } |
| } |
| |
| /** |
| * Process transactional command. |
| * @param cmd Command. |
| * @param params Parameters. |
| * @throws IgniteCheckedException if failed. |
| */ |
| private void processTxCommand(SqlCommand cmd, QueryParameters params) |
| throws IgniteCheckedException { |
| NestedTxMode nestedTxMode = params.nestedTxMode(); |
| |
| GridNearTxLocal tx = tx(ctx); |
| |
| if (cmd instanceof SqlBeginTransactionCommand) { |
| if (!mvccEnabled(ctx)) |
| throw new IgniteSQLException("MVCC must be enabled in order to start transaction.", |
| IgniteQueryErrorCode.MVCC_DISABLED); |
| |
| if (tx != null) { |
| if (nestedTxMode == null) |
| nestedTxMode = NestedTxMode.DEFAULT; |
| |
| switch (nestedTxMode) { |
| case COMMIT: |
| doCommit(tx); |
| |
| txStart(ctx, params.timeout()); |
| |
| break; |
| |
| case IGNORE: |
| log.warning("Transaction has already been started, ignoring BEGIN command."); |
| |
| break; |
| |
| case ERROR: |
| throw new IgniteSQLException("Transaction has already been started.", |
| IgniteQueryErrorCode.TRANSACTION_EXISTS); |
| |
| default: |
| throw new IgniteSQLException("Unexpected nested transaction handling mode: " + |
| nestedTxMode.name()); |
| } |
| } |
| else |
| txStart(ctx, params.timeout()); |
| } |
| else if (cmd instanceof SqlCommitTransactionCommand) { |
| // Do nothing if there's no transaction. |
| if (tx != null) |
| doCommit(tx); |
| } |
| else { |
| assert cmd instanceof SqlRollbackTransactionCommand; |
| |
| // Do nothing if there's no transaction. |
| if (tx != null) |
| doRollback(tx); |
| } |
| } |
| |
| /** |
| * Commit and properly close transaction. |
| * @param tx Transaction. |
| * @throws IgniteCheckedException if failed. |
| */ |
| private void doCommit(@NotNull GridNearTxLocal tx) throws IgniteCheckedException { |
| try { |
| tx.commit(); |
| } |
| finally { |
| closeTx(tx); |
| } |
| } |
| |
| /** |
| * Rollback and properly close transaction. |
| * @param tx Transaction. |
| * @throws IgniteCheckedException if failed. |
| */ |
| public void doRollback(@NotNull GridNearTxLocal tx) throws IgniteCheckedException { |
| try { |
| tx.rollback(); |
| } |
| finally { |
| closeTx(tx); |
| } |
| } |
| |
| /** |
| * Properly close transaction. |
| * @param tx Transaction. |
| * @throws IgniteCheckedException if failed. |
| */ |
| private void closeTx(@NotNull GridNearTxLocal tx) throws IgniteCheckedException { |
| try { |
| tx.close(); |
| } |
| finally { |
| ctx.cache().context().tm().resetContext(); |
| } |
| } |
| |
| /** |
| * Process SET STREAMING command. |
| * |
| * @param cmd Command. |
| * @param cliCtx Client context. |
| */ |
| private void processSetStreamingCommand(SqlSetStreamingCommand cmd, |
| @Nullable SqlClientContext cliCtx) { |
| if (cliCtx == null) |
| throw new IgniteSQLException("SET STREAMING command can only be executed from JDBC or ODBC driver."); |
| |
| if (cmd.isTurnOn()) |
| cliCtx.enableStreaming( |
| cmd.allowOverwrite(), |
| cmd.flushFrequency(), |
| cmd.perNodeBufferSize(), |
| cmd.perNodeParallelOperations(), |
| cmd.isOrdered() |
| ); |
| else |
| cliCtx.disableStreaming(); |
| } |
| |
| /** |
| * Process bulk load COPY command. |
| * |
| * @param cmd The command. |
| * @param qryId Query id. |
| * @return The context (which is the result of the first request/response). |
| * @throws IgniteCheckedException If something failed. |
| */ |
| private FieldsQueryCursor<List<?>> processBulkLoadCommand(SqlBulkLoadCommand cmd, long qryId) |
| throws IgniteCheckedException { |
| if (cmd.packetSize() == null) |
| cmd.packetSize(BulkLoadAckClientParameters.DFLT_PACKET_SIZE); |
| |
| GridH2Table tbl = schemaMgr.dataTable(cmd.schemaName(), cmd.tableName()); |
| |
| if (tbl == null) { |
| throw new IgniteSQLException("Table does not exist: " + cmd.tableName(), |
| IgniteQueryErrorCode.TABLE_NOT_FOUND); |
| } |
| |
| H2Utils.checkAndStartNotStartedCache(ctx, tbl); |
| |
| UpdatePlan plan = UpdatePlanBuilder.planForBulkLoad(cmd, tbl); |
| |
| IgniteClosureX<List<?>, IgniteBiTuple<?, ?>> dataConverter = new DmlBulkLoadDataConverter(plan); |
| |
| IgniteDataStreamer<Object, Object> streamer = ctx.grid().dataStreamer(tbl.cacheName()); |
| |
| BulkLoadCacheWriter outputWriter = new BulkLoadStreamerWriter(streamer); |
| |
| BulkLoadParser inputParser = BulkLoadParser.createParser(cmd.inputFormat()); |
| |
| BulkLoadProcessor processor = new BulkLoadProcessor(inputParser, dataConverter, outputWriter, |
| idx.runningQueryManager(), qryId, ctx.tracing()); |
| |
| BulkLoadAckClientParameters params = new BulkLoadAckClientParameters(cmd.localFileName(), cmd.packetSize()); |
| |
| return new BulkLoadContextCursor(processor, params); |
| } |
| } |