blob: 00f8522c4af44e68d2bb917313280c1a7f9f9351 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kylin.rest.controller;
import java.sql.Connection;
import java.sql.DriverManager;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.stream.Collectors;
import org.apache.hadoop.util.Shell;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.RandomUtil;
import org.apache.kylin.common.util.TempMetadataBuilder;
import org.apache.kylin.engine.spark.ExecutableUtils;
import org.apache.kylin.engine.spark.job.NSparkCubingJob;
import org.apache.kylin.engine.spark.merger.AfterBuildResourceMerger;
import org.apache.kylin.guava30.shaded.common.base.Preconditions;
import org.apache.kylin.guava30.shaded.common.collect.Lists;
import org.apache.kylin.guava30.shaded.common.collect.Sets;
import org.apache.kylin.job.engine.JobEngineConfig;
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.job.execution.NExecutableManager;
import org.apache.kylin.job.impl.threadpool.NDefaultScheduler;
import org.apache.kylin.metadata.cube.model.IndexEntity;
import org.apache.kylin.metadata.cube.model.LayoutEntity;
import org.apache.kylin.metadata.cube.model.NDataSegment;
import org.apache.kylin.metadata.cube.model.NDataflowManager;
import org.apache.kylin.metadata.cube.model.NDataflowUpdate;
import org.apache.kylin.metadata.cube.model.NIndexPlanManager;
import org.apache.kylin.metadata.model.ColumnDesc;
import org.apache.kylin.metadata.model.NTableMetadataManager;
import org.apache.kylin.metadata.model.SegmentRange;
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.metadata.project.NProjectManager;
import org.apache.kylin.metadata.project.ProjectInstance;
import org.apache.kylin.metadata.user.ManagedUser;
import org.apache.kylin.rest.constant.Constant;
import org.apache.kylin.rest.request.SQLRequest;
import org.apache.kylin.rest.service.QueryService;
import org.apache.kylin.rest.service.UserGrantedAuthority;
import org.apache.kylin.rest.service.UserService;
import org.apache.kylin.server.AbstractMVCIntegrationTestCase;
import org.apache.kylin.source.jdbc.H2Database;
import org.apache.kylin.util.JobFinishHelper;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparderEnv;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.internal.StaticSQLConf;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mockito;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.security.authentication.TestingAuthenticationToken;
import org.springframework.security.core.context.SecurityContextHolder;
import lombok.val;
import lombok.var;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class NBuildAndQueryMetricsTest extends AbstractMVCIntegrationTestCase {
private static final String CSV_TABLE_DIR = TempMetadataBuilder.TEMP_TEST_METADATA + "/data/%s.csv";
protected static SparkConf sparkConf;
protected static SparkSession ss;
@Autowired
protected UserService userService;
@Autowired
QueryService queryService;
@BeforeClass
public static void beforeClass() {
if (Shell.MAC)
overwriteSystemPropBeforeClass("org.xerial.snappy.lib.name", "libsnappyjava.jnilib");//for snappy
sparkConf = new SparkConf().setAppName(RandomUtil.randomUUIDStr()).setMaster("local[4]");
sparkConf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer");
sparkConf.set(StaticSQLConf.CATALOG_IMPLEMENTATION().key(), "in-memory");
sparkConf.set("spark.sql.shuffle.partitions", "1");
sparkConf.set(StaticSQLConf.WAREHOUSE_PATH().key(),
TempMetadataBuilder.TEMP_TEST_METADATA + "/spark-warehouse");
ss = SparkSession.builder().config(sparkConf).getOrCreate();
SparderEnv.setSparkSession(ss);
}
@AfterClass
public static void afterClass() {
ss.close();
}
private static DataType convertType(org.apache.kylin.metadata.datatype.DataType type) {
if (type.isTimeFamily())
return DataTypes.TimestampType;
if (type.isDateTimeFamily())
return DataTypes.DateType;
if (type.isIntegerFamily())
switch (type.getName()) {
case "tinyint":
return DataTypes.ByteType;
case "smallint":
return DataTypes.ShortType;
case "integer":
case "int4":
return DataTypes.IntegerType;
default:
return DataTypes.LongType;
}
if (type.isNumberFamily())
switch (type.getName()) {
case "float":
return DataTypes.FloatType;
case "double":
return DataTypes.DoubleType;
default:
if (type.getPrecision() == -1 || type.getScale() == -1) {
return DataTypes.createDecimalType(19, 4);
} else {
return DataTypes.createDecimalType(type.getPrecision(), type.getScale());
}
}
if (type.isStringFamily())
return DataTypes.StringType;
if (type.isBoolean())
return DataTypes.BooleanType;
throw new IllegalArgumentException("KAP data type: " + type + " can not be converted to spark's type.");
}
@Before
public void setup() throws Exception {
overwriteSystemProp("kylin.engine.spark.build-job-progress-reporter", //
"org.apache.kylin.engine.spark.job.MockJobProgressReport");
overwriteSystemProp("kylin.build.resource.consecutive-idle-state-num", "1");
overwriteSystemProp("kylin.build.resource.state-check-interval-seconds", "1s");
setupPushdownEnv();
SecurityContextHolder.getContext()
.setAuthentication(new TestingAuthenticationToken("ADMIN", "ADMIN", Constant.ROLE_ADMIN));
KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
NProjectManager projectManager = NProjectManager.getInstance(kylinConfig);
ProjectInstance projectInstance = projectManager.getProject(getProject());
val overrideKylinProps = projectInstance.getOverrideKylinProps();
ProjectInstance projectInstanceUpdate = ProjectInstance.create(projectInstance.getName(),
projectInstance.getOwner(), projectInstance.getDescription(), overrideKylinProps);
projectManager.updateProject(projectInstance, projectInstanceUpdate.getName(),
projectInstanceUpdate.getDescription(), projectInstanceUpdate.getOverrideKylinProps());
Preconditions.checkNotNull(projectInstance);
for (String table : projectInstance.getTables()) {
if (!"DEFAULT.TEST_KYLIN_FACT".equals(table) && !"DEFAULT.TEST_ACCOUNT".equals(table)) {
continue;
}
TableDesc tableDesc = NTableMetadataManager.getInstance(kylinConfig, getProject()).getTableDesc(table);
ColumnDesc[] columns = tableDesc.getColumns();
StructType schema = new StructType();
for (ColumnDesc column : columns) {
schema = schema.add(column.getName(), convertType(column.getType()), false);
}
Dataset<Row> ret = ss.read().schema(schema).csv(String.format(Locale.ROOT, CSV_TABLE_DIR, table));
ret.createOrReplaceTempView(tableDesc.getName());
}
val scheduler = NDefaultScheduler.getInstance(getProject());
scheduler.init(new JobEngineConfig(kylinConfig));
NExecutableManager originExecutableManager = NExecutableManager.getInstance(getTestConfig(), getProject());
NExecutableManager executableManager = Mockito.spy(originExecutableManager);
val dsMgr = NDataflowManager.getInstance(kylinConfig, getProject());
// ready dataflow, segment, cuboid layout
var df = dsMgr.getDataflowByModelAlias("test_cube_01_sum_expr_with_count_distinct_expr");
// cleanup all segments first
val update = new NDataflowUpdate(df.getUuid());
update.setToRemoveSegsWithArray(df.getSegments().toArray(new NDataSegment[0]));
dsMgr.updateDataflow(update);
df = dsMgr.getDataflowByModelAlias("test_cube_01_sum_expr_with_count_distinct_expr");
val layouts = df.getIndexPlan().getAllLayouts();
val round1 = Lists.newArrayList(layouts);
val segmentRange = SegmentRange.TimePartitionedSegmentRange.createInfinite();
val toBuildLayouts = Sets.newLinkedHashSet(round1);
val execMgr = NExecutableManager.getInstance(kylinConfig, getProject());
// ready dataflow, segment, cuboid layout
val oneSeg = dsMgr.appendSegment(df, segmentRange);
val job = NSparkCubingJob.create(Sets.newHashSet(oneSeg), toBuildLayouts, "ADMIN", null);
// launch the job
execMgr.addJob(job);
JobFinishHelper.waitJobFinish(kylinConfig, getProject(), job.getId(), 600 * 1000);
Preconditions.checkArgument(executableManager.getJob(job.getId()).getStatus() == ExecutableState.SUCCEED);
val buildStore = ExecutableUtils.getRemoteStore(kylinConfig, job.getSparkCubingStep());
val merger = new AfterBuildResourceMerger(kylinConfig, getProject());
val layoutIds = toBuildLayouts.stream().map(LayoutEntity::getId).collect(Collectors.toSet());
merger.mergeAfterIncrement(df.getUuid(), oneSeg.getId(), layoutIds, buildStore);
val indexManager = NIndexPlanManager.getInstance(getTestConfig(), getProject());
indexManager.updateIndexPlan("73e06974-e642-6b91-e7a0-5cd7f02ec4f2", copyForWrite -> {
List<IndexEntity> indexes = copyForWrite.getIndexes().stream().peek(i -> {
if (i.getId() == 0) {
i.setLayouts(Lists.newArrayList(i.getLayouts().get(0)));
}
}).collect(Collectors.toList());
copyForWrite.setIndexes(indexes);
});
userService.createUser(new ManagedUser("ADMIN", "KYLIN", false,
Collections.singletonList(new UserGrantedAuthority("ROLE_ADMIN"))));
}
@After
public void teardown() throws Exception {
cleanPushdownEnv();
NDefaultScheduler.destroyInstance();
}
@Test
public void testMetricsScanForPushDown() throws Exception {
String sql = "select account_id from test_account limit 30";
assertMetric(sql, 30);
}
@Test
public void testSplitFunction() throws Exception {
String sql = "select split(account_id, '-')[0] from test_account limit 30";
assertMetric(sql, 30);
}
@Test
public void testMetricsScanForTableIndex() throws Exception {
String sql = "select count(distinct case when trans_id > 100 then order_id else 0 end),"
+ "sum(case when trans_id > 100 then price else 0 end), price from test_kylin_fact group by price limit 20";
assertMetric(sql, 10000);
}
@Test
public void testMetricsScanForTableIndex2() throws Exception {
String sql = "select trans_id from test_kylin_fact limit 20";
assertMetric(sql, 4096);
}
@Test
public void testMetricsScanForAggIndex() throws Throwable {
String sql = "select trans_id from test_kylin_fact group by trans_id limit 20";
assertMetric(sql, 10000);
}
private void assertMetric(String sql, long scanRowsExpect) throws Exception {
val req = new SQLRequest();
req.setSql(sql);
req.setProject(getProject());
req.setUsername("ADMIN");
val response = queryService.query(req);
long scanRows = response.getScanRows().get(0);
long scanBytes = response.getScanBytes().get(0);
Assert.assertEquals(scanRowsExpect, scanRows);
Assert.assertTrue(scanBytes > 0);
}
private void setupPushdownEnv() throws Exception {
getTestConfig().setProperty("kylin.query.pushdown.runner-class-name",
"org.apache.kylin.query.pushdown.PushDownRunnerSparkImpl");
getTestConfig().setProperty("kylin.query.pushdown-enabled", "true");
// Load H2 Tables (inner join)
Connection h2Connection = DriverManager.getConnection("jdbc:h2:mem:db_default;DB_CLOSE_DELAY=-1", "sa", "");
H2Database h2DB = new H2Database(h2Connection, getTestConfig(), "default");
h2DB.loadAllTables();
overwriteSystemProp("kylin.query.pushdown.jdbc.url", "jdbc:h2:mem:db_default;SCHEMA=DEFAULT");
overwriteSystemProp("kylin.query.pushdown.jdbc.driver", "org.h2.Driver");
overwriteSystemProp("kylin.query.pushdown.jdbc.username", "sa");
overwriteSystemProp("kylin.query.pushdown.jdbc.password", "");
}
private void cleanPushdownEnv() throws Exception {
getTestConfig().setProperty("kylin.query.pushdown-enabled", "false");
// Load H2 Tables (inner join)
Connection h2Connection = DriverManager.getConnection("jdbc:h2:mem:db_default", "sa", "");
h2Connection.close();
}
protected String getProject() {
return "sum_expr_with_count_distinct";
}
}