blob: 1977c32d338fff20f30e36dd343780143133f285 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.commons.pipe.connector.protocol;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.connector.payload.airgap.AirGapELanguageConstant;
import org.apache.iotdb.commons.pipe.connector.payload.airgap.AirGapOneByteResponse;
import org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.tsfile.utils.BytesUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.zip.CRC32;
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_AIR_GAP_E_LANGUAGE_ENABLE_DEFAULT_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_AIR_GAP_E_LANGUAGE_ENABLE_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_AIR_GAP_HANDSHAKE_TIMEOUT_MS_DEFAULT_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_AIR_GAP_HANDSHAKE_TIMEOUT_MS_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_PRIORITY_STRATEGY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_RANDOM_STRATEGY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_ROUND_ROBIN_STRATEGY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_AIR_GAP_E_LANGUAGE_ENABLE_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_AIR_GAP_HANDSHAKE_TIMEOUT_MS_KEY;
import static org.apache.iotdb.commons.utils.BasicStructureSerDeUtil.LONG_LEN;
public abstract class IoTDBAirGapConnector extends IoTDBConnector {
private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBAirGapConnector.class);
protected static final PipeConfig PIPE_CONFIG = PipeConfig.getInstance();
protected final List<Socket> sockets = new ArrayList<>();
protected final List<Boolean> isSocketAlive = new ArrayList<>();
private LoadBalancer loadBalancer;
private long currentClientIndex = 0;
private int handshakeTimeoutMs;
private boolean eLanguageEnable;
// The air gap connector does not use clientManager thus we put handshake type here
protected boolean supportModsIfIsDataNodeReceiver = true;
@Override
public void customize(PipeParameters parameters, PipeConnectorRuntimeConfiguration configuration)
throws Exception {
super.customize(parameters, configuration);
if (isTabletBatchModeEnabled) {
LOGGER.warn(
"Batch mode is enabled by the given parameters. "
+ "IoTDBAirGapConnector does not support batch mode. "
+ "Disable batch mode.");
}
for (int i = 0; i < nodeUrls.size(); i++) {
isSocketAlive.add(false);
sockets.add(null);
}
switch (loadBalanceStrategy) {
case CONNECTOR_LOAD_BALANCE_ROUND_ROBIN_STRATEGY:
loadBalancer = new RoundRobinLoadBalancer();
break;
case CONNECTOR_LOAD_BALANCE_RANDOM_STRATEGY:
loadBalancer = new RandomLoadBalancer();
break;
case CONNECTOR_LOAD_BALANCE_PRIORITY_STRATEGY:
loadBalancer = new PriorityLoadBalancer();
break;
default:
LOGGER.warn(
"Unknown load balance strategy: {}, use round-robin strategy instead.",
loadBalanceStrategy);
loadBalancer = new RoundRobinLoadBalancer();
}
handshakeTimeoutMs =
parameters.getIntOrDefault(
Arrays.asList(
CONNECTOR_AIR_GAP_HANDSHAKE_TIMEOUT_MS_KEY, SINK_AIR_GAP_HANDSHAKE_TIMEOUT_MS_KEY),
CONNECTOR_AIR_GAP_HANDSHAKE_TIMEOUT_MS_DEFAULT_VALUE);
LOGGER.info(
"IoTDBAirGapConnector is customized with handshakeTimeoutMs: {}.", handshakeTimeoutMs);
eLanguageEnable =
parameters.getBooleanOrDefault(
Arrays.asList(
CONNECTOR_AIR_GAP_E_LANGUAGE_ENABLE_KEY, SINK_AIR_GAP_E_LANGUAGE_ENABLE_KEY),
CONNECTOR_AIR_GAP_E_LANGUAGE_ENABLE_DEFAULT_VALUE);
LOGGER.info("IoTDBAirGapConnector is customized with eLanguageEnable: {}.", eLanguageEnable);
}
@Override
public void handshake() throws Exception {
for (int i = 0; i < sockets.size(); i++) {
if (Boolean.TRUE.equals(isSocketAlive.get(i))) {
continue;
}
final String ip = nodeUrls.get(i).getIp();
final int port = nodeUrls.get(i).getPort();
// Close the socket if necessary
if (sockets.get(i) != null) {
try {
sockets.set(i, null).close();
} catch (Exception e) {
LOGGER.warn(
"Failed to close socket with target server ip: {}, port: {}, because: {}. Ignore it.",
ip,
port,
e.getMessage());
}
}
final Socket socket = new Socket();
try {
socket.connect(new InetSocketAddress(ip, port), handshakeTimeoutMs);
socket.setKeepAlive(true);
sockets.set(i, socket);
LOGGER.info("Successfully connected to target server ip: {}, port: {}.", ip, port);
} catch (Exception e) {
LOGGER.warn(
"Failed to connect to target server ip: {}, port: {}, because: {}. Ignore it.",
ip,
port,
e.getMessage());
continue;
}
sendHandshakeReq(socket);
isSocketAlive.set(i, true);
}
for (int i = 0; i < sockets.size(); i++) {
if (Boolean.TRUE.equals(isSocketAlive.get(i))) {
return;
}
}
throw new PipeConnectionException(
String.format("All target servers %s are not available.", nodeUrls));
}
protected void sendHandshakeReq(Socket socket) throws IOException {
socket.setSoTimeout(handshakeTimeoutMs);
// Try to handshake by PipeTransferHandshakeV2Req. If failed, retry to handshake by
// PipeTransferHandshakeV1Req. If failed again, throw PipeConnectionException.
if (!send(socket, generateHandShakeV2Payload())) {
supportModsIfIsDataNodeReceiver = false;
if (!send(socket, generateHandShakeV1Payload())) {
throw new PipeConnectionException("Handshake error with target server, socket: " + socket);
}
} else {
supportModsIfIsDataNodeReceiver = true;
}
socket.setSoTimeout((int) PIPE_CONFIG.getPipeConnectorTransferTimeoutMs());
LOGGER.info("Handshake success. Socket: {}", socket);
}
protected abstract byte[] generateHandShakeV1Payload() throws IOException;
protected abstract byte[] generateHandShakeV2Payload() throws IOException;
@Override
public void heartbeat() {
try {
handshake();
} catch (Exception e) {
LOGGER.warn(
"Failed to reconnect to target server, because: {}. Try to reconnect later.",
e.getMessage(),
e);
}
}
protected void transferFilePieces(File file, Socket socket, boolean isMultiFile)
throws PipeException, IOException {
final int readFileBufferSize = PipeConfig.getInstance().getPipeConnectorReadFileBufferSize();
final byte[] readBuffer = new byte[readFileBufferSize];
long position = 0;
try (final RandomAccessFile reader = new RandomAccessFile(file, "r")) {
while (true) {
final int readLength = reader.read(readBuffer);
if (readLength == -1) {
break;
}
final byte[] payload =
readLength == readFileBufferSize
? readBuffer
: Arrays.copyOfRange(readBuffer, 0, readLength);
if (!send(
socket,
isMultiFile
? getTransferMultiFilePieceBytes(file.getName(), position, payload)
: getTransferSingleFilePieceBytes(file.getName(), position, payload))) {
final String errorMessage =
String.format("Transfer file %s error. Socket %s.", file, socket);
if (mayNeedHandshakeWhenFail()) {
// Send handshake because we don't know whether the receiver side configNode
// has set up a new one
sendHandshakeReq(socket);
}
receiverStatusHandler.handle(
new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(errorMessage),
errorMessage,
file.toString());
} else {
position += readLength;
}
}
}
}
protected abstract boolean mayNeedHandshakeWhenFail();
protected abstract byte[] getTransferSingleFilePieceBytes(
String fileName, long position, byte[] payLoad) throws IOException;
protected abstract byte[] getTransferMultiFilePieceBytes(
String fileName, long position, byte[] payLoad) throws IOException;
protected int nextSocketIndex() {
return loadBalancer.nextSocketIndex();
}
protected boolean send(Socket socket, byte[] bytes) throws IOException {
if (!socket.isConnected()) {
return false;
}
final BufferedOutputStream outputStream = new BufferedOutputStream(socket.getOutputStream());
bytes = enrichWithLengthAndChecksum(bytes);
outputStream.write(eLanguageEnable ? enrichWithELanguage(bytes) : bytes);
outputStream.flush();
final byte[] response = new byte[1];
final int size = socket.getInputStream().read(response);
return size > 0 && Arrays.equals(AirGapOneByteResponse.OK, response);
}
private byte[] enrichWithLengthAndChecksum(byte[] bytes) {
// Length of checksum and bytes payload
final byte[] length = BytesUtils.intToBytes(bytes.length + LONG_LEN);
final CRC32 crc32 = new CRC32();
crc32.update(bytes, 0, bytes.length);
// Double length as simple checksum
return BytesUtils.concatByteArrayList(
Arrays.asList(length, length, BytesUtils.longToBytes(crc32.getValue()), bytes));
}
private byte[] enrichWithELanguage(byte[] bytes) {
return BytesUtils.concatByteArrayList(
Arrays.asList(
AirGapELanguageConstant.E_LANGUAGE_PREFIX,
bytes,
AirGapELanguageConstant.E_LANGUAGE_SUFFIX));
}
@Override
public void close() {
for (int i = 0; i < sockets.size(); ++i) {
try {
if (sockets.get(i) != null) {
sockets.set(i, null).close();
}
} catch (Exception e) {
LOGGER.warn("Failed to close client {}.", i, e);
} finally {
isSocketAlive.set(i, false);
}
}
}
/////////////////////// Strategies for load balance //////////////////////////
private interface LoadBalancer {
int nextSocketIndex();
}
private class RoundRobinLoadBalancer implements LoadBalancer {
@Override
public int nextSocketIndex() {
final int socketSize = sockets.size();
// Round-robin, find the next alive client
for (int tryCount = 0; tryCount < socketSize; ++tryCount) {
final int clientIndex = (int) (currentClientIndex++ % socketSize);
if (Boolean.TRUE.equals(isSocketAlive.get(clientIndex))) {
return clientIndex;
}
}
throw new PipeConnectionException(
"All sockets are dead, please check the connection to the receiver.");
}
}
private class RandomLoadBalancer implements LoadBalancer {
@Override
public int nextSocketIndex() {
final int socketSize = sockets.size();
final int clientIndex = (int) (Math.random() * socketSize);
if (Boolean.TRUE.equals(isSocketAlive.get(clientIndex))) {
return clientIndex;
}
// Random, find the next alive client
for (int tryCount = 0; tryCount < socketSize - 1; ++tryCount) {
final int nextClientIndex = (clientIndex + tryCount + 1) % socketSize;
if (Boolean.TRUE.equals(isSocketAlive.get(nextClientIndex))) {
return nextClientIndex;
}
}
throw new PipeConnectionException(
"All sockets are dead, please check the connection to the receiver.");
}
}
private class PriorityLoadBalancer implements LoadBalancer {
@Override
public int nextSocketIndex() {
// Priority, find the first alive client
final int socketSize = sockets.size();
for (int i = 0; i < socketSize; ++i) {
if (Boolean.TRUE.equals(isSocketAlive.get(i))) {
return i;
}
}
throw new PipeConnectionException(
"All sockets are dead, please check the connection to the receiver.");
}
}
}