blob: 819385b847ad51cedf093e6fe3c56b507c21273d [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource.property;
import org.apache.doris.analysis.CreateCatalogStmt;
import org.apache.doris.analysis.CreateRepositoryStmt;
import org.apache.doris.analysis.CreateResourceStmt;
import org.apache.doris.analysis.DropCatalogStmt;
import org.apache.doris.analysis.OutFileClause;
import org.apache.doris.analysis.QueryStmt;
import org.apache.doris.analysis.SelectStmt;
import org.apache.doris.analysis.TableValuedFunctionRef;
import org.apache.doris.backup.Repository;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.Resource;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.jmockit.Deencapsulation;
import org.apache.doris.datasource.HMSExternalCatalog;
import org.apache.doris.datasource.property.constants.CosProperties;
import org.apache.doris.datasource.property.constants.MinioProperties;
import org.apache.doris.datasource.property.constants.ObsProperties;
import org.apache.doris.datasource.property.constants.OssProperties;
import org.apache.doris.datasource.property.constants.S3Properties;
import org.apache.doris.meta.MetaContext;
import org.apache.doris.tablefunction.S3TableValuedFunction;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.utframe.TestWithFeService;
import com.google.common.collect.ImmutableList;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
public class PropertyConverterTest extends TestWithFeService {
private final Set<String> checkSet = new HashSet<>();
private final Map<String, String> expectedCredential = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
@Override
protected void runBeforeAll() throws Exception {
createDorisCluster();
createDatabase("mock_db");
useDatabase("mock_db");
createTable("create table mock_tbl1 \n" + "(k1 int, k2 int) distributed by hash(k1) buckets 1\n"
+ "properties('replication_num' = '1');");
List<String> withoutPrefix = ImmutableList.of("endpoint", "access_key", "secret_key");
checkSet.addAll(withoutPrefix);
checkSet.addAll(S3Properties.Env.REQUIRED_FIELDS);
expectedCredential.put("access_key", "akk");
expectedCredential.put("secret_key", "skk");
}
@Test
public void testOutFileS3PropertiesConverter() throws Exception {
String query = "select * from mock_tbl1 \n"
+ "into outfile 's3://bucket/mock_dir'\n"
+ "format as csv\n"
+ "properties(\n"
+ " 'AWS_ENDPOINT' = 'http://127.0.0.1:9000',\n"
+ " 'AWS_ACCESS_KEY' = 'akk',\n"
+ " 'AWS_SECRET_KEY'='akk',\n"
+ " 'AWS_REGION' = 'mock',\n"
+ " 'use_path_style' = 'true'\n"
+ ");";
QueryStmt analyzedOutStmt = createStmt(query);
Assertions.assertTrue(analyzedOutStmt.hasOutFileClause());
OutFileClause outFileClause = analyzedOutStmt.getOutFileClause();
boolean isOutFileClauseAnalyzed = Deencapsulation.getField(outFileClause, "isAnalyzed");
Assertions.assertTrue(isOutFileClauseAnalyzed);
Assertions.assertEquals(outFileClause.getFileFormatType(), TFileFormatType.FORMAT_CSV_PLAIN);
String queryNew = "select * from mock_tbl1 \n"
+ "into outfile 's3://bucket/mock_dir'\n"
+ "format as csv\n"
+ "properties(\n"
+ " 's3.endpoint' = 'http://127.0.0.1:9000',\n"
+ " 's3.access_key' = 'akk',\n"
+ " 's3.secret_key'='akk',\n"
+ " 'use_path_style' = 'true'\n"
+ ");";
QueryStmt analyzedOutStmtNew = createStmt(queryNew);
Assertions.assertTrue(analyzedOutStmtNew.hasOutFileClause());
OutFileClause outFileClauseNew = analyzedOutStmtNew.getOutFileClause();
boolean isNewAnalyzed = Deencapsulation.getField(outFileClauseNew, "isAnalyzed");
Assertions.assertTrue(isNewAnalyzed);
}
@Test
public void testS3SourcePropertiesConverter() throws Exception {
String queryOld = "CREATE RESOURCE 'remote_s3'\n"
+ "PROPERTIES\n"
+ "(\n"
+ " 'type' = 's3',\n"
+ " 'AWS_ENDPOINT' = 's3.us-east-1.amazonaws.com',\n"
+ " 'AWS_REGION' = 'us-east-1',\n"
+ " 'AWS_ACCESS_KEY' = 'akk',\n"
+ " 'AWS_SECRET_KEY' = 'skk',\n"
+ " 'AWS_ROOT_PATH' = '/',\n"
+ " 'AWS_BUCKET' = 'bucket',\n"
+ " 's3_validity_check' = 'false'"
+ ");";
CreateResourceStmt analyzedResourceStmt = createStmt(queryOld);
Assertions.assertEquals(analyzedResourceStmt.getProperties().size(), 8);
Resource resource = Resource.fromStmt(analyzedResourceStmt);
// will add converted properties
Assertions.assertEquals(resource.getCopiedProperties().size(), 20);
String queryNew = "CREATE RESOURCE 'remote_new_s3'\n"
+ "PROPERTIES\n"
+ "(\n"
+ " 'type' = 's3',\n"
+ " 's3.endpoint' = 'http://s3.us-east-1.amazonaws.com',\n"
+ " 's3.region' = 'us-east-1',\n"
+ " 's3.access_key' = 'akk',\n"
+ " 's3.secret_key' = 'skk',\n"
+ " 's3.root.path' = '/',\n"
+ " 's3.bucket' = 'bucket',\n"
+ " 's3_validity_check' = 'false'"
+ ");";
CreateResourceStmt analyzedResourceStmtNew = createStmt(queryNew);
Assertions.assertEquals(analyzedResourceStmtNew.getProperties().size(), 8);
Resource newResource = Resource.fromStmt(analyzedResourceStmtNew);
// will add converted properties
Assertions.assertEquals(newResource.getCopiedProperties().size(), 14);
}
@Test
public void testS3RepositoryPropertiesConverter() throws Exception {
FeConstants.runningUnitTest = true;
String s3Repo = "CREATE REPOSITORY `s3_repo`\n"
+ "WITH S3\n"
+ "ON LOCATION 's3://s3-repo'\n"
+ "PROPERTIES\n"
+ "(\n"
+ " 'AWS_ENDPOINT' = 'http://s3.us-east-1.amazonaws.com',\n"
+ " 'AWS_ACCESS_KEY' = 'akk',\n"
+ " 'AWS_SECRET_KEY'='skk',\n"
+ " 'AWS_REGION' = 'us-east-1'\n"
+ ");";
CreateRepositoryStmt analyzedStmt = createStmt(s3Repo);
Assertions.assertEquals(analyzedStmt.getProperties().size(), 4);
Repository repository = getRepository(analyzedStmt, "s3_repo");
Assertions.assertEquals(repository.getRemoteFileSystem().getProperties().size(), 5);
String s3RepoNew = "CREATE REPOSITORY `s3_repo_new`\n"
+ "WITH S3\n"
+ "ON LOCATION 's3://s3-repo'\n"
+ "PROPERTIES\n"
+ "(\n"
+ " 's3.endpoint' = 'http://s3.us-east-1.amazonaws.com',\n"
+ " 's3.access_key' = 'akk',\n"
+ " 's3.secret_key' = 'skk'\n"
+ ");";
CreateRepositoryStmt analyzedStmtNew = createStmt(s3RepoNew);
Assertions.assertEquals(analyzedStmtNew.getProperties().size(), 3);
Repository repositoryNew = getRepository(analyzedStmtNew, "s3_repo_new");
Assertions.assertEquals(repositoryNew.getRemoteFileSystem().getProperties().size(), 4);
}
private static Repository getRepository(CreateRepositoryStmt analyzedStmt, String name) throws DdlException {
Env.getCurrentEnv().getBackupHandler().createRepository(analyzedStmt);
return Env.getCurrentEnv().getBackupHandler().getRepoMgr().getRepo(name);
}
@Test
public void testBosBrokerRepositoryPropertiesConverter() throws Exception {
FeConstants.runningUnitTest = true;
String bosBroker = "CREATE REPOSITORY `bos_broker_repo`\n"
+ "WITH BROKER `bos_broker`\n"
+ "ON LOCATION 'bos://backup'\n"
+ "PROPERTIES\n"
+ "(\n"
+ " 'bos_endpoint' = 'http://gz.bcebos.com',\n"
+ " 'bos_accesskey' = 'akk',\n"
+ " 'bos_secret_accesskey'='skk'\n"
+ ");";
CreateRepositoryStmt analyzedStmt = createStmt(bosBroker);
analyzedStmt.getProperties();
Assertions.assertEquals(analyzedStmt.getProperties().size(), 3);
List<Pair<String, Integer>> brokers = ImmutableList.of(Pair.of("127.0.0.1", 9999));
Env.getCurrentEnv().getBrokerMgr().addBrokers("bos_broker", brokers);
Repository repositoryNew = getRepository(analyzedStmt, "bos_broker_repo");
Assertions.assertEquals(repositoryNew.getRemoteFileSystem().getProperties().size(), 4);
}
@Test
public void testS3TVFPropertiesConverter() throws Exception {
FeConstants.runningUnitTest = true;
String queryOld = "select * from s3(\n"
+ " 'uri' = 'http://s3.us-east-1.amazonaws.com/test.parquet',\n"
+ " 'access_key' = 'akk',\n"
+ " 'secret_key' = 'skk',\n"
+ " 'region' = 'us-east-1',\n"
+ " 'format' = 'parquet',\n"
+ " 'use_path_style' = 'true'\n"
+ ") limit 10;";
SelectStmt analyzedStmt = createStmt(queryOld);
Assertions.assertEquals(analyzedStmt.getTableRefs().size(), 1);
TableValuedFunctionRef oldFuncTable = (TableValuedFunctionRef) analyzedStmt.getTableRefs().get(0);
S3TableValuedFunction s3Tvf = (S3TableValuedFunction) oldFuncTable.getTableFunction();
Assertions.assertEquals(s3Tvf.getBrokerDesc().getProperties().size(), 9);
String queryNew = "select * from s3(\n"
+ " 'uri' = 'http://s3.us-east-1.amazonaws.com/test.parquet',\n"
+ " 's3.access_key' = 'akk',\n"
+ " 's3.secret_key' = 'skk',\n"
+ " 'format' = 'parquet',\n"
+ " 'use_path_style' = 'true'\n"
+ ") limit 10;";
SelectStmt analyzedStmtNew = createStmt(queryNew);
Assertions.assertEquals(analyzedStmtNew.getTableRefs().size(), 1);
TableValuedFunctionRef newFuncTable = (TableValuedFunctionRef) analyzedStmt.getTableRefs().get(0);
S3TableValuedFunction newS3Tvf = (S3TableValuedFunction) newFuncTable.getTableFunction();
Assertions.assertEquals(newS3Tvf.getBrokerDesc().getProperties().size(), 9);
}
@Test
public void testAWSOldCatalogPropertiesConverter() throws Exception {
String queryOld = "create catalog hms_s3_old properties (\n"
+ " 'type'='hms',\n"
+ " 'hive.metastore.uris' = 'thrift://172.21.0.44:7004',\n"
+ " 'AWS_ENDPOINT' = 's3.us-east-1.amazonaws.com',\n"
+ " 'AWS_REGION' = 'us-east-1',\n"
+ " 'AWS_ACCESS_KEY' = 'akk',\n"
+ " 'AWS_SECRET_KEY' = 'skk'\n"
+ ");";
CreateCatalogStmt analyzedStmt = createStmt(queryOld);
HMSExternalCatalog catalog = createAndGetCatalog(analyzedStmt, "hms_s3_old");
Map<String, String> properties = catalog.getCatalogProperty().getProperties();
Assertions.assertEquals(properties.size(), 12);
Map<String, String> hdProps = catalog.getCatalogProperty().getHadoopProperties();
Assertions.assertEquals(hdProps.size(), 21);
}
@Test
public void testS3CatalogPropertiesConverter() throws Exception {
String query = "create catalog hms_s3 properties (\n"
+ " 'type'='hms',\n"
+ " 'hive.metastore.uris' = 'thrift://172.21.0.1:7004',\n"
+ " 's3.endpoint' = 's3.us-east-1.amazonaws.com',\n"
+ " 's3.access_key' = 'akk',\n"
+ " 's3.secret_key' = 'skk'\n"
+ ");";
CreateCatalogStmt analyzedStmt = createStmt(query);
HMSExternalCatalog catalog = createAndGetCatalog(analyzedStmt, "hms_s3");
Map<String, String> properties = catalog.getCatalogProperty().getProperties();
Assertions.assertEquals(properties.size(), 11);
Map<String, String> hdProps = catalog.getCatalogProperty().getHadoopProperties();
Assertions.assertEquals(hdProps.size(), 20);
}
@Test
public void testGlueCatalogPropertiesConverter() throws Exception {
String queryOld = "create catalog hms_glue_old properties (\n"
+ " 'type'='hms',\n"
+ " 'hive.metastore.type'='glue',\n"
+ " 'hive.metastore.uris' = 'thrift://172.21.0.1:7004',\n"
+ " 'aws.glue.endpoint' = 'glue.us-east-1.amazonaws.com',\n"
+ " 'aws.glue.access-key' = 'akk',\n"
+ " 'aws.glue.secret-key' = 'skk',\n"
+ " 'aws.region' = 'us-east-1'\n"
+ ");";
String catalogName = "hms_glue_old";
CreateCatalogStmt analyzedStmt = createStmt(queryOld);
HMSExternalCatalog catalog = createAndGetCatalog(analyzedStmt, catalogName);
Map<String, String> properties = catalog.getCatalogProperty().getProperties();
Assertions.assertEquals(properties.size(), 20);
Map<String, String> hdProps = catalog.getCatalogProperty().getHadoopProperties();
Assertions.assertEquals(hdProps.size(), 29);
String query = "create catalog hms_glue properties (\n"
+ " 'type'='hms',\n"
+ " 'hive.metastore.type'='glue',\n"
+ " 'hive.metastore.uris' = 'thrift://172.21.0.1:7004',\n"
+ " 'glue.endpoint' = 'glue.us-east-1.amazonaws.com',\n"
+ " 'glue.access_key' = 'akk',\n"
+ " 'glue.secret_key' = 'skk'\n"
+ ");";
catalogName = "hms_glue";
CreateCatalogStmt analyzedStmtNew = createStmt(query);
HMSExternalCatalog catalogNew = createAndGetCatalog(analyzedStmtNew, catalogName);
Map<String, String> propertiesNew = catalogNew.getCatalogProperty().getProperties();
Assertions.assertEquals(propertiesNew.size(), 20);
Map<String, String> hdPropsNew = catalogNew.getCatalogProperty().getHadoopProperties();
Assertions.assertEquals(hdPropsNew.size(), 29);
}
@Test
public void testOBSCatalogPropertiesConverter() throws Exception {
String query = "create catalog hms_obs properties (\n"
+ " 'type'='hms',\n"
+ " 'hive.metastore.uris' = 'thrift://172.21.0.1:7004',\n"
+ " 'obs.endpoint' = 'obs.cn-north-4.myhuaweicloud.com',\n"
+ " 'obs.access_key' = 'akk',\n"
+ " 'obs.secret_key' = 'skk'\n"
+ ");";
CreateCatalogStmt analyzedStmt = createStmt(query);
HMSExternalCatalog catalog = createAndGetCatalog(analyzedStmt, "hms_obs");
Map<String, String> properties = catalog.getCatalogProperty().getProperties();
Assertions.assertEquals(properties.size(), 11);
Map<String, String> hdProps = catalog.getCatalogProperty().getHadoopProperties();
Assertions.assertEquals(hdProps.size(), 16);
Map<String, String> expectedMetaProperties = new HashMap<>();
expectedMetaProperties.put("endpoint", "obs.cn-north-4.myhuaweicloud.com");
expectedMetaProperties.put("AWS_ENDPOINT", "obs.cn-north-4.myhuaweicloud.com");
expectedMetaProperties.putAll(expectedCredential);
checkExpectedProperties(ObsProperties.OBS_PREFIX, properties, expectedMetaProperties);
}
@Test
public void testS3CompatibleCatalogPropertiesConverter() throws Exception {
String catalogName0 = "hms_cos";
String query0 = "create catalog " + catalogName0 + " properties (\n"
+ " 'type'='hms',\n"
+ " 'hive.metastore.uris' = 'thrift://172.21.0.1:7004',\n"
+ " 'cos.endpoint' = 'cos.ap-beijing.myqcloud.com',\n"
+ " 'cos.access_key' = 'akk',\n"
+ " 'cos.secret_key' = 'skk'\n"
+ ");";
testS3CompatibleCatalogProperties(catalogName0, CosProperties.COS_PREFIX,
"cos.ap-beijing.myqcloud.com", query0);
String catalogName1 = "hms_oss";
String query1 = "create catalog " + catalogName1 + " properties (\n"
+ " 'type'='hms',\n"
+ " 'hive.metastore.uris' = 'thrift://172.21.0.1:7004',\n"
+ " 'oss.endpoint' = 'oss.oss-cn-beijing.aliyuncs.com',\n"
+ " 'oss.access_key' = 'akk',\n"
+ " 'oss.secret_key' = 'skk'\n"
+ ");";
testS3CompatibleCatalogProperties(catalogName1, OssProperties.OSS_PREFIX,
"oss.oss-cn-beijing.aliyuncs.com", query1);
String catalogName2 = "hms_minio";
String query2 = "create catalog " + catalogName2 + " properties (\n"
+ " 'type'='hms',\n"
+ " 'hive.metastore.uris' = 'thrift://172.21.0.1:7004',\n"
+ " 'minio.endpoint' = 'http://127.0.0.1',\n"
+ " 'minio.access_key' = 'akk',\n"
+ " 'minio.secret_key' = 'skk'\n"
+ ");";
testS3CompatibleCatalogProperties(catalogName2, MinioProperties.MINIO_PREFIX,
"http://127.0.0.1", query2);
}
private void testS3CompatibleCatalogProperties(String catalogName, String prefix,
String endpoint, String sql) throws Exception {
Env.getCurrentEnv().getCatalogMgr().dropCatalog(new DropCatalogStmt(true, catalogName));
CreateCatalogStmt analyzedStmt = createStmt(sql);
HMSExternalCatalog catalog = createAndGetCatalog(analyzedStmt, catalogName);
Map<String, String> properties = catalog.getCatalogProperty().getProperties();
Assertions.assertEquals(properties.size(), 11);
Map<String, String> hdProps = catalog.getCatalogProperty().getHadoopProperties();
Assertions.assertEquals(hdProps.size(), 20);
Map<String, String> expectedMetaProperties = new HashMap<>();
expectedMetaProperties.put("endpoint", endpoint);
expectedMetaProperties.put("AWS_ENDPOINT", endpoint);
expectedMetaProperties.putAll(expectedCredential);
checkExpectedProperties(prefix, properties, expectedMetaProperties);
}
private void checkExpectedProperties(String prefix, Map<String, String> properties,
Map<String, String> expectedProperties) {
properties.forEach((key, value) -> {
if (key.startsWith(prefix)) {
String keyToCheck = key.replace(prefix, "");
if (checkSet.contains(keyToCheck)) {
Assertions.assertEquals(value, expectedProperties.get(keyToCheck));
}
}
});
}
private static HMSExternalCatalog createAndGetCatalog(CreateCatalogStmt analyzedStmt, String name)
throws UserException {
Env.getCurrentEnv().getCatalogMgr().createCatalog(analyzedStmt);
return (HMSExternalCatalog) Env.getCurrentEnv().getCatalogMgr().getCatalog(name);
}
@Test
public void testSerialization() throws Exception {
MetaContext metaContext = new MetaContext();
metaContext.setMetaVersion(FeMetaVersion.VERSION_CURRENT);
metaContext.setThreadLocalInfo();
}
}