blob: 1b28317a3688710ddf8df9484d86f93752da1250 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.alter;
import org.apache.doris.analysis.AlterTableStmt;
import org.apache.doris.analysis.CreateDbStmt;
import org.apache.doris.analysis.CreateTableStmt;
import org.apache.doris.analysis.ShowAlterStmt;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ShowExecutor;
import org.apache.doris.qe.ShowResultSet;
import org.apache.doris.thrift.TStorageFormat;
import org.apache.doris.utframe.UtFrameUtils;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import java.util.Map;
import java.util.UUID;
public class AlterJobV2Test {
// use a unique dir so that it won't be conflict with other unit test which
// may also start a Mocked Frontend
private static String runningDir = "fe/mocked/AlterJobV2Test/" + UUID.randomUUID().toString() + "/";
private static ConnectContext connectContext;
@BeforeClass
public static void beforeClass() throws Exception {
FeConstants.default_scheduler_interval_millisecond = 1000;
FeConstants.runningUnitTest = true;
UtFrameUtils.createMinDorisCluster(runningDir);
Config.enable_alpha_rowset = true;
// create connect context
connectContext = UtFrameUtils.createDefaultCtx();
// create database
String createDbStmtStr = "create database test;";
CreateDbStmt createDbStmt = (CreateDbStmt) UtFrameUtils.parseAndAnalyzeStmt(createDbStmtStr, connectContext);
Catalog.getCurrentCatalog().createDb(createDbStmt);
createTable("CREATE TABLE test.schema_change_test(k1 int, k2 int, k3 int) distributed by hash(k1) buckets 3 properties('replication_num' = '1');");
createTable("CREATE TABLE test.segmentv2(k1 int, k2 int, v1 int sum) distributed by hash(k1) buckets 3 properties('replication_num' = '1', 'storage_format' = 'v1');");
}
@AfterClass
public static void tearDown() {
UtFrameUtils.cleanDorisFeDir(runningDir);
}
private static void createTable(String sql) throws Exception {
CreateTableStmt createTableStmt = (CreateTableStmt) UtFrameUtils.parseAndAnalyzeStmt(sql, connectContext);
Catalog.getCurrentCatalog().createTable(createTableStmt);
}
@Test
public void testSchemaChange() throws Exception {
// 1. process a schema change job
String alterStmtStr = "alter table test.schema_change_test add column k4 int default '1'";
AlterTableStmt alterTableStmt = (AlterTableStmt) UtFrameUtils.parseAndAnalyzeStmt(alterStmtStr, connectContext);
Catalog.getCurrentCatalog().getAlterInstance().processAlterTable(alterTableStmt);
// 2. check alter job
Map<Long, AlterJobV2> alterJobs = Catalog.getCurrentCatalog().getSchemaChangeHandler().getAlterJobsV2();
Assert.assertEquals(1, alterJobs.size());
waitAlterJobDone(alterJobs);
// 3. check show alter table column
String showAlterStmtStr = "show alter table column from test;";
ShowAlterStmt showAlterStmt = (ShowAlterStmt) UtFrameUtils.parseAndAnalyzeStmt(showAlterStmtStr, connectContext);
ShowExecutor showExecutor = new ShowExecutor(connectContext, showAlterStmt);
ShowResultSet showResultSet = showExecutor.execute();
System.out.println(showResultSet.getMetaData());
System.out.println(showResultSet.getResultRows());
}
private void waitAlterJobDone(Map<Long, AlterJobV2> alterJobs) throws InterruptedException {
for (AlterJobV2 alterJobV2 : alterJobs.values()) {
while (!alterJobV2.getJobState().isFinalState()) {
System.out.println("alter job " + alterJobV2.getDbId() + " is running. state: " + alterJobV2.getJobState());
Thread.sleep(1000);
}
System.out.println("alter job " + alterJobV2.getDbId() + " is done. state: " + alterJobV2.getJobState());
Assert.assertEquals(AlterJobV2.JobState.FINISHED, alterJobV2.getJobState());
Database db = Catalog.getCurrentCatalog().getDb(alterJobV2.getDbId());
OlapTable tbl = (OlapTable) db.getTable(alterJobV2.getTableId());
while (tbl.getState() != OlapTable.OlapTableState.NORMAL) {
Thread.sleep(1000);
}
}
}
@Test
public void testRollup() throws Exception {
// 1. process a rollup job
String alterStmtStr = "alter table test.schema_change_test add rollup test_rollup(k1, k2);";
AlterTableStmt alterTableStmt = (AlterTableStmt) UtFrameUtils.parseAndAnalyzeStmt(alterStmtStr, connectContext);
Catalog.getCurrentCatalog().getAlterInstance().processAlterTable(alterTableStmt);
// 2. check alter job
Map<Long, AlterJobV2> alterJobs = Catalog.getCurrentCatalog().getRollupHandler().getAlterJobsV2();
waitAlterJobDone(alterJobs);
// 3. check show alter table column
String showAlterStmtStr = "show alter table rollup from test;";
ShowAlterStmt showAlterStmt = (ShowAlterStmt) UtFrameUtils.parseAndAnalyzeStmt(showAlterStmtStr, connectContext);
ShowExecutor showExecutor = new ShowExecutor(connectContext, showAlterStmt);
ShowResultSet showResultSet = showExecutor.execute();
System.out.println(showResultSet.getMetaData());
System.out.println(showResultSet.getResultRows());
}
@Test
@Deprecated
public void testAlterSegmentV2() throws Exception {
// TODO this test should remove after we disable segment v1 completely
Database db = Catalog.getCurrentCatalog().getDb("default_cluster:test");
Assert.assertNotNull(db);
OlapTable tbl = (OlapTable) db.getTable("segmentv2");
Assert.assertNotNull(tbl);
Assert.assertEquals(TStorageFormat.V1, tbl.getTableProperty().getStorageFormat());
// 1. create a rollup r1
String alterStmtStr = "alter table test.segmentv2 add rollup r1(k2, v1)";
AlterTableStmt alterTableStmt = (AlterTableStmt) UtFrameUtils.parseAndAnalyzeStmt(alterStmtStr, connectContext);
Catalog.getCurrentCatalog().getAlterInstance().processAlterTable(alterTableStmt);
Map<Long, AlterJobV2> alterJobs = Catalog.getCurrentCatalog().getRollupHandler().getAlterJobsV2();
waitAlterJobDone(alterJobs);
String sql = "select k2, sum(v1) from test.segmentv2 group by k2";
String explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, "explain " + sql);
Assert.assertTrue(explainString.contains("rollup: r1"));
// 2. create a rollup with segment v2
alterStmtStr = "alter table test.segmentv2 add rollup segmentv2(k2, v1) properties('storage_format' = 'v2')";
alterTableStmt = (AlterTableStmt) UtFrameUtils.parseAndAnalyzeStmt(alterStmtStr, connectContext);
Catalog.getCurrentCatalog().getAlterInstance().processAlterTable(alterTableStmt);
alterJobs = Catalog.getCurrentCatalog().getRollupHandler().getAlterJobsV2();
waitAlterJobDone(alterJobs);
explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, "explain " + sql);
Assert.assertTrue(explainString.contains("rollup: r1"));
// set use_v2_rollup = true;
connectContext.getSessionVariable().setUseV2Rollup(true);
explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, "explain " + sql);
Assert.assertTrue(explainString.contains("rollup: __v2_segmentv2"));
// 3. process alter segment v2
alterStmtStr = "alter table test.segmentv2 set ('storage_format' = 'v2');";
alterTableStmt = (AlterTableStmt) UtFrameUtils.parseAndAnalyzeStmt(alterStmtStr, connectContext);
Catalog.getCurrentCatalog().getAlterInstance().processAlterTable(alterTableStmt);
// 4. check alter job
alterJobs = Catalog.getCurrentCatalog().getSchemaChangeHandler().getAlterJobsV2();
waitAlterJobDone(alterJobs);
// 5. check storage format of table
Assert.assertEquals(TStorageFormat.V2, tbl.getTableProperty().getStorageFormat());
// 6. alter again, that no job will be created
try {
Catalog.getCurrentCatalog().getAlterInstance().processAlterTable(alterTableStmt);
Assert.fail();
} catch (DdlException e) {
Assert.assertTrue(e.getMessage().contains("Nothing is changed"));
}
}
}