blob: 456387681c91d423a22f166f06bed26a3c50708c [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.flink.sink.schema;
import org.apache.doris.flink.catalog.doris.FieldSchema;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.exception.IllegalArgumentException;
import org.apache.doris.flink.sink.BackendUtil;
import org.apache.doris.flink.sink.HttpEntityMock;
import org.apache.doris.flink.sink.OptionUtils;
import org.apache.http.ProtocolVersion;
import org.apache.http.StatusLine;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.message.BasicStatusLine;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.MockedStatic;
import java.io.IOException;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.when;
public class SchemaManagerTest {
static String queryResponse =
"{\n"
+ " \"data\": {\n"
+ " \"type\": \"result_set\",\n"
+ " \"meta\": [{\"name\":\"COLUMN_NAME\",\"type\":\"CHAR\"}],\n"
+ " \"data\": [\n"
+ " [\"age\"]\n"
+ " ],\n"
+ " \"time\": 15\n"
+ " },\n"
+ " \"msg\": \"success\",\n"
+ " \"code\": 0\n"
+ "}";
static String queryNoExistsResponse =
"{\n"
+ " \"data\": {\n"
+ " \"type\": \"result_set\",\n"
+ " \"meta\": [{\"name\":\"COLUMN_NAME\",\"type\":\"CHAR\"}],\n"
+ " \"data\": [],\n"
+ " \"time\": 0\n"
+ " },\n"
+ " \"msg\": \"success\",\n"
+ " \"code\": 0\n"
+ "}";
HttpEntityMock entityMock;
SchemaChangeManager schemaChangeManager;
static MockedStatic<HttpClients> httpClientMockedStatic;
static MockedStatic<BackendUtil> backendUtilMockedStatic;
@Before
public void setUp() throws IOException {
DorisOptions dorisOptions = OptionUtils.buildDorisOptions();
schemaChangeManager = new SchemaChangeManager(dorisOptions);
CloseableHttpClient httpClient = mock(CloseableHttpClient.class);
entityMock = new HttpEntityMock();
CloseableHttpResponse httpResponse = mock(CloseableHttpResponse.class);
StatusLine normalLine = new BasicStatusLine(new ProtocolVersion("http", 1, 0), 200, "");
when(httpClient.execute(any())).thenReturn(httpResponse);
when(httpResponse.getStatusLine()).thenReturn(normalLine);
when(httpResponse.getEntity()).thenReturn(entityMock);
when(httpClient.execute(any())).thenReturn(httpResponse);
when(httpResponse.getStatusLine()).thenReturn(normalLine);
when(httpResponse.getEntity()).thenReturn(entityMock);
httpClientMockedStatic = mockStatic(HttpClients.class);
httpClientMockedStatic.when(() -> HttpClients.createDefault()).thenReturn(httpClient);
backendUtilMockedStatic = mockStatic(BackendUtil.class);
backendUtilMockedStatic.when(() -> BackendUtil.tryHttpConnection(any())).thenReturn(true);
}
@Test
public void testColumnExists() throws IOException, IllegalArgumentException {
entityMock.setValue(queryResponse);
boolean columnExists = schemaChangeManager.checkColumnExists("test", "test_flink", "age");
Assert.assertEquals(true, columnExists);
}
@Test
public void testColumnNotExists() throws IOException, IllegalArgumentException {
entityMock.setValue(queryNoExistsResponse);
boolean columnExists = schemaChangeManager.checkColumnExists("test", "test_flink", "age1");
Assert.assertEquals(false, columnExists);
}
@Test
public void testAddColumn() {
FieldSchema field = new FieldSchema("col", "int", "comment \"'sdf'");
String addColumnDDL = SchemaChangeHelper.buildAddColumnDDL("test.test_flink", field);
Assert.assertEquals(
"ALTER TABLE `test`.`test_flink` ADD COLUMN `col` int COMMENT 'comment \"\\'sdf\\''",
addColumnDDL);
field = new FieldSchema("col", "int", "10", "comment \"'sdf'");
addColumnDDL = SchemaChangeHelper.buildAddColumnDDL("test.test_flink", field);
Assert.assertEquals(
"ALTER TABLE `test`.`test_flink` ADD COLUMN `col` int DEFAULT '10' COMMENT 'comment \"\\'sdf\\''",
addColumnDDL);
field = new FieldSchema("col", "int", "current_timestamp", "comment \"'sdf'");
addColumnDDL = SchemaChangeHelper.buildAddColumnDDL("test.test_flink", field);
Assert.assertEquals(
"ALTER TABLE `test`.`test_flink` ADD COLUMN `col` int DEFAULT current_timestamp COMMENT 'comment \"\\'sdf\\''",
addColumnDDL);
}
@Test
public void testDropColumn() {
String dropColumnDDL = SchemaChangeHelper.buildDropColumnDDL("test.test_flink", "col");
Assert.assertEquals("ALTER TABLE `test`.`test_flink` DROP COLUMN `col`", dropColumnDDL);
}
@Test
public void testRenameColumn() {
String renameColumnDDL =
SchemaChangeHelper.buildRenameColumnDDL("test.test_flink", "col", "col_new");
Assert.assertEquals(
"ALTER TABLE `test`.`test_flink` RENAME COLUMN `col` `col_new`", renameColumnDDL);
}
@After
public void after() {
httpClientMockedStatic.close();
backendUtilMockedStatic.close();
}
}