| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| package org.apache.doris.load.routineload; |
| |
| import org.apache.doris.analysis.ColumnSeparator; |
| import org.apache.doris.analysis.CreateRoutineLoadStmt; |
| import org.apache.doris.analysis.LabelName; |
| import org.apache.doris.analysis.ParseNode; |
| import org.apache.doris.analysis.PauseRoutineLoadStmt; |
| import org.apache.doris.analysis.ResumeRoutineLoadStmt; |
| import org.apache.doris.analysis.StopRoutineLoadStmt; |
| import org.apache.doris.analysis.TableName; |
| import org.apache.doris.catalog.Catalog; |
| import org.apache.doris.catalog.Database; |
| import org.apache.doris.common.AnalysisException; |
| import org.apache.doris.common.Config; |
| import org.apache.doris.common.DdlException; |
| import org.apache.doris.common.InternalErrorCode; |
| import org.apache.doris.common.LoadException; |
| import org.apache.doris.common.MetaNotFoundException; |
| import org.apache.doris.common.UserException; |
| import org.apache.doris.common.jmockit.Deencapsulation; |
| import org.apache.doris.load.loadv2.LoadTask; |
| import org.apache.doris.mysql.privilege.PaloAuth; |
| import org.apache.doris.mysql.privilege.PrivPredicate; |
| import org.apache.doris.persist.EditLog; |
| import org.apache.doris.persist.RoutineLoadOperation; |
| import org.apache.doris.qe.ConnectContext; |
| import org.apache.doris.qe.OriginStatement; |
| import org.apache.doris.system.SystemInfoService; |
| import org.apache.doris.thrift.TResourceInfo; |
| |
| import com.google.common.collect.Lists; |
| import com.google.common.collect.Maps; |
| |
| import org.apache.logging.log4j.LogManager; |
| import org.apache.logging.log4j.Logger; |
| import org.junit.Assert; |
| import org.junit.Test; |
| |
| import java.util.ArrayList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.UUID; |
| |
| import mockit.Expectations; |
| import mockit.Injectable; |
| import mockit.Mock; |
| import mockit.MockUp; |
| import mockit.Mocked; |
| |
| public class RoutineLoadManagerTest { |
| |
| private static final Logger LOG = LogManager.getLogger(RoutineLoadManagerTest.class); |
| |
| @Mocked |
| private SystemInfoService systemInfoService; |
| |
| @Test |
| public void testAddJobByStmt(@Injectable PaloAuth paloAuth, |
| @Injectable TResourceInfo tResourceInfo, |
| @Mocked ConnectContext connectContext, |
| @Mocked Catalog catalog) throws UserException { |
| String jobName = "job1"; |
| String dbName = "db1"; |
| LabelName labelName = new LabelName(dbName, jobName); |
| String tableNameString = "table1"; |
| TableName tableName = new TableName(dbName, tableNameString); |
| List<ParseNode> loadPropertyList = new ArrayList<>(); |
| ColumnSeparator columnSeparator = new ColumnSeparator(","); |
| loadPropertyList.add(columnSeparator); |
| Map<String, String> properties = Maps.newHashMap(); |
| properties.put(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY, "2"); |
| String typeName = LoadDataSourceType.KAFKA.name(); |
| Map<String, String> customProperties = Maps.newHashMap(); |
| String topicName = "topic1"; |
| customProperties.put(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY, topicName); |
| String serverAddress = "http://127.0.0.1:8080"; |
| customProperties.put(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY, serverAddress); |
| CreateRoutineLoadStmt createRoutineLoadStmt = new CreateRoutineLoadStmt(labelName, tableNameString, |
| loadPropertyList, properties, |
| typeName, customProperties, |
| LoadTask.MergeType.APPEND); |
| createRoutineLoadStmt.setOrigStmt(new OriginStatement("dummy", 0)); |
| |
| KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob(1L, jobName, "default_cluster", 1L, 1L, |
| serverAddress, topicName); |
| |
| new MockUp<KafkaRoutineLoadJob>() { |
| @Mock |
| public KafkaRoutineLoadJob fromCreateStmt(CreateRoutineLoadStmt stmt) { |
| return kafkaRoutineLoadJob; |
| } |
| }; |
| |
| new Expectations() { |
| { |
| catalog.getAuth(); |
| minTimes = 0; |
| result = paloAuth; |
| paloAuth.checkTblPriv((ConnectContext) any, anyString, anyString, PrivPredicate.LOAD); |
| minTimes = 0; |
| result = true; |
| } |
| }; |
| RoutineLoadManager routineLoadManager = new RoutineLoadManager(); |
| routineLoadManager.createRoutineLoadJob(createRoutineLoadStmt); |
| |
| Map<String, RoutineLoadJob> idToRoutineLoadJob = |
| Deencapsulation.getField(routineLoadManager, "idToRoutineLoadJob"); |
| Assert.assertEquals(1, idToRoutineLoadJob.size()); |
| RoutineLoadJob routineLoadJob = idToRoutineLoadJob.values().iterator().next(); |
| Assert.assertEquals(1L, routineLoadJob.getDbId()); |
| Assert.assertEquals(jobName, routineLoadJob.getName()); |
| Assert.assertEquals(1L, routineLoadJob.getTableId()); |
| Assert.assertEquals(RoutineLoadJob.JobState.NEED_SCHEDULE, routineLoadJob.getState()); |
| Assert.assertEquals(true, routineLoadJob instanceof KafkaRoutineLoadJob); |
| |
| Map<Long, Map<String, List<RoutineLoadJob>>> dbToNameToRoutineLoadJob = |
| Deencapsulation.getField(routineLoadManager, "dbToNameToRoutineLoadJob"); |
| Assert.assertEquals(1, dbToNameToRoutineLoadJob.size()); |
| Assert.assertEquals(Long.valueOf(1L), dbToNameToRoutineLoadJob.keySet().iterator().next()); |
| Map<String, List<RoutineLoadJob>> nameToRoutineLoadJob = dbToNameToRoutineLoadJob.get(1L); |
| Assert.assertEquals(jobName, nameToRoutineLoadJob.keySet().iterator().next()); |
| Assert.assertEquals(1, nameToRoutineLoadJob.values().size()); |
| Assert.assertEquals(routineLoadJob, nameToRoutineLoadJob.values().iterator().next().get(0)); |
| } |
| |
| @Test |
| public void testCreateJobAuthDeny(@Injectable PaloAuth paloAuth, |
| @Injectable TResourceInfo tResourceInfo, |
| @Mocked ConnectContext connectContext, |
| @Mocked Catalog catalog) { |
| String jobName = "job1"; |
| String dbName = "db1"; |
| LabelName labelName = new LabelName(dbName, jobName); |
| String tableNameString = "table1"; |
| TableName tableName = new TableName(dbName, tableNameString); |
| List<ParseNode> loadPropertyList = new ArrayList<>(); |
| ColumnSeparator columnSeparator = new ColumnSeparator(","); |
| loadPropertyList.add(columnSeparator); |
| Map<String, String> properties = Maps.newHashMap(); |
| properties.put(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY, "2"); |
| String typeName = LoadDataSourceType.KAFKA.name(); |
| Map<String, String> customProperties = Maps.newHashMap(); |
| String topicName = "topic1"; |
| customProperties.put(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY, topicName); |
| String serverAddress = "http://127.0.0.1:8080"; |
| customProperties.put(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY, serverAddress); |
| CreateRoutineLoadStmt createRoutineLoadStmt = new CreateRoutineLoadStmt(labelName, tableNameString, |
| loadPropertyList, properties, |
| typeName, customProperties, |
| LoadTask.MergeType.APPEND); |
| createRoutineLoadStmt.setOrigStmt(new OriginStatement("dummy", 0)); |
| |
| |
| new Expectations() { |
| { |
| catalog.getAuth(); |
| minTimes = 0; |
| result = paloAuth; |
| paloAuth.checkTblPriv((ConnectContext) any, anyString, anyString, PrivPredicate.LOAD); |
| minTimes = 0; |
| result = false; |
| } |
| }; |
| RoutineLoadManager routineLoadManager = new RoutineLoadManager(); |
| try { |
| routineLoadManager.createRoutineLoadJob(createRoutineLoadStmt); |
| Assert.fail(); |
| } catch (LoadException | DdlException e) { |
| Assert.fail(); |
| } catch (AnalysisException e) { |
| LOG.info("Access deny"); |
| } catch (UserException e) { |
| e.printStackTrace(); |
| } |
| } |
| |
| @Test |
| public void testCreateWithSameName(@Mocked ConnectContext connectContext) { |
| String jobName = "job1"; |
| String topicName = "topic1"; |
| String serverAddress = "http://127.0.0.1:8080"; |
| KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob(1L, jobName, "default_cluster", 1L, 1L, |
| serverAddress, topicName); |
| |
| RoutineLoadManager routineLoadManager = new RoutineLoadManager(); |
| |
| Map<Long, Map<String, List<RoutineLoadJob>>> dbToNameToRoutineLoadJob = Maps.newConcurrentMap(); |
| Map<String, List<RoutineLoadJob>> nameToRoutineLoadJob = Maps.newConcurrentMap(); |
| List<RoutineLoadJob> routineLoadJobList = Lists.newArrayList(); |
| KafkaRoutineLoadJob kafkaRoutineLoadJobWithSameName = new KafkaRoutineLoadJob(1L, jobName, "default_cluster", |
| 1L, 1L, serverAddress, topicName); |
| routineLoadJobList.add(kafkaRoutineLoadJobWithSameName); |
| nameToRoutineLoadJob.put(jobName, routineLoadJobList); |
| dbToNameToRoutineLoadJob.put(1L, nameToRoutineLoadJob); |
| |
| Deencapsulation.setField(routineLoadManager, "dbToNameToRoutineLoadJob", dbToNameToRoutineLoadJob); |
| try { |
| routineLoadManager.addRoutineLoadJob(kafkaRoutineLoadJob, "db"); |
| Assert.fail(); |
| } catch (DdlException e) { |
| LOG.info(e.getMessage()); |
| } |
| } |
| |
| @Test |
| public void testCreateWithSameNameOfStoppedJob(@Mocked ConnectContext connectContext, |
| @Mocked Catalog catalog, |
| @Mocked EditLog editLog) throws DdlException { |
| String jobName = "job1"; |
| String topicName = "topic1"; |
| String serverAddress = "http://127.0.0.1:8080"; |
| KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob(1L, jobName, "default_cluster", 1L, 1L, |
| serverAddress, topicName); |
| |
| RoutineLoadManager routineLoadManager = new RoutineLoadManager(); |
| |
| new Expectations() { |
| { |
| catalog.getEditLog(); |
| minTimes = 0; |
| result = editLog; |
| } |
| }; |
| |
| Map<Long, Map<String, List<RoutineLoadJob>>> dbToNameToRoutineLoadJob = Maps.newConcurrentMap(); |
| Map<String, List<RoutineLoadJob>> nameToRoutineLoadJob = Maps.newConcurrentMap(); |
| List<RoutineLoadJob> routineLoadJobList = Lists.newArrayList(); |
| KafkaRoutineLoadJob kafkaRoutineLoadJobWithSameName = new KafkaRoutineLoadJob(1L, jobName, "default_cluster", |
| 1L, 1L, serverAddress, topicName); |
| Deencapsulation.setField(kafkaRoutineLoadJobWithSameName, "state", RoutineLoadJob.JobState.STOPPED); |
| routineLoadJobList.add(kafkaRoutineLoadJobWithSameName); |
| nameToRoutineLoadJob.put(jobName, routineLoadJobList); |
| dbToNameToRoutineLoadJob.put(1L, nameToRoutineLoadJob); |
| Map<String, RoutineLoadJob> idToRoutineLoadJob = Maps.newConcurrentMap(); |
| idToRoutineLoadJob.put(UUID.randomUUID().toString(), kafkaRoutineLoadJobWithSameName); |
| |
| Deencapsulation.setField(routineLoadManager, "dbToNameToRoutineLoadJob", dbToNameToRoutineLoadJob); |
| Deencapsulation.setField(routineLoadManager, "idToRoutineLoadJob", idToRoutineLoadJob); |
| routineLoadManager.addRoutineLoadJob(kafkaRoutineLoadJob, "db"); |
| |
| Map<Long, Map<String, List<RoutineLoadJob>>> result = |
| Deencapsulation.getField(routineLoadManager, "dbToNameToRoutineLoadJob"); |
| Map<String, RoutineLoadJob> result1 = Deencapsulation.getField(routineLoadManager, "idToRoutineLoadJob"); |
| Assert.assertEquals(1, result.size()); |
| Assert.assertEquals(Long.valueOf(1L), result.keySet().iterator().next()); |
| Map<String, List<RoutineLoadJob>> resultNameToRoutineLoadJob = result.get(1L); |
| Assert.assertEquals(jobName, resultNameToRoutineLoadJob.keySet().iterator().next()); |
| Assert.assertEquals(2, resultNameToRoutineLoadJob.values().iterator().next().size()); |
| Assert.assertEquals(2, result1.values().size()); |
| } |
| |
| @Test |
| public void testGetMinTaskBeId(@Injectable RoutineLoadJob routineLoadJob) throws LoadException { |
| List<Long> beIds = Lists.newArrayList(); |
| beIds.add(1L); |
| beIds.add(2L); |
| |
| new Expectations() { |
| { |
| systemInfoService.getClusterBackendIds(anyString, true); |
| minTimes = 0; |
| result = beIds; |
| systemInfoService.getBackendIds(true); |
| minTimes = 0; |
| result = beIds; |
| } |
| }; |
| |
| new MockUp<Catalog>() { |
| SystemInfoService getCurrentSystemInfo() { |
| return systemInfoService; |
| } |
| }; |
| |
| Map<Long, RoutineLoadJob> idToRoutineLoadJob = Maps.newConcurrentMap(); |
| idToRoutineLoadJob.put(1L, routineLoadJob); |
| RoutineLoadManager routineLoadManager = new RoutineLoadManager(); |
| Map<Long, Integer> beIdToConcurrentTaskMap = Maps.newHashMap(); |
| beIdToConcurrentTaskMap.put(1L, 1); |
| |
| new Expectations(routineLoadJob) { |
| { |
| routineLoadJob.getBeCurrentTasksNumMap(); |
| result = beIdToConcurrentTaskMap; |
| routineLoadJob.getState(); |
| result = RoutineLoadJob.JobState.RUNNING; |
| } |
| }; |
| |
| Deencapsulation.setField(routineLoadManager, "idToRoutineLoadJob", idToRoutineLoadJob); |
| |
| Assert.assertEquals(2L, routineLoadManager.getMinTaskBeId("default")); |
| } |
| |
| @Test |
| public void testGetMinTaskBeIdWhileClusterDeleted() { |
| new Expectations() { |
| { |
| systemInfoService.getClusterBackendIds(anyString, true); |
| minTimes = 0; |
| result = null; |
| } |
| }; |
| |
| new MockUp<Catalog>() { |
| SystemInfoService getCurrentSystemInfo() { |
| return systemInfoService; |
| } |
| }; |
| |
| RoutineLoadManager routineLoadManager = new RoutineLoadManager(); |
| try { |
| routineLoadManager.getMinTaskBeId("default"); |
| Assert.fail(); |
| } catch (LoadException e) { |
| // do nothing |
| } |
| |
| } |
| |
| @Test |
| public void testGetMinTaskBeIdWhileNoSlot(@Injectable RoutineLoadJob routineLoadJob) { |
| List<Long> beIds = Lists.newArrayList(); |
| beIds.add(1L); |
| Map<Long, Integer> beIdToConcurrentTaskMap = Maps.newHashMap(); |
| beIdToConcurrentTaskMap.put(1L, 11); |
| |
| new Expectations() { |
| { |
| systemInfoService.getClusterBackendIds(anyString, true); |
| minTimes = 0; |
| result = beIds; |
| systemInfoService.getBackendIds(true); |
| minTimes = 0; |
| result = beIds; |
| routineLoadJob.getBeCurrentTasksNumMap(); |
| minTimes = 0; |
| result = beIdToConcurrentTaskMap; |
| routineLoadJob.getState(); |
| minTimes = 0; |
| result = RoutineLoadJob.JobState.RUNNING; |
| } |
| }; |
| |
| new MockUp<Catalog>() { |
| SystemInfoService getCurrentSystemInfo() { |
| return systemInfoService; |
| } |
| }; |
| |
| RoutineLoadManager routineLoadManager = new RoutineLoadManager(); |
| Config.max_routine_load_task_num_per_be = 0; |
| Map<Long, RoutineLoadJob> routineLoadJobMap = Maps.newHashMap(); |
| routineLoadJobMap.put(1l, routineLoadJob); |
| Deencapsulation.setField(routineLoadManager, "idToRoutineLoadJob", routineLoadJobMap); |
| |
| |
| try { |
| Assert.assertEquals(-1, routineLoadManager.getMinTaskBeId("default")); |
| } catch (LoadException e) { |
| e.printStackTrace(); |
| Assert.fail(); |
| } |
| } |
| |
| @Test |
| public void testGetTotalIdleTaskNum(@Injectable RoutineLoadJob routineLoadJob) { |
| List<Long> beIds = Lists.newArrayList(); |
| beIds.add(1L); |
| beIds.add(2L); |
| |
| new Expectations() { |
| { |
| systemInfoService.getBackendIds(true); |
| minTimes = 0; |
| result = beIds; |
| } |
| }; |
| |
| new MockUp<Catalog>() { |
| SystemInfoService getCurrentSystemInfo() { |
| return systemInfoService; |
| } |
| }; |
| |
| Map<Long, RoutineLoadJob> idToRoutineLoadJob = Maps.newConcurrentMap(); |
| idToRoutineLoadJob.put(1L, routineLoadJob); |
| RoutineLoadManager routineLoadManager = new RoutineLoadManager(); |
| Map<Long, Integer> beIdToConcurrentTaskMap = Maps.newHashMap(); |
| beIdToConcurrentTaskMap.put(1L, 1); |
| |
| new Expectations(routineLoadJob) { |
| { |
| routineLoadJob.getBeCurrentTasksNumMap(); |
| result = beIdToConcurrentTaskMap; |
| routineLoadJob.getState(); |
| result = RoutineLoadJob.JobState.RUNNING; |
| } |
| }; |
| |
| Deencapsulation.setField(routineLoadManager, "idToRoutineLoadJob", idToRoutineLoadJob); |
| |
| Assert.assertEquals(Config.max_routine_load_task_num_per_be * 2 - 1, |
| routineLoadManager.getClusterIdleSlotNum()); |
| } |
| |
| @Test |
| public void testUpdateBeIdTaskMaps() { |
| List<Long> oldBeIds = Lists.newArrayList(); |
| oldBeIds.add(1L); |
| oldBeIds.add(2L); |
| |
| List<Long> newBeIds = Lists.newArrayList(); |
| newBeIds.add(1L); |
| newBeIds.add(3L); |
| |
| new Expectations() { |
| { |
| systemInfoService.getBackendIds(true); |
| minTimes = 0; |
| returns(oldBeIds, newBeIds); |
| } |
| }; |
| |
| new MockUp<Catalog>() { |
| SystemInfoService getCurrentSystemInfo() { |
| return systemInfoService; |
| } |
| }; |
| |
| RoutineLoadManager routineLoadManager = new RoutineLoadManager(); |
| routineLoadManager.updateBeIdToMaxConcurrentTasks(); |
| } |
| |
| @Test |
| public void testGetJobByName(@Injectable RoutineLoadJob routineLoadJob1, |
| @Injectable RoutineLoadJob routineLoadJob2, |
| @Injectable RoutineLoadJob routineLoadJob3) { |
| String jobName = "ilovedoris"; |
| List<RoutineLoadJob> routineLoadJobList1 = Lists.newArrayList(); |
| routineLoadJobList1.add(routineLoadJob1); |
| routineLoadJobList1.add(routineLoadJob2); |
| Map<String, List<RoutineLoadJob>> nameToRoutineLoadList1 = Maps.newHashMap(); |
| nameToRoutineLoadList1.put(jobName, routineLoadJobList1); |
| |
| List<RoutineLoadJob> routineLoadJobList2 = Lists.newArrayList(); |
| routineLoadJobList2.add(routineLoadJob3); |
| Map<String, List<RoutineLoadJob>> nameToRoutineLoadList2 = Maps.newHashMap(); |
| nameToRoutineLoadList2.put(jobName, routineLoadJobList2); |
| |
| Map<String, Map<String, List<RoutineLoadJob>>> dbToNameRoutineLoadList = Maps.newHashMap(); |
| dbToNameRoutineLoadList.put("db1", nameToRoutineLoadList1); |
| dbToNameRoutineLoadList.put("db2", nameToRoutineLoadList2); |
| |
| new Expectations() { |
| { |
| routineLoadJob1.isFinal(); |
| minTimes = 0; |
| result = true; |
| routineLoadJob2.isFinal(); |
| minTimes = 0; |
| result = false; |
| } |
| }; |
| |
| RoutineLoadManager routineLoadManager = new RoutineLoadManager(); |
| Deencapsulation.setField(routineLoadManager, "dbToNameToRoutineLoadJob", dbToNameRoutineLoadList); |
| List<RoutineLoadJob> result = routineLoadManager.getJobByName(jobName); |
| |
| Assert.assertEquals(3, result.size()); |
| Assert.assertEquals(routineLoadJob2, result.get(0)); |
| Assert.assertEquals(routineLoadJob1, result.get(1)); |
| Assert.assertEquals(routineLoadJob3, result.get(2)); |
| |
| } |
| |
| @Test |
| public void testGetJob(@Injectable RoutineLoadJob routineLoadJob1, |
| @Injectable RoutineLoadJob routineLoadJob2, |
| @Injectable RoutineLoadJob routineLoadJob3) throws MetaNotFoundException { |
| |
| new Expectations() { |
| { |
| routineLoadJob1.isFinal(); |
| minTimes = 0; |
| result = true; |
| routineLoadJob2.isFinal(); |
| minTimes = 0; |
| result = false; |
| routineLoadJob3.isFinal(); |
| minTimes = 0; |
| result = true; |
| } |
| }; |
| |
| RoutineLoadManager routineLoadManager = new RoutineLoadManager(); |
| Map<Long, RoutineLoadJob> idToRoutineLoadJob = Maps.newHashMap(); |
| idToRoutineLoadJob.put(1L, routineLoadJob1); |
| idToRoutineLoadJob.put(2L, routineLoadJob2); |
| idToRoutineLoadJob.put(3L, routineLoadJob3); |
| Deencapsulation.setField(routineLoadManager, "idToRoutineLoadJob", idToRoutineLoadJob); |
| List<RoutineLoadJob> result = routineLoadManager.getJob(null, null, true); |
| |
| Assert.assertEquals(3, result.size()); |
| Assert.assertEquals(routineLoadJob2, result.get(0)); |
| Assert.assertEquals(routineLoadJob1, result.get(1)); |
| Assert.assertEquals(routineLoadJob3, result.get(2)); |
| } |
| |
| @Test |
| public void testGetJobIncludeHistory(@Injectable RoutineLoadJob routineLoadJob1, |
| @Injectable RoutineLoadJob routineLoadJob2, |
| @Injectable RoutineLoadJob routineLoadJob3, |
| @Mocked Catalog catalog, |
| @Mocked Database database) throws MetaNotFoundException { |
| new Expectations() { |
| { |
| routineLoadJob1.isFinal(); |
| minTimes = 0; |
| result = true; |
| routineLoadJob2.isFinal(); |
| minTimes = 0; |
| result = false; |
| routineLoadJob3.isFinal(); |
| minTimes = 0; |
| result = true; |
| catalog.getDb(anyString); |
| minTimes = 0; |
| result = database; |
| database.getId(); |
| minTimes = 0; |
| result = 1L; |
| } |
| }; |
| |
| RoutineLoadManager routineLoadManager = new RoutineLoadManager(); |
| Map<Long, Map<String, List<RoutineLoadJob>>> dbToNameToRoutineLoadJob = Maps.newHashMap(); |
| Map<String, List<RoutineLoadJob>> nameToRoutineLoadJob = Maps.newHashMap(); |
| List<RoutineLoadJob> routineLoadJobList = Lists.newArrayList(); |
| routineLoadJobList.add(routineLoadJob1); |
| routineLoadJobList.add(routineLoadJob2); |
| routineLoadJobList.add(routineLoadJob3); |
| nameToRoutineLoadJob.put("", routineLoadJobList); |
| dbToNameToRoutineLoadJob.put(1L, nameToRoutineLoadJob); |
| Deencapsulation.setField(routineLoadManager, "dbToNameToRoutineLoadJob", dbToNameToRoutineLoadJob); |
| List<RoutineLoadJob> result = routineLoadManager.getJob("", "", true); |
| |
| Assert.assertEquals(3, result.size()); |
| Assert.assertEquals(routineLoadJob2, result.get(0)); |
| Assert.assertEquals(routineLoadJob1, result.get(1)); |
| Assert.assertEquals(routineLoadJob3, result.get(2)); |
| } |
| |
| @Test |
| public void testPauseRoutineLoadJob(@Injectable PauseRoutineLoadStmt pauseRoutineLoadStmt, |
| @Mocked Catalog catalog, |
| @Mocked Database database, |
| @Mocked PaloAuth paloAuth, |
| @Mocked ConnectContext connectContext) throws UserException { |
| RoutineLoadManager routineLoadManager = new RoutineLoadManager(); |
| Map<Long, Map<String, List<RoutineLoadJob>>> dbToNameToRoutineLoadJob = Maps.newHashMap(); |
| Map<String, List<RoutineLoadJob>> nameToRoutineLoadJob = Maps.newHashMap(); |
| List<RoutineLoadJob> routineLoadJobList = Lists.newArrayList(); |
| RoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob(); |
| routineLoadJobList.add(routineLoadJob); |
| nameToRoutineLoadJob.put("", routineLoadJobList); |
| dbToNameToRoutineLoadJob.put(1L, nameToRoutineLoadJob); |
| Deencapsulation.setField(routineLoadManager, "dbToNameToRoutineLoadJob", dbToNameToRoutineLoadJob); |
| |
| Map<Long, RoutineLoadJob> idToRoutineLoadJob = Maps.newConcurrentMap(); |
| idToRoutineLoadJob.put(routineLoadJob.getId(), routineLoadJob); |
| Deencapsulation.setField(routineLoadManager, "idToRoutineLoadJob", idToRoutineLoadJob); |
| |
| new Expectations() { |
| { |
| pauseRoutineLoadStmt.getDbFullName(); |
| minTimes = 0; |
| result = ""; |
| pauseRoutineLoadStmt.getName(); |
| minTimes = 0; |
| result = ""; |
| catalog.getDb(""); |
| minTimes = 0; |
| result = database; |
| database.getId(); |
| minTimes = 0; |
| result = 1L; |
| catalog.getAuth(); |
| minTimes = 0; |
| result = paloAuth; |
| paloAuth.checkTblPriv((ConnectContext) any, anyString, anyString, (PrivPredicate) any); |
| minTimes = 0; |
| result = true; |
| } |
| }; |
| |
| routineLoadManager.pauseRoutineLoadJob(pauseRoutineLoadStmt); |
| |
| Assert.assertEquals(RoutineLoadJob.JobState.PAUSED, routineLoadJob.getState()); |
| |
| // 第一次自动恢复 |
| for (int i = 0; i < 3; i++) { |
| Deencapsulation.setField(routineLoadJob, "pauseReason", |
| new ErrorReason(InternalErrorCode.REPLICA_FEW_ERR, "")); |
| routineLoadManager.updateRoutineLoadJob(); |
| Assert.assertEquals(RoutineLoadJob.JobState.NEED_SCHEDULE, routineLoadJob.getState()); |
| Deencapsulation.setField(routineLoadJob, "state", RoutineLoadJob.JobState.PAUSED); |
| boolean autoResumeLock = Deencapsulation.getField(routineLoadJob, "autoResumeLock"); |
| Assert.assertEquals(autoResumeLock, false); |
| } |
| // 第四次自动恢复 就会锁定 |
| routineLoadManager.updateRoutineLoadJob(); |
| Assert.assertEquals(RoutineLoadJob.JobState.PAUSED, routineLoadJob.getState()); |
| boolean autoResumeLock = Deencapsulation.getField(routineLoadJob, "autoResumeLock"); |
| Assert.assertEquals(autoResumeLock, true); |
| } |
| |
| @Test |
| public void testResumeRoutineLoadJob(@Injectable ResumeRoutineLoadStmt resumeRoutineLoadStmt, |
| @Mocked Catalog catalog, |
| @Mocked Database database, |
| @Mocked PaloAuth paloAuth, |
| @Mocked ConnectContext connectContext) throws UserException { |
| RoutineLoadManager routineLoadManager = new RoutineLoadManager(); |
| Map<Long, Map<String, List<RoutineLoadJob>>> dbToNameToRoutineLoadJob = Maps.newHashMap(); |
| Map<String, List<RoutineLoadJob>> nameToRoutineLoadJob = Maps.newHashMap(); |
| List<RoutineLoadJob> routineLoadJobList = Lists.newArrayList(); |
| RoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob(); |
| routineLoadJobList.add(routineLoadJob); |
| nameToRoutineLoadJob.put("", routineLoadJobList); |
| dbToNameToRoutineLoadJob.put(1L, nameToRoutineLoadJob); |
| Deencapsulation.setField(routineLoadManager, "dbToNameToRoutineLoadJob", dbToNameToRoutineLoadJob); |
| |
| new Expectations() { |
| { |
| resumeRoutineLoadStmt.getDbFullName(); |
| minTimes = 0; |
| result = ""; |
| resumeRoutineLoadStmt.getName(); |
| minTimes = 0; |
| result = ""; |
| catalog.getDb(""); |
| minTimes = 0; |
| result = database; |
| database.getId(); |
| minTimes = 0; |
| result = 1L; |
| catalog.getAuth(); |
| minTimes = 0; |
| result = paloAuth; |
| paloAuth.checkTblPriv((ConnectContext) any, anyString, anyString, (PrivPredicate) any); |
| minTimes = 0; |
| result = true; |
| } |
| }; |
| |
| routineLoadManager.resumeRoutineLoadJob(resumeRoutineLoadStmt); |
| |
| Assert.assertEquals(RoutineLoadJob.JobState.NEED_SCHEDULE, routineLoadJob.getState()); |
| } |
| |
| @Test |
| public void testStopRoutineLoadJob(@Injectable StopRoutineLoadStmt stopRoutineLoadStmt, |
| @Mocked Catalog catalog, |
| @Mocked Database database, |
| @Mocked PaloAuth paloAuth, |
| @Mocked ConnectContext connectContext) throws UserException { |
| RoutineLoadManager routineLoadManager = new RoutineLoadManager(); |
| Map<Long, Map<String, List<RoutineLoadJob>>> dbToNameToRoutineLoadJob = Maps.newHashMap(); |
| Map<String, List<RoutineLoadJob>> nameToRoutineLoadJob = Maps.newHashMap(); |
| List<RoutineLoadJob> routineLoadJobList = Lists.newArrayList(); |
| RoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob(); |
| routineLoadJobList.add(routineLoadJob); |
| nameToRoutineLoadJob.put("", routineLoadJobList); |
| dbToNameToRoutineLoadJob.put(1L, nameToRoutineLoadJob); |
| Deencapsulation.setField(routineLoadManager, "dbToNameToRoutineLoadJob", dbToNameToRoutineLoadJob); |
| |
| new Expectations() { |
| { |
| stopRoutineLoadStmt.getDbFullName(); |
| minTimes = 0; |
| result = ""; |
| stopRoutineLoadStmt.getName(); |
| minTimes = 0; |
| result = ""; |
| catalog.getDb(""); |
| minTimes = 0; |
| result = database; |
| database.getId(); |
| minTimes = 0; |
| result = 1L; |
| catalog.getAuth(); |
| minTimes = 0; |
| result = paloAuth; |
| paloAuth.checkTblPriv((ConnectContext) any, anyString, anyString, (PrivPredicate) any); |
| minTimes = 0; |
| result = true; |
| } |
| }; |
| |
| routineLoadManager.stopRoutineLoadJob(stopRoutineLoadStmt); |
| |
| Assert.assertEquals(RoutineLoadJob.JobState.STOPPED, routineLoadJob.getState()); |
| } |
| |
| @Test |
| public void testCheckBeToTask(@Mocked Catalog catalog, |
| @Mocked SystemInfoService systemInfoService) throws LoadException { |
| List<Long> beIdsInCluster = Lists.newArrayList(); |
| beIdsInCluster.add(1L); |
| Map<Long, Integer> beIdToMaxConcurrentTasks = Maps.newHashMap(); |
| beIdToMaxConcurrentTasks.put(1L, 10); |
| new Expectations() { |
| { |
| systemInfoService.getClusterBackendIds("default", true); |
| minTimes = 0; |
| result = beIdsInCluster; |
| } |
| }; |
| |
| RoutineLoadManager routineLoadManager = new RoutineLoadManager(); |
| Config.max_routine_load_task_num_per_be = 10; |
| Deencapsulation.setField(routineLoadManager, "beIdToMaxConcurrentTasks", beIdToMaxConcurrentTasks); |
| Assert.assertEquals(true, routineLoadManager.checkBeToTask(1L, "default")); |
| } |
| |
| @Test |
| public void testCleanOldRoutineLoadJobs(@Injectable RoutineLoadJob routineLoadJob, |
| @Mocked Catalog catalog, |
| @Mocked EditLog editLog) { |
| RoutineLoadManager routineLoadManager = new RoutineLoadManager(); |
| Map<Long, Map<String, List<RoutineLoadJob>>> dbToNameToRoutineLoadJob = Maps.newHashMap(); |
| Map<String, List<RoutineLoadJob>> nameToRoutineLoadJob = Maps.newHashMap(); |
| List<RoutineLoadJob> routineLoadJobList = Lists.newArrayList(); |
| routineLoadJobList.add(routineLoadJob); |
| nameToRoutineLoadJob.put("", routineLoadJobList); |
| dbToNameToRoutineLoadJob.put(1L, nameToRoutineLoadJob); |
| Map<Long, RoutineLoadJob> idToRoutineLoadJob = Maps.newHashMap(); |
| idToRoutineLoadJob.put(1L, routineLoadJob); |
| Deencapsulation.setField(routineLoadManager, "idToRoutineLoadJob", idToRoutineLoadJob); |
| Deencapsulation.setField(routineLoadManager, "dbToNameToRoutineLoadJob", dbToNameToRoutineLoadJob); |
| |
| new Expectations() { |
| { |
| routineLoadJob.needRemove(); |
| minTimes = 0; |
| result = true; |
| routineLoadJob.getDbId(); |
| minTimes = 0; |
| result = 1L; |
| routineLoadJob.getName(); |
| minTimes = 0; |
| result = ""; |
| catalog.getEditLog(); |
| minTimes = 0; |
| result = editLog; |
| } |
| }; |
| routineLoadManager.cleanOldRoutineLoadJobs(); |
| |
| Assert.assertEquals(0, dbToNameToRoutineLoadJob.size()); |
| Assert.assertEquals(0, idToRoutineLoadJob.size()); |
| } |
| |
| @Test |
| public void testGetBeIdConcurrentTaskMaps(@Injectable RoutineLoadJob routineLoadJob) { |
| RoutineLoadManager routineLoadManager = new RoutineLoadManager(); |
| Map<Long, RoutineLoadJob> idToRoutineLoadJob = Maps.newHashMap(); |
| idToRoutineLoadJob.put(1L, routineLoadJob); |
| Deencapsulation.setField(routineLoadManager, "idToRoutineLoadJob", idToRoutineLoadJob); |
| Map<Long, Integer> beIdToConcurrenTaskNum = Maps.newHashMap(); |
| beIdToConcurrenTaskNum.put(1L, 1); |
| |
| new Expectations() { |
| { |
| routineLoadJob.getState(); |
| minTimes = 0; |
| result = RoutineLoadJob.JobState.RUNNING; |
| routineLoadJob.getBeCurrentTasksNumMap(); |
| minTimes = 0; |
| result = beIdToConcurrenTaskNum; |
| } |
| }; |
| |
| Map<Long, Integer> result = Deencapsulation.invoke(routineLoadManager, "getBeCurrentTasksNumMap"); |
| Assert.assertEquals(1, (int) result.get(1l)); |
| |
| } |
| |
| @Test |
| public void testReplayRemoveOldRoutineLoad(@Injectable RoutineLoadOperation operation, |
| @Injectable RoutineLoadJob routineLoadJob) { |
| RoutineLoadManager routineLoadManager = new RoutineLoadManager(); |
| Map<Long, RoutineLoadJob> idToRoutineLoadJob = Maps.newHashMap(); |
| idToRoutineLoadJob.put(1L, routineLoadJob); |
| Deencapsulation.setField(routineLoadManager, "idToRoutineLoadJob", idToRoutineLoadJob); |
| Map<Long, Map<String, List<RoutineLoadJob>>> dbToNameToRoutineLoadJob = Maps.newHashMap(); |
| Map<String, List<RoutineLoadJob>> nameToRoutineLoadJob = Maps.newHashMap(); |
| List<RoutineLoadJob> routineLoadJobList = Lists.newArrayList(); |
| routineLoadJobList.add(routineLoadJob); |
| nameToRoutineLoadJob.put("", routineLoadJobList); |
| dbToNameToRoutineLoadJob.put(1L, nameToRoutineLoadJob); |
| Deencapsulation.setField(routineLoadManager, "dbToNameToRoutineLoadJob", dbToNameToRoutineLoadJob); |
| |
| new Expectations() { |
| { |
| routineLoadJob.getName(); |
| minTimes = 0; |
| result = ""; |
| routineLoadJob.getDbId(); |
| minTimes = 0; |
| result = 1L; |
| operation.getId(); |
| minTimes = 0; |
| result = 1L; |
| } |
| }; |
| |
| routineLoadManager.replayRemoveOldRoutineLoad(operation); |
| Assert.assertEquals(0, idToRoutineLoadJob.size()); |
| } |
| |
| @Test |
| public void testReplayChangeRoutineLoadJob(@Injectable RoutineLoadOperation operation) { |
| RoutineLoadManager routineLoadManager = new RoutineLoadManager(); |
| RoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob(); |
| Deencapsulation.setField(routineLoadJob, "name", ""); |
| Deencapsulation.setField(routineLoadJob, "dbId", 1L); |
| Map<Long, RoutineLoadJob> idToRoutineLoadJob = Maps.newHashMap(); |
| idToRoutineLoadJob.put(1L, routineLoadJob); |
| Deencapsulation.setField(routineLoadManager, "idToRoutineLoadJob", idToRoutineLoadJob); |
| Map<Long, Map<String, List<RoutineLoadJob>>> dbToNameToRoutineLoadJob = Maps.newHashMap(); |
| Map<String, List<RoutineLoadJob>> nameToRoutineLoadJob = Maps.newHashMap(); |
| List<RoutineLoadJob> routineLoadJobList = Lists.newArrayList(); |
| routineLoadJobList.add(routineLoadJob); |
| nameToRoutineLoadJob.put("", routineLoadJobList); |
| dbToNameToRoutineLoadJob.put(1L, nameToRoutineLoadJob); |
| Deencapsulation.setField(routineLoadManager, "dbToNameToRoutineLoadJob", dbToNameToRoutineLoadJob); |
| |
| new Expectations() { |
| { |
| operation.getId(); |
| minTimes = 0; |
| result = 1L; |
| operation.getJobState(); |
| minTimes = 0; |
| result = RoutineLoadJob.JobState.PAUSED; |
| } |
| }; |
| |
| routineLoadManager.replayChangeRoutineLoadJob(operation); |
| Assert.assertEquals(RoutineLoadJob.JobState.PAUSED, routineLoadJob.getState()); |
| } |
| |
| @Test |
| public void testAlterRoutineLoadJob(@Injectable StopRoutineLoadStmt stopRoutineLoadStmt, |
| @Mocked Catalog catalog, |
| @Mocked Database database, |
| @Mocked PaloAuth paloAuth, |
| @Mocked ConnectContext connectContext) throws UserException { |
| RoutineLoadManager routineLoadManager = new RoutineLoadManager(); |
| Map<Long, Map<String, List<RoutineLoadJob>>> dbToNameToRoutineLoadJob = Maps.newHashMap(); |
| Map<String, List<RoutineLoadJob>> nameToRoutineLoadJob = Maps.newHashMap(); |
| List<RoutineLoadJob> routineLoadJobList = Lists.newArrayList(); |
| RoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob(); |
| routineLoadJobList.add(routineLoadJob); |
| nameToRoutineLoadJob.put("", routineLoadJobList); |
| dbToNameToRoutineLoadJob.put(1L, nameToRoutineLoadJob); |
| Deencapsulation.setField(routineLoadManager, "dbToNameToRoutineLoadJob", dbToNameToRoutineLoadJob); |
| |
| new Expectations() { |
| { |
| stopRoutineLoadStmt.getDbFullName(); |
| minTimes = 0; |
| result = ""; |
| stopRoutineLoadStmt.getName(); |
| minTimes = 0; |
| result = ""; |
| catalog.getDb(""); |
| minTimes = 0; |
| result = database; |
| database.getId(); |
| minTimes = 0; |
| result = 1L; |
| catalog.getAuth(); |
| minTimes = 0; |
| result = paloAuth; |
| paloAuth.checkTblPriv((ConnectContext) any, anyString, anyString, (PrivPredicate) any); |
| minTimes = 0; |
| result = true; |
| } |
| }; |
| |
| routineLoadManager.stopRoutineLoadJob(stopRoutineLoadStmt); |
| |
| Assert.assertEquals(RoutineLoadJob.JobState.STOPPED, routineLoadJob.getState()); |
| } |
| } |