blob: 3065a1fdad64a3ced611ab981854308fa52e6cdf [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.task;
import org.apache.doris.analysis.PartitionValue;
import org.apache.doris.catalog.AggregateType;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.KeysType;
import org.apache.doris.catalog.PartitionKey;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.MarkedCountDownLatch;
import org.apache.doris.thrift.TAgentTaskRequest;
import org.apache.doris.thrift.TBackend;
import org.apache.doris.thrift.TKeysType;
import org.apache.doris.thrift.TPriority;
import org.apache.doris.thrift.TPushType;
import org.apache.doris.thrift.TStorageMedium;
import org.apache.doris.thrift.TStorageType;
import org.apache.doris.thrift.TTabletType;
import org.apache.doris.thrift.TTaskType;
import com.google.common.collect.Range;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class AgentTaskTest {
private AgentBatchTask agentBatchTask;
private long backendId1 = 1000L;
private long backendId2 = 1001L;
private long dbId = 10000L;
private long tableId = 20000L;
private long partitionId = 20000L;
private long indexId1 = 30000L;
private long indexId2 = 30001L;
private long tabletId1 = 40000L;
private long tabletId2 = 40001L;
private long replicaId1 = 50000L;
private long replicaId2 = 50001L;
private short shortKeyNum = (short) 2;
private int schemaHash1 = 60000;
private int schemaHash2 = 60001;
private long version = 1L;
private long versionHash = 70000L;
private TStorageType storageType = TStorageType.COLUMN;
private List<Column> columns;
private MarkedCountDownLatch<Long, Long> latch = new MarkedCountDownLatch<Long, Long>(3);
private Range<PartitionKey> range1;
private Range<PartitionKey> range2;
private AgentTask createReplicaTask;
private AgentTask dropTask;
private AgentTask pushTask;
private AgentTask cloneTask;
private AgentTask rollupTask;
private AgentTask schemaChangeTask;
private AgentTask cancelDeleteTask;
@Before
public void setUp() throws AnalysisException {
agentBatchTask = new AgentBatchTask();
columns = new LinkedList<Column>();
columns.add(new Column("k1", ScalarType.createType(PrimitiveType.INT), false, null, "1", ""));
columns.add(new Column("v1", ScalarType.createType(PrimitiveType.INT), false, AggregateType.SUM, "1", ""));
PartitionKey pk1 = PartitionKey.createInfinityPartitionKey(Arrays.asList(columns.get(0)), false);
PartitionKey pk2 = PartitionKey.createPartitionKey(Arrays.asList(new PartitionValue("10")), Arrays.asList(columns.get(0)));
range1 = Range.closedOpen(pk1, pk2);
PartitionKey pk3 = PartitionKey.createInfinityPartitionKey(Arrays.asList(columns.get(0)), true);
range2 = Range.closedOpen(pk2, pk3);
// create tasks
// create
createReplicaTask = new CreateReplicaTask(backendId1, dbId, tableId, partitionId,
indexId1, tabletId1, shortKeyNum, schemaHash1,
version, versionHash, KeysType.AGG_KEYS,
storageType, TStorageMedium.SSD,
columns, null, 0, latch, null,
false, TTabletType.TABLET_TYPE_DISK);
// drop
dropTask = new DropReplicaTask(backendId1, tabletId1, schemaHash1);
// push
pushTask =
new PushTask(null, backendId1, dbId, tableId, partitionId, indexId1, tabletId1,
replicaId1, schemaHash1, version, versionHash, "/home/a", 10L, 200, 80000L,
TPushType.LOAD, null, false, TPriority.NORMAL);
// clone
cloneTask =
new CloneTask(backendId1, dbId, tableId, partitionId, indexId1, tabletId1, schemaHash1,
Arrays.asList(new TBackend("host1", 8290, 8390)), TStorageMedium.HDD, -1, -1, 3600);
// rollup
rollupTask =
new CreateRollupTask(null, backendId1, dbId, tableId, partitionId, indexId2, indexId1,
tabletId2, tabletId1, replicaId2, shortKeyNum, schemaHash2, schemaHash1,
storageType, columns, null, 0, TKeysType.AGG_KEYS);
// schemaChange
schemaChangeTask =
new SchemaChangeTask(null, backendId1, dbId, tableId, partitionId, indexId1,
tabletId1, replicaId1, columns, schemaHash2, schemaHash1,
shortKeyNum, storageType, null, 0, TKeysType.AGG_KEYS);
}
@Test
public void addTaskTest() {
// add null
agentBatchTask.addTask(null);
Assert.assertEquals(0, agentBatchTask.getTaskNum());
// normal
agentBatchTask.addTask(createReplicaTask);
Assert.assertEquals(1, agentBatchTask.getTaskNum());
agentBatchTask.addTask(rollupTask);
Assert.assertEquals(2, agentBatchTask.getTaskNum());
List<AgentTask> allTasks = agentBatchTask.getAllTasks();
Assert.assertEquals(2, allTasks.size());
for (AgentTask agentTask : allTasks) {
if (agentTask instanceof CreateReplicaTask) {
Assert.assertEquals(createReplicaTask, agentTask);
} else if (agentTask instanceof CreateRollupTask) {
Assert.assertEquals(rollupTask, agentTask);
} else {
Assert.fail();
}
}
}
@Test
public void toThriftTest() throws Exception {
Class<? extends AgentBatchTask> agentBatchTaskClass = agentBatchTask.getClass();
Class[] typeParams = new Class[] { AgentTask.class };
Method toAgentTaskRequest = agentBatchTaskClass.getDeclaredMethod("toAgentTaskRequest", typeParams);
toAgentTaskRequest.setAccessible(true);
// create
TAgentTaskRequest request = (TAgentTaskRequest) toAgentTaskRequest.invoke(agentBatchTask, createReplicaTask);
Assert.assertEquals(TTaskType.CREATE, request.getTaskType());
Assert.assertEquals(createReplicaTask.getSignature(), request.getSignature());
Assert.assertNotNull(request.getCreateTabletReq());
// drop
TAgentTaskRequest request2 = (TAgentTaskRequest) toAgentTaskRequest.invoke(agentBatchTask, dropTask);
Assert.assertEquals(TTaskType.DROP, request2.getTaskType());
Assert.assertEquals(dropTask.getSignature(), request2.getSignature());
Assert.assertNotNull(request2.getDropTabletReq());
// push
TAgentTaskRequest request3 = (TAgentTaskRequest) toAgentTaskRequest.invoke(agentBatchTask, pushTask);
Assert.assertEquals(TTaskType.PUSH, request3.getTaskType());
Assert.assertEquals(pushTask.getSignature(), request3.getSignature());
Assert.assertNotNull(request3.getPushReq());
// clone
TAgentTaskRequest request4 = (TAgentTaskRequest) toAgentTaskRequest.invoke(agentBatchTask, cloneTask);
Assert.assertEquals(TTaskType.CLONE, request4.getTaskType());
Assert.assertEquals(cloneTask.getSignature(), request4.getSignature());
Assert.assertNotNull(request4.getCloneReq());
// rollup
TAgentTaskRequest request5 = (TAgentTaskRequest) toAgentTaskRequest.invoke(agentBatchTask, rollupTask);
Assert.assertEquals(TTaskType.ROLLUP, request5.getTaskType());
Assert.assertEquals(rollupTask.getSignature(), request5.getSignature());
Assert.assertNotNull(request5.getAlterTabletReq());
// schemaChange
TAgentTaskRequest request6 = (TAgentTaskRequest) toAgentTaskRequest.invoke(agentBatchTask, schemaChangeTask);
Assert.assertEquals(TTaskType.SCHEMA_CHANGE, request6.getTaskType());
Assert.assertEquals(schemaChangeTask.getSignature(), request6.getSignature());
Assert.assertNotNull(request6.getAlterTabletReq());
}
@Test
public void agentTaskQueueTest() {
AgentTaskQueue.clearAllTasks();
Assert.assertEquals(0, AgentTaskQueue.getTaskNum());
// add
AgentTaskQueue.addTask(createReplicaTask);
Assert.assertEquals(1, AgentTaskQueue.getTaskNum());
Assert.assertFalse(AgentTaskQueue.addTask(createReplicaTask));
// get
AgentTask task = AgentTaskQueue.getTask(backendId1, TTaskType.CREATE, createReplicaTask.getSignature());
Assert.assertEquals(createReplicaTask, task);
// diff
AgentTaskQueue.addTask(rollupTask);
Map<TTaskType, Set<Long>> runningTasks = new HashMap<TTaskType, Set<Long>>();
List<AgentTask> diffTasks = AgentTaskQueue.getDiffTasks(backendId1, runningTasks);
Assert.assertEquals(2, diffTasks.size());
Set<Long> set = new HashSet<Long>();
set.add(createReplicaTask.getSignature());
runningTasks.put(TTaskType.CREATE, set);
diffTasks = AgentTaskQueue.getDiffTasks(backendId1, runningTasks);
Assert.assertEquals(1, diffTasks.size());
Assert.assertEquals(rollupTask, diffTasks.get(0));
// remove
AgentTaskQueue.removeTask(backendId1, TTaskType.CREATE, createReplicaTask.getSignature());
Assert.assertEquals(1, AgentTaskQueue.getTaskNum());
AgentTaskQueue.removeTask(backendId1, TTaskType.ROLLUP, rollupTask.getSignature());
Assert.assertEquals(0, AgentTaskQueue.getTaskNum());
}
@Test
public void failedAgentTaskTest() {
AgentTaskQueue.clearAllTasks();
AgentTaskQueue.addTask(dropTask);
Assert.assertEquals(0, dropTask.getFailedTimes());
dropTask.failed();
Assert.assertEquals(1, dropTask.getFailedTimes());
Assert.assertEquals(1, AgentTaskQueue.getTaskNum());
Assert.assertEquals(1, AgentTaskQueue.getTaskNum(backendId1, TTaskType.DROP, false));
Assert.assertEquals(1, AgentTaskQueue.getTaskNum(-1, TTaskType.DROP, false));
Assert.assertEquals(1, AgentTaskQueue.getTaskNum(backendId1, TTaskType.DROP, true));
dropTask.failed();
DropReplicaTask dropTask2 = new DropReplicaTask(backendId2, tabletId1, schemaHash1);
AgentTaskQueue.addTask(dropTask2);
dropTask2.failed();
Assert.assertEquals(1, AgentTaskQueue.getTaskNum(backendId1, TTaskType.DROP, true));
Assert.assertEquals(2, AgentTaskQueue.getTaskNum(-1, TTaskType.DROP, true));
}
}