blob: 2f2317c8c66d8515be0ccbead2f129a3188384bc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.pipe.it.manual;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
import org.apache.iotdb.db.it.utils.TestUtils;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.MultiClusterIT2ManualCreateSchema;
import org.apache.iotdb.rpc.TSStatusCode;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@RunWith(IoTDBTestRunner.class)
@Category({MultiClusterIT2ManualCreateSchema.class})
public class IoTDBPipeMetaRestartIT extends AbstractPipeDualManualIT {
@Test
public void testAutoRestartSchemaTask() throws Exception {
DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
String receiverIp = receiverDataNode.getIp();
int receiverPort = receiverDataNode.getPort();
try (SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
Map<String, String> extractorAttributes = new HashMap<>();
Map<String, String> processorAttributes = new HashMap<>();
Map<String, String> connectorAttributes = new HashMap<>();
extractorAttributes.put("extractor.inclusion", "all");
extractorAttributes.put("extractor.inclusion.exclusion", "");
extractorAttributes.put("extractor.forwarding-pipe-requests", "false");
connectorAttributes.put("connector", "iotdb-thrift-connector");
connectorAttributes.put("connector.ip", receiverIp);
connectorAttributes.put("connector.port", Integer.toString(receiverPort));
connectorAttributes.put("connector.exception.conflict.resolve-strategy", "retry");
connectorAttributes.put("connector.exception.conflict.retry-max-time-seconds", "-1");
TSStatus status =
client.createPipe(
new TCreatePipeReq("testPipe", connectorAttributes)
.setExtractorAttributes(extractorAttributes)
.setProcessorAttributes(processorAttributes));
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("testPipe").getCode());
}
int successCount = 0;
for (int i = 0; i < 10; ++i) {
if (TestUtils.tryExecuteNonQueryWithRetry(
senderEnv,
String.format(
"create timeseries root.ln.wf01.GPS.status%s with datatype=BOOLEAN,encoding=PLAIN",
i))) {
++successCount;
}
}
TestUtils.restartCluster(senderEnv);
TestUtils.restartCluster(receiverEnv);
for (int i = 10; i < 20; ++i) {
if (TestUtils.tryExecuteNonQueryWithRetry(
senderEnv,
String.format(
"create timeseries root.ln.wf01.GPS.status%s with datatype=BOOLEAN,encoding=PLAIN",
i))) {
++successCount;
}
}
TestUtils.assertDataEventuallyOnEnv(
receiverEnv,
"count timeseries",
"count(timeseries),",
Collections.singleton(String.format("%d,", successCount)));
}
@Test
public void testAutoRestartConfigTask() throws Exception {
DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
String receiverIp = receiverDataNode.getIp();
int receiverPort = receiverDataNode.getPort();
try (SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
Map<String, String> extractorAttributes = new HashMap<>();
Map<String, String> processorAttributes = new HashMap<>();
Map<String, String> connectorAttributes = new HashMap<>();
extractorAttributes.put("extractor.inclusion", "all");
extractorAttributes.put("extractor.inclusion.exclusion", "");
extractorAttributes.put("extractor.forwarding-pipe-requests", "false");
connectorAttributes.put("connector", "iotdb-thrift-connector");
connectorAttributes.put("connector.ip", receiverIp);
connectorAttributes.put("connector.port", Integer.toString(receiverPort));
connectorAttributes.put("connector.exception.conflict.resolve-strategy", "retry");
connectorAttributes.put("connector.exception.conflict.retry-max-time-seconds", "-1");
TSStatus status =
client.createPipe(
new TCreatePipeReq("testPipe", connectorAttributes)
.setExtractorAttributes(extractorAttributes)
.setProcessorAttributes(processorAttributes));
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("testPipe").getCode());
}
int successCount = 0;
for (int i = 0; i < 10; ++i) {
if (TestUtils.tryExecuteNonQueryWithRetry(
senderEnv, String.format("create database root.ln%s", i))) {
++successCount;
}
}
TestUtils.restartCluster(senderEnv);
TestUtils.restartCluster(receiverEnv);
for (int i = 10; i < 20; ++i) {
if (TestUtils.tryExecuteNonQueryWithRetry(
senderEnv, String.format("create database root.ln%s", i))) {
++successCount;
}
}
TestUtils.assertDataEventuallyOnEnv(
receiverEnv,
"count databases",
"count,",
Collections.singleton(String.format("%d,", successCount)));
}
}