blob: bf7fdf6c2e06c8c66a4ba3262dc5571663cea983 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.backup;
import org.apache.doris.analysis.StorageBackend;
import org.apache.doris.backup.BackupJobInfo.BackupIndexInfo;
import org.apache.doris.backup.BackupJobInfo.BackupPartitionInfo;
import org.apache.doris.backup.BackupJobInfo.BackupOlapTableInfo;
import org.apache.doris.backup.BackupJobInfo.BackupTabletInfo;
import org.apache.doris.backup.RestoreJob.RestoreJobState;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.Resource;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.MarkedCountDownLatch;
import org.apache.doris.common.jmockit.Deencapsulation;
import org.apache.doris.persist.EditLog;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.task.AgentTask;
import org.apache.doris.task.AgentTaskQueue;
import org.apache.doris.task.DirMoveTask;
import org.apache.doris.task.DownloadTask;
import org.apache.doris.task.SnapshotTask;
import org.apache.doris.thrift.TBackend;
import org.apache.doris.thrift.TFinishTaskRequest;
import org.apache.doris.thrift.TStatus;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TTaskType;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.zip.Adler32;
import mockit.Delegate;
import mockit.Expectations;
import mockit.Injectable;
import mockit.Mock;
import mockit.MockUp;
import mockit.Mocked;
public class RestoreJobTest {
private Database db;
private BackupJobInfo jobInfo;
private RestoreJob job;
private String label = "test_label";
private AtomicLong id = new AtomicLong(50000);
private OlapTable expectedRestoreTbl;
private long repoId = 20000;
@Mocked
private Catalog catalog;
private MockBackupHandler backupHandler;
private MockRepositoryMgr repoMgr;
// Thread is not mockable in Jmockit, use subclass instead
private final class MockBackupHandler extends BackupHandler {
public MockBackupHandler(Catalog catalog) {
super(catalog);
}
@Override
public RepositoryMgr getRepoMgr() {
return repoMgr;
}
}
// Thread is not mockable in Jmockit, use subclass instead
private final class MockRepositoryMgr extends RepositoryMgr {
public MockRepositoryMgr() {
super();
}
@Override
public Repository getRepo(long repoId) {
return repo;
}
}
@Mocked
private EditLog editLog;
@Mocked
private SystemInfoService systemInfoService;
@Injectable
private Repository repo = new Repository(repoId, "repo", false, "bos://my_repo",
BlobStorage.create("broker", StorageBackend.StorageType.BROKER, Maps.newHashMap()));
private BackupMeta backupMeta;
@Before
public void setUp() throws AnalysisException {
db = CatalogMocker.mockDb();
backupHandler = new MockBackupHandler(catalog);
repoMgr = new MockRepositoryMgr();
Deencapsulation.setField(catalog, "backupHandler", backupHandler);
new Expectations() {
{
catalog.getDb(anyLong);
minTimes = 0;
result = db;
Catalog.getCurrentCatalogJournalVersion();
minTimes = 0;
result = FeConstants.meta_version;
catalog.getNextId();
minTimes = 0;
result = id.getAndIncrement();
catalog.getEditLog();
minTimes = 0;
result = editLog;
Catalog.getCurrentSystemInfo();
minTimes = 0;
result = systemInfoService;
}
};
new Expectations() {
{
systemInfoService.seqChooseBackendIds(anyInt, anyBoolean, anyBoolean, anyString);
minTimes = 0;
result = new Delegate() {
public synchronized List<Long> seqChooseBackendIds(int backendNum, boolean needAlive,
boolean isCreate, String clusterName) {
List<Long> beIds = Lists.newArrayList();
beIds.add(CatalogMocker.BACKEND1_ID);
beIds.add(CatalogMocker.BACKEND2_ID);
beIds.add(CatalogMocker.BACKEND3_ID);
return beIds;
}
};
}
};
new Expectations() {
{
editLog.logBackupJob((BackupJob) any);
minTimes = 0;
result = new Delegate() {
public void logBackupJob(BackupJob job) {
System.out.println("log backup job: " + job);
}
};
}
};
new Expectations() {
{
repo.upload(anyString, anyString);
result = Status.OK;
minTimes = 0;
List<BackupMeta> backupMetas = Lists.newArrayList();
repo.getSnapshotMetaFile(label, backupMetas, -1);
minTimes = 0;
result = new Delegate() {
public Status getSnapshotMetaFile(String label, List<BackupMeta> backupMetas) {
backupMetas.add(backupMeta);
return Status.OK;
}
};
}
};
new MockUp<MarkedCountDownLatch>() {
@Mock
boolean await(long timeout, TimeUnit unit) {
return true;
}
};
// gen BackupJobInfo
jobInfo = new BackupJobInfo();
jobInfo.backupTime = System.currentTimeMillis();
jobInfo.dbId = CatalogMocker.TEST_DB_ID;
jobInfo.dbName = CatalogMocker.TEST_DB_NAME;
jobInfo.name = label;
jobInfo.success = true;
expectedRestoreTbl = (OlapTable) db.getTable(CatalogMocker.TEST_TBL2_ID);
BackupOlapTableInfo tblInfo = new BackupOlapTableInfo();
tblInfo.id = CatalogMocker.TEST_TBL2_ID;
jobInfo.backupOlapTableObjects.put(CatalogMocker.TEST_TBL2_NAME, tblInfo);
for (Partition partition : expectedRestoreTbl.getPartitions()) {
BackupPartitionInfo partInfo = new BackupPartitionInfo();
partInfo.id = partition.getId();
tblInfo.partitions.put(partition.getName(), partInfo);
for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.VISIBLE)) {
BackupIndexInfo idxInfo = new BackupIndexInfo();
idxInfo.id = index.getId();
idxInfo.schemaHash = expectedRestoreTbl.getSchemaHashByIndexId(index.getId());
partInfo.indexes.put(expectedRestoreTbl.getIndexNameById(index.getId()), idxInfo);
for (Tablet tablet : index.getTablets()) {
List<String> files = Lists.newArrayList(tablet.getId() + ".dat",
tablet.getId()+ ".idx", tablet.getId()+".hdr");
BackupTabletInfo tabletInfo = new BackupTabletInfo(tablet.getId(), files);
idxInfo.sortedTabletInfoList.add(tabletInfo);
}
}
}
// drop this table, cause we want to try restoring this table
db.dropTable(expectedRestoreTbl.getName());
job = new RestoreJob(label, "2018-01-01 01:01:01", db.getId(), db.getFullName(),
jobInfo, false, 3, 100000, -1, catalog, repo.getId());
List<Table> tbls = Lists.newArrayList();
List<Resource> resources = Lists.newArrayList();
tbls.add(expectedRestoreTbl);
backupMeta = new BackupMeta(tbls, resources);
}
@Ignore
@Test
public void testRun() {
// pending
job.run();
Assert.assertEquals(Status.OK, job.getStatus());
Assert.assertEquals(RestoreJobState.SNAPSHOTING, job.getState());
Assert.assertEquals(12, job.getFileMapping().getMapping().size());
// 2. snapshoting
job.run();
Assert.assertEquals(Status.OK, job.getStatus());
Assert.assertEquals(RestoreJobState.SNAPSHOTING, job.getState());
Assert.assertEquals(12 * 2, AgentTaskQueue.getTaskNum());
// 3. snapshot finished
List<AgentTask> agentTasks = Lists.newArrayList();
Map<TTaskType, Set<Long>> runningTasks = Maps.newHashMap();
agentTasks.addAll(AgentTaskQueue.getDiffTasks(CatalogMocker.BACKEND1_ID, runningTasks));
agentTasks.addAll(AgentTaskQueue.getDiffTasks(CatalogMocker.BACKEND2_ID, runningTasks));
agentTasks.addAll(AgentTaskQueue.getDiffTasks(CatalogMocker.BACKEND3_ID, runningTasks));
Assert.assertEquals(12 * 2, agentTasks.size());
for (AgentTask agentTask : agentTasks) {
if (agentTask.getTaskType() != TTaskType.MAKE_SNAPSHOT) {
continue;
}
SnapshotTask task = (SnapshotTask) agentTask;
String snapshotPath = "/path/to/snapshot/" + System.currentTimeMillis();
TStatus taskStatus = new TStatus(TStatusCode.OK);
TBackend tBackend = new TBackend("", 0, 1);
TFinishTaskRequest request = new TFinishTaskRequest(tBackend, TTaskType.MAKE_SNAPSHOT,
task.getSignature(), taskStatus);
request.setSnapshotPath(snapshotPath);
Assert.assertTrue(job.finishTabletSnapshotTask(task, request));
}
job.run();
Assert.assertEquals(Status.OK, job.getStatus());
Assert.assertEquals(RestoreJobState.DOWNLOAD, job.getState());
// download
AgentTaskQueue.clearAllTasks();
job.run();
Assert.assertEquals(Status.OK, job.getStatus());
Assert.assertEquals(RestoreJobState.DOWNLOADING, job.getState());
Assert.assertEquals(9, AgentTaskQueue.getTaskNum());
// downloading
job.run();
Assert.assertEquals(Status.OK, job.getStatus());
Assert.assertEquals(RestoreJobState.DOWNLOADING, job.getState());
List<AgentTask> downloadTasks = Lists.newArrayList();
runningTasks = Maps.newHashMap();
downloadTasks.addAll(AgentTaskQueue.getDiffTasks(CatalogMocker.BACKEND1_ID, runningTasks));
downloadTasks.addAll(AgentTaskQueue.getDiffTasks(CatalogMocker.BACKEND2_ID, runningTasks));
downloadTasks.addAll(AgentTaskQueue.getDiffTasks(CatalogMocker.BACKEND3_ID, runningTasks));
Assert.assertEquals(9, downloadTasks.size());
List<Long> downloadedTabletIds = Lists.newArrayList();
for (AgentTask agentTask : downloadTasks) {
TStatus taskStatus = new TStatus(TStatusCode.OK);
TBackend tBackend = new TBackend("", 0, 1);
TFinishTaskRequest request = new TFinishTaskRequest(tBackend, TTaskType.MAKE_SNAPSHOT,
agentTask.getSignature(), taskStatus);
request.setDownloadedTabletIds(downloadedTabletIds);
Assert.assertTrue(job.finishTabletDownloadTask((DownloadTask) agentTask, request));
}
job.run();
Assert.assertEquals(Status.OK, job.getStatus());
Assert.assertEquals(RestoreJobState.COMMIT, job.getState());
// commit
AgentTaskQueue.clearAllTasks();
job.run();
Assert.assertEquals(Status.OK, job.getStatus());
Assert.assertEquals(RestoreJobState.COMMITTING, job.getState());
Assert.assertEquals(12, AgentTaskQueue.getTaskNum());
// committing
job.run();
Assert.assertEquals(Status.OK, job.getStatus());
Assert.assertEquals(RestoreJobState.COMMITTING, job.getState());
List<AgentTask> dirMoveTasks = Lists.newArrayList();
runningTasks = Maps.newHashMap();
dirMoveTasks.addAll(AgentTaskQueue.getDiffTasks(CatalogMocker.BACKEND1_ID, runningTasks));
dirMoveTasks.addAll(AgentTaskQueue.getDiffTasks(CatalogMocker.BACKEND2_ID, runningTasks));
dirMoveTasks.addAll(AgentTaskQueue.getDiffTasks(CatalogMocker.BACKEND3_ID, runningTasks));
Assert.assertEquals(12, dirMoveTasks.size());
for (AgentTask agentTask : dirMoveTasks) {
TStatus taskStatus = new TStatus(TStatusCode.OK);
TBackend tBackend = new TBackend("", 0, 1);
TFinishTaskRequest request = new TFinishTaskRequest(tBackend, TTaskType.MAKE_SNAPSHOT,
agentTask.getSignature(), taskStatus);
job.finishDirMoveTask((DirMoveTask) agentTask, request);
}
job.run();
Assert.assertEquals(Status.OK, job.getStatus());
Assert.assertEquals(RestoreJobState.FINISHED, job.getState());
}
@Test
public void testSignature() {
Adler32 sig1 = new Adler32();
sig1.update("name1".getBytes());
sig1.update("name2".getBytes());
System.out.println("sig1: " + Math.abs((int) sig1.getValue()));
Adler32 sig2 = new Adler32();
sig2.update("name2".getBytes());
sig2.update("name1".getBytes());
System.out.println("sig2: " + Math.abs((int) sig2.getValue()));
OlapTable tbl = (OlapTable) db.getTable(CatalogMocker.TEST_TBL_NAME);
List<String> partNames = Lists.newArrayList(tbl.getPartitionNames());
System.out.println(partNames);
System.out.println("tbl signature: " + tbl.getSignature(BackupHandler.SIGNATURE_VERSION, partNames));
tbl.setName("newName");
partNames = Lists.newArrayList(tbl.getPartitionNames());
System.out.println("tbl signature: " + tbl.getSignature(BackupHandler.SIGNATURE_VERSION, partNames));
}
}