blob: 0c10496b461c2acc93dfa409645d3183a59fe353 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.backup;
import org.apache.doris.analysis.BackupStmt;
import org.apache.doris.analysis.StorageBackend;
import org.apache.doris.analysis.TableName;
import org.apache.doris.analysis.TableRef;
import org.apache.doris.backup.BackupJob.BackupJobState;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.FsBroker;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.jmockit.Deencapsulation;
import org.apache.doris.common.util.UnitTestUtil;
import org.apache.doris.persist.EditLog;
import org.apache.doris.task.AgentBatchTask;
import org.apache.doris.task.AgentTask;
import org.apache.doris.task.AgentTaskExecutor;
import org.apache.doris.task.AgentTaskQueue;
import org.apache.doris.task.SnapshotTask;
import org.apache.doris.task.UploadTask;
import org.apache.doris.thrift.TBackend;
import org.apache.doris.thrift.TFinishTaskRequest;
import org.apache.doris.thrift.TStatus;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TTaskType;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.io.File;
import java.io.IOException;
import java.nio.file.FileVisitOption;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import mockit.Delegate;
import mockit.Expectations;
import mockit.Mock;
import mockit.MockUp;
import mockit.Mocked;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
public class BackupJobTest {
private BackupJob job;
private Database db;
private long dbId = 1;
private long tblId = 2;
private long partId = 3;
private long idxId = 4;
private long tabletId = 5;
private long backendId = 10000;
private long version = 6;
private long versionHash = 7;
private long repoId = 20000;
private AtomicLong id = new AtomicLong(50000);
@Mocked
private Catalog catalog;
private MockBackupHandler backupHandler;
private MockRepositoryMgr repoMgr;
// Thread is not mockable in Jmockit, use subclass instead
private final class MockBackupHandler extends BackupHandler {
public MockBackupHandler(Catalog catalog) {
super(catalog);
}
@Override
public RepositoryMgr getRepoMgr() {
return repoMgr;
}
}
// Thread is not mockable in Jmockit, use subclass instead
private final class MockRepositoryMgr extends RepositoryMgr {
public MockRepositoryMgr() {
super();
}
@Override
public Repository getRepo(long repoId) {
return repo;
}
}
@Mocked
private EditLog editLog;
private Repository repo = new Repository(repoId, "repo", false, "my_repo",
BlobStorage.create("broker", StorageBackend.StorageType.BROKER, Maps.newHashMap()));
@BeforeClass
public static void start() {
Config.tmp_dir = "./";
File backupDir = new File(BackupHandler.BACKUP_ROOT_DIR.toString());
backupDir.mkdirs();
}
@AfterClass
public static void end() throws IOException {
Config.tmp_dir = "./";
File backupDir = new File(BackupHandler.BACKUP_ROOT_DIR.toString());
if (backupDir.exists()) {
Files.walk(BackupHandler.BACKUP_ROOT_DIR,
FileVisitOption.FOLLOW_LINKS).sorted(Comparator.reverseOrder()).map(Path::toFile).forEach(File::delete);
}
}
@Before
public void setUp() {
repoMgr = new MockRepositoryMgr();
backupHandler = new MockBackupHandler(catalog);
// Thread is unmockable after Jmockit version 1.48, so use reflection to set field instead.
Deencapsulation.setField(catalog, "backupHandler", backupHandler);
db = UnitTestUtil.createDb(dbId, tblId, partId, idxId, tabletId, backendId, version, versionHash);
new Expectations(catalog) {
{
catalog.getDb(anyLong);
minTimes = 0;
result = db;
Catalog.getCurrentCatalogJournalVersion();
minTimes = 0;
result = FeConstants.meta_version;
catalog.getNextId();
minTimes = 0;
result = id.getAndIncrement();
catalog.getEditLog();
minTimes = 0;
result = editLog;
}
};
new Expectations() {
{
editLog.logBackupJob((BackupJob) any);
minTimes = 0;
result = new Delegate() {
public void logBackupJob(BackupJob job) {
System.out.println("log backup job: " + job);
}
};
}
};
new MockUp<AgentTaskExecutor>() {
@Mock
public void submit(AgentBatchTask task) {
}
};
new MockUp<Repository>() {
@Mock
Status upload(String localFilePath, String remoteFilePath) {
return Status.OK;
}
@Mock
Status getBrokerAddress(Long beId, Catalog catalog, List<FsBroker> brokerAddrs) {
brokerAddrs.add(new FsBroker());
return Status.OK;
}
};
List<TableRef> tableRefs = Lists.newArrayList();
tableRefs.add(new TableRef(new TableName(UnitTestUtil.DB_NAME, UnitTestUtil.TABLE_NAME), null));
job = new BackupJob("label", dbId, UnitTestUtil.DB_NAME, tableRefs, 13600 * 1000, BackupStmt.BackupContent.ALL,
catalog, repo.getId());
}
@Test
public void testRunNormal() {
// 1.pending
Assert.assertEquals(BackupJobState.PENDING, job.getState());
job.run();
Assert.assertEquals(Status.OK, job.getStatus());
Assert.assertEquals(BackupJobState.SNAPSHOTING, job.getState());
BackupMeta backupMeta = job.getBackupMeta();
Assert.assertEquals(1, backupMeta.getTables().size());
OlapTable backupTbl = (OlapTable) backupMeta.getTable(UnitTestUtil.TABLE_NAME);
List<String> partNames = Lists.newArrayList(backupTbl.getPartitionNames());
Assert.assertNotNull(backupTbl);
Assert.assertEquals(backupTbl.getSignature(BackupHandler.SIGNATURE_VERSION, partNames),
((OlapTable) db.getTable(tblId)).getSignature(BackupHandler.SIGNATURE_VERSION, partNames));
Assert.assertEquals(1, AgentTaskQueue.getTaskNum());
AgentTask task = AgentTaskQueue.getTask(backendId, TTaskType.MAKE_SNAPSHOT, tabletId);
Assert.assertTrue(task instanceof SnapshotTask);
SnapshotTask snapshotTask = (SnapshotTask) task;
// 2. snapshoting
job.run();
Assert.assertEquals(Status.OK, job.getStatus());
Assert.assertEquals(BackupJobState.SNAPSHOTING, job.getState());
// 3. snapshot finished
String snapshotPath = "/path/to/snapshot";
List<String> snapshotFiles = Lists.newArrayList();
snapshotFiles.add("1.dat");
snapshotFiles.add("1.idx");
snapshotFiles.add("1.hdr");
TStatus task_status = new TStatus(TStatusCode.OK);
TBackend tBackend = new TBackend("", 0, 1);
TFinishTaskRequest request = new TFinishTaskRequest(tBackend, TTaskType.MAKE_SNAPSHOT,
snapshotTask.getSignature(), task_status);
request.setSnapshotFiles(snapshotFiles);
request.setSnapshotPath(snapshotPath);
Assert.assertTrue(job.finishTabletSnapshotTask(snapshotTask, request));
job.run();
Assert.assertEquals(Status.OK, job.getStatus());
Assert.assertEquals(BackupJobState.UPLOAD_SNAPSHOT, job.getState());
// 4. upload snapshots
AgentTaskQueue.clearAllTasks();
job.run();
Assert.assertEquals(Status.OK, job.getStatus());
Assert.assertEquals(BackupJobState.UPLOADING, job.getState());
Assert.assertEquals(1, AgentTaskQueue.getTaskNum());
task = AgentTaskQueue.getTask(backendId, TTaskType.UPLOAD, id.get() - 1);
Assert.assertTrue(task instanceof UploadTask);
UploadTask upTask = (UploadTask) task;
Assert.assertEquals(job.getJobId(), upTask.getJobId());
Map<String, String> srcToDest = upTask.getSrcToDestPath();
Assert.assertEquals(1, srcToDest.size());
System.out.println(srcToDest);
String dest = srcToDest.get(snapshotPath + "/" + tabletId + "/" + 0);
Assert.assertNotNull(dest);
// 5. uploading
job.run();
Assert.assertEquals(Status.OK, job.getStatus());
Assert.assertEquals(BackupJobState.UPLOADING, job.getState());
Map<Long, List<String>> tabletFileMap = Maps.newHashMap();
request = new TFinishTaskRequest(tBackend, TTaskType.UPLOAD,
upTask.getSignature(), task_status);
request.setTabletFiles(tabletFileMap);
Assert.assertFalse(job.finishSnapshotUploadTask(upTask, request));
List<String> tabletFiles = Lists.newArrayList();
tabletFileMap.put(tabletId, tabletFiles);
Assert.assertFalse(job.finishSnapshotUploadTask(upTask, request));
tabletFiles.add("1.dat.4f158689243a3d6030352fec3cfd3798");
tabletFiles.add("wrong_files.idx.4f158689243a3d6030352fec3cfd3798");
tabletFiles.add("wrong_files.hdr.4f158689243a3d6030352fec3cfd3798");
Assert.assertFalse(job.finishSnapshotUploadTask(upTask, request));
tabletFiles.clear();
tabletFiles.add("1.dat.4f158689243a3d6030352fec3cfd3798");
tabletFiles.add("1.idx.4f158689243a3d6030352fec3cfd3798");
tabletFiles.add("1.hdr.4f158689243a3d6030352fec3cfd3798");
Assert.assertTrue(job.finishSnapshotUploadTask(upTask, request));
job.run();
Assert.assertEquals(Status.OK, job.getStatus());
Assert.assertEquals(BackupJobState.SAVE_META, job.getState());
// 6. save meta
job.run();
Assert.assertEquals(Status.OK, job.getStatus());
Assert.assertEquals(BackupJobState.UPLOAD_INFO, job.getState());
File metaInfo = new File(job.getLocalMetaInfoFilePath());
Assert.assertTrue(metaInfo.exists());
File jobInfo = new File(job.getLocalJobInfoFilePath());
Assert.assertTrue(jobInfo.exists());
BackupMeta restoreMetaInfo = null;
BackupJobInfo restoreJobInfo = null;
try {
restoreMetaInfo = BackupMeta.fromFile(job.getLocalMetaInfoFilePath(), -1);
Assert.assertEquals(1, restoreMetaInfo.getTables().size());
OlapTable olapTable = (OlapTable) restoreMetaInfo.getTable(tblId);
Assert.assertNotNull(olapTable);
Assert.assertNotNull(restoreMetaInfo.getTable(UnitTestUtil.TABLE_NAME));
List<String> names = Lists.newArrayList(olapTable.getPartitionNames());
Assert.assertEquals(((OlapTable) db.getTable(tblId)).getSignature(BackupHandler.SIGNATURE_VERSION, names),
olapTable.getSignature(BackupHandler.SIGNATURE_VERSION, names));
restoreJobInfo = BackupJobInfo.fromFile(job.getLocalJobInfoFilePath());
Assert.assertEquals(UnitTestUtil.DB_NAME, restoreJobInfo.dbName);
Assert.assertEquals(job.getLabel(), restoreJobInfo.name);
Assert.assertEquals(1, restoreJobInfo.backupOlapTableObjects.values().size());
} catch (IOException e) {
e.printStackTrace();
Assert.fail();
}
Assert.assertNull(job.getBackupMeta());
Assert.assertNull(job.getJobInfo());
// 7. upload_info
job.run();
Assert.assertEquals(Status.OK, job.getStatus());
Assert.assertEquals(BackupJobState.FINISHED, job.getState());
}
@Test
public void testRunAbnormal() {
// 1.pending
AgentTaskQueue.clearAllTasks();
List<TableRef> tableRefs = Lists.newArrayList();
tableRefs.add(new TableRef(new TableName(UnitTestUtil.DB_NAME, "unknown_tbl"), null));
job = new BackupJob("label", dbId, UnitTestUtil.DB_NAME, tableRefs, 13600 * 1000, BackupStmt.BackupContent.ALL,
catalog, repo.getId());
job.run();
Assert.assertEquals(Status.ErrCode.NOT_FOUND, job.getStatus().getErrCode());
Assert.assertEquals(BackupJobState.CANCELLED, job.getState());
}
}