| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| package org.apache.doris.load.routineload; |
| |
| import org.apache.doris.analysis.ColumnSeparator; |
| import org.apache.doris.analysis.CreateRoutineLoadStmt; |
| import org.apache.doris.analysis.ImportSequenceStmt; |
| import org.apache.doris.analysis.LabelName; |
| import org.apache.doris.analysis.ParseNode; |
| import org.apache.doris.analysis.PartitionNames; |
| import org.apache.doris.catalog.Catalog; |
| import org.apache.doris.catalog.Database; |
| import org.apache.doris.catalog.OlapTable; |
| import org.apache.doris.catalog.Table; |
| import org.apache.doris.common.AnalysisException; |
| import org.apache.doris.common.LabelAlreadyUsedException; |
| import org.apache.doris.common.LoadException; |
| import org.apache.doris.common.MetaNotFoundException; |
| import org.apache.doris.common.Pair; |
| import org.apache.doris.common.UserException; |
| import org.apache.doris.common.jmockit.Deencapsulation; |
| import org.apache.doris.common.util.KafkaUtil; |
| import org.apache.doris.load.RoutineLoadDesc; |
| import org.apache.doris.load.loadv2.LoadTask; |
| import org.apache.doris.qe.ConnectContext; |
| import org.apache.doris.system.SystemInfoService; |
| import org.apache.doris.thrift.TResourceInfo; |
| import org.apache.doris.transaction.BeginTransactionException; |
| import org.apache.doris.transaction.GlobalTransactionMgr; |
| |
| import org.apache.kafka.common.PartitionInfo; |
| import org.apache.logging.log4j.LogManager; |
| import org.apache.logging.log4j.Logger; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.Test; |
| |
| import com.google.common.base.Joiner; |
| import com.google.common.collect.Lists; |
| import com.google.common.collect.Maps; |
| |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.UUID; |
| |
| import mockit.Expectations; |
| import mockit.Injectable; |
| import mockit.Mock; |
| import mockit.MockUp; |
| import mockit.Mocked; |
| import mockit.Verifications; |
| |
| public class KafkaRoutineLoadJobTest { |
| private static final Logger LOG = LogManager.getLogger(KafkaRoutineLoadJobTest.class); |
| |
| private String jobName = "job1"; |
| private String dbName = "db1"; |
| private LabelName labelName = new LabelName(dbName, jobName); |
| private String tableNameString = "table1"; |
| private String topicName = "topic1"; |
| private String serverAddress = "http://127.0.0.1:8080"; |
| private String kafkaPartitionString = "1,2,3"; |
| |
| private PartitionNames partitionNames; |
| |
| private ColumnSeparator columnSeparator = new ColumnSeparator(","); |
| |
| private ImportSequenceStmt sequenceStmt = new ImportSequenceStmt("source_sequence"); |
| |
| @Mocked |
| ConnectContext connectContext; |
| @Mocked |
| TResourceInfo tResourceInfo; |
| |
| @Before |
| public void init() { |
| List<String> partitionNameList = Lists.newArrayList(); |
| partitionNameList.add("p1"); |
| partitionNames = new PartitionNames(false, partitionNameList); |
| } |
| |
| @Test |
| public void testBeNumMin(@Injectable PartitionInfo partitionInfo1, |
| @Injectable PartitionInfo partitionInfo2, |
| @Mocked Catalog catalog, |
| @Mocked SystemInfoService systemInfoService, |
| @Mocked Database database, |
| @Mocked RoutineLoadDesc routineLoadDesc) throws MetaNotFoundException { |
| List<Integer> partitionList1 = Lists.newArrayList(1, 2); |
| List<Integer> partitionList2 = Lists.newArrayList(1, 2, 3); |
| List<Integer> partitionList3 = Lists.newArrayList(1, 2, 3, 4); |
| List<Integer> partitionList4 = Lists.newArrayList(1, 2, 3, 4, 5, 6, 7); |
| List<Long> beIds1 = Lists.newArrayList(1L); |
| List<Long> beIds2 = Lists.newArrayList(1L, 2L, 3L, 4L); |
| |
| String clusterName1 = "default1"; |
| String clusterName2 = "default2"; |
| |
| new Expectations() { |
| { |
| Catalog.getCurrentSystemInfo(); |
| minTimes = 0; |
| result = systemInfoService; |
| systemInfoService.getClusterBackendIds(clusterName1, true); |
| minTimes = 0; |
| result = beIds1; |
| systemInfoService.getClusterBackendIds(clusterName2, true); |
| result = beIds2; |
| minTimes = 0; |
| } |
| }; |
| |
| // 2 partitions, 1 be |
| RoutineLoadJob routineLoadJob = |
| new KafkaRoutineLoadJob(1L, "kafka_routine_load_job", clusterName1, 1L, |
| 1L, "127.0.0.1:9020", "topic1"); |
| Deencapsulation.setField(routineLoadJob, "currentKafkaPartitions", partitionList1); |
| Assert.assertEquals(1, routineLoadJob.calculateCurrentConcurrentTaskNum()); |
| |
| // 3 partitions, 4 be |
| routineLoadJob = new KafkaRoutineLoadJob(1L, "kafka_routine_load_job", clusterName2, 1L, |
| 1L, "127.0.0.1:9020", "topic1"); |
| Deencapsulation.setField(routineLoadJob, "currentKafkaPartitions", partitionList2); |
| Assert.assertEquals(3, routineLoadJob.calculateCurrentConcurrentTaskNum()); |
| |
| // 4 partitions, 4 be |
| routineLoadJob = new KafkaRoutineLoadJob(1L, "kafka_routine_load_job", clusterName2, 1L, |
| 1L, "127.0.0.1:9020", "topic1"); |
| Deencapsulation.setField(routineLoadJob, "currentKafkaPartitions", partitionList3); |
| Assert.assertEquals(4, routineLoadJob.calculateCurrentConcurrentTaskNum()); |
| |
| // 7 partitions, 4 be |
| routineLoadJob = new KafkaRoutineLoadJob(1L, "kafka_routine_load_job", clusterName2, 1L, |
| 1L, "127.0.0.1:9020", "topic1"); |
| Deencapsulation.setField(routineLoadJob, "currentKafkaPartitions", partitionList4); |
| Assert.assertEquals(4, routineLoadJob.calculateCurrentConcurrentTaskNum()); |
| } |
| |
| |
| @Test |
| public void testDivideRoutineLoadJob(@Injectable RoutineLoadManager routineLoadManager, |
| @Mocked RoutineLoadDesc routineLoadDesc) |
| throws UserException { |
| |
| Catalog catalog = Deencapsulation.newInstance(Catalog.class); |
| |
| RoutineLoadJob routineLoadJob = |
| new KafkaRoutineLoadJob(1L, "kafka_routine_load_job", "default", 1L, |
| 1L, "127.0.0.1:9020", "topic1"); |
| |
| new Expectations(catalog) { |
| { |
| catalog.getRoutineLoadManager(); |
| minTimes = 0; |
| result = routineLoadManager; |
| } |
| }; |
| |
| RoutineLoadTaskScheduler routineLoadTaskScheduler = new RoutineLoadTaskScheduler(routineLoadManager); |
| Deencapsulation.setField(catalog, "routineLoadTaskScheduler", routineLoadTaskScheduler); |
| |
| Deencapsulation.setField(routineLoadJob, "currentKafkaPartitions", Arrays.asList(1, 4, 6)); |
| |
| routineLoadJob.divideRoutineLoadJob(2); |
| |
| // todo(ml): assert |
| List<RoutineLoadTaskInfo> routineLoadTaskInfoList = Deencapsulation.getField(routineLoadJob, "routineLoadTaskInfoList"); |
| Assert.assertEquals(2, routineLoadTaskInfoList.size()); |
| for (RoutineLoadTaskInfo routineLoadTaskInfo : routineLoadTaskInfoList) { |
| KafkaTaskInfo kafkaTaskInfo = (KafkaTaskInfo) routineLoadTaskInfo; |
| Assert.assertEquals(false, kafkaTaskInfo.isRunning()); |
| if (kafkaTaskInfo.getPartitions().size() == 2) { |
| Assert.assertTrue(kafkaTaskInfo.getPartitions().contains(1)); |
| Assert.assertTrue(kafkaTaskInfo.getPartitions().contains(6)); |
| } else if (kafkaTaskInfo.getPartitions().size() == 1) { |
| Assert.assertTrue(kafkaTaskInfo.getPartitions().contains(4)); |
| } else { |
| Assert.fail(); |
| } |
| } |
| } |
| |
| @Test |
| public void testProcessTimeOutTasks(@Injectable GlobalTransactionMgr globalTransactionMgr, |
| @Injectable RoutineLoadManager routineLoadManager, |
| @Mocked RoutineLoadDesc routineLoadDesc) |
| throws AnalysisException, LabelAlreadyUsedException, |
| BeginTransactionException { |
| |
| Catalog catalog = Deencapsulation.newInstance(Catalog.class); |
| |
| RoutineLoadJob routineLoadJob = |
| new KafkaRoutineLoadJob(1L, "kafka_routine_load_job", "default", 1L, |
| 1L, "127.0.0.1:9020", "topic1"); |
| long maxBatchIntervalS = 10; |
| Deencapsulation.setField(routineLoadJob, "maxBatchIntervalS", maxBatchIntervalS); |
| new Expectations() { |
| { |
| catalog.getRoutineLoadManager(); |
| minTimes = 0; |
| result = routineLoadManager; |
| } |
| }; |
| |
| List<RoutineLoadTaskInfo> routineLoadTaskInfoList = new ArrayList<>(); |
| Map<Integer, Long> partitionIdsToOffset = Maps.newHashMap(); |
| partitionIdsToOffset.put(100, 0L); |
| KafkaTaskInfo kafkaTaskInfo = new KafkaTaskInfo(new UUID(1, 1), 1L, "default_cluster", |
| maxBatchIntervalS * 2 * 1000, partitionIdsToOffset); |
| kafkaTaskInfo.setExecuteStartTimeMs(System.currentTimeMillis() - maxBatchIntervalS * 2 * 1000 - 1); |
| routineLoadTaskInfoList.add(kafkaTaskInfo); |
| |
| Deencapsulation.setField(routineLoadJob, "routineLoadTaskInfoList", routineLoadTaskInfoList); |
| |
| routineLoadJob.processTimeoutTasks(); |
| new Verifications() { |
| { |
| List<RoutineLoadTaskInfo> idToRoutineLoadTask = |
| Deencapsulation.getField(routineLoadJob, "routineLoadTaskInfoList"); |
| Assert.assertNotEquals("1", idToRoutineLoadTask.get(0).getId()); |
| Assert.assertEquals(1, idToRoutineLoadTask.size()); |
| } |
| }; |
| } |
| |
| @Test |
| public void testFromCreateStmtWithErrorTable(@Mocked Catalog catalog, |
| @Injectable Database database) throws LoadException { |
| CreateRoutineLoadStmt createRoutineLoadStmt = initCreateRoutineLoadStmt(); |
| RoutineLoadDesc routineLoadDesc = new RoutineLoadDesc(columnSeparator, null, null, null, |
| partitionNames, null, LoadTask.MergeType.APPEND, null); |
| Deencapsulation.setField(createRoutineLoadStmt, "routineLoadDesc", routineLoadDesc); |
| |
| new Expectations() { |
| { |
| database.getTable(tableNameString); |
| minTimes = 0; |
| result = null; |
| } |
| }; |
| |
| try { |
| KafkaRoutineLoadJob kafkaRoutineLoadJob = KafkaRoutineLoadJob.fromCreateStmt(createRoutineLoadStmt); |
| Assert.fail(); |
| } catch (UserException e) { |
| LOG.info(e.getMessage()); |
| } |
| } |
| |
| @Test |
| public void testFromCreateStmt(@Mocked Catalog catalog, |
| @Injectable Database database, |
| @Injectable OlapTable table) throws UserException { |
| CreateRoutineLoadStmt createRoutineLoadStmt = initCreateRoutineLoadStmt(); |
| RoutineLoadDesc routineLoadDesc = new RoutineLoadDesc(columnSeparator, null, null, null, partitionNames, null, |
| LoadTask.MergeType.APPEND, sequenceStmt.getSequenceColName()); |
| Deencapsulation.setField(createRoutineLoadStmt, "routineLoadDesc", routineLoadDesc); |
| List<Pair<Integer, Long>> partitionIdToOffset = Lists.newArrayList(); |
| List<PartitionInfo> kafkaPartitionInfoList = Lists.newArrayList(); |
| for (String s : kafkaPartitionString.split(",")) { |
| partitionIdToOffset.add(new Pair<>(Integer.valueOf(s), 0l)); |
| PartitionInfo partitionInfo = new PartitionInfo(topicName, Integer.valueOf(s), null, null, null); |
| kafkaPartitionInfoList.add(partitionInfo); |
| } |
| Deencapsulation.setField(createRoutineLoadStmt, "kafkaPartitionOffsets", partitionIdToOffset); |
| Deencapsulation.setField(createRoutineLoadStmt, "kafkaBrokerList", serverAddress); |
| Deencapsulation.setField(createRoutineLoadStmt, "kafkaTopic", topicName); |
| long dbId = 1l; |
| long tableId = 2L; |
| |
| new Expectations() { |
| { |
| database.getTable(tableNameString); |
| minTimes = 0; |
| result = table; |
| database.getId(); |
| minTimes = 0; |
| result = dbId; |
| table.getId(); |
| minTimes = 0; |
| result = tableId; |
| table.getType(); |
| minTimes = 0; |
| result = Table.TableType.OLAP; |
| } |
| }; |
| |
| new MockUp<KafkaUtil>() { |
| @Mock |
| public List<Integer> getAllKafkaPartitions(String brokerList, String topic, |
| Map<String, String> convertedCustomProperties) throws UserException { |
| return Lists.newArrayList(1, 2, 3); |
| } |
| }; |
| |
| KafkaRoutineLoadJob kafkaRoutineLoadJob = KafkaRoutineLoadJob.fromCreateStmt(createRoutineLoadStmt); |
| Assert.assertEquals(jobName, kafkaRoutineLoadJob.getName()); |
| Assert.assertEquals(dbId, kafkaRoutineLoadJob.getDbId()); |
| Assert.assertEquals(tableId, kafkaRoutineLoadJob.getTableId()); |
| Assert.assertEquals(serverAddress, Deencapsulation.getField(kafkaRoutineLoadJob, "brokerList")); |
| Assert.assertEquals(topicName, Deencapsulation.getField(kafkaRoutineLoadJob, "topic")); |
| List<Integer> kafkaPartitionResult = Deencapsulation.getField(kafkaRoutineLoadJob, "customKafkaPartitions"); |
| Assert.assertEquals(kafkaPartitionString, Joiner.on(",").join(kafkaPartitionResult)); |
| Assert.assertEquals(sequenceStmt.getSequenceColName(), kafkaRoutineLoadJob.getSequenceCol()); |
| } |
| |
| private CreateRoutineLoadStmt initCreateRoutineLoadStmt() { |
| List<ParseNode> loadPropertyList = new ArrayList<>(); |
| loadPropertyList.add(columnSeparator); |
| loadPropertyList.add(partitionNames); |
| Map<String, String> properties = Maps.newHashMap(); |
| properties.put(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY, "2"); |
| String typeName = LoadDataSourceType.KAFKA.name(); |
| Map<String, String> customProperties = Maps.newHashMap(); |
| |
| customProperties.put(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY, topicName); |
| customProperties.put(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY, serverAddress); |
| customProperties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, kafkaPartitionString); |
| |
| CreateRoutineLoadStmt createRoutineLoadStmt = new CreateRoutineLoadStmt(labelName, tableNameString, |
| loadPropertyList, properties, |
| typeName, customProperties, |
| LoadTask.MergeType.APPEND); |
| Deencapsulation.setField(createRoutineLoadStmt, "name", jobName); |
| return createRoutineLoadStmt; |
| } |
| } |