| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| package org.apache.doris.backup; |
| |
| import org.apache.doris.analysis.StorageBackend; |
| import org.apache.doris.backup.BackupJobInfo.BackupIndexInfo; |
| import org.apache.doris.backup.BackupJobInfo.BackupPartitionInfo; |
| import org.apache.doris.backup.BackupJobInfo.BackupOlapTableInfo; |
| import org.apache.doris.backup.BackupJobInfo.BackupTabletInfo; |
| import org.apache.doris.backup.RestoreJob.RestoreJobState; |
| import org.apache.doris.catalog.Catalog; |
| import org.apache.doris.catalog.Database; |
| import org.apache.doris.catalog.MaterializedIndex; |
| import org.apache.doris.catalog.MaterializedIndex.IndexExtState; |
| import org.apache.doris.catalog.OlapTable; |
| import org.apache.doris.catalog.Partition; |
| import org.apache.doris.catalog.Resource; |
| import org.apache.doris.catalog.Table; |
| import org.apache.doris.catalog.Tablet; |
| import org.apache.doris.common.AnalysisException; |
| import org.apache.doris.common.FeConstants; |
| import org.apache.doris.common.MarkedCountDownLatch; |
| import org.apache.doris.common.jmockit.Deencapsulation; |
| import org.apache.doris.persist.EditLog; |
| import org.apache.doris.system.SystemInfoService; |
| import org.apache.doris.task.AgentTask; |
| import org.apache.doris.task.AgentTaskQueue; |
| import org.apache.doris.task.DirMoveTask; |
| import org.apache.doris.task.DownloadTask; |
| import org.apache.doris.task.SnapshotTask; |
| import org.apache.doris.thrift.TBackend; |
| import org.apache.doris.thrift.TFinishTaskRequest; |
| import org.apache.doris.thrift.TStatus; |
| import org.apache.doris.thrift.TStatusCode; |
| import org.apache.doris.thrift.TTaskType; |
| |
| import com.google.common.collect.Lists; |
| import com.google.common.collect.Maps; |
| |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.Ignore; |
| import org.junit.Test; |
| |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.zip.Adler32; |
| |
| import mockit.Delegate; |
| import mockit.Expectations; |
| import mockit.Injectable; |
| import mockit.Mock; |
| import mockit.MockUp; |
| import mockit.Mocked; |
| |
| public class RestoreJobTest { |
| |
| private Database db; |
| private BackupJobInfo jobInfo; |
| private RestoreJob job; |
| private String label = "test_label"; |
| |
| private AtomicLong id = new AtomicLong(50000); |
| |
| private OlapTable expectedRestoreTbl; |
| |
| private long repoId = 20000; |
| |
| @Mocked |
| private Catalog catalog; |
| |
| private MockBackupHandler backupHandler; |
| |
| private MockRepositoryMgr repoMgr; |
| |
| // Thread is not mockable in Jmockit, use subclass instead |
| private final class MockBackupHandler extends BackupHandler { |
| public MockBackupHandler(Catalog catalog) { |
| super(catalog); |
| } |
| @Override |
| public RepositoryMgr getRepoMgr() { |
| return repoMgr; |
| } |
| } |
| |
| // Thread is not mockable in Jmockit, use subclass instead |
| private final class MockRepositoryMgr extends RepositoryMgr { |
| public MockRepositoryMgr() { |
| super(); |
| } |
| @Override |
| public Repository getRepo(long repoId) { |
| return repo; |
| } |
| } |
| |
| @Mocked |
| private EditLog editLog; |
| @Mocked |
| private SystemInfoService systemInfoService; |
| |
| @Injectable |
| private Repository repo = new Repository(repoId, "repo", false, "bos://my_repo", |
| BlobStorage.create("broker", StorageBackend.StorageType.BROKER, Maps.newHashMap())); |
| |
| private BackupMeta backupMeta; |
| |
| @Before |
| public void setUp() throws AnalysisException { |
| db = CatalogMocker.mockDb(); |
| backupHandler = new MockBackupHandler(catalog); |
| repoMgr = new MockRepositoryMgr(); |
| |
| Deencapsulation.setField(catalog, "backupHandler", backupHandler); |
| |
| new Expectations() { |
| { |
| catalog.getDb(anyLong); |
| minTimes = 0; |
| result = db; |
| |
| Catalog.getCurrentCatalogJournalVersion(); |
| minTimes = 0; |
| result = FeConstants.meta_version; |
| |
| catalog.getNextId(); |
| minTimes = 0; |
| result = id.getAndIncrement(); |
| |
| catalog.getEditLog(); |
| minTimes = 0; |
| result = editLog; |
| |
| Catalog.getCurrentSystemInfo(); |
| minTimes = 0; |
| result = systemInfoService; |
| } |
| }; |
| |
| new Expectations() { |
| { |
| systemInfoService.seqChooseBackendIds(anyInt, anyBoolean, anyBoolean, anyString); |
| minTimes = 0; |
| result = new Delegate() { |
| public synchronized List<Long> seqChooseBackendIds(int backendNum, boolean needAlive, |
| boolean isCreate, String clusterName) { |
| List<Long> beIds = Lists.newArrayList(); |
| beIds.add(CatalogMocker.BACKEND1_ID); |
| beIds.add(CatalogMocker.BACKEND2_ID); |
| beIds.add(CatalogMocker.BACKEND3_ID); |
| return beIds; |
| } |
| }; |
| } |
| }; |
| |
| new Expectations() { |
| { |
| editLog.logBackupJob((BackupJob) any); |
| minTimes = 0; |
| result = new Delegate() { |
| public void logBackupJob(BackupJob job) { |
| System.out.println("log backup job: " + job); |
| } |
| }; |
| } |
| }; |
| |
| new Expectations() { |
| { |
| repo.upload(anyString, anyString); |
| result = Status.OK; |
| minTimes = 0; |
| |
| List<BackupMeta> backupMetas = Lists.newArrayList(); |
| repo.getSnapshotMetaFile(label, backupMetas, -1); |
| minTimes = 0; |
| result = new Delegate() { |
| public Status getSnapshotMetaFile(String label, List<BackupMeta> backupMetas) { |
| backupMetas.add(backupMeta); |
| return Status.OK; |
| } |
| }; |
| } |
| }; |
| |
| new MockUp<MarkedCountDownLatch>() { |
| @Mock |
| boolean await(long timeout, TimeUnit unit) { |
| return true; |
| } |
| }; |
| |
| // gen BackupJobInfo |
| jobInfo = new BackupJobInfo(); |
| jobInfo.backupTime = System.currentTimeMillis(); |
| jobInfo.dbId = CatalogMocker.TEST_DB_ID; |
| jobInfo.dbName = CatalogMocker.TEST_DB_NAME; |
| jobInfo.name = label; |
| jobInfo.success = true; |
| |
| expectedRestoreTbl = (OlapTable) db.getTable(CatalogMocker.TEST_TBL2_ID); |
| BackupOlapTableInfo tblInfo = new BackupOlapTableInfo(); |
| tblInfo.id = CatalogMocker.TEST_TBL2_ID; |
| jobInfo.backupOlapTableObjects.put(CatalogMocker.TEST_TBL2_NAME, tblInfo); |
| |
| for (Partition partition : expectedRestoreTbl.getPartitions()) { |
| BackupPartitionInfo partInfo = new BackupPartitionInfo(); |
| partInfo.id = partition.getId(); |
| tblInfo.partitions.put(partition.getName(), partInfo); |
| |
| for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.VISIBLE)) { |
| BackupIndexInfo idxInfo = new BackupIndexInfo(); |
| idxInfo.id = index.getId(); |
| idxInfo.schemaHash = expectedRestoreTbl.getSchemaHashByIndexId(index.getId()); |
| partInfo.indexes.put(expectedRestoreTbl.getIndexNameById(index.getId()), idxInfo); |
| |
| for (Tablet tablet : index.getTablets()) { |
| List<String> files = Lists.newArrayList(tablet.getId() + ".dat", |
| tablet.getId()+ ".idx", tablet.getId()+".hdr"); |
| BackupTabletInfo tabletInfo = new BackupTabletInfo(tablet.getId(), files); |
| idxInfo.sortedTabletInfoList.add(tabletInfo); |
| } |
| } |
| } |
| |
| // drop this table, cause we want to try restoring this table |
| db.dropTable(expectedRestoreTbl.getName()); |
| |
| job = new RestoreJob(label, "2018-01-01 01:01:01", db.getId(), db.getFullName(), |
| jobInfo, false, 3, 100000, -1, catalog, repo.getId()); |
| |
| List<Table> tbls = Lists.newArrayList(); |
| List<Resource> resources = Lists.newArrayList(); |
| tbls.add(expectedRestoreTbl); |
| backupMeta = new BackupMeta(tbls, resources); |
| } |
| |
| @Ignore |
| @Test |
| public void testRun() { |
| // pending |
| job.run(); |
| Assert.assertEquals(Status.OK, job.getStatus()); |
| Assert.assertEquals(RestoreJobState.SNAPSHOTING, job.getState()); |
| Assert.assertEquals(12, job.getFileMapping().getMapping().size()); |
| |
| // 2. snapshoting |
| job.run(); |
| Assert.assertEquals(Status.OK, job.getStatus()); |
| Assert.assertEquals(RestoreJobState.SNAPSHOTING, job.getState()); |
| Assert.assertEquals(12 * 2, AgentTaskQueue.getTaskNum()); |
| |
| // 3. snapshot finished |
| List<AgentTask> agentTasks = Lists.newArrayList(); |
| Map<TTaskType, Set<Long>> runningTasks = Maps.newHashMap(); |
| agentTasks.addAll(AgentTaskQueue.getDiffTasks(CatalogMocker.BACKEND1_ID, runningTasks)); |
| agentTasks.addAll(AgentTaskQueue.getDiffTasks(CatalogMocker.BACKEND2_ID, runningTasks)); |
| agentTasks.addAll(AgentTaskQueue.getDiffTasks(CatalogMocker.BACKEND3_ID, runningTasks)); |
| Assert.assertEquals(12 * 2, agentTasks.size()); |
| |
| for (AgentTask agentTask : agentTasks) { |
| if (agentTask.getTaskType() != TTaskType.MAKE_SNAPSHOT) { |
| continue; |
| } |
| |
| SnapshotTask task = (SnapshotTask) agentTask; |
| String snapshotPath = "/path/to/snapshot/" + System.currentTimeMillis(); |
| TStatus taskStatus = new TStatus(TStatusCode.OK); |
| TBackend tBackend = new TBackend("", 0, 1); |
| TFinishTaskRequest request = new TFinishTaskRequest(tBackend, TTaskType.MAKE_SNAPSHOT, |
| task.getSignature(), taskStatus); |
| request.setSnapshotPath(snapshotPath); |
| Assert.assertTrue(job.finishTabletSnapshotTask(task, request)); |
| } |
| |
| job.run(); |
| Assert.assertEquals(Status.OK, job.getStatus()); |
| Assert.assertEquals(RestoreJobState.DOWNLOAD, job.getState()); |
| |
| // download |
| AgentTaskQueue.clearAllTasks(); |
| job.run(); |
| Assert.assertEquals(Status.OK, job.getStatus()); |
| Assert.assertEquals(RestoreJobState.DOWNLOADING, job.getState()); |
| Assert.assertEquals(9, AgentTaskQueue.getTaskNum()); |
| |
| // downloading |
| job.run(); |
| Assert.assertEquals(Status.OK, job.getStatus()); |
| Assert.assertEquals(RestoreJobState.DOWNLOADING, job.getState()); |
| |
| List<AgentTask> downloadTasks = Lists.newArrayList(); |
| runningTasks = Maps.newHashMap(); |
| downloadTasks.addAll(AgentTaskQueue.getDiffTasks(CatalogMocker.BACKEND1_ID, runningTasks)); |
| downloadTasks.addAll(AgentTaskQueue.getDiffTasks(CatalogMocker.BACKEND2_ID, runningTasks)); |
| downloadTasks.addAll(AgentTaskQueue.getDiffTasks(CatalogMocker.BACKEND3_ID, runningTasks)); |
| Assert.assertEquals(9, downloadTasks.size()); |
| |
| List<Long> downloadedTabletIds = Lists.newArrayList(); |
| for (AgentTask agentTask : downloadTasks) { |
| TStatus taskStatus = new TStatus(TStatusCode.OK); |
| TBackend tBackend = new TBackend("", 0, 1); |
| TFinishTaskRequest request = new TFinishTaskRequest(tBackend, TTaskType.MAKE_SNAPSHOT, |
| agentTask.getSignature(), taskStatus); |
| request.setDownloadedTabletIds(downloadedTabletIds); |
| Assert.assertTrue(job.finishTabletDownloadTask((DownloadTask) agentTask, request)); |
| } |
| |
| job.run(); |
| Assert.assertEquals(Status.OK, job.getStatus()); |
| Assert.assertEquals(RestoreJobState.COMMIT, job.getState()); |
| |
| // commit |
| AgentTaskQueue.clearAllTasks(); |
| job.run(); |
| Assert.assertEquals(Status.OK, job.getStatus()); |
| Assert.assertEquals(RestoreJobState.COMMITTING, job.getState()); |
| Assert.assertEquals(12, AgentTaskQueue.getTaskNum()); |
| |
| // committing |
| job.run(); |
| Assert.assertEquals(Status.OK, job.getStatus()); |
| Assert.assertEquals(RestoreJobState.COMMITTING, job.getState()); |
| |
| List<AgentTask> dirMoveTasks = Lists.newArrayList(); |
| runningTasks = Maps.newHashMap(); |
| dirMoveTasks.addAll(AgentTaskQueue.getDiffTasks(CatalogMocker.BACKEND1_ID, runningTasks)); |
| dirMoveTasks.addAll(AgentTaskQueue.getDiffTasks(CatalogMocker.BACKEND2_ID, runningTasks)); |
| dirMoveTasks.addAll(AgentTaskQueue.getDiffTasks(CatalogMocker.BACKEND3_ID, runningTasks)); |
| Assert.assertEquals(12, dirMoveTasks.size()); |
| |
| for (AgentTask agentTask : dirMoveTasks) { |
| TStatus taskStatus = new TStatus(TStatusCode.OK); |
| TBackend tBackend = new TBackend("", 0, 1); |
| TFinishTaskRequest request = new TFinishTaskRequest(tBackend, TTaskType.MAKE_SNAPSHOT, |
| agentTask.getSignature(), taskStatus); |
| job.finishDirMoveTask((DirMoveTask) agentTask, request); |
| } |
| |
| job.run(); |
| Assert.assertEquals(Status.OK, job.getStatus()); |
| Assert.assertEquals(RestoreJobState.FINISHED, job.getState()); |
| } |
| |
| @Test |
| public void testSignature() { |
| Adler32 sig1 = new Adler32(); |
| sig1.update("name1".getBytes()); |
| sig1.update("name2".getBytes()); |
| System.out.println("sig1: " + Math.abs((int) sig1.getValue())); |
| |
| Adler32 sig2 = new Adler32(); |
| sig2.update("name2".getBytes()); |
| sig2.update("name1".getBytes()); |
| System.out.println("sig2: " + Math.abs((int) sig2.getValue())); |
| |
| OlapTable tbl = (OlapTable) db.getTable(CatalogMocker.TEST_TBL_NAME); |
| List<String> partNames = Lists.newArrayList(tbl.getPartitionNames()); |
| System.out.println(partNames); |
| System.out.println("tbl signature: " + tbl.getSignature(BackupHandler.SIGNATURE_VERSION, partNames)); |
| tbl.setName("newName"); |
| partNames = Lists.newArrayList(tbl.getPartitionNames()); |
| System.out.println("tbl signature: " + tbl.getSignature(BackupHandler.SIGNATURE_VERSION, partNames)); |
| } |
| |
| } |
| |