blob: 4246b305861043bf4135caff5bf3d87a9f7660ef [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.analysis;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.UserException;
import org.apache.doris.mysql.privilege.PaloAuth;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
import com.google.common.collect.Maps;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.Map;
import mockit.Expectations;
import mockit.Mocked;
/*
* Author: Chenmingyu
* Date: Jul 20, 2020
*/
public class AlterRoutineLoadStmtTest {
private Analyzer analyzer;
@Mocked
private PaloAuth auth;
@Before
public void setUp() {
analyzer = AccessTestUtil.fetchAdminAnalyzer(false);
new Expectations() {
{
auth.checkGlobalPriv((ConnectContext) any, (PrivPredicate) any);
minTimes = 0;
result = true;
auth.checkDbPriv((ConnectContext) any, anyString, (PrivPredicate) any);
minTimes = 0;
result = true;
auth.checkTblPriv((ConnectContext) any, anyString, anyString, (PrivPredicate) any);
minTimes = 0;
result = true;
}
};
}
@Test
public void testNormal() {
{
Map<String, String> jobProperties = Maps.newHashMap();
jobProperties.put(CreateRoutineLoadStmt.MAX_ERROR_NUMBER_PROPERTY, "100");
jobProperties.put(CreateRoutineLoadStmt.MAX_BATCH_ROWS_PROPERTY, "200000");
String typeName = "kafka";
Map<String, String> dataSourceProperties = Maps.newHashMap();
dataSourceProperties.put("property.client.id", "101");
dataSourceProperties.put("property.group.id", "mygroup");
dataSourceProperties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, "1,2,3");
dataSourceProperties.put(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY, "10000, 20000, 30000");
RoutineLoadDataSourceProperties routineLoadDataSourceProperties = new RoutineLoadDataSourceProperties(
typeName, dataSourceProperties);
AlterRoutineLoadStmt stmt = new AlterRoutineLoadStmt(new LabelName("db1", "label1"),
jobProperties, routineLoadDataSourceProperties);
try {
stmt.analyze(analyzer);
} catch (UserException e) {
Assert.fail();
}
Assert.assertEquals(2, stmt.getAnalyzedJobProperties().size());
Assert.assertTrue(stmt.getAnalyzedJobProperties().containsKey(CreateRoutineLoadStmt.MAX_ERROR_NUMBER_PROPERTY));
Assert.assertTrue(stmt.getAnalyzedJobProperties().containsKey(CreateRoutineLoadStmt.MAX_BATCH_ROWS_PROPERTY));
Assert.assertTrue(stmt.hasDataSourceProperty());
Assert.assertEquals(2, stmt.getDataSourceProperties().getCustomKafkaProperties().size());
Assert.assertTrue(stmt.getDataSourceProperties().getCustomKafkaProperties().containsKey("group.id"));
Assert.assertTrue(stmt.getDataSourceProperties().getCustomKafkaProperties().containsKey("client.id"));
Assert.assertEquals(3, stmt.getDataSourceProperties().getKafkaPartitionOffsets().size());
}
}
@Test(expected = AnalysisException.class)
public void testNoProperties() throws AnalysisException, UserException {
AlterRoutineLoadStmt stmt = new AlterRoutineLoadStmt(new LabelName("db1", "label1"),
Maps.newHashMap(), new RoutineLoadDataSourceProperties());
stmt.analyze(analyzer);
}
@Test
public void testUnsupportedProperties() {
{
Map<String, String> jobProperties = Maps.newHashMap();
jobProperties.put(CreateRoutineLoadStmt.FORMAT, "csv");
AlterRoutineLoadStmt stmt = new AlterRoutineLoadStmt(new LabelName("db1", "label1"),
jobProperties, new RoutineLoadDataSourceProperties());
try {
stmt.analyze(analyzer);
Assert.fail();
} catch (AnalysisException e) {
Assert.assertTrue(e.getMessage().contains("format is invalid property"));
} catch (UserException e) {
Assert.fail();
}
}
{
Map<String, String> jobProperties = Maps.newHashMap();
jobProperties.put(CreateRoutineLoadStmt.MAX_ERROR_NUMBER_PROPERTY, "100");
String typeName = "kafka";
Map<String, String> dataSourceProperties = Maps.newHashMap();
dataSourceProperties.put(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY, "new_topic");
RoutineLoadDataSourceProperties routineLoadDataSourceProperties = new RoutineLoadDataSourceProperties(
typeName, dataSourceProperties);
AlterRoutineLoadStmt stmt = new AlterRoutineLoadStmt(new LabelName("db1", "label1"),
jobProperties, routineLoadDataSourceProperties);
try {
stmt.analyze(analyzer);
Assert.fail();
} catch (AnalysisException e) {
Assert.assertTrue(e.getMessage().contains("kafka_topic is invalid kafka custom property"));
} catch (UserException e) {
Assert.fail();
}
}
{
Map<String, String> jobProperties = Maps.newHashMap();
jobProperties.put(CreateRoutineLoadStmt.MAX_ERROR_NUMBER_PROPERTY, "100");
String typeName = "kafka";
Map<String, String> dataSourceProperties = Maps.newHashMap();
dataSourceProperties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, "1,2,3");
RoutineLoadDataSourceProperties routineLoadDataSourceProperties = new RoutineLoadDataSourceProperties(
typeName, dataSourceProperties);
AlterRoutineLoadStmt stmt = new AlterRoutineLoadStmt(new LabelName("db1", "label1"),
jobProperties, routineLoadDataSourceProperties);
try {
stmt.analyze(analyzer);
Assert.fail();
} catch (AnalysisException e) {
Assert.assertTrue(e.getMessage().contains("Partition and offset must be specified at the same time"));
} catch (UserException e) {
Assert.fail();
}
}
{
Map<String, String> jobProperties = Maps.newHashMap();
jobProperties.put(CreateRoutineLoadStmt.MAX_ERROR_NUMBER_PROPERTY, "100");
String typeName = "kafka";
Map<String, String> dataSourceProperties = Maps.newHashMap();
dataSourceProperties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, "1,2,3");
dataSourceProperties.put(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY, "1000, 2000");
RoutineLoadDataSourceProperties routineLoadDataSourceProperties = new RoutineLoadDataSourceProperties(
typeName, dataSourceProperties);
AlterRoutineLoadStmt stmt = new AlterRoutineLoadStmt(new LabelName("db1", "label1"),
jobProperties, routineLoadDataSourceProperties);
try {
stmt.analyze(analyzer);
Assert.fail();
} catch (AnalysisException e) {
Assert.assertTrue(e.getMessage().contains("Partitions number should be equals to offsets number"));
} catch (UserException e) {
Assert.fail();
}
}
{
Map<String, String> jobProperties = Maps.newHashMap();
jobProperties.put(CreateRoutineLoadStmt.MAX_ERROR_NUMBER_PROPERTY, "100");
String typeName = "kafka";
Map<String, String> dataSourceProperties = Maps.newHashMap();
dataSourceProperties.put(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY, "1000, 2000, 3000");
RoutineLoadDataSourceProperties routineLoadDataSourceProperties = new RoutineLoadDataSourceProperties(
typeName, dataSourceProperties);
AlterRoutineLoadStmt stmt = new AlterRoutineLoadStmt(new LabelName("db1", "label1"),
jobProperties, routineLoadDataSourceProperties);
try {
stmt.analyze(analyzer);
Assert.fail();
} catch (AnalysisException e) {
Assert.assertTrue(e.getMessage().contains("Missing kafka partition info"));
} catch (UserException e) {
Assert.fail();
}
}
{
Map<String, String> jobProperties = Maps.newHashMap();
jobProperties.put(CreateRoutineLoadStmt.MAX_ERROR_NUMBER_PROPERTY, "100");
jobProperties.put(CreateRoutineLoadStmt.MAX_BATCH_SIZE_PROPERTY, "200000");
String typeName = "kafka";
Map<String, String> dataSourceProperties = Maps.newHashMap();
dataSourceProperties.put("property.client.id", "101");
dataSourceProperties.put("property.group.id", "mygroup");
dataSourceProperties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, "1,2,3");
dataSourceProperties.put(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY, "10000, 20000, 30000");
RoutineLoadDataSourceProperties routineLoadDataSourceProperties = new RoutineLoadDataSourceProperties(
typeName, dataSourceProperties);
AlterRoutineLoadStmt stmt = new AlterRoutineLoadStmt(new LabelName("db1", "label1"),
jobProperties, routineLoadDataSourceProperties);
try {
stmt.analyze(analyzer);
Assert.fail();
} catch (AnalysisException e) {
Assert.assertTrue(e.getMessage().contains("max_batch_size should between 100MB and 1GB"));
} catch (UserException e) {
Assert.fail();
}
}
}
}