| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.rocketmq.connect.runtime.rest; |
| |
| import com.alibaba.fastjson.JSON; |
| import com.alibaba.fastjson.JSONObject; |
| import io.openmessaging.connector.api.component.connector.Connector; |
| import io.openmessaging.connector.api.component.task.source.SourceTask; |
| import io.openmessaging.connector.api.data.ConnectRecord; |
| import io.openmessaging.connector.api.data.RecordConverter; |
| import io.openmessaging.internal.DefaultKeyValue; |
| import java.net.URI; |
| import java.nio.ByteBuffer; |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.atomic.AtomicReference; |
| import java.util.stream.Collectors; |
| import org.apache.http.HttpResponse; |
| import org.apache.http.client.HttpClient; |
| import org.apache.http.client.methods.HttpGet; |
| import org.apache.http.client.utils.URIBuilder; |
| import org.apache.http.impl.client.HttpClientBuilder; |
| import org.apache.http.util.EntityUtils; |
| import org.apache.rocketmq.client.producer.DefaultMQProducer; |
| import org.apache.rocketmq.connect.runtime.controller.distributed.DistributedConnectController; |
| import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue; |
| import org.apache.rocketmq.connect.runtime.config.WorkerConfig; |
| import org.apache.rocketmq.connect.runtime.config.ConnectorConfig; |
| import org.apache.rocketmq.connect.runtime.connectorwrapper.TransformChain; |
| import org.apache.rocketmq.connect.runtime.connectorwrapper.Worker; |
| import org.apache.rocketmq.connect.runtime.connectorwrapper.WorkerConnector; |
| import org.apache.rocketmq.connect.runtime.connectorwrapper.WorkerSourceTask; |
| import org.apache.rocketmq.connect.runtime.connectorwrapper.WorkerState; |
| import org.apache.rocketmq.connect.runtime.errors.ReporterManagerUtil; |
| import org.apache.rocketmq.connect.runtime.errors.RetryWithToleranceOperator; |
| import org.apache.rocketmq.connect.runtime.rest.entities.PluginInfo; |
| import org.apache.rocketmq.connect.runtime.service.ClusterManagementService; |
| import org.apache.rocketmq.connect.runtime.service.ConfigManagementService; |
| import org.apache.rocketmq.connect.runtime.service.DefaultConnectorContext; |
| import org.apache.rocketmq.connect.runtime.service.PositionManagementServiceImpl; |
| import org.apache.rocketmq.connect.runtime.stats.ConnectStatsManager; |
| import org.apache.rocketmq.connect.runtime.stats.ConnectStatsService; |
| import org.apache.rocketmq.connect.runtime.utils.ConnectorTaskId; |
| import org.apache.rocketmq.connect.runtime.controller.isolation.Plugin; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.Test; |
| import org.junit.runner.RunWith; |
| import org.mockito.Mock; |
| import org.mockito.junit.MockitoJUnitRunner; |
| |
| import static org.junit.Assert.assertEquals; |
| import static org.mockito.Mockito.any; |
| import static org.mockito.Mockito.anyString; |
| import static org.mockito.Mockito.when; |
| |
| @RunWith(MockitoJUnitRunner.class) |
| public class RestHandlerTest { |
| |
| @Mock |
| private DistributedConnectController connectController; |
| |
| @Mock |
| private ConfigManagementService configManagementService; |
| |
| @Mock |
| private ClusterManagementService clusterManagementService; |
| |
| @Mock |
| private Worker worker; |
| |
| @Mock |
| private DefaultMQProducer producer; |
| |
| private RestHandler restHandler; |
| |
| @Mock |
| private WorkerConfig connectConfig; |
| |
| @Mock |
| private SourceTask sourceTask; |
| |
| @Mock |
| private RecordConverter converter; |
| |
| @Mock |
| private PositionManagementServiceImpl positionManagementServiceImpl; |
| |
| @Mock |
| private Connector connector; |
| |
| private byte[] sourcePartition; |
| |
| private byte[] sourcePosition; |
| |
| private Map<ByteBuffer, ByteBuffer> positions; |
| |
| private static final String CREATE_CONNECTOR_URL = "http://localhost:8081/connectors/%s"; |
| |
| private static final String STOP_CONNECTOR_URL = "http://localhost:8081/connectors/%s/stop"; |
| |
| private static final String GET_CLUSTER_INFO_URL = "http://localhost:8081/getClusterInfo"; |
| |
| private static final String GET_CONFIG_INFO_URL = "http://localhost:8081/getConfigInfo"; |
| |
| private static final String GET_POSITION_INFO_URL = "http://localhost:8081/getPositionInfo"; |
| |
| private static final String GET_ALLOCATED_CONNECTORS_URL = "http://localhost:8081/getAllocatedConnectors"; |
| |
| private static final String GET_ALLOCATED_TASKS_URL = "http://localhost:8081/getAllocatedTasks"; |
| |
| private static final String QUERY_CONNECTOR_CONFIG_URL = "http://localhost:8081/connectors/testConnector/config"; |
| |
| private static final String QUERY_CONNECTOR_STATUS_URL = "http://localhost:8081/connectors/testConnector/status"; |
| |
| private static final String STOP_ALL_CONNECTOR_URL = "http://localhost:8081/connectors/stopAll"; |
| |
| private static final String PLUGIN_LIST_URL = "http://localhost:8081/plugin/list"; |
| |
| private static final String PLUGIN_RELOAD_URL = "http://localhost:8081/plugin/reload"; |
| |
| private HttpClient httpClient; |
| |
| private List<String> aliveWorker; |
| |
| private Map<String, ConnectKeyValue> connectorConfigs; |
| |
| private Map<String, List<ConnectKeyValue>> taskConfigs; |
| |
| private Set<WorkerConnector> workerConnectors; |
| |
| private Set<Runnable> workerTasks; |
| |
| private AtomicReference<WorkerState> workerState; |
| |
| @Mock |
| private ConnectStatsManager connectStatsManager; |
| |
| @Mock |
| private ConnectStatsService connectStatsService; |
| |
| @Before |
| public void init() throws Exception { |
| workerState = new AtomicReference<>(WorkerState.STARTED); |
| |
| when(connectController.getConnectConfig()).thenReturn(connectConfig); |
| when(connectConfig.getHttpPort()).thenReturn(8081); |
| when(connectController.getConfigManagementService()).thenReturn(configManagementService); |
| when(configManagementService.putConnectorConfig(anyString(), any(ConnectKeyValue.class))).thenReturn(""); |
| |
| String connectName = "testConnector"; |
| ConnectKeyValue connectKeyValue = new ConnectKeyValue(); |
| connectKeyValue.put(ConnectorConfig.CONNECTOR_CLASS, "org.apache.rocketmq.connect.runtime.service.TestConnector"); |
| connectKeyValue.put(ConnectorConfig.VALUE_CONVERTER, "source-record-converter"); |
| |
| ConnectKeyValue connectKeyValue1 = new ConnectKeyValue(); |
| connectKeyValue1.put(ConnectorConfig.CONNECTOR_CLASS, "org.apache.rocketmq.connect.runtime.service.TestConnector"); |
| connectKeyValue1.put(ConnectorConfig.VALUE_CONVERTER, "source-record-converter1"); |
| |
| List<ConnectKeyValue> connectKeyValues = new ArrayList<ConnectKeyValue>(8) { |
| { |
| add(connectKeyValue); |
| } |
| }; |
| connectorConfigs = new HashMap<String, ConnectKeyValue>() { |
| { |
| put(connectName, connectKeyValue); |
| } |
| }; |
| taskConfigs = new HashMap<String, List<ConnectKeyValue>>() { |
| { |
| put(connectName, connectKeyValues); |
| } |
| }; |
| when(configManagementService.getConnectorConfigs()).thenReturn(connectorConfigs); |
| when(configManagementService.getTaskConfigs()).thenReturn(taskConfigs); |
| |
| aliveWorker = new ArrayList<String>() { |
| { |
| add("workerId1"); |
| add("workerId2"); |
| } |
| }; |
| |
| when(connectController.getClusterManagementService()).thenReturn(clusterManagementService); |
| when(clusterManagementService.getAllAliveWorkers()).thenReturn(aliveWorker); |
| |
| sourcePartition = "127.0.0.13306".getBytes("UTF-8"); |
| JSONObject jsonObject = new JSONObject(); |
| // jsonObject.put(MysqlConstants.BINLOG_FILENAME, "binlogFilename"); |
| // jsonObject.put(MysqlConstants.NEXT_POSITION, "100"); |
| sourcePosition = jsonObject.toJSONString().getBytes(); |
| positions = new HashMap<ByteBuffer, ByteBuffer>() { |
| { |
| put(ByteBuffer.wrap(sourcePartition), ByteBuffer.wrap(sourcePosition)); |
| } |
| }; |
| |
| WorkerConnector workerConnector1 = new WorkerConnector("testConnectorName1", connector, connectKeyValue, new DefaultConnectorContext("testConnectorName1", connectController) , null, null); |
| WorkerConnector workerConnector2 = new WorkerConnector("testConnectorName2", connector, connectKeyValue1, new DefaultConnectorContext("testConnectorName2", connectController), null, null); |
| workerConnectors = new HashSet<WorkerConnector>() { |
| { |
| add(workerConnector1); |
| add(workerConnector2); |
| } |
| }; |
| TransformChain<ConnectRecord> transformChain = new TransformChain<ConnectRecord>(new DefaultKeyValue(), new Plugin(new ArrayList<>())); |
| // create retry operator |
| RetryWithToleranceOperator retryWithToleranceOperator = ReporterManagerUtil.createRetryWithToleranceOperator(connectKeyValue); |
| retryWithToleranceOperator.reporters(ReporterManagerUtil.sourceTaskReporters("testConnectorName1", connectKeyValue)); |
| |
| WorkerSourceTask workerSourceTask1 = new WorkerSourceTask(new WorkerConfig(),new ConnectorTaskId("testConnectorName1", 1), sourceTask,null, connectKeyValue, positionManagementServiceImpl, converter, converter, producer, workerState, connectStatsManager, connectStatsService, transformChain,retryWithToleranceOperator, null); |
| |
| // create retry operator |
| RetryWithToleranceOperator retryWithToleranceOperator02 = ReporterManagerUtil.createRetryWithToleranceOperator(connectKeyValue); |
| retryWithToleranceOperator02.reporters(ReporterManagerUtil.sourceTaskReporters("testConnectorName2", connectKeyValue)); |
| |
| WorkerSourceTask workerSourceTask2 = new WorkerSourceTask(new WorkerConfig(), new ConnectorTaskId("testConnectorName2", 1), sourceTask,null, connectKeyValue1, positionManagementServiceImpl, converter, converter, producer, workerState, connectStatsManager, connectStatsService, transformChain,retryWithToleranceOperator02, null); |
| workerTasks = new HashSet<Runnable>() { |
| { |
| add(workerSourceTask1); |
| add(workerSourceTask2); |
| } |
| }; |
| when(connectController.getWorker()).thenReturn(worker); |
| when(worker.getWorkingConnectors()).thenReturn(workerConnectors); |
| |
| List<String> pluginPaths = new ArrayList<>(); |
| pluginPaths.add("src/test/java/org/apache/rocketmq/connect/runtime"); |
| Plugin plugin = new Plugin(pluginPaths); |
| when(connectController.plugin()).thenReturn(plugin); |
| when(configManagementService.getPlugin()).thenReturn(plugin); |
| restHandler = new RestHandler(connectController); |
| |
| httpClient = HttpClientBuilder.create().build(); |
| } |
| |
| @Test |
| public void testRESTful() throws Exception { |
| URIBuilder uriBuilder = new URIBuilder(String.format(CREATE_CONNECTOR_URL, "testConnectorName")); |
| uriBuilder.setParameter("config", "{\"connector-class\": \"org.apache.rocketmq.connect.runtime.connectorwrapper.testimpl.TestConnector\",\"mysqlAddr\": \"112.74.179.68\",\"mysqlPort\": \"3306\",\"mysqlUsername\": \"canal\",\"mysqlPassword\": \"canal\",\"source-record-converter\":\"org.apache.rocketmq.connect.runtime.converter.JsonConverter\"}"); |
| URI uri = uriBuilder.build(); |
| HttpGet httpGet = new HttpGet(uri); |
| HttpResponse httpResponse = httpClient.execute(httpGet); |
| assertEquals(200, httpResponse.getStatusLine().getStatusCode()); |
| assertEquals("success", EntityUtils.toString(httpResponse.getEntity(), "UTF-8")); |
| |
| URIBuilder uriBuilder1 = new URIBuilder(String.format(STOP_CONNECTOR_URL, "testConnectorName")); |
| URI uri1 = uriBuilder1.build(); |
| HttpGet httpGet1 = new HttpGet(uri1); |
| HttpResponse httpResponse1 = httpClient.execute(httpGet1); |
| assertEquals(200, httpResponse1.getStatusLine().getStatusCode()); |
| assertEquals("success", EntityUtils.toString(httpResponse1.getEntity(), "UTF-8")); |
| |
| URIBuilder uriBuilder2 = new URIBuilder(GET_CLUSTER_INFO_URL); |
| URI uri2 = uriBuilder2.build(); |
| HttpGet httpGet2 = new HttpGet(uri2); |
| HttpResponse httpResponse2 = httpClient.execute(httpGet2); |
| assertEquals(200, httpResponse2.getStatusLine().getStatusCode()); |
| assertEquals(JSON.toJSONString(aliveWorker), EntityUtils.toString(httpResponse2.getEntity(), "UTF-8")); |
| |
| URIBuilder uriBuilder3 = new URIBuilder(GET_CONFIG_INFO_URL); |
| URI uri3 = uriBuilder3.build(); |
| HttpGet httpGet3 = new HttpGet(uri3); |
| HttpResponse httpResponse3 = httpClient.execute(httpGet3); |
| assertEquals(200, httpResponse3.getStatusLine().getStatusCode()); |
| Map<String, Map> formatter = new HashMap<>(); |
| formatter.put("connectorConfigs", connectorConfigs); |
| formatter.put("taskConfigs", taskConfigs); |
| assertEquals(JSON.toJSONString(formatter), EntityUtils.toString(httpResponse3.getEntity(), "UTF-8")); |
| |
| URIBuilder uriBuilder4 = new URIBuilder(GET_ALLOCATED_CONNECTORS_URL); |
| URI uri4 = uriBuilder4.build(); |
| HttpGet httpGet4 = new HttpGet(uri4); |
| HttpResponse httpResponse4 = httpClient.execute(httpGet4); |
| assertEquals(200, httpResponse4.getStatusLine().getStatusCode()); |
| Map<String, ConnectKeyValue> connectors = new HashMap<>(); |
| for (WorkerConnector workerConnector : workerConnectors) { |
| connectors.put(workerConnector.getConnectorName(), workerConnector.getKeyValue()); |
| } |
| assertEquals(JSON.toJSONString(connectors), EntityUtils.toString(httpResponse4.getEntity(), "UTF-8")); |
| |
| URIBuilder uriBuilder5 = new URIBuilder(GET_ALLOCATED_TASKS_URL); |
| URI uri5 = uriBuilder5.build(); |
| HttpGet httpGet5 = new HttpGet(uri5); |
| HttpResponse httpResponse5 = httpClient.execute(httpGet5); |
| assertEquals(200, httpResponse5.getStatusLine().getStatusCode()); |
| Map<String, Object> formatter2 = new HashMap<>(); |
| formatter2.put("pendingTasks", new HashSet<>()); |
| formatter2.put("runningTasks", new HashSet<>()); |
| formatter2.put("stoppingTasks", new HashSet<>()); |
| formatter2.put("stoppedTasks", new HashSet<>()); |
| formatter2.put("errorTasks", new HashSet<>()); |
| assertEquals(JSON.toJSONString(formatter2), EntityUtils.toString(httpResponse5.getEntity(), "UTF-8")); |
| |
| URIBuilder uriBuilder6 = new URIBuilder(QUERY_CONNECTOR_CONFIG_URL); |
| URI uri6 = uriBuilder6.build(); |
| HttpGet httpGet6 = new HttpGet(uri6); |
| HttpResponse httpResponse6 = httpClient.execute(httpGet6); |
| assertEquals(200, httpResponse6.getStatusLine().getStatusCode()); |
| String connectorName = "testConnector"; |
| Map<String, ConnectKeyValue> connectorConfigs = connectController.getConfigManagementService().getConnectorConfigs(); |
| Map<String, List<ConnectKeyValue>> taskConfigs = connectController.getConfigManagementService().getTaskConfigs(); |
| StringBuilder sb = new StringBuilder(); |
| sb.append("ConnectorConfigs:") |
| .append(JSON.toJSONString(connectorConfigs.get(connectorName))) |
| .append("\n") |
| .append("TaskConfigs:") |
| .append(JSON.toJSONString(taskConfigs.get(connectorName))); |
| assertEquals(sb.toString(), EntityUtils.toString(httpResponse6.getEntity(), "UTF-8")); |
| |
| URIBuilder uriBuilder7 = new URIBuilder(QUERY_CONNECTOR_STATUS_URL); |
| URI uri7 = uriBuilder7.build(); |
| HttpGet httpGet7 = new HttpGet(uri7); |
| HttpResponse httpResponse7 = httpClient.execute(httpGet7); |
| assertEquals(200, httpResponse7.getStatusLine().getStatusCode()); |
| assertEquals("running", EntityUtils.toString(httpResponse7.getEntity(), "UTF-8")); |
| |
| URIBuilder uriBuilder8 = new URIBuilder(STOP_ALL_CONNECTOR_URL); |
| URI uri8 = uriBuilder8.build(); |
| HttpGet httpGet8 = new HttpGet(uri8); |
| HttpResponse httpResponse8 = httpClient.execute(httpGet8); |
| assertEquals(200, httpResponse8.getStatusLine().getStatusCode()); |
| assertEquals("success", EntityUtils.toString(httpResponse8.getEntity(), "UTF-8")); |
| |
| URIBuilder uriBuilder9 = new URIBuilder(PLUGIN_LIST_URL); |
| URI uri9 = uriBuilder9.build(); |
| HttpGet httpGet9 = new HttpGet(uri9); |
| HttpResponse httpResponse9 = httpClient.execute(httpGet9); |
| assertEquals(200, httpResponse9.getStatusLine().getStatusCode()); |
| List<PluginInfo> connectorPlugins = JSON.parseArray(EntityUtils.toString(httpResponse9.getEntity(), "UTF-8"), PluginInfo.class); |
| final Map<String, PluginInfo> connectorPluginMap = connectorPlugins.stream().collect(Collectors.toMap(PluginInfo::getClassName, item -> item, (k1, k2) -> k1)); |
| Assert.assertTrue(connectorPluginMap.containsKey("org.apache.rocketmq.connect.runtime.connectorwrapper.TestTransform")); |
| Assert.assertTrue(connectorPluginMap.containsKey("org.apache.rocketmq.connect.runtime.connectorwrapper.testimpl.TestConverter")); |
| |
| URIBuilder uriBuilder10 = new URIBuilder(PLUGIN_RELOAD_URL); |
| URI uri10 = uriBuilder10.build(); |
| HttpGet httpGet10 = new HttpGet(uri10); |
| HttpResponse httpResponse10 = httpClient.execute(httpGet10); |
| assertEquals(200, httpResponse10.getStatusLine().getStatusCode()); |
| assertEquals("success", EntityUtils.toString(httpResponse10.getEntity(), "UTF-8")); |
| |
| } |
| |
| } |