blob: 894ed2f25bccbd57c957da0ee8dbfaf8e7f7ebe6 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.load.loadv2;
import mockit.Expectations;
import mockit.Injectable;
import mockit.Mock;
import mockit.MockUp;
import mockit.Mocked;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.analysis.DataDescription;
import org.apache.doris.analysis.LabelName;
import org.apache.doris.analysis.LoadStmt;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.BrokerTable;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.Table;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.jmockit.Deencapsulation;
import org.apache.doris.common.util.RuntimeProfile;
import org.apache.doris.load.BrokerFileGroup;
import org.apache.doris.load.BrokerFileGroupAggInfo;
import org.apache.doris.load.BrokerFileGroupAggInfo.FileGroupAggKey;
import org.apache.doris.load.EtlJobType;
import org.apache.doris.load.EtlStatus;
import org.apache.doris.load.Load;
import org.apache.doris.load.Source;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.planner.BrokerScanNode;
import org.apache.doris.planner.OlapTableSink;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.task.MasterTaskExecutor;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.TransactionState;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
public class BrokerLoadJobTest {
@BeforeClass
public static void start() {
MetricRepo.init();
}
@Test
public void testFromLoadStmt(@Injectable LoadStmt loadStmt,
@Injectable LabelName labelName,
@Injectable DataDescription dataDescription,
@Mocked Catalog catalog,
@Injectable Database database) {
List<DataDescription> dataDescriptionList = Lists.newArrayList();
dataDescriptionList.add(dataDescription);
String tableName = "table";
String databaseName = "database";
new Expectations() {
{
loadStmt.getLabel();
minTimes = 0;
result = labelName;
labelName.getDbName();
minTimes = 0;
result = databaseName;
catalog.getDb(databaseName);
minTimes = 0;
result = database;
loadStmt.getDataDescriptions();
minTimes = 0;
result = dataDescriptionList;
dataDescription.getTableName();
minTimes = 0;
result = tableName;
database.getTable(tableName);
minTimes = 0;
result = null;
}
};
try {
BulkLoadJob.fromLoadStmt(loadStmt);
Assert.fail();
} catch (DdlException e) {
System.out.println("could not find table named " + tableName);
}
}
@Test
public void testFromLoadStmt2(@Injectable LoadStmt loadStmt,
@Injectable DataDescription dataDescription,
@Injectable LabelName labelName,
@Injectable Database database,
@Injectable OlapTable olapTable,
@Mocked Catalog catalog) {
String label = "label";
long dbId = 1;
String tableName = "table";
String databaseName = "database";
List<DataDescription> dataDescriptionList = Lists.newArrayList();
dataDescriptionList.add(dataDescription);
BrokerDesc brokerDesc = new BrokerDesc("broker0", Maps.newHashMap());
new Expectations() {
{
loadStmt.getLabel();
minTimes = 0;
result = labelName;
labelName.getDbName();
minTimes = 0;
result = databaseName;
labelName.getLabelName();
minTimes = 0;
result = label;
catalog.getDb(databaseName);
minTimes = 0;
result = database;
loadStmt.getDataDescriptions();
minTimes = 0;
result = dataDescriptionList;
dataDescription.getTableName();
minTimes = 0;
result = tableName;
database.getTable(tableName);
minTimes = 0;
result = olapTable;
dataDescription.getPartitionNames();
minTimes = 0;
result = null;
database.getId();
minTimes = 0;
result = dbId;
loadStmt.getBrokerDesc();
minTimes = 0;
result = brokerDesc;
loadStmt.getEtlJobType();
minTimes = 0;
result = EtlJobType.BROKER;
}
};
new MockUp<Load>() {
@Mock
public void checkAndCreateSource(Database db, DataDescription dataDescription,
Map<Long, Map<Long, List<Source>>> tableToPartitionSources, EtlJobType jobType) {
}
};
try {
BrokerLoadJob brokerLoadJob = (BrokerLoadJob) BulkLoadJob.fromLoadStmt(loadStmt);
Assert.assertEquals(Long.valueOf(dbId), Deencapsulation.getField(brokerLoadJob, "dbId"));
Assert.assertEquals(label, Deencapsulation.getField(brokerLoadJob, "label"));
Assert.assertEquals(JobState.PENDING, Deencapsulation.getField(brokerLoadJob, "state"));
Assert.assertEquals(EtlJobType.BROKER, Deencapsulation.getField(brokerLoadJob, "jobType"));
} catch (DdlException e) {
Assert.fail(e.getMessage());
}
}
@Test
public void testGetTableNames(@Injectable BrokerFileGroupAggInfo fileGroupAggInfo,
@Injectable BrokerFileGroup brokerFileGroup,
@Mocked Catalog catalog,
@Injectable Database database,
@Injectable Table table) throws MetaNotFoundException {
List<BrokerFileGroup> brokerFileGroups = Lists.newArrayList();
brokerFileGroups.add(brokerFileGroup);
Map<FileGroupAggKey, List<BrokerFileGroup>> aggKeyToFileGroups = Maps.newHashMap();
FileGroupAggKey aggKey = new FileGroupAggKey(1L, null);
aggKeyToFileGroups.put(aggKey, brokerFileGroups);
BrokerLoadJob brokerLoadJob = new BrokerLoadJob();
Deencapsulation.setField(brokerLoadJob, "fileGroupAggInfo", fileGroupAggInfo);
String tableName = "table";
new Expectations() {
{
fileGroupAggInfo.getAggKeyToFileGroups();
minTimes = 0;
result = aggKeyToFileGroups;
fileGroupAggInfo.getAllTableIds();
minTimes = 0;
result = Sets.newHashSet(1L);
catalog.getDb(anyLong);
minTimes = 0;
result = database;
database.getTable(1L);
minTimes = 0;
result = table;
table.getName();
minTimes = 0;
result = tableName;
}
};
Assert.assertEquals(1, brokerLoadJob.getTableNamesForShow().size());
Assert.assertEquals(true, brokerLoadJob.getTableNamesForShow().contains(tableName));
}
@Test
public void testExecuteJob(@Mocked MasterTaskExecutor masterTaskExecutor) {
BrokerLoadJob brokerLoadJob = new BrokerLoadJob();
brokerLoadJob.unprotectedExecuteJob();
Map<Long, LoadTask> idToTasks = Deencapsulation.getField(brokerLoadJob, "idToTasks");
Assert.assertEquals(1, idToTasks.size());
}
@Test
public void testPendingTaskOnFinishedWithJobCancelled(@Injectable BrokerPendingTaskAttachment attachment) {
BrokerLoadJob brokerLoadJob = new BrokerLoadJob();
Deencapsulation.setField(brokerLoadJob, "state", JobState.CANCELLED);
brokerLoadJob.onTaskFinished(attachment);
Set<Long> finishedTaskIds = Deencapsulation.getField(brokerLoadJob, "finishedTaskIds");
Assert.assertEquals(0, finishedTaskIds.size());
}
@Test
public void testPendingTaskOnFinishedWithDuplicated(@Injectable BrokerPendingTaskAttachment attachment) {
BrokerLoadJob brokerLoadJob = new BrokerLoadJob();
Deencapsulation.setField(brokerLoadJob, "state", JobState.LOADING);
Set<Long> finishedTaskIds = Sets.newHashSet();
long taskId = 1L;
finishedTaskIds.add(taskId);
Deencapsulation.setField(brokerLoadJob, "finishedTaskIds", finishedTaskIds);
new Expectations() {
{
attachment.getTaskId();
minTimes = 0;
result = taskId;
}
};
brokerLoadJob.onTaskFinished(attachment);
Map<Long, LoadTask> idToTasks = Deencapsulation.getField(brokerLoadJob, "idToTasks");
Assert.assertEquals(0, idToTasks.size());
}
@Test
public void testPendingTaskOnFinished(@Injectable BrokerPendingTaskAttachment attachment,
@Mocked Catalog catalog,
@Injectable Database database,
@Injectable BrokerFileGroupAggInfo fileGroupAggInfo,
@Injectable BrokerFileGroup brokerFileGroup1,
@Injectable BrokerFileGroup brokerFileGroup2,
@Injectable BrokerFileGroup brokerFileGroup3,
@Mocked MasterTaskExecutor masterTaskExecutor,
@Injectable OlapTable olapTable,
@Mocked LoadingTaskPlanner loadingTaskPlanner) {
BrokerLoadJob brokerLoadJob = new BrokerLoadJob();
Deencapsulation.setField(brokerLoadJob, "state", JobState.LOADING);
long taskId = 1L;
long tableId1 = 1L;
long tableId2 = 2L;
long partitionId1 = 3L;
long partitionId2 = 4;
Map<FileGroupAggKey, List<BrokerFileGroup>> aggKeyToFileGroups = Maps.newHashMap();
List<BrokerFileGroup> fileGroups1 = Lists.newArrayList();
fileGroups1.add(brokerFileGroup1);
aggKeyToFileGroups.put(new FileGroupAggKey(tableId1, null), fileGroups1);
List<BrokerFileGroup> fileGroups2 = Lists.newArrayList();
fileGroups2.add(brokerFileGroup2);
fileGroups2.add(brokerFileGroup3);
aggKeyToFileGroups.put(new FileGroupAggKey(tableId2, Lists.newArrayList(partitionId1)), fileGroups2);
// add another file groups with different partition id
aggKeyToFileGroups.put(new FileGroupAggKey(tableId2, Lists.newArrayList(partitionId2)), fileGroups2);
Deencapsulation.setField(brokerLoadJob, "fileGroupAggInfo", fileGroupAggInfo);
new Expectations() {
{
attachment.getTaskId();
minTimes = 0;
result = taskId;
catalog.getDb(anyLong);
minTimes = 0;
result = database;
fileGroupAggInfo.getAggKeyToFileGroups();
minTimes = 0;
result = aggKeyToFileGroups;
database.getTable(anyLong);
minTimes = 0;
result = olapTable;
catalog.getNextId();
minTimes = 0;
result = 1L;
result = 2L;
result = 3L;
}
};
brokerLoadJob.onTaskFinished(attachment);
Set<Long> finishedTaskIds = Deencapsulation.getField(brokerLoadJob, "finishedTaskIds");
Assert.assertEquals(1, finishedTaskIds.size());
Assert.assertEquals(true, finishedTaskIds.contains(taskId));
Map<Long, LoadTask> idToTasks = Deencapsulation.getField(brokerLoadJob, "idToTasks");
Assert.assertEquals(3, idToTasks.size());
}
@Test
public void testPendingTaskOnFinishedWithUserInfo(@Mocked BrokerPendingTaskAttachment attachment,
@Mocked Catalog catalog,
@Injectable BrokerDesc brokerDesc,
@Injectable LoadTaskCallback callback,
@Injectable Database database,
@Injectable FileGroupAggKey aggKey,
@Mocked OlapTable olapTable,
@Mocked PlanFragment sinkFragment,
@Mocked OlapTableSink olapTableSink,
@Mocked BrokerScanNode scanNode) throws Exception{
List<Column> schema = new ArrayList<>();
schema.add(new Column("a", PrimitiveType.BIGINT));
Map<String, String> properties = new HashMap<>();
properties.put("broker_name", "test");
properties.put("path", "hdfs://www.test.com");
BrokerTable brokerTable = new BrokerTable(123L, "test", schema, properties);
BrokerFileGroup brokerFileGroup = new BrokerFileGroup(brokerTable);
List<Long> partitionIds = new ArrayList<>();
partitionIds.add(123L);
Deencapsulation.setField(brokerFileGroup, "partitionIds", partitionIds);
List<BrokerFileGroup> fileGroups = Lists.newArrayList();
fileGroups.add(brokerFileGroup);
UUID uuid = UUID.randomUUID();
TUniqueId loadId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());
RuntimeProfile jobProfile = new RuntimeProfile("test");
LoadLoadingTask task = new LoadLoadingTask(database, olapTable,brokerDesc, fileGroups,
100, 100, false, 100, callback, "", 100, 1,
jobProfile);
try {
UserIdentity userInfo = new UserIdentity("root", "localhost");
userInfo.setIsAnalyzed();
task.init(loadId,
attachment.getFileStatusByTable(aggKey),
attachment.getFileNumByTable(aggKey),
userInfo);
LoadingTaskPlanner planner = Deencapsulation.getField(task, "planner");
Analyzer al = Deencapsulation.getField(planner, "analyzer");
Assert.assertFalse(al.isUDFAllowed());
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void testLoadingTaskOnFinishedWithUnfinishedTask(@Injectable BrokerLoadingTaskAttachment attachment,
@Injectable LoadTask loadTask1,
@Injectable LoadTask loadTask2) {
BrokerLoadJob brokerLoadJob = new BrokerLoadJob();
Deencapsulation.setField(brokerLoadJob, "state", JobState.LOADING);
Map<Long, LoadTask> idToTasks = Maps.newHashMap();
idToTasks.put(1L, loadTask1);
idToTasks.put(2L, loadTask2);
Deencapsulation.setField(brokerLoadJob, "idToTasks", idToTasks);
new Expectations() {
{
attachment.getCounter(BrokerLoadJob.DPP_NORMAL_ALL);
minTimes = 0;
result = 10;
attachment.getCounter(BrokerLoadJob.DPP_ABNORMAL_ALL);
minTimes = 0;
result = 1;
attachment.getTaskId();
minTimes = 0;
result = 1L;
}
};
brokerLoadJob.onTaskFinished(attachment);
Set<Long> finishedTaskIds = Deencapsulation.getField(brokerLoadJob, "finishedTaskIds");
Assert.assertEquals(1, finishedTaskIds.size());
EtlStatus loadingStatus = Deencapsulation.getField(brokerLoadJob, "loadingStatus");
Assert.assertEquals("10", loadingStatus.getCounters().get(BrokerLoadJob.DPP_NORMAL_ALL));
Assert.assertEquals("1", loadingStatus.getCounters().get(BrokerLoadJob.DPP_ABNORMAL_ALL));
int progress = Deencapsulation.getField(brokerLoadJob, "progress");
Assert.assertEquals(50, progress);
}
@Test
public void testLoadingTaskOnFinishedWithErrorNum(@Injectable BrokerLoadingTaskAttachment attachment1,
@Injectable BrokerLoadingTaskAttachment attachment2,
@Injectable LoadTask loadTask1,
@Injectable LoadTask loadTask2,
@Mocked Catalog catalog) {
BrokerLoadJob brokerLoadJob = new BrokerLoadJob();
Deencapsulation.setField(brokerLoadJob, "state", JobState.LOADING);
Map<Long, LoadTask> idToTasks = Maps.newHashMap();
idToTasks.put(1L, loadTask1);
idToTasks.put(2L, loadTask2);
Deencapsulation.setField(brokerLoadJob, "idToTasks", idToTasks);
new Expectations() {
{
attachment1.getCounter(BrokerLoadJob.DPP_NORMAL_ALL);
minTimes = 0;
result = 10;
attachment2.getCounter(BrokerLoadJob.DPP_NORMAL_ALL);
minTimes = 0;
result = 20;
attachment1.getCounter(BrokerLoadJob.DPP_ABNORMAL_ALL);
minTimes = 0;
result = 1;
attachment2.getCounter(BrokerLoadJob.DPP_ABNORMAL_ALL);
minTimes = 0;
result = 2;
attachment1.getTaskId();
minTimes = 0;
result = 1L;
attachment2.getTaskId();
minTimes = 0;
result = 2L;
}
};
brokerLoadJob.onTaskFinished(attachment1);
brokerLoadJob.onTaskFinished(attachment2);
Set<Long> finishedTaskIds = Deencapsulation.getField(brokerLoadJob, "finishedTaskIds");
Assert.assertEquals(2, finishedTaskIds.size());
EtlStatus loadingStatus = Deencapsulation.getField(brokerLoadJob, "loadingStatus");
Assert.assertEquals("30", loadingStatus.getCounters().get(BrokerLoadJob.DPP_NORMAL_ALL));
Assert.assertEquals("3", loadingStatus.getCounters().get(BrokerLoadJob.DPP_ABNORMAL_ALL));
int progress = Deencapsulation.getField(brokerLoadJob, "progress");
Assert.assertEquals(99, progress);
Assert.assertEquals(JobState.CANCELLED, Deencapsulation.getField(brokerLoadJob, "state"));
}
@Test
public void testLoadingTaskOnFinished(@Injectable BrokerLoadingTaskAttachment attachment1,
@Injectable LoadTask loadTask1,
@Mocked Catalog catalog,
@Injectable Database database) {
BrokerLoadJob brokerLoadJob = new BrokerLoadJob();
Deencapsulation.setField(brokerLoadJob, "state", JobState.LOADING);
Map<Long, LoadTask> idToTasks = Maps.newHashMap();
idToTasks.put(1L, loadTask1);
Deencapsulation.setField(brokerLoadJob, "idToTasks", idToTasks);
new Expectations() {
{
attachment1.getCounter(BrokerLoadJob.DPP_NORMAL_ALL);
minTimes = 0;
result = 10;
attachment1.getCounter(BrokerLoadJob.DPP_ABNORMAL_ALL);
minTimes = 0;
result = 0;
attachment1.getTaskId();
minTimes = 0;
result = 1L;
catalog.getDb(anyLong);
minTimes = 0;
result = database;
}
};
brokerLoadJob.onTaskFinished(attachment1);
Set<Long> finishedTaskIds = Deencapsulation.getField(brokerLoadJob, "finishedTaskIds");
Assert.assertEquals(1, finishedTaskIds.size());
EtlStatus loadingStatus = Deencapsulation.getField(brokerLoadJob, "loadingStatus");
Assert.assertEquals("10", loadingStatus.getCounters().get(BrokerLoadJob.DPP_NORMAL_ALL));
Assert.assertEquals("0", loadingStatus.getCounters().get(BrokerLoadJob.DPP_ABNORMAL_ALL));
int progress = Deencapsulation.getField(brokerLoadJob, "progress");
Assert.assertEquals(99, progress);
}
@Test
public void testExecuteReplayOnAborted(@Injectable TransactionState txnState,
@Injectable LoadJobFinalOperation attachment,
@Injectable EtlStatus etlStatus) {
BrokerLoadJob brokerLoadJob = new BrokerLoadJob();
new Expectations() {
{
txnState.getTxnCommitAttachment();
minTimes = 0;
result = attachment;
attachment.getLoadingStatus();
minTimes = 0;
result = etlStatus;
attachment.getProgress();
minTimes = 0;
result = 99;
attachment.getFinishTimestamp();
minTimes = 0;
result = 1;
attachment.getJobState();
minTimes = 0;
result = JobState.CANCELLED;
}
};
brokerLoadJob.replayTxnAttachment(txnState);
Assert.assertEquals(99, (int) Deencapsulation.getField(brokerLoadJob, "progress"));
Assert.assertEquals(1, brokerLoadJob.getFinishTimestamp());
Assert.assertEquals(JobState.CANCELLED, brokerLoadJob.getState());
}
@Test
public void testExecuteReplayOnVisible(@Injectable TransactionState txnState,
@Injectable LoadJobFinalOperation attachment,
@Injectable EtlStatus etlStatus) {
BrokerLoadJob brokerLoadJob = new BrokerLoadJob();
new Expectations() {
{
txnState.getTxnCommitAttachment();
minTimes = 0;
result = attachment;
attachment.getLoadingStatus();
minTimes = 0;
result = etlStatus;
attachment.getProgress();
minTimes = 0;
result = 99;
attachment.getFinishTimestamp();
minTimes = 0;
result = 1;
attachment.getJobState();
minTimes = 0;
result = JobState.LOADING;
}
};
brokerLoadJob.replayTxnAttachment(txnState);
Assert.assertEquals(99, (int) Deencapsulation.getField(brokerLoadJob, "progress"));
Assert.assertEquals(1, brokerLoadJob.getFinishTimestamp());
Assert.assertEquals(JobState.LOADING, brokerLoadJob.getState());
}
}