blob: 6d15fdc650c91ee691629c303a690404f619f3f6 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.load;
import mockit.Expectations;
import mockit.Mocked;
import org.apache.doris.analysis.LoadStmt;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.FakeCatalog;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.LoadException;
import com.google.common.collect.Maps;
import org.junit.Assert;
import org.junit.Test;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.util.Map;
public class DppConfigTest {
private FakeCatalog fakeCatalog;
@Test
public void testNormal(@Mocked Catalog catalog) throws LoadException {
// mock catalog
int clusterId = 10;
fakeCatalog = new FakeCatalog();
FakeCatalog.setCatalog(catalog);
new Expectations() {
{
catalog.getClusterId();
minTimes = 0;
result = clusterId;
}
};
Map<String, String> configMap = Maps.newHashMap();
configMap.put("hadoop_palo_path", "/user/palo2");
configMap.put("hadoop_http_port", "1234");
configMap.put("hadoop_configs", "mapred.job.tracker=127.0.0.1:111;fs.default.name=hdfs://127.0.0.1:112;"
+ "hadoop.job.ugi=user,password;mapred.job.priority=NORMAL");
DppConfig dppConfig = DppConfig.create(configMap);
Assert.assertEquals(String.format("/user/palo2/%d/applications/%s", clusterId, FeConstants.dpp_version),
dppConfig.getApplicationsPath());
Assert.assertEquals(String.format("/user/palo2/%d/output", clusterId), dppConfig.getOutputPath());
Assert.assertEquals("hdfs://127.0.0.1:112", dppConfig.getFsDefaultName());
Assert.assertEquals("user,password", dppConfig.getHadoopJobUgiStr());
Assert.assertEquals("127.0.0.1", dppConfig.getNameNodeHost());
Assert.assertEquals(1234, dppConfig.getHttpPort());
Map<String, String> hadoopConfigs = dppConfig.getHadoopConfigs();
Assert.assertEquals(4, hadoopConfigs.size());
Assert.assertEquals("NORMAL", hadoopConfigs.get("mapred.job.priority"));
// update
Map<String, String> newConfigMap = Maps.newHashMap();
newConfigMap.put("hadoop_configs", "mapred.job.priority=VERY_HIGH");
DppConfig newDppConfig = DppConfig.create(newConfigMap);
dppConfig.update(newDppConfig);
Assert.assertEquals(4, hadoopConfigs.size());
Assert.assertEquals("VERY_HIGH", hadoopConfigs.get("mapred.job.priority"));
newConfigMap.clear();
newConfigMap.put(LoadStmt.BOS_ENDPOINT, "http://127.0.0.2:1234");
newConfigMap.put(LoadStmt.BOS_ACCESSKEY, "123");
newConfigMap.put(LoadStmt.BOS_SECRET_ACCESSKEY, "456");
dppConfig.updateHadoopConfigs(newConfigMap);
Assert.assertEquals(7, hadoopConfigs.size());
Assert.assertEquals("http://127.0.0.2:1234", hadoopConfigs.get("fs.bos.endpoint"));
Assert.assertEquals("123", hadoopConfigs.get("fs.bos.access.key"));
Assert.assertEquals("456", hadoopConfigs.get("fs.bos.secret.access.key"));
// clear
dppConfig.clear();
Assert.assertEquals(3, hadoopConfigs.size());
Assert.assertFalse(hadoopConfigs.containsKey("mapred.job.priority"));
Assert.assertTrue(hadoopConfigs.containsKey("fs.default.name"));
}
@Test(expected = LoadException.class)
public void testHadoopConfigsNotEnough() throws LoadException {
Map<String, String> configMap = Maps.newHashMap();
configMap.put("hadoop_palo_path", "/user/palo2");
configMap.put("hadoop_http_port", "1234");
configMap.put("hadoop_configs", "mapred.job.tracker=127.0.0.1:111;fs.default.name=hdfs://127.0.0.1:112;");
DppConfig dppConfig = DppConfig.create(configMap);
dppConfig.check();
}
@Test(expected = LoadException.class)
public void testBosParamsNotEnough() throws LoadException {
Map<String, String> configMap = Maps.newHashMap();
configMap.put(LoadStmt.BOS_ENDPOINT, "http://127.0.0.2:1234");
configMap.put(LoadStmt.BOS_ACCESSKEY, "123");
DppConfig dppConfig = new DppConfig();
dppConfig.updateHadoopConfigs(configMap);
}
@Test
public void testSerialization() throws Exception {
// mock catalog
fakeCatalog = new FakeCatalog();
FakeCatalog.setMetaVersion(FeMetaVersion.VERSION_12);
Map<String, String> configMap = Maps.newHashMap();
configMap.put("hadoop_palo_path", "/user/palo2");
configMap.put("hadoop_http_port", "1234");
configMap.put("hadoop_configs", "mapred.job.tracker=127.0.0.1:111;fs.default.name=hdfs://127.0.0.1:112;"
+ "hadoop.job.ugi=user,password;mapred.job.priority=NORMAL");
DppConfig dppConfig = DppConfig.create(configMap);
File file = new File("./dppConfigTest");
file.createNewFile();
DataOutputStream dos = new DataOutputStream(new FileOutputStream(file));
dppConfig.write(dos);
dos.flush();
dos.close();
DataInputStream dis = new DataInputStream(new FileInputStream(file));
DppConfig newDppConfig = new DppConfig();
newDppConfig.readFields(dis);
dis.close();
file.delete();
Assert.assertEquals("/user/palo2", newDppConfig.getPaloPath());
Assert.assertEquals(1234, newDppConfig.getHttpPort());
Assert.assertEquals(4, newDppConfig.getHadoopConfigs().size());
}
}