blob: a487c181ce036dd93b41a2c11529367ac691d186 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.load.loadv2;
import mockit.Expectations;
import mockit.Injectable;
import mockit.Mock;
import mockit.MockUp;
import mockit.Mocked;
import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.analysis.DataDescription;
import org.apache.doris.analysis.PartitionKeyDesc;
import org.apache.doris.analysis.PartitionValue;
import org.apache.doris.analysis.SingleRangePartitionDesc;
import org.apache.doris.catalog.AggregateType;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.DistributionInfo;
import org.apache.doris.catalog.HashDistributionInfo;
import org.apache.doris.catalog.KeysType;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.PartitionInfo;
import org.apache.doris.catalog.RangePartitionInfo;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.SinglePartitionInfo;
import org.apache.doris.catalog.SparkResource;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.jmockit.Deencapsulation;
import org.apache.doris.load.BrokerFileGroup;
import org.apache.doris.load.BrokerFileGroupAggInfo;
import org.apache.doris.load.loadv2.etl.EtlJobConfig;
import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlFileGroup;
import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlIndex;
import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlPartition;
import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlPartitionInfo;
import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlTable;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.junit.Assert;
import org.junit.Test;
import java.util.List;
import java.util.Map;
public class SparkLoadPendingTaskTest {
@Test
public void testExecuteTask(@Injectable SparkLoadJob sparkLoadJob,
@Injectable SparkResource resource,
@Injectable BrokerDesc brokerDesc,
@Mocked Catalog catalog, @Injectable SparkLoadAppHandle handle,
@Injectable Database database,
@Injectable OlapTable table) throws LoadException {
long dbId = 0L;
long tableId = 1L;
// columns
List<Column> columns = Lists.newArrayList();
columns.add(new Column("c1", Type.BIGINT, true, null, false, null, ""));
// indexes
Map<Long, List<Column>> indexIdToSchema = Maps.newHashMap();
long indexId = 3L;
indexIdToSchema.put(indexId, columns);
// partition and distribution infos
long partitionId = 2L;
DistributionInfo distributionInfo = new HashDistributionInfo(2, Lists.newArrayList(columns.get(0)));
PartitionInfo partitionInfo = new SinglePartitionInfo();
Partition partition = new Partition(partitionId, "p1", null, distributionInfo);
List<Partition> partitions = Lists.newArrayList(partition);
// file group
Map<BrokerFileGroupAggInfo.FileGroupAggKey, List<BrokerFileGroup>> aggKeyToFileGroups = Maps.newHashMap();
List<BrokerFileGroup> brokerFileGroups = Lists.newArrayList();
DataDescription desc = new DataDescription("testTable", null, Lists.newArrayList("abc.txt"),
null, null, null, false, null);
BrokerFileGroup brokerFileGroup = new BrokerFileGroup(desc);
brokerFileGroups.add(brokerFileGroup);
BrokerFileGroupAggInfo.FileGroupAggKey aggKey = new BrokerFileGroupAggInfo.FileGroupAggKey(tableId, null);
aggKeyToFileGroups.put(aggKey, brokerFileGroups);
new Expectations() {
{
catalog.getDb(dbId);
result = database;
sparkLoadJob.getHandle();
result = handle;
database.getTable(tableId);
result = table;
table.getPartitions();
result = partitions;
table.getIndexIdToSchema();
result = indexIdToSchema;
table.getDefaultDistributionInfo();
result = distributionInfo;
table.getSchemaHashByIndexId(indexId);
result = 123;
table.getPartitionInfo();
result = partitionInfo;
table.getPartition(partitionId);
result = partition;
table.getKeysTypeByIndexId(indexId);
result = KeysType.DUP_KEYS;
table.getBaseIndexId();
result = indexId;
}
};
String appId = "application_15888888888_0088";
new MockUp<SparkEtlJobHandler>() {
@Mock
public void submitEtlJob(long loadJobId, String loadLabel, EtlJobConfig etlJobConfig,
SparkResource resource, BrokerDesc brokerDesc, SparkLoadAppHandle handle,
SparkPendingTaskAttachment attachment) throws LoadException {
attachment.setAppId(appId);
}
};
SparkLoadPendingTask task = new SparkLoadPendingTask(sparkLoadJob, aggKeyToFileGroups, resource, brokerDesc);
task.init();
SparkPendingTaskAttachment attachment = Deencapsulation.getField(task, "attachment");
Assert.assertEquals(null, attachment.getAppId());
task.executeTask();
Assert.assertEquals(appId, attachment.getAppId());
}
@Test(expected = LoadException.class)
public void testNoDb(@Injectable SparkLoadJob sparkLoadJob,
@Injectable SparkResource resource,
@Injectable BrokerDesc brokerDesc,
@Mocked Catalog catalog) throws LoadException {
long dbId = 0L;
new Expectations() {
{
catalog.getDb(dbId);
result = null;
}
};
SparkLoadPendingTask task = new SparkLoadPendingTask(sparkLoadJob, null, resource, brokerDesc);
task.init();
}
@Test(expected = LoadException.class)
public void testNoTable(@Injectable SparkLoadJob sparkLoadJob,
@Injectable SparkResource resource,
@Injectable BrokerDesc brokerDesc,
@Mocked Catalog catalog,
@Injectable Database database) throws LoadException {
long dbId = 0L;
long tableId = 1L;
Map<BrokerFileGroupAggInfo.FileGroupAggKey, List<BrokerFileGroup>> aggKeyToFileGroups = Maps.newHashMap();
List<BrokerFileGroup> brokerFileGroups = Lists.newArrayList();
DataDescription desc = new DataDescription("testTable", null, Lists.newArrayList("abc.txt"),
null, null, null, false, null);
BrokerFileGroup brokerFileGroup = new BrokerFileGroup(desc);
brokerFileGroups.add(brokerFileGroup);
BrokerFileGroupAggInfo.FileGroupAggKey aggKey = new BrokerFileGroupAggInfo.FileGroupAggKey(tableId, null);
aggKeyToFileGroups.put(aggKey, brokerFileGroups);
new Expectations() {
{
catalog.getDb(dbId);
result = database;
database.getTable(tableId);
result = null;
}
};
SparkLoadPendingTask task = new SparkLoadPendingTask(sparkLoadJob, aggKeyToFileGroups, resource, brokerDesc);
task.init();
}
@Test
public void testRangePartitionHashDistribution(@Injectable SparkLoadJob sparkLoadJob,
@Injectable SparkResource resource,
@Injectable BrokerDesc brokerDesc,
@Mocked Catalog catalog,
@Injectable Database database,
@Injectable OlapTable table) throws LoadException, DdlException, AnalysisException {
long dbId = 0L;
long tableId = 1L;
// c1 is partition column, c2 is distribution column
List<Column> columns = Lists.newArrayList();
columns.add(new Column("c1", Type.INT, true, null, false, null, ""));
columns.add(new Column("c2", ScalarType.createVarchar(10), true, null, false, null, ""));
columns.add(new Column("c3", Type.INT, false, AggregateType.SUM, false, null, ""));
// indexes
Map<Long, List<Column>> indexIdToSchema = Maps.newHashMap();
long index1Id = 3L;
indexIdToSchema.put(index1Id, columns);
long index2Id = 4L;
indexIdToSchema.put(index2Id, Lists.newArrayList(columns.get(0), columns.get(2)));
// partition and distribution info
long partition1Id = 2L;
long partition2Id = 5L;
int distributionColumnIndex = 1;
DistributionInfo distributionInfo = new HashDistributionInfo(3, Lists.newArrayList(columns.get(distributionColumnIndex)));
Partition partition1 = new Partition(partition1Id, "p1", null,
distributionInfo);
Partition partition2 = new Partition(partition2Id, "p2", null,
new HashDistributionInfo(4, Lists.newArrayList(columns.get(distributionColumnIndex))));
int partitionColumnIndex = 0;
List<Partition> partitions = Lists.newArrayList(partition1, partition2);
RangePartitionInfo partitionInfo = new RangePartitionInfo(Lists.newArrayList(columns.get(partitionColumnIndex)));
PartitionKeyDesc partitionKeyDesc1 = new PartitionKeyDesc(Lists.newArrayList(new PartitionValue("10")));
SingleRangePartitionDesc partitionDesc1 = new SingleRangePartitionDesc(false, "p1", partitionKeyDesc1, null);
partitionDesc1.analyze(1, null);
partitionInfo.handleNewSinglePartitionDesc(partitionDesc1, partition1Id, false);
PartitionKeyDesc partitionKeyDesc2 = new PartitionKeyDesc(Lists.newArrayList(new PartitionValue("20")));
SingleRangePartitionDesc partitionDesc2 = new SingleRangePartitionDesc(false, "p2", partitionKeyDesc2, null);
partitionDesc2.analyze(1, null);
partitionInfo.handleNewSinglePartitionDesc(partitionDesc2, partition2Id, false);
// file group
Map<BrokerFileGroupAggInfo.FileGroupAggKey, List<BrokerFileGroup>> aggKeyToFileGroups = Maps.newHashMap();
List<BrokerFileGroup> brokerFileGroups = Lists.newArrayList();
DataDescription desc = new DataDescription("testTable", null, Lists.newArrayList("abc.txt"),
null, null, null, false, null);
BrokerFileGroup brokerFileGroup = new BrokerFileGroup(desc);
brokerFileGroups.add(brokerFileGroup);
BrokerFileGroupAggInfo.FileGroupAggKey aggKey = new BrokerFileGroupAggInfo.FileGroupAggKey(tableId, null);
aggKeyToFileGroups.put(aggKey, brokerFileGroups);
new Expectations() {
{
catalog.getDb(dbId);
result = database;
database.getTable(tableId);
result = table;
table.getPartitions();
result = partitions;
table.getIndexIdToSchema();
result = indexIdToSchema;
table.getDefaultDistributionInfo();
result = distributionInfo;
table.getSchemaHashByIndexId(index1Id);
result = 123;
table.getSchemaHashByIndexId(index2Id);
result = 234;
table.getPartitionInfo();
result = partitionInfo;
table.getPartition(partition1Id);
result = partition1;
table.getPartition(partition2Id);
result = partition2;
table.getKeysTypeByIndexId(index1Id);
result = KeysType.AGG_KEYS;
table.getKeysTypeByIndexId(index2Id);
result = KeysType.AGG_KEYS;
table.getBaseIndexId();
result = index1Id;
}
};
SparkLoadPendingTask task = new SparkLoadPendingTask(sparkLoadJob, aggKeyToFileGroups, resource, brokerDesc);
EtlJobConfig etlJobConfig = Deencapsulation.getField(task, "etlJobConfig");
Assert.assertEquals(null, etlJobConfig);
task.init();
etlJobConfig = Deencapsulation.getField(task, "etlJobConfig");
Assert.assertTrue(etlJobConfig != null);
// check table id
Map<Long, EtlTable> idToEtlTable = etlJobConfig.tables;
Assert.assertEquals(1, idToEtlTable.size());
Assert.assertTrue(idToEtlTable.containsKey(tableId));
// check indexes
EtlTable etlTable = idToEtlTable.get(tableId);
List<EtlIndex> etlIndexes = etlTable.indexes;
Assert.assertEquals(2, etlIndexes.size());
Assert.assertEquals(index1Id, etlIndexes.get(0).indexId);
Assert.assertEquals(index2Id, etlIndexes.get(1).indexId);
// check base index columns
EtlIndex baseIndex = etlIndexes.get(0);
Assert.assertTrue(baseIndex.isBaseIndex);
Assert.assertEquals(3, baseIndex.columns.size());
for (int i = 0; i < columns.size(); i++) {
Assert.assertEquals(columns.get(i).getName(), baseIndex.columns.get(i).columnName);
}
Assert.assertEquals("AGGREGATE", baseIndex.indexType);
// check partitions
EtlPartitionInfo etlPartitionInfo = etlTable.partitionInfo;
Assert.assertEquals("RANGE", etlPartitionInfo.partitionType);
List<String> partitionColumns = etlPartitionInfo.partitionColumnRefs;
Assert.assertEquals(1, partitionColumns.size());
Assert.assertEquals(columns.get(partitionColumnIndex).getName(), partitionColumns.get(0));
List<String> distributionColumns = etlPartitionInfo.distributionColumnRefs;
Assert.assertEquals(1, distributionColumns.size());
Assert.assertEquals(columns.get(distributionColumnIndex).getName(), distributionColumns.get(0));
List<EtlPartition> etlPartitions = etlPartitionInfo.partitions;
Assert.assertEquals(2, etlPartitions.size());
// check file group
List<EtlFileGroup> etlFileGroups = etlTable.fileGroups;
Assert.assertEquals(1, etlFileGroups.size());
}
}