blob: d882dc67c6b1b8679d06104a0c7bc0f796864d9f [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.task;
import mockit.Expectations;
import mockit.Mocked;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.common.Config;
import org.apache.doris.common.util.UnitTestUtil;
import org.apache.doris.load.DppConfig;
import org.apache.doris.load.DppScheduler;
import org.apache.doris.load.EtlSubmitResult;
import org.apache.doris.load.Load;
import org.apache.doris.load.LoadJob;
import org.apache.doris.load.LoadJob.JobState;
import org.apache.doris.load.PartitionLoadInfo;
import org.apache.doris.load.Source;
import org.apache.doris.load.TableLoadInfo;
import org.apache.doris.persist.EditLog;
import org.apache.doris.thrift.TStatus;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.transaction.GlobalTransactionMgr;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class LoadPendingTaskTest {
private long dbId;
private long tableId;
private long partitionId;
private long indexId;
private long tabletId;
private long backendId;
private String label;
@Mocked
private Catalog catalog;
@Mocked
private EditLog editLog;
@Mocked
private Load load;
@Mocked
private DppScheduler dppScheduler;
private Database db;
@Before
public void setUp() {
dbId = 0L;
tableId = 0L;
partitionId = 0L;
indexId = 0L;
tabletId = 0L;
backendId = 0L;
label = "test_label";
UnitTestUtil.initDppConfig();
}
@Test
public void testRunPendingTask() throws Exception {
// mock catalog
db = UnitTestUtil.createDb(dbId, tableId, partitionId, indexId, tabletId, backendId, 1L, 0L);
GlobalTransactionMgr globalTransactionMgr = new GlobalTransactionMgr(catalog);
globalTransactionMgr.setEditLog(editLog);
globalTransactionMgr.addDatabaseTransactionMgr(db.getId());
// mock catalog
new Expectations(catalog) {
{
catalog.getDb(dbId);
minTimes = 0;
result = db;
catalog.getDb(db.getFullName());
minTimes = 0;
result = db;
catalog.getEditLog();
minTimes = 0;
result = editLog;
Catalog.getCurrentCatalog();
minTimes = 0;
result = catalog;
Catalog.getCurrentGlobalTransactionMgr();
minTimes = 0;
result = globalTransactionMgr;
}
};
// create job
LoadJob job = new LoadJob(label);
job.setState(JobState.PENDING);
job.setDbId(dbId);
String cluster = Config.dpp_default_cluster;
job.setClusterInfo(cluster, Load.clusterToDppConfig.get(cluster));
// set partition load infos
OlapTable table = (OlapTable) db.getTable(tableId);
table.setBaseIndexId(0L);
Partition partition = table.getPartition(partitionId);
Source source = new Source(new ArrayList<String>());
List<Source> sources = new ArrayList<Source>();
sources.add(source);
PartitionLoadInfo partitionLoadInfo = new PartitionLoadInfo(sources);
Map<Long, PartitionLoadInfo> idToPartitionLoadInfo = new HashMap<Long, PartitionLoadInfo>();
idToPartitionLoadInfo.put(partitionId, partitionLoadInfo);
TableLoadInfo tableLoadInfo = new TableLoadInfo(idToPartitionLoadInfo);
tableLoadInfo.addIndexSchemaHash(partition.getBaseIndex().getId(), 0);
Map<Long, TableLoadInfo> idToTableLoadInfo = new HashMap<Long, TableLoadInfo>();
idToTableLoadInfo.put(tableId, tableLoadInfo);
job.setIdToTableLoadInfo(idToTableLoadInfo);
job.setTimeoutSecond(10);
// mock
new Expectations() {
{
load.updateLoadJobState(job, JobState.ETL);
times = 1;
result = true;
load.getLoadErrorHubInfo();
times = 1;
result = null;
catalog.getLoadInstance();
times = 1;
result = load;
dppScheduler.submitEtlJob(anyLong, anyString, anyString, anyString, (Map) any, anyInt);
times = 1;
result = new EtlSubmitResult(new TStatus(TStatusCode.OK), "job_123456");
editLog.logSaveTransactionId(anyLong);
minTimes = 0;
}
};
HadoopLoadPendingTask loadPendingTask = new HadoopLoadPendingTask(job);
loadPendingTask.exec();
Assert.assertEquals(job.getId(), loadPendingTask.getSignature());
Assert.assertEquals(JobState.PENDING, job.getState());
}
}