blob: 7293740e4fa85a5766b3b053523244c554510b6a [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.alter;
import static org.junit.Assert.assertEquals;
import org.apache.doris.alter.AlterJobV2.JobState;
import org.apache.doris.analysis.AccessTestUtil;
import org.apache.doris.analysis.AddRollupClause;
import org.apache.doris.analysis.AlterClause;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.CreateMaterializedViewStmt;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.FunctionCallExpr;
import org.apache.doris.analysis.FunctionName;
import org.apache.doris.analysis.MVColumnItem;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.TableName;
import org.apache.doris.catalog.AggregateType;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.CatalogTestUtil;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.FakeCatalog;
import org.apache.doris.catalog.FakeEditLog;
import org.apache.doris.catalog.KeysType;
import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.OlapTable.OlapTableState;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.UserException;
import org.apache.doris.common.jmockit.Deencapsulation;
import org.apache.doris.meta.MetaContext;
import org.apache.doris.qe.OriginStatement;
import org.apache.doris.task.AgentTask;
import org.apache.doris.task.AgentTaskQueue;
import org.apache.doris.thrift.TStorageFormat;
import org.apache.doris.thrift.TTaskType;
import org.apache.doris.transaction.FakeTransactionIDGenerator;
import org.apache.doris.transaction.GlobalTransactionMgr;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import mockit.Expectations;
import mockit.Mock;
import mockit.MockUp;
import mockit.Mocked;
public class RollupJobV2Test {
private static String fileName = "./RollupJobV2Test";
private static FakeTransactionIDGenerator fakeTransactionIDGenerator;
private static GlobalTransactionMgr masterTransMgr;
private static GlobalTransactionMgr slaveTransMgr;
private static Catalog masterCatalog;
private static Catalog slaveCatalog;
private static String transactionSource = "localfe";
private static Analyzer analyzer;
private static AddRollupClause clause;
private static AddRollupClause clause2;
private FakeCatalog fakeCatalog;
private FakeEditLog fakeEditLog;
@Before
public void setUp() throws InstantiationException, IllegalAccessException, IllegalArgumentException,
InvocationTargetException, NoSuchMethodException, SecurityException, AnalysisException {
fakeCatalog = new FakeCatalog();
fakeEditLog = new FakeEditLog();
fakeTransactionIDGenerator = new FakeTransactionIDGenerator();
masterCatalog = CatalogTestUtil.createTestCatalog();
slaveCatalog = CatalogTestUtil.createTestCatalog();
MetaContext metaContext = new MetaContext();
metaContext.setMetaVersion(FeMetaVersion.VERSION_61);
metaContext.setThreadLocalInfo();
// masterCatalog.setJournalVersion(FeMetaVersion.VERSION_40);
// slaveCatalog.setJournalVersion(FeMetaVersion.VERSION_40);
masterTransMgr = masterCatalog.getGlobalTransactionMgr();
masterTransMgr.setEditLog(masterCatalog.getEditLog());
slaveTransMgr = slaveCatalog.getGlobalTransactionMgr();
slaveTransMgr.setEditLog(slaveCatalog.getEditLog());
analyzer = AccessTestUtil.fetchAdminAnalyzer(false);
clause = new AddRollupClause(CatalogTestUtil.testRollupIndex2, Lists.newArrayList("k1", "v"), null,
CatalogTestUtil.testIndex1, null);
clause.analyze(analyzer);
clause2 = new AddRollupClause(CatalogTestUtil.testRollupIndex3, Lists.newArrayList("k1", "v"), null,
CatalogTestUtil.testIndex1, null);
clause2.analyze(analyzer);
FeConstants.runningUnitTest = true;
AgentTaskQueue.clearAllTasks();
new MockUp<Catalog>() {
@Mock
public Catalog getCurrentCatalog() {
return masterCatalog;
}
};
}
@After
public void tearDown() {
File file = new File(fileName);
file.delete();
}
@Test
public void testRunRollupJobConcurrentLimit() throws UserException {
fakeCatalog = new FakeCatalog();
fakeEditLog = new FakeEditLog();
FakeCatalog.setCatalog(masterCatalog);
MaterializedViewHandler materializedViewHandler = Catalog.getCurrentCatalog().getRollupHandler();
ArrayList<AlterClause> alterClauses = new ArrayList<>();
alterClauses.add(clause);
alterClauses.add(clause2);
Database db = masterCatalog.getDb(CatalogTestUtil.testDbId1);
OlapTable olapTable = (OlapTable) db.getTable(CatalogTestUtil.testTableId1);
materializedViewHandler.process(alterClauses, db.getClusterName(), db, olapTable);
Map<Long, AlterJobV2> alterJobsV2 = materializedViewHandler.getAlterJobsV2();
materializedViewHandler.runAfterCatalogReady();
Assert.assertEquals(Config.max_running_rollup_job_num_per_table, materializedViewHandler.getTableRunningJobMap().get(CatalogTestUtil.testTableId1).size());
Assert.assertEquals(2, alterJobsV2.size());
Assert.assertEquals(OlapTableState.ROLLUP, olapTable.getState());
}
@Test
public void testAddSchemaChange() throws UserException {
fakeCatalog = new FakeCatalog();
fakeEditLog = new FakeEditLog();
FakeCatalog.setCatalog(masterCatalog);
MaterializedViewHandler materializedViewHandler = Catalog.getCurrentCatalog().getRollupHandler();
ArrayList<AlterClause> alterClauses = new ArrayList<>();
alterClauses.add(clause);
Database db = masterCatalog.getDb(CatalogTestUtil.testDbId1);
OlapTable olapTable = (OlapTable) db.getTable(CatalogTestUtil.testTableId1);
materializedViewHandler.process(alterClauses, db.getClusterName(), db, olapTable);
Map<Long, AlterJobV2> alterJobsV2 = materializedViewHandler.getAlterJobsV2();
Assert.assertEquals(1, alterJobsV2.size());
Assert.assertEquals(OlapTableState.ROLLUP, olapTable.getState());
}
// start a schema change, then finished
@Test
public void testSchemaChange1() throws Exception {
fakeCatalog = new FakeCatalog();
fakeEditLog = new FakeEditLog();
FakeCatalog.setCatalog(masterCatalog);
MaterializedViewHandler materializedViewHandler = Catalog.getCurrentCatalog().getRollupHandler();
// add a rollup job
ArrayList<AlterClause> alterClauses = new ArrayList<>();
alterClauses.add(clause);
Database db = masterCatalog.getDb(CatalogTestUtil.testDbId1);
OlapTable olapTable = (OlapTable) db.getTable(CatalogTestUtil.testTableId1);
Partition testPartition = olapTable.getPartition(CatalogTestUtil.testPartitionId1);
materializedViewHandler.process(alterClauses, db.getClusterName(), db, olapTable);
Map<Long, AlterJobV2> alterJobsV2 = materializedViewHandler.getAlterJobsV2();
Assert.assertEquals(1, alterJobsV2.size());
RollupJobV2 rollupJob = (RollupJobV2) alterJobsV2.values().stream().findAny().get();
// runPendingJob
materializedViewHandler.runAfterCatalogReady();
Assert.assertEquals(JobState.WAITING_TXN, rollupJob.getJobState());
Assert.assertEquals(2, testPartition.getMaterializedIndices(IndexExtState.ALL).size());
Assert.assertEquals(1, testPartition.getMaterializedIndices(IndexExtState.VISIBLE).size());
Assert.assertEquals(1, testPartition.getMaterializedIndices(IndexExtState.SHADOW).size());
// runWaitingTxnJob
materializedViewHandler.runAfterCatalogReady();
Assert.assertEquals(JobState.RUNNING, rollupJob.getJobState());
// runWaitingTxnJob, task not finished
materializedViewHandler.runAfterCatalogReady();
Assert.assertEquals(JobState.RUNNING, rollupJob.getJobState());
// finish all tasks
List<AgentTask> tasks = AgentTaskQueue.getTask(TTaskType.ALTER);
Assert.assertEquals(3, tasks.size());
for (AgentTask agentTask : tasks) {
agentTask.setFinished(true);
}
MaterializedIndex shadowIndex = testPartition.getMaterializedIndices(IndexExtState.SHADOW).get(0);
for (Tablet shadowTablet : shadowIndex.getTablets()) {
for (Replica shadowReplica : shadowTablet.getReplicas()) {
shadowReplica.updateVersionInfo(testPartition.getVisibleVersion(),
testPartition.getVisibleVersionHash(), shadowReplica.getDataSize(),
shadowReplica.getRowCount());
}
}
materializedViewHandler.runAfterCatalogReady();
Assert.assertEquals(JobState.FINISHED, rollupJob.getJobState());
/*
Assert.assertEquals(CatalogTestUtil.testIndexId1, rollupJob.getBaseIndexId());
Assert.assertEquals(CatalogTestUtil.testRollupIndex2, rollupJob.getRollupIndexName());
MaterializedIndex rollupIndex = rollupJob.getRollupIndex(CatalogTestUtil.testPartitionId1);
MaterializedIndex baseIndex = testPartition.getBaseIndex();
assertEquals(IndexState.ROLLUP, rollupIndex.getState());
assertEquals(IndexState.NORMAL, baseIndex.getState());
assertEquals(OlapTableState.ROLLUP, olapTable.getState());
assertEquals(PartitionState.ROLLUP, testPartition.getState());
Tablet rollupTablet = rollupIndex.getTablets().get(0);
List<Replica> replicas = rollupTablet.getReplicas();
Replica rollupReplica1 = replicas.get(0);
Replica rollupReplica2 = replicas.get(1);
Replica rollupReplica3 = replicas.get(2);
assertEquals(-1, rollupReplica1.getVersion());
assertEquals(-1, rollupReplica2.getVersion());
assertEquals(-1, rollupReplica3.getVersion());
assertEquals(CatalogTestUtil.testStartVersion, rollupReplica1.getLastFailedVersion());
assertEquals(CatalogTestUtil.testStartVersion, rollupReplica2.getLastFailedVersion());
assertEquals(CatalogTestUtil.testStartVersion, rollupReplica3.getLastFailedVersion());
assertEquals(-1, rollupReplica1.getLastSuccessVersion());
assertEquals(-1, rollupReplica2.getLastSuccessVersion());
assertEquals(-1, rollupReplica3.getLastSuccessVersion());
// rollup handler run one cycle, agent task is generated and send tasks
rollupHandler.runOneCycle();
AgentTask task1 = AgentTaskQueue.getTask(rollupReplica1.getBackendId(), TTaskType.ROLLUP, rollupTablet.getId());
AgentTask task2 = AgentTaskQueue.getTask(rollupReplica2.getBackendId(), TTaskType.ROLLUP, rollupTablet.getId());
AgentTask task3 = AgentTaskQueue.getTask(rollupReplica3.getBackendId(), TTaskType.ROLLUP, rollupTablet.getId());
// be report finishe rollup success
TTabletInfo tTabletInfo = new TTabletInfo(rollupTablet.getId(), CatalogTestUtil.testSchemaHash1,
CatalogTestUtil.testStartVersion, CatalogTestUtil.testStartVersionHash, 0, 0);
rollupHandler.handleFinishedReplica(task1, tTabletInfo, -1);
rollupHandler.handleFinishedReplica(task2, tTabletInfo, -1);
rollupHandler.handleFinishedReplica(task3, tTabletInfo, -1);
// rollup hander run one cycle again, the rollup job is finishing
rollupHandler.runOneCycle();
Assert.assertEquals(JobState.FINISHING, rollupJob.getState());
assertEquals(CatalogTestUtil.testStartVersion, rollupReplica1.getVersion());
assertEquals(CatalogTestUtil.testStartVersion, rollupReplica2.getVersion());
assertEquals(CatalogTestUtil.testStartVersion, rollupReplica3.getVersion());
assertEquals(-1, rollupReplica1.getLastFailedVersion());
assertEquals(-1, rollupReplica2.getLastFailedVersion());
assertEquals(-1, rollupReplica3.getLastFailedVersion());
assertEquals(CatalogTestUtil.testStartVersion, rollupReplica1.getLastSuccessVersion());
assertEquals(CatalogTestUtil.testStartVersion, rollupReplica1.getLastSuccessVersion());
assertEquals(CatalogTestUtil.testStartVersion, rollupReplica1.getLastSuccessVersion());
*/
}
@Test
public void testSchemaChangeWhileTabletNotStable() throws Exception {
fakeCatalog = new FakeCatalog();
fakeEditLog = new FakeEditLog();
FakeCatalog.setCatalog(masterCatalog);
MaterializedViewHandler materializedViewHandler = Catalog.getCurrentCatalog().getRollupHandler();
// add a rollup job
ArrayList<AlterClause> alterClauses = new ArrayList<>();
alterClauses.add(clause);
Database db = masterCatalog.getDb(CatalogTestUtil.testDbId1);
OlapTable olapTable = (OlapTable) db.getTable(CatalogTestUtil.testTableId1);
Partition testPartition = olapTable.getPartition(CatalogTestUtil.testPartitionId1);
materializedViewHandler.process(alterClauses, db.getClusterName(), db, olapTable);
Map<Long, AlterJobV2> alterJobsV2 = materializedViewHandler.getAlterJobsV2();
Assert.assertEquals(1, alterJobsV2.size());
RollupJobV2 rollupJob = (RollupJobV2) alterJobsV2.values().stream().findAny().get();
MaterializedIndex baseIndex = testPartition.getBaseIndex();
assertEquals(MaterializedIndex.IndexState.NORMAL, baseIndex.getState());
assertEquals(Partition.PartitionState.NORMAL, testPartition.getState());
assertEquals(OlapTableState.ROLLUP, olapTable.getState());
Tablet baseTablet = baseIndex.getTablets().get(0);
List<Replica> replicas = baseTablet.getReplicas();
Replica replica1 = replicas.get(0);
Replica replica2 = replicas.get(1);
Replica replica3 = replicas.get(2);
assertEquals(CatalogTestUtil.testStartVersion, replica1.getVersion());
assertEquals(CatalogTestUtil.testStartVersion, replica2.getVersion());
assertEquals(CatalogTestUtil.testStartVersion, replica3.getVersion());
assertEquals(-1, replica1.getLastFailedVersion());
assertEquals(-1, replica2.getLastFailedVersion());
assertEquals(-1, replica3.getLastFailedVersion());
assertEquals(CatalogTestUtil.testStartVersion, replica1.getLastSuccessVersion());
assertEquals(CatalogTestUtil.testStartVersion, replica2.getLastSuccessVersion());
assertEquals(CatalogTestUtil.testStartVersion, replica3.getLastSuccessVersion());
// runPendingJob
replica1.setState(Replica.ReplicaState.DECOMMISSION);
materializedViewHandler.runAfterCatalogReady();
Assert.assertEquals(JobState.PENDING, rollupJob.getJobState());
// table is stable, runPendingJob again
replica1.setState(Replica.ReplicaState.NORMAL);
materializedViewHandler.runAfterCatalogReady();
Assert.assertEquals(JobState.WAITING_TXN, rollupJob.getJobState());
Assert.assertEquals(2, testPartition.getMaterializedIndices(IndexExtState.ALL).size());
Assert.assertEquals(1, testPartition.getMaterializedIndices(IndexExtState.VISIBLE).size());
Assert.assertEquals(1, testPartition.getMaterializedIndices(IndexExtState.SHADOW).size());
// runWaitingTxnJob
materializedViewHandler.runAfterCatalogReady();
Assert.assertEquals(JobState.RUNNING, rollupJob.getJobState());
// runWaitingTxnJob, task not finished
materializedViewHandler.runAfterCatalogReady();
Assert.assertEquals(JobState.RUNNING, rollupJob.getJobState());
// finish all tasks
List<AgentTask> tasks = AgentTaskQueue.getTask(TTaskType.ALTER);
Assert.assertEquals(3, tasks.size());
for (AgentTask agentTask : tasks) {
agentTask.setFinished(true);
}
MaterializedIndex shadowIndex = testPartition.getMaterializedIndices(IndexExtState.SHADOW).get(0);
for (Tablet shadowTablet : shadowIndex.getTablets()) {
for (Replica shadowReplica : shadowTablet.getReplicas()) {
shadowReplica.updateVersionInfo(testPartition.getVisibleVersion(),
testPartition.getVisibleVersionHash(), shadowReplica.getDataSize(),
shadowReplica.getRowCount());
}
}
materializedViewHandler.runAfterCatalogReady();
Assert.assertEquals(JobState.FINISHED, rollupJob.getJobState());
}
@Test
public void testSerializeOfRollupJob(@Mocked CreateMaterializedViewStmt stmt) throws IOException,
AnalysisException {
Config.enable_materialized_view = true;
// prepare file
File file = new File(fileName);
file.createNewFile();
DataOutputStream out = new DataOutputStream(new FileOutputStream(file));
short keysCount = 1;
List<Column> columns = Lists.newArrayList();
String mvColumnName = CreateMaterializedViewStmt.MATERIALIZED_VIEW_NAME_PREFIX + "to_bitmap_" + "c1";
Column column = new Column(mvColumnName, Type.BITMAP, false, AggregateType.BITMAP_UNION, false, "1", "");
columns.add(column);
RollupJobV2 rollupJobV2 = new RollupJobV2(1, 1, 1, "test", 1, 1, 1, "test", "rollup", columns, 1, 1,
KeysType.AGG_KEYS, keysCount,
new OriginStatement("create materialized view rollup as select bitmap_union(to_bitmap(c1)) from test",
0));
rollupJobV2.setStorageFormat(TStorageFormat.V2);
// write rollup job
rollupJobV2.write(out);
out.flush();
out.close();
List<Expr> params = Lists.newArrayList();
SlotRef param1 = new SlotRef(new TableName(null, "test"), "c1");
params.add(param1);
MVColumnItem mvColumnItem = new MVColumnItem(mvColumnName, Type.BITMAP);
mvColumnItem.setDefineExpr(new FunctionCallExpr(new FunctionName("to_bitmap"), params));
List<MVColumnItem> mvColumnItemList = Lists.newArrayList();
mvColumnItemList.add(mvColumnItem);
new Expectations() {
{
stmt.getMVColumnItemList();
result = mvColumnItemList;
}
};
// read objects from file
MetaContext metaContext = new MetaContext();
metaContext.setMetaVersion(FeMetaVersion.VERSION_86);
metaContext.setThreadLocalInfo();
DataInputStream in = new DataInputStream(new FileInputStream(file));
RollupJobV2 result = (RollupJobV2) AlterJobV2.read(in);
Assert.assertEquals(TStorageFormat.V2, Deencapsulation.getField(result, "storageFormat"));
List<Column> resultColumns = Deencapsulation.getField(result, "rollupSchema");
Assert.assertEquals(1, resultColumns.size());
Column resultColumn1 = resultColumns.get(0);
Assert.assertEquals(mvColumnName,
resultColumn1.getName());
Assert.assertTrue(resultColumn1.getDefineExpr() instanceof FunctionCallExpr);
FunctionCallExpr resultFunctionCall = (FunctionCallExpr) resultColumn1.getDefineExpr();
Assert.assertEquals("to_bitmap", resultFunctionCall.getFnName().getFunction());
}
}