| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| package org.apache.doris.alter; |
| |
| import org.apache.doris.analysis.AlterTableStmt; |
| import org.apache.doris.analysis.CreateDbStmt; |
| import org.apache.doris.analysis.CreateTableStmt; |
| import org.apache.doris.analysis.ShowAlterStmt; |
| import org.apache.doris.catalog.Catalog; |
| import org.apache.doris.catalog.Database; |
| import org.apache.doris.catalog.OlapTable; |
| import org.apache.doris.common.Config; |
| import org.apache.doris.common.DdlException; |
| import org.apache.doris.common.FeConstants; |
| import org.apache.doris.qe.ConnectContext; |
| import org.apache.doris.qe.ShowExecutor; |
| import org.apache.doris.qe.ShowResultSet; |
| import org.apache.doris.thrift.TStorageFormat; |
| import org.apache.doris.utframe.UtFrameUtils; |
| |
| import org.junit.AfterClass; |
| import org.junit.Assert; |
| import org.junit.BeforeClass; |
| import org.junit.Test; |
| |
| import java.util.Map; |
| import java.util.UUID; |
| |
| public class AlterJobV2Test { |
| // use a unique dir so that it won't be conflict with other unit test which |
| // may also start a Mocked Frontend |
| private static String runningDir = "fe/mocked/AlterJobV2Test/" + UUID.randomUUID().toString() + "/"; |
| |
| private static ConnectContext connectContext; |
| |
| @BeforeClass |
| public static void beforeClass() throws Exception { |
| FeConstants.default_scheduler_interval_millisecond = 1000; |
| FeConstants.runningUnitTest = true; |
| |
| UtFrameUtils.createMinDorisCluster(runningDir); |
| Config.enable_alpha_rowset = true; |
| |
| // create connect context |
| connectContext = UtFrameUtils.createDefaultCtx(); |
| // create database |
| String createDbStmtStr = "create database test;"; |
| CreateDbStmt createDbStmt = (CreateDbStmt) UtFrameUtils.parseAndAnalyzeStmt(createDbStmtStr, connectContext); |
| Catalog.getCurrentCatalog().createDb(createDbStmt); |
| |
| createTable("CREATE TABLE test.schema_change_test(k1 int, k2 int, k3 int) distributed by hash(k1) buckets 3 properties('replication_num' = '1');"); |
| |
| createTable("CREATE TABLE test.segmentv2(k1 int, k2 int, v1 int sum) distributed by hash(k1) buckets 3 properties('replication_num' = '1', 'storage_format' = 'v1');"); |
| } |
| |
| @AfterClass |
| public static void tearDown() { |
| UtFrameUtils.cleanDorisFeDir(runningDir); |
| } |
| |
| private static void createTable(String sql) throws Exception { |
| CreateTableStmt createTableStmt = (CreateTableStmt) UtFrameUtils.parseAndAnalyzeStmt(sql, connectContext); |
| Catalog.getCurrentCatalog().createTable(createTableStmt); |
| } |
| |
| @Test |
| public void testSchemaChange() throws Exception { |
| // 1. process a schema change job |
| String alterStmtStr = "alter table test.schema_change_test add column k4 int default '1'"; |
| AlterTableStmt alterTableStmt = (AlterTableStmt) UtFrameUtils.parseAndAnalyzeStmt(alterStmtStr, connectContext); |
| Catalog.getCurrentCatalog().getAlterInstance().processAlterTable(alterTableStmt); |
| // 2. check alter job |
| Map<Long, AlterJobV2> alterJobs = Catalog.getCurrentCatalog().getSchemaChangeHandler().getAlterJobsV2(); |
| Assert.assertEquals(1, alterJobs.size()); |
| waitAlterJobDone(alterJobs); |
| // 3. check show alter table column |
| String showAlterStmtStr = "show alter table column from test;"; |
| ShowAlterStmt showAlterStmt = (ShowAlterStmt) UtFrameUtils.parseAndAnalyzeStmt(showAlterStmtStr, connectContext); |
| ShowExecutor showExecutor = new ShowExecutor(connectContext, showAlterStmt); |
| ShowResultSet showResultSet = showExecutor.execute(); |
| System.out.println(showResultSet.getMetaData()); |
| System.out.println(showResultSet.getResultRows()); |
| } |
| |
| private void waitAlterJobDone(Map<Long, AlterJobV2> alterJobs) throws InterruptedException { |
| for (AlterJobV2 alterJobV2 : alterJobs.values()) { |
| while (!alterJobV2.getJobState().isFinalState()) { |
| System.out.println("alter job " + alterJobV2.getDbId() + " is running. state: " + alterJobV2.getJobState()); |
| Thread.sleep(1000); |
| } |
| System.out.println("alter job " + alterJobV2.getDbId() + " is done. state: " + alterJobV2.getJobState()); |
| Assert.assertEquals(AlterJobV2.JobState.FINISHED, alterJobV2.getJobState()); |
| |
| Database db = Catalog.getCurrentCatalog().getDb(alterJobV2.getDbId()); |
| OlapTable tbl = (OlapTable) db.getTable(alterJobV2.getTableId()); |
| while (tbl.getState() != OlapTable.OlapTableState.NORMAL) { |
| Thread.sleep(1000); |
| } |
| } |
| } |
| |
| @Test |
| public void testRollup() throws Exception { |
| // 1. process a rollup job |
| String alterStmtStr = "alter table test.schema_change_test add rollup test_rollup(k1, k2);"; |
| AlterTableStmt alterTableStmt = (AlterTableStmt) UtFrameUtils.parseAndAnalyzeStmt(alterStmtStr, connectContext); |
| Catalog.getCurrentCatalog().getAlterInstance().processAlterTable(alterTableStmt); |
| // 2. check alter job |
| Map<Long, AlterJobV2> alterJobs = Catalog.getCurrentCatalog().getRollupHandler().getAlterJobsV2(); |
| waitAlterJobDone(alterJobs); |
| // 3. check show alter table column |
| String showAlterStmtStr = "show alter table rollup from test;"; |
| ShowAlterStmt showAlterStmt = (ShowAlterStmt) UtFrameUtils.parseAndAnalyzeStmt(showAlterStmtStr, connectContext); |
| ShowExecutor showExecutor = new ShowExecutor(connectContext, showAlterStmt); |
| ShowResultSet showResultSet = showExecutor.execute(); |
| System.out.println(showResultSet.getMetaData()); |
| System.out.println(showResultSet.getResultRows()); |
| } |
| |
| @Test |
| @Deprecated |
| public void testAlterSegmentV2() throws Exception { |
| // TODO this test should remove after we disable segment v1 completely |
| Database db = Catalog.getCurrentCatalog().getDb("default_cluster:test"); |
| Assert.assertNotNull(db); |
| OlapTable tbl = (OlapTable) db.getTable("segmentv2"); |
| Assert.assertNotNull(tbl); |
| Assert.assertEquals(TStorageFormat.V1, tbl.getTableProperty().getStorageFormat()); |
| |
| // 1. create a rollup r1 |
| String alterStmtStr = "alter table test.segmentv2 add rollup r1(k2, v1)"; |
| AlterTableStmt alterTableStmt = (AlterTableStmt) UtFrameUtils.parseAndAnalyzeStmt(alterStmtStr, connectContext); |
| Catalog.getCurrentCatalog().getAlterInstance().processAlterTable(alterTableStmt); |
| Map<Long, AlterJobV2> alterJobs = Catalog.getCurrentCatalog().getRollupHandler().getAlterJobsV2(); |
| waitAlterJobDone(alterJobs); |
| |
| String sql = "select k2, sum(v1) from test.segmentv2 group by k2"; |
| String explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, "explain " + sql); |
| Assert.assertTrue(explainString.contains("rollup: r1")); |
| |
| // 2. create a rollup with segment v2 |
| alterStmtStr = "alter table test.segmentv2 add rollup segmentv2(k2, v1) properties('storage_format' = 'v2')"; |
| alterTableStmt = (AlterTableStmt) UtFrameUtils.parseAndAnalyzeStmt(alterStmtStr, connectContext); |
| Catalog.getCurrentCatalog().getAlterInstance().processAlterTable(alterTableStmt); |
| alterJobs = Catalog.getCurrentCatalog().getRollupHandler().getAlterJobsV2(); |
| waitAlterJobDone(alterJobs); |
| |
| explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, "explain " + sql); |
| Assert.assertTrue(explainString.contains("rollup: r1")); |
| |
| // set use_v2_rollup = true; |
| connectContext.getSessionVariable().setUseV2Rollup(true); |
| explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, "explain " + sql); |
| Assert.assertTrue(explainString.contains("rollup: __v2_segmentv2")); |
| |
| // 3. process alter segment v2 |
| alterStmtStr = "alter table test.segmentv2 set ('storage_format' = 'v2');"; |
| alterTableStmt = (AlterTableStmt) UtFrameUtils.parseAndAnalyzeStmt(alterStmtStr, connectContext); |
| Catalog.getCurrentCatalog().getAlterInstance().processAlterTable(alterTableStmt); |
| // 4. check alter job |
| alterJobs = Catalog.getCurrentCatalog().getSchemaChangeHandler().getAlterJobsV2(); |
| waitAlterJobDone(alterJobs); |
| // 5. check storage format of table |
| Assert.assertEquals(TStorageFormat.V2, tbl.getTableProperty().getStorageFormat()); |
| |
| // 6. alter again, that no job will be created |
| try { |
| Catalog.getCurrentCatalog().getAlterInstance().processAlterTable(alterTableStmt); |
| Assert.fail(); |
| } catch (DdlException e) { |
| Assert.assertTrue(e.getMessage().contains("Nothing is changed")); |
| } |
| } |
| } |