blob: c904f8f712df89b78bbefe599f15d7b5c0117c6e [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.catalog;
import org.apache.doris.analysis.AccessTestUtil;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.CreateResourceStmt;
import org.apache.doris.analysis.ResourceDesc;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.proc.BaseProcResult;
import org.apache.doris.mysql.privilege.PaloAuth;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
import com.google.common.collect.Maps;
import mockit.Expectations;
import mockit.Injectable;
import mockit.Mocked;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.Map;
public class SparkResourceTest {
private String name;
private String type;
private String master;
private String workingDir;
private String broker;
private Map<String, String> properties;
private Analyzer analyzer;
@Before
public void setUp() {
name = "spark0";
type = "spark";
master = "spark://127.0.0.1:7077";
workingDir = "hdfs://127.0.0.1/tmp/doris";
broker = "broker0";
properties = Maps.newHashMap();
properties.put("type", type);
properties.put("spark.master", master);
properties.put("spark.submit.deployMode", "cluster");
properties.put("working_dir", workingDir);
properties.put("broker", broker);
analyzer = AccessTestUtil.fetchAdminAnalyzer(true);
}
@Test
public void testFromStmt(@Injectable BrokerMgr brokerMgr, @Mocked Catalog catalog, @Injectable PaloAuth auth)
throws UserException {
new Expectations() {
{
catalog.getBrokerMgr();
result = brokerMgr;
brokerMgr.containsBroker(broker);
result = true;
catalog.getAuth();
result = auth;
auth.checkGlobalPriv((ConnectContext) any, PrivPredicate.ADMIN);
result = true;
}
};
// master: spark, deploy_mode: cluster
CreateResourceStmt stmt = new CreateResourceStmt(true, name, properties);
stmt.analyze(analyzer);
SparkResource resource = (SparkResource) Resource.fromStmt(stmt);
Assert.assertEquals(name, resource.getName());
Assert.assertEquals(type, resource.getType().name().toLowerCase());
Assert.assertEquals(master, resource.getMaster());
Assert.assertEquals("cluster", resource.getDeployMode().name().toLowerCase());
Assert.assertEquals(workingDir, resource.getWorkingDir());
Assert.assertEquals(broker, resource.getBroker());
Assert.assertEquals(2, resource.getSparkConfigs().size());
Assert.assertFalse(resource.isYarnMaster());
// master: spark, deploy_mode: client
properties.put("spark.submit.deployMode", "client");
stmt = new CreateResourceStmt(true, name, properties);
stmt.analyze(analyzer);
resource = (SparkResource) Resource.fromStmt(stmt);
Assert.assertEquals("client", resource.getDeployMode().name().toLowerCase());
// master: yarn, deploy_mode cluster
properties.put("spark.master", "yarn");
properties.put("spark.submit.deployMode", "cluster");
properties.put("spark.jars", "xxx.jar,yyy.jar");
properties.put("spark.files", "/tmp/aaa,/tmp/bbb");
properties.put("spark.driver.memory", "1g");
properties.put("spark.hadoop.yarn.resourcemanager.address", "127.0.0.1:9999");
properties.put("spark.hadoop.fs.defaultFS", "hdfs://127.0.0.1:10000");
stmt = new CreateResourceStmt(true, name, properties);
stmt.analyze(analyzer);
resource = (SparkResource) Resource.fromStmt(stmt);
Assert.assertTrue(resource.isYarnMaster());
Map<String, String> map = resource.getSparkConfigs();
Assert.assertEquals(7, map.size());
// test getProcNodeData
BaseProcResult result = new BaseProcResult();
resource.getProcNodeData(result);
Assert.assertEquals(9, result.getRows().size());
}
@Test
public void testUpdate(@Injectable BrokerMgr brokerMgr, @Mocked Catalog catalog, @Injectable PaloAuth auth)
throws UserException {
new Expectations() {
{
catalog.getBrokerMgr();
result = brokerMgr;
brokerMgr.containsBroker(broker);
result = true;
catalog.getAuth();
result = auth;
auth.checkGlobalPriv((ConnectContext) any, PrivPredicate.ADMIN);
result = true;
}
};
properties.put("spark.master", "yarn");
properties.put("spark.submit.deployMode", "cluster");
properties.put("spark.driver.memory", "1g");
properties.put("spark.hadoop.yarn.resourcemanager.address", "127.0.0.1:9999");
properties.put("spark.hadoop.fs.defaultFS", "hdfs://127.0.0.1:10000");
CreateResourceStmt stmt = new CreateResourceStmt(true, name, properties);
stmt.analyze(analyzer);
SparkResource resource = (SparkResource) Resource.fromStmt(stmt);
SparkResource copiedResource = resource.getCopiedResource();
Map<String, String> newProperties = Maps.newHashMap();
newProperties.put("spark.executor.memory", "1g");
newProperties.put("spark.driver.memory", "2g");
ResourceDesc resourceDesc = new ResourceDesc(name, newProperties);
copiedResource.update(resourceDesc);
Map<String, String> map = copiedResource.getSparkConfigs();
Assert.assertEquals(5, resource.getSparkConfigs().size());
Assert.assertEquals("1g", resource.getSparkConfigs().get("spark.driver.memory"));
Assert.assertEquals(6, map.size());
Assert.assertEquals("2g", copiedResource.getSparkConfigs().get("spark.driver.memory"));
}
@Test(expected = DdlException.class)
public void testNoBroker(@Injectable BrokerMgr brokerMgr, @Mocked Catalog catalog, @Injectable PaloAuth auth)
throws UserException {
new Expectations() {
{
catalog.getBrokerMgr();
result = brokerMgr;
brokerMgr.containsBroker(broker);
result = false;
catalog.getAuth();
result = auth;
auth.checkGlobalPriv((ConnectContext) any, PrivPredicate.ADMIN);
result = true;
}
};
CreateResourceStmt stmt = new CreateResourceStmt(true, name, properties);
stmt.analyze(analyzer);
Resource.fromStmt(stmt);
}
}