blob: 69a45806079fa984cf242187eb1df46c82475625 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.session;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.isession.SessionConfig;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.RedirectException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsOfOneDeviceReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertTabletReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertTabletsReq;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.write.record.Tablet;
import org.apache.tsfile.write.schema.MeasurementSchema;
import org.junit.Assert;
import org.junit.Test;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.fail;
public class SessionCacheLeaderTest {
private static final List<TEndPoint> endpoints =
new ArrayList<TEndPoint>() {
{
add(new TEndPoint("127.0.0.1", 55560)); // default endpoint
add(new TEndPoint("127.0.0.1", 55561)); // meta leader endpoint
add(new TEndPoint("127.0.0.1", 55562));
add(new TEndPoint("127.0.0.1", 55563));
}
};
private Session session;
// just for simulation
public static TEndPoint getDeviceIdBelongedEndpoint(String deviceId) {
if (deviceId.startsWith("root.sg1")) {
return endpoints.get(0);
} else if (deviceId.startsWith("root.sg2")) {
return endpoints.get(1);
} else if (deviceId.startsWith("root.sg3")) {
return endpoints.get(2);
} else if (deviceId.startsWith("root.sg4")) {
return endpoints.get(3);
}
return endpoints.get(deviceId.hashCode() % endpoints.size());
}
@Test
public void testInsertRecord() throws IoTDBConnectionException, StatementExecutionException {
// without leader cache
session = new MockSession("127.0.0.1", 55560, false);
session.open();
assertNull(session.deviceIdToEndpoint);
assertNull(session.endPointToSessionConnection);
String deviceId = "root.sg2.d1";
List<String> measurements = new ArrayList<>();
List<TSDataType> types = new ArrayList<>();
measurements.add("s1");
measurements.add("s2");
measurements.add("s3");
types.add(TSDataType.INT64);
types.add(TSDataType.INT64);
types.add(TSDataType.INT64);
for (long time = 0; time < 100; time++) {
List<Object> values = new ArrayList<>();
values.add(1L);
values.add(2L);
values.add(3L);
session.insertRecord(deviceId, time, measurements, types, values);
}
assertNull(session.deviceIdToEndpoint);
assertNull(session.endPointToSessionConnection);
session.close();
// with leader cache
session = new MockSession("127.0.0.1", 55560, true);
session.open();
assertEquals(0, session.deviceIdToEndpoint.size());
assertEquals(1, session.endPointToSessionConnection.size());
for (long time = 0; time < 100; time++) {
List<Object> values = new ArrayList<>();
values.add(1L);
values.add(2L);
values.add(3L);
session.insertRecord(deviceId, time, measurements, types, values);
}
assertEquals(1, session.deviceIdToEndpoint.size());
assertEquals(getDeviceIdBelongedEndpoint(deviceId), session.deviceIdToEndpoint.get(deviceId));
assertEquals(2, session.endPointToSessionConnection.size());
session.close();
}
@Test
public void testInsertStringRecord()
throws IoTDBConnectionException, StatementExecutionException {
// without leader cache
session = new MockSession("127.0.0.1", 55560, false);
session.open();
assertNull(session.deviceIdToEndpoint);
assertNull(session.endPointToSessionConnection);
String deviceId = "root.sg2.d1";
List<String> measurements = new ArrayList<>();
measurements.add("s1");
measurements.add("s2");
measurements.add("s3");
for (long time = 0; time < 10; time++) {
List<String> values = new ArrayList<>();
values.add("1");
values.add("2");
values.add("3");
session.insertRecord(deviceId, time, measurements, values);
}
assertNull(session.deviceIdToEndpoint);
assertNull(session.endPointToSessionConnection);
session.close();
// with leader cache
session = new MockSession("127.0.0.1", 55560, true);
session.open();
assertEquals(0, session.deviceIdToEndpoint.size());
assertEquals(1, session.endPointToSessionConnection.size());
for (long time = 0; time < 10; time++) {
List<String> values = new ArrayList<>();
values.add("1");
values.add("2");
values.add("3");
session.insertRecord(deviceId, time, measurements, values);
}
assertEquals(1, session.deviceIdToEndpoint.size());
assertEquals(getDeviceIdBelongedEndpoint(deviceId), session.deviceIdToEndpoint.get(deviceId));
assertEquals(2, session.endPointToSessionConnection.size());
session.close();
}
@Test
public void testInsertRecords() throws IoTDBConnectionException, StatementExecutionException {
// without leader cache
session = new MockSession("127.0.0.1", 55560, false);
session.open();
assertNull(session.deviceIdToEndpoint);
assertNull(session.endPointToSessionConnection);
List<String> allDeviceIds =
new ArrayList<String>() {
{
add("root.sg1.d1");
add("root.sg2.d1");
add("root.sg3.d1");
add("root.sg4.d1");
}
};
List<String> measurements = new ArrayList<>();
measurements.add("s1");
measurements.add("s2");
measurements.add("s3");
List<String> deviceIds = new ArrayList<>();
List<List<String>> measurementsList = new ArrayList<>();
List<List<Object>> valuesList = new ArrayList<>();
List<Long> timestamps = new ArrayList<>();
List<List<TSDataType>> typesList = new ArrayList<>();
for (long time = 0; time < 500; time++) {
List<Object> values = new ArrayList<>();
List<TSDataType> types = new ArrayList<>();
values.add(1L);
values.add(2L);
values.add(3L);
types.add(TSDataType.INT64);
types.add(TSDataType.INT64);
types.add(TSDataType.INT64);
deviceIds.add(allDeviceIds.get((int) (time % allDeviceIds.size())));
measurementsList.add(measurements);
valuesList.add(values);
typesList.add(types);
timestamps.add(time);
if (time != 0 && time % 100 == 0) {
session.insertRecords(deviceIds, timestamps, measurementsList, typesList, valuesList);
deviceIds.clear();
measurementsList.clear();
valuesList.clear();
timestamps.clear();
}
}
session.insertRecords(deviceIds, timestamps, measurementsList, typesList, valuesList);
deviceIds.clear();
measurementsList.clear();
valuesList.clear();
timestamps.clear();
assertNull(session.deviceIdToEndpoint);
assertNull(session.endPointToSessionConnection);
session.close();
// with leader cache
session = new MockSession("127.0.0.1", 55560, true);
session.open();
assertEquals(0, session.deviceIdToEndpoint.size());
assertEquals(1, session.endPointToSessionConnection.size());
for (long time = 0; time < 500; time++) {
List<Object> values = new ArrayList<>();
List<TSDataType> types = new ArrayList<>();
values.add(1L);
values.add(2L);
values.add(3L);
types.add(TSDataType.INT64);
types.add(TSDataType.INT64);
types.add(TSDataType.INT64);
deviceIds.add(allDeviceIds.get((int) (time % allDeviceIds.size())));
measurementsList.add(measurements);
valuesList.add(values);
typesList.add(types);
timestamps.add(time);
if (time != 0 && time % 100 == 0) {
session.insertRecords(deviceIds, timestamps, measurementsList, typesList, valuesList);
deviceIds.clear();
measurementsList.clear();
valuesList.clear();
timestamps.clear();
}
}
session.insertRecords(deviceIds, timestamps, measurementsList, typesList, valuesList);
assertEquals(4, session.deviceIdToEndpoint.size());
for (String deviceId : allDeviceIds) {
assertEquals(getDeviceIdBelongedEndpoint(deviceId), session.deviceIdToEndpoint.get(deviceId));
}
assertEquals(4, session.endPointToSessionConnection.size());
session.close();
}
@Test
public void testInsertStringRecords()
throws IoTDBConnectionException, StatementExecutionException {
// without leader cache
session = new MockSession("127.0.0.1", 55560, false);
session.open();
assertNull(session.deviceIdToEndpoint);
assertNull(session.endPointToSessionConnection);
List<String> allDeviceIds =
new ArrayList<String>() {
{
add("root.sg1.d1");
add("root.sg2.d1");
add("root.sg3.d1");
add("root.sg4.d1");
}
};
List<String> measurements = new ArrayList<>();
measurements.add("s1");
measurements.add("s2");
measurements.add("s3");
List<String> deviceIds = new ArrayList<>();
List<List<String>> measurementsList = new ArrayList<>();
List<List<String>> valuesList = new ArrayList<>();
List<Long> timestamps = new ArrayList<>();
for (long time = 0; time < 500; time++) {
List<String> values = new ArrayList<>();
values.add("1");
values.add("2");
values.add("3");
deviceIds.add(allDeviceIds.get((int) (time % allDeviceIds.size())));
measurementsList.add(measurements);
valuesList.add(values);
timestamps.add(time);
if (time != 0 && time % 100 == 0) {
session.insertRecords(deviceIds, timestamps, measurementsList, valuesList);
deviceIds.clear();
measurementsList.clear();
valuesList.clear();
timestamps.clear();
}
}
session.insertRecords(deviceIds, timestamps, measurementsList, valuesList);
deviceIds.clear();
measurementsList.clear();
valuesList.clear();
timestamps.clear();
assertNull(session.deviceIdToEndpoint);
assertNull(session.endPointToSessionConnection);
session.close();
// with leader cache
session = new MockSession("127.0.0.1", 55560, true);
session.open();
assertEquals(0, session.deviceIdToEndpoint.size());
assertEquals(1, session.endPointToSessionConnection.size());
for (long time = 0; time < 500; time++) {
List<String> values = new ArrayList<>();
values.add("1");
values.add("2");
values.add("3");
deviceIds.add(allDeviceIds.get((int) (time % allDeviceIds.size())));
measurementsList.add(measurements);
valuesList.add(values);
timestamps.add(time);
if (time != 0 && time % 100 == 0) {
session.insertRecords(deviceIds, timestamps, measurementsList, valuesList);
deviceIds.clear();
measurementsList.clear();
valuesList.clear();
timestamps.clear();
}
}
session.insertRecords(deviceIds, timestamps, measurementsList, valuesList);
assertEquals(4, session.deviceIdToEndpoint.size());
for (String deviceId : allDeviceIds) {
assertEquals(getDeviceIdBelongedEndpoint(deviceId), session.deviceIdToEndpoint.get(deviceId));
}
assertEquals(4, session.endPointToSessionConnection.size());
session.close();
}
@Test
public void testInsertRecordsOfOneDevice()
throws IoTDBConnectionException, StatementExecutionException {
// without leader cache
session = new MockSession("127.0.0.1", 55560, false);
session.open();
assertNull(session.deviceIdToEndpoint);
assertNull(session.endPointToSessionConnection);
String deviceId = "root.sg2.d2";
List<Long> times = new ArrayList<>();
List<List<String>> measurements = new ArrayList<>();
List<List<TSDataType>> datatypes = new ArrayList<>();
List<List<Object>> values = new ArrayList<>();
addLine(
times,
measurements,
datatypes,
values,
3L,
"s1",
"s2",
TSDataType.INT32,
TSDataType.INT32,
1,
2);
addLine(
times,
measurements,
datatypes,
values,
2L,
"s2",
"s3",
TSDataType.INT32,
TSDataType.INT64,
3,
4L);
addLine(
times,
measurements,
datatypes,
values,
1L,
"s4",
"s5",
TSDataType.FLOAT,
TSDataType.BOOLEAN,
5.0f,
Boolean.TRUE);
session.insertRecordsOfOneDevice(deviceId, times, measurements, datatypes, values);
assertNull(session.deviceIdToEndpoint);
assertNull(session.endPointToSessionConnection);
session.close();
// with leader cache
session = new MockSession("127.0.0.1", 55560, true);
session.open();
assertEquals(0, session.deviceIdToEndpoint.size());
assertEquals(1, session.endPointToSessionConnection.size());
session.insertRecordsOfOneDevice(deviceId, times, measurements, datatypes, values);
assertEquals(1, session.deviceIdToEndpoint.size());
assertEquals(2, session.endPointToSessionConnection.size());
session.close();
}
@Test
public void testInsertTablet() throws IoTDBConnectionException, StatementExecutionException {
// without leader cache
session = new MockSession("127.0.0.1", 55560, false);
session.open();
assertNull(session.deviceIdToEndpoint);
assertNull(session.endPointToSessionConnection);
String deviceId = "root.sg2.d2";
List<MeasurementSchema> schemaList = new ArrayList<>();
schemaList.add(new MeasurementSchema("s1", TSDataType.INT64));
schemaList.add(new MeasurementSchema("s2", TSDataType.INT64));
schemaList.add(new MeasurementSchema("s3", TSDataType.INT64));
Tablet tablet = new Tablet(deviceId, schemaList, 100);
long timestamp = System.currentTimeMillis();
for (long row = 0; row < 100; row++) {
int rowIndex = tablet.rowSize++;
tablet.addTimestamp(rowIndex, timestamp);
for (int s = 0; s < 3; s++) {
long value = new Random().nextLong();
tablet.addValue(schemaList.get(s).getMeasurementId(), rowIndex, value);
}
if (tablet.rowSize == tablet.getMaxRowNumber()) {
session.insertTablet(tablet, true);
tablet.reset();
}
timestamp++;
}
if (tablet.rowSize != 0) {
session.insertTablet(tablet);
tablet.reset();
}
assertNull(session.deviceIdToEndpoint);
assertNull(session.endPointToSessionConnection);
session.close();
// with leader cache
session = new MockSession("127.0.0.1", 55560, true);
session.open();
assertEquals(0, session.deviceIdToEndpoint.size());
assertEquals(1, session.endPointToSessionConnection.size());
for (long row = 0; row < 100; row++) {
int rowIndex = tablet.rowSize++;
tablet.addTimestamp(rowIndex, timestamp);
for (int s = 0; s < 3; s++) {
long value = new Random().nextLong();
tablet.addValue(schemaList.get(s).getMeasurementId(), rowIndex, value);
}
if (tablet.rowSize == tablet.getMaxRowNumber()) {
session.insertTablet(tablet, true);
tablet.reset();
}
timestamp++;
}
if (tablet.rowSize != 0) {
session.insertTablet(tablet);
tablet.reset();
}
assertEquals(1, session.deviceIdToEndpoint.size());
assertEquals(2, session.endPointToSessionConnection.size());
session.close();
}
@Test
public void testInsertTablets() throws IoTDBConnectionException, StatementExecutionException {
// without leader cache
session = new MockSession("127.0.0.1", 55560, false);
session.open();
assertNull(session.deviceIdToEndpoint);
assertNull(session.endPointToSessionConnection);
List<String> allDeviceIds =
new ArrayList<String>() {
{
add("root.sg1.d1");
add("root.sg2.d1");
add("root.sg3.d1");
add("root.sg4.d1");
}
};
List<MeasurementSchema> schemaList = new ArrayList<>();
schemaList.add(new MeasurementSchema("s1", TSDataType.INT64));
schemaList.add(new MeasurementSchema("s2", TSDataType.INT64));
schemaList.add(new MeasurementSchema("s3", TSDataType.INT64));
Tablet tablet1 = new Tablet(allDeviceIds.get(1), schemaList, 100);
Tablet tablet2 = new Tablet(allDeviceIds.get(2), schemaList, 100);
Tablet tablet3 = new Tablet(allDeviceIds.get(3), schemaList, 100);
Map<String, Tablet> tabletMap = new HashMap<>();
tabletMap.put(allDeviceIds.get(1), tablet1);
tabletMap.put(allDeviceIds.get(2), tablet2);
tabletMap.put(allDeviceIds.get(3), tablet3);
long timestamp = System.currentTimeMillis();
for (long row = 0; row < 100; row++) {
int row1 = tablet1.rowSize++;
int row2 = tablet2.rowSize++;
int row3 = tablet3.rowSize++;
tablet1.addTimestamp(row1, timestamp);
tablet2.addTimestamp(row2, timestamp);
tablet3.addTimestamp(row3, timestamp);
for (int i = 0; i < 3; i++) {
long value = new Random().nextLong();
tablet1.addValue(schemaList.get(i).getMeasurementId(), row1, value);
tablet2.addValue(schemaList.get(i).getMeasurementId(), row2, value);
tablet3.addValue(schemaList.get(i).getMeasurementId(), row3, value);
}
if (tablet1.rowSize == tablet1.getMaxRowNumber()) {
session.insertTablets(tabletMap, true);
tablet1.reset();
tablet2.reset();
tablet3.reset();
}
timestamp++;
}
if (tablet1.rowSize != 0) {
session.insertTablets(tabletMap, true);
tablet1.reset();
tablet2.reset();
tablet3.reset();
}
assertNull(session.deviceIdToEndpoint);
assertNull(session.endPointToSessionConnection);
session.close();
// with leader cache
session = new MockSession("127.0.0.1", 55560, true);
session.open();
assertEquals(0, session.deviceIdToEndpoint.size());
assertEquals(1, session.endPointToSessionConnection.size());
for (long row = 0; row < 100; row++) {
int row1 = tablet1.rowSize++;
int row2 = tablet2.rowSize++;
int row3 = tablet3.rowSize++;
tablet1.addTimestamp(row1, timestamp);
tablet2.addTimestamp(row2, timestamp);
tablet3.addTimestamp(row3, timestamp);
for (int i = 0; i < 3; i++) {
long value = new Random().nextLong();
tablet1.addValue(schemaList.get(i).getMeasurementId(), row1, value);
tablet2.addValue(schemaList.get(i).getMeasurementId(), row2, value);
tablet3.addValue(schemaList.get(i).getMeasurementId(), row3, value);
}
if (tablet1.rowSize == tablet1.getMaxRowNumber()) {
session.insertTablets(tabletMap, true);
tablet1.reset();
tablet2.reset();
tablet3.reset();
}
timestamp++;
}
if (tablet1.rowSize != 0) {
session.insertTablets(tabletMap, true);
tablet1.reset();
tablet2.reset();
tablet3.reset();
}
assertEquals(3, session.deviceIdToEndpoint.size());
for (String deviceId : allDeviceIds.subList(1, allDeviceIds.size())) {
assertEquals(getDeviceIdBelongedEndpoint(deviceId), session.deviceIdToEndpoint.get(deviceId));
}
assertEquals(4, session.endPointToSessionConnection.size());
session.close();
}
@Test
public void testInsertRecordsWithSessionBroken() throws StatementExecutionException {
// without leader cache
session = new MockSession("127.0.0.1", 55560, false);
try {
session.open();
} catch (IoTDBConnectionException e) {
fail(e.getMessage());
}
assertNull(session.deviceIdToEndpoint);
assertNull(session.endPointToSessionConnection);
((MockSession) session).getLastConstructedSessionConnection().setConnectionBroken(true);
List<String> allDeviceIds =
new ArrayList<String>() {
{
add("root.sg1.d1");
add("root.sg2.d1");
add("root.sg3.d1");
add("root.sg4.d1");
}
};
List<String> measurements = new ArrayList<>();
measurements.add("s1");
measurements.add("s2");
measurements.add("s3");
List<String> deviceIds = new ArrayList<>();
List<List<String>> measurementsList = new ArrayList<>();
List<List<Object>> valuesList = new ArrayList<>();
List<Long> timestamps = new ArrayList<>();
List<List<TSDataType>> typesList = new ArrayList<>();
for (long time = 0; time < 500; time++) {
List<Object> values = new ArrayList<>();
List<TSDataType> types = new ArrayList<>();
values.add(1L);
values.add(2L);
values.add(3L);
types.add(TSDataType.INT64);
types.add(TSDataType.INT64);
types.add(TSDataType.INT64);
deviceIds.add(allDeviceIds.get((int) (time % allDeviceIds.size())));
measurementsList.add(measurements);
valuesList.add(values);
typesList.add(types);
timestamps.add(time);
if (time != 0 && time % 100 == 0) {
try {
session.insertRecords(deviceIds, timestamps, measurementsList, typesList, valuesList);
} catch (IoTDBConnectionException e) {
Assert.assertEquals(
"the session connection = TEndPoint(ip:127.0.0.1, port:55560) is broken",
e.getMessage());
}
deviceIds.clear();
measurementsList.clear();
valuesList.clear();
timestamps.clear();
}
}
try {
session.insertRecords(deviceIds, timestamps, measurementsList, typesList, valuesList);
} catch (IoTDBConnectionException e) {
Assert.assertEquals(
"the session connection = TEndPoint(ip:127.0.0.1, port:55560) is broken", e.getMessage());
}
deviceIds.clear();
measurementsList.clear();
valuesList.clear();
timestamps.clear();
assertNull(session.deviceIdToEndpoint);
assertNull(session.endPointToSessionConnection);
try {
session.close();
} catch (IoTDBConnectionException e) {
Assert.assertEquals(
"the session connection = TEndPoint(ip:127.0.0.1, port:55560) is broken", e.getMessage());
}
// with leader cache
// reset connection
session = new MockSession("127.0.0.1", 55560, true);
try {
session.open();
} catch (IoTDBConnectionException e) {
Assert.fail(e.getMessage());
}
assertEquals(0, session.deviceIdToEndpoint.size());
assertEquals(1, session.endPointToSessionConnection.size());
for (long time = 0; time < 500; time++) {
List<Object> values = new ArrayList<>();
List<TSDataType> types = new ArrayList<>();
values.add(1L);
values.add(2L);
values.add(3L);
types.add(TSDataType.INT64);
types.add(TSDataType.INT64);
types.add(TSDataType.INT64);
deviceIds.add(allDeviceIds.get((int) (time % allDeviceIds.size())));
measurementsList.add(measurements);
valuesList.add(values);
typesList.add(types);
timestamps.add(time);
if (time != 0 && time % 100 == 0) {
try {
session.insertRecords(deviceIds, timestamps, measurementsList, typesList, valuesList);
} catch (IoTDBConnectionException e) {
Assert.fail(e.getMessage());
}
deviceIds.clear();
measurementsList.clear();
valuesList.clear();
timestamps.clear();
}
}
// set connection as broken, due to we enable the cache leader, when we called
// ((MockSession) session).getLastConstructedSessionConnection(), the session's endpoint has
// been changed to TEndPoint(ip:127.0.0.1, port:55561)
Assert.assertEquals(
"MockSessionConnection{ endPoint=TEndPoint(ip:127.0.0.1, port:55561)}",
((MockSession) session).getLastConstructedSessionConnection().toString());
((MockSession) session).getLastConstructedSessionConnection().setConnectionBroken(true);
try {
session.insertRecords(deviceIds, timestamps, measurementsList, typesList, valuesList);
} catch (IoTDBConnectionException e) {
Assert.assertEquals(
"the session connection = TEndPoint(ip:127.0.0.1, port:55561) is broken", e.getMessage());
}
assertEquals(3, session.deviceIdToEndpoint.size());
for (Map.Entry<String, TEndPoint> endPointMap : session.deviceIdToEndpoint.entrySet()) {
assertEquals(getDeviceIdBelongedEndpoint(endPointMap.getKey()), endPointMap.getValue());
}
assertEquals(3, session.endPointToSessionConnection.size());
try {
session.close();
} catch (IoTDBConnectionException e) {
Assert.fail(e.getMessage());
}
}
@Test
public void testInsertTabletsWithSessionBroken() throws StatementExecutionException {
// without leader cache
session = new MockSession("127.0.0.1", 55560, false);
try {
session.open();
} catch (IoTDBConnectionException e) {
Assert.fail(e.getMessage());
}
assertNull(session.deviceIdToEndpoint);
assertNull(session.endPointToSessionConnection);
// set the session connection as broken
((MockSession) session).getLastConstructedSessionConnection().setConnectionBroken(true);
List<String> allDeviceIds =
new ArrayList<String>() {
{
add("root.sg1.d1");
add("root.sg2.d1");
add("root.sg3.d1");
add("root.sg4.d1");
}
};
List<MeasurementSchema> schemaList = new ArrayList<>();
schemaList.add(new MeasurementSchema("s1", TSDataType.INT64));
schemaList.add(new MeasurementSchema("s2", TSDataType.INT64));
schemaList.add(new MeasurementSchema("s3", TSDataType.INT64));
Tablet tablet1 = new Tablet(allDeviceIds.get(1), schemaList, 100);
Tablet tablet2 = new Tablet(allDeviceIds.get(2), schemaList, 100);
Tablet tablet3 = new Tablet(allDeviceIds.get(3), schemaList, 100);
Map<String, Tablet> tabletMap = new HashMap<>();
tabletMap.put(allDeviceIds.get(1), tablet1);
tabletMap.put(allDeviceIds.get(2), tablet2);
tabletMap.put(allDeviceIds.get(3), tablet3);
long timestamp = System.currentTimeMillis();
for (long row = 0; row < 100; row++) {
int row1 = tablet1.rowSize++;
int row2 = tablet2.rowSize++;
int row3 = tablet3.rowSize++;
tablet1.addTimestamp(row1, timestamp);
tablet2.addTimestamp(row2, timestamp);
tablet3.addTimestamp(row3, timestamp);
for (int i = 0; i < 3; i++) {
long value = new Random().nextLong();
tablet1.addValue(schemaList.get(i).getMeasurementId(), row1, value);
tablet2.addValue(schemaList.get(i).getMeasurementId(), row2, value);
tablet3.addValue(schemaList.get(i).getMeasurementId(), row3, value);
}
if (tablet1.rowSize == tablet1.getMaxRowNumber()) {
try {
session.insertTablets(tabletMap, true);
} catch (IoTDBConnectionException e) {
assertEquals(
"the session connection = TEndPoint(ip:127.0.0.1, port:55560) is broken",
e.getMessage());
}
tablet1.reset();
tablet2.reset();
tablet3.reset();
}
timestamp++;
}
if (tablet1.rowSize != 0) {
try {
session.insertTablets(tabletMap, true);
} catch (IoTDBConnectionException e) {
Assert.fail(e.getMessage());
}
tablet1.reset();
tablet2.reset();
tablet3.reset();
}
assertNull(session.deviceIdToEndpoint);
assertNull(session.endPointToSessionConnection);
try {
session.close();
} catch (IoTDBConnectionException e) {
Assert.fail(e.getMessage());
}
// with leader cache
// rest the session connection
session = new MockSession("127.0.0.1", 55560, true);
try {
session.open();
} catch (IoTDBConnectionException e) {
Assert.fail(e.getMessage());
}
assertEquals(0, session.deviceIdToEndpoint.size());
assertEquals(1, session.endPointToSessionConnection.size());
for (long row = 0; row < 100; row++) {
int row1 = tablet1.rowSize++;
int row2 = tablet2.rowSize++;
int row3 = tablet3.rowSize++;
tablet1.addTimestamp(row1, timestamp);
tablet2.addTimestamp(row2, timestamp);
tablet3.addTimestamp(row3, timestamp);
for (int i = 0; i < 3; i++) {
long value = new Random().nextLong();
tablet1.addValue(schemaList.get(i).getMeasurementId(), row1, value);
tablet2.addValue(schemaList.get(i).getMeasurementId(), row2, value);
tablet3.addValue(schemaList.get(i).getMeasurementId(), row3, value);
}
if (tablet1.rowSize == tablet1.getMaxRowNumber()) {
try {
session.insertTablets(tabletMap, true);
} catch (IoTDBConnectionException e) {
Assert.fail(e.getMessage());
}
tablet1.reset();
tablet2.reset();
tablet3.reset();
}
timestamp++;
}
// set the session connection as broken
((MockSession) session).getLastConstructedSessionConnection().setConnectionBroken(true);
// set connection as broken, due to we enable the cache leader, when we called
// ((MockSession) session).getLastConstructedSessionConnection(), the session's endpoint has
// been changed to EndPoint(ip:127.0.0.1, port:55562)
Assert.assertEquals(
"MockSessionConnection{ endPoint=TEndPoint(ip:127.0.0.1, port:55562)}",
((MockSession) session).getLastConstructedSessionConnection().toString());
for (long row = 0; row < 10; row++) {
int row1 = tablet1.rowSize++;
int row2 = tablet2.rowSize++;
int row3 = tablet3.rowSize++;
tablet1.addTimestamp(row1, timestamp);
tablet2.addTimestamp(row2, timestamp);
tablet3.addTimestamp(row3, timestamp);
for (int i = 0; i < 3; i++) {
long value = new Random().nextLong();
tablet1.addValue(schemaList.get(i).getMeasurementId(), row1, value);
tablet2.addValue(schemaList.get(i).getMeasurementId(), row2, value);
tablet3.addValue(schemaList.get(i).getMeasurementId(), row3, value);
}
if (tablet1.rowSize == tablet1.getMaxRowNumber()) {
try {
session.insertTablets(tabletMap, true);
} catch (IoTDBConnectionException e) {
Assert.fail(e.getMessage());
}
tablet1.reset();
tablet2.reset();
tablet3.reset();
}
timestamp++;
}
try {
session.insertTablets(tabletMap, true);
} catch (IoTDBConnectionException e) {
Assert.assertEquals(
"the session connection = TEndPoint(ip:127.0.0.1, port:55562) is broken", e.getMessage());
}
tablet1.reset();
tablet2.reset();
tablet3.reset();
assertEquals(2, session.deviceIdToEndpoint.size());
for (Map.Entry<String, TEndPoint> endPointEntry : session.deviceIdToEndpoint.entrySet()) {
assertEquals(getDeviceIdBelongedEndpoint(endPointEntry.getKey()), endPointEntry.getValue());
}
assertEquals(3, session.endPointToSessionConnection.size());
try {
session.close();
} catch (IoTDBConnectionException e) {
Assert.fail(e.getMessage());
}
}
private void addLine(
List<Long> times,
List<List<String>> measurements,
List<List<TSDataType>> datatypes,
List<List<Object>> values,
long time,
String s1,
String s2,
TSDataType s1type,
TSDataType s2type,
Object value1,
Object value2) {
List<String> tmpMeasurements = new ArrayList<>();
List<TSDataType> tmpDataTypes = new ArrayList<>();
List<Object> tmpValues = new ArrayList<>();
tmpMeasurements.add(s1);
tmpMeasurements.add(s2);
tmpDataTypes.add(s1type);
tmpDataTypes.add(s2type);
tmpValues.add(value1);
tmpValues.add(value2);
times.add(time);
measurements.add(tmpMeasurements);
datatypes.add(tmpDataTypes);
values.add(tmpValues);
}
static class MockSession extends Session {
private MockSessionConnection lastConstructedSessionConnection;
public MockSession(String host, int rpcPort, boolean enableRedirection) {
super(
host,
rpcPort,
SessionConfig.DEFAULT_USER,
SessionConfig.DEFAULT_PASSWORD,
SessionConfig.DEFAULT_FETCH_SIZE,
null,
SessionConfig.DEFAULT_INITIAL_BUFFER_CAPACITY,
SessionConfig.DEFAULT_MAX_FRAME_SIZE,
enableRedirection,
SessionConfig.DEFAULT_VERSION);
this.enableAutoFetch = false;
}
@Override
public SessionConnection constructSessionConnection(
Session session, TEndPoint endpoint, ZoneId zoneId) {
lastConstructedSessionConnection = new MockSessionConnection(session, endpoint, zoneId);
return lastConstructedSessionConnection;
}
public MockSessionConnection getLastConstructedSessionConnection() {
return lastConstructedSessionConnection;
}
}
static class MockSessionConnection extends SessionConnection {
private TEndPoint endPoint;
private boolean connectionBroken;
private IoTDBConnectionException ioTDBConnectionException;
public MockSessionConnection(Session session, TEndPoint endPoint, ZoneId zoneId) {
super();
this.endPoint = endPoint;
ioTDBConnectionException =
new IoTDBConnectionException(
String.format("the session connection = %s is broken", endPoint.toString()));
}
@Override
public void close() {}
@Override
protected void insertRecord(TSInsertRecordReq request)
throws RedirectException, IoTDBConnectionException {
if (isConnectionBroken()) {
throw ioTDBConnectionException;
}
throw new RedirectException(getDeviceIdBelongedEndpoint(request.prefixPath));
}
@Override
protected void insertRecord(TSInsertStringRecordReq request)
throws RedirectException, IoTDBConnectionException {
if (isConnectionBroken()) {
throw ioTDBConnectionException;
}
throw new RedirectException(getDeviceIdBelongedEndpoint(request.prefixPath));
}
@Override
protected void insertRecords(TSInsertRecordsReq request)
throws RedirectException, IoTDBConnectionException {
if (isConnectionBroken()) {
throw ioTDBConnectionException;
}
throw getRedirectException(request.getPrefixPaths());
}
@Override
protected void insertRecords(TSInsertStringRecordsReq request)
throws RedirectException, IoTDBConnectionException {
if (isConnectionBroken()) {
throw ioTDBConnectionException;
}
throw getRedirectException(request.getPrefixPaths());
}
@Override
protected void insertRecordsOfOneDevice(TSInsertRecordsOfOneDeviceReq request)
throws RedirectException, IoTDBConnectionException {
if (isConnectionBroken()) {
throw ioTDBConnectionException;
}
throw new RedirectException(getDeviceIdBelongedEndpoint(request.prefixPath));
}
@Override
protected void insertTablet(TSInsertTabletReq request)
throws RedirectException, IoTDBConnectionException {
if (isConnectionBroken()) {
throw ioTDBConnectionException;
}
throw new RedirectException(getDeviceIdBelongedEndpoint(request.prefixPath));
}
@Override
protected void insertTablets(TSInsertTabletsReq request)
throws RedirectException, IoTDBConnectionException {
if (isConnectionBroken()) {
throw ioTDBConnectionException;
}
throw getRedirectException(request.getPrefixPaths());
}
private RedirectException getRedirectException(List<String> deviceIds) {
Map<String, TEndPoint> deviceEndPointMap = new HashMap<>();
for (String deviceId : deviceIds) {
deviceEndPointMap.put(deviceId, getDeviceIdBelongedEndpoint(deviceId));
}
return new RedirectException(deviceEndPointMap);
}
public boolean isConnectionBroken() {
return connectionBroken;
}
public void setConnectionBroken(boolean connectionBroken) {
this.connectionBroken = connectionBroken;
}
@Override
public String toString() {
return "MockSessionConnection{" + " endPoint=" + endPoint + "}";
}
}
}