blob: d07f180763cf13fc9ea50a65269cfc08e7d2b426 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.pipe.receiver.protocol.airgap;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.concurrent.WrappedRunnable;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.connector.payload.airgap.AirGapELanguageConstant;
import org.apache.iotdb.commons.pipe.connector.payload.airgap.AirGapOneByteResponse;
import org.apache.iotdb.commons.pipe.connector.payload.airgap.AirGapPseudoTPipeTransferRequest;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.pipe.receiver.protocol.thrift.IoTDBDataNodeReceiverAgent;
import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
import org.apache.tsfile.utils.BytesUtils;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.zip.CRC32;
import static org.apache.iotdb.commons.utils.BasicStructureSerDeUtil.INT_LEN;
import static org.apache.iotdb.commons.utils.BasicStructureSerDeUtil.LONG_LEN;
public class IoTDBAirGapReceiver extends WrappedRunnable {
private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBAirGapReceiver.class);
private final Socket socket;
private final long receiverId;
private final IoTDBDataNodeReceiverAgent agent;
private boolean isELanguagePayload;
public IoTDBAirGapReceiver(Socket socket, long receiverId) {
this.socket = socket;
this.receiverId = receiverId;
agent = PipeAgent.receiver().thrift();
}
@Override
public void runMayThrow() throws Throwable {
socket.setSoTimeout((int) PipeConfig.getInstance().getPipeConnectorTransferTimeoutMs());
socket.setKeepAlive(true);
LOGGER.info("Pipe air gap receiver {} started. Socket: {}", receiverId, socket);
try {
while (!socket.isClosed()) {
isELanguagePayload = false;
receive();
}
LOGGER.info(
"Pipe air gap receiver {} closed because socket is closed. Socket: {}",
receiverId,
socket);
} catch (Exception e) {
LOGGER.warn(
"Pipe air gap receiver {} closed because of exception. Socket: {}",
receiverId,
socket,
e);
throw e;
} finally {
PipeAgent.receiver().thrift().handleClientExit();
socket.close();
}
}
private void receive() throws IOException {
final InputStream inputStream = new BufferedInputStream(socket.getInputStream());
try {
final byte[] data = readData(inputStream);
if (!checkSum(data)) {
LOGGER.warn("Checksum failed, receiverId: {}", receiverId);
fail();
return;
}
// Removed the used checksum
final ByteBuffer byteBuffer = ByteBuffer.wrap(data, LONG_LEN, data.length - LONG_LEN);
// Pseudo request, to reuse logic in IoTDBThriftReceiverAgent
final AirGapPseudoTPipeTransferRequest req =
(AirGapPseudoTPipeTransferRequest)
new AirGapPseudoTPipeTransferRequest()
.setVersion(ReadWriteIOUtils.readByte(byteBuffer))
.setType(ReadWriteIOUtils.readShort(byteBuffer))
.setBody(byteBuffer.slice());
final TPipeTransferResp resp = agent.receive(req);
final TSStatus status = resp.getStatus();
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
ok();
} else if (status.getCode() == TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()
|| status.getCode()
== TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode()) {
LOGGER.info(
"TSStatus:{} is encountered at the air gap receiver, will ignore.", resp.getStatus());
ok();
} else {
LOGGER.warn(
"Handle data failed, receiverId: {}, status: {}, req: {}",
receiverId,
resp.getStatus(),
req);
fail();
}
} catch (final PipeConnectionException e) {
LOGGER.info("Socket closed when listening to data. Because: {}", e.getMessage());
} catch (final Exception e) {
LOGGER.warn("Exception during handling receiving, receiverId: {}", receiverId, e);
fail();
}
}
private void ok() throws IOException {
final OutputStream outputStream = socket.getOutputStream();
outputStream.write(AirGapOneByteResponse.OK);
outputStream.flush();
}
private void fail() throws IOException {
final OutputStream outputStream = socket.getOutputStream();
outputStream.write(AirGapOneByteResponse.FAIL);
outputStream.flush();
}
private boolean checkSum(byte[] bytes) {
try {
final CRC32 crc32 = new CRC32();
crc32.update(bytes, LONG_LEN, bytes.length - LONG_LEN);
return BytesUtils.bytesToLong(BytesUtils.subBytes(bytes, 0, LONG_LEN)) == crc32.getValue();
} catch (Exception e) {
// ArrayIndexOutOfBoundsException when bytes.length < LONG_LEN
return false;
}
}
private byte[] readData(InputStream inputStream) throws IOException {
final int length = readLength(inputStream);
if (length <= 0) {
// Will fail() after checkSum()
return new byte[0];
}
final byte[] resultBuffer = new byte[length];
readTillFull(inputStream, resultBuffer);
if (isELanguagePayload) {
skipTillEnough(inputStream, AirGapELanguageConstant.E_LANGUAGE_SUFFIX.length);
}
return resultBuffer;
}
/**
* Read the length of the following data. The thread may typically block here when there is no
* data to read.
*/
private int readLength(InputStream inputStream) throws IOException {
final byte[] doubleIntLengthBytes = new byte[2 * INT_LEN];
readTillFull(inputStream, doubleIntLengthBytes);
// Check the header of the request, if it is an E-Language request, skip the E-Language header.
// We assert AirGapELanguageConstant.E_LANGUAGE_PREFIX.length > 2 * INT_LEN here.
if (Arrays.equals(
doubleIntLengthBytes,
BytesUtils.subBytes(AirGapELanguageConstant.E_LANGUAGE_PREFIX, 0, 2 * INT_LEN))) {
isELanguagePayload = true;
skipTillEnough(
inputStream, (long) AirGapELanguageConstant.E_LANGUAGE_PREFIX.length - 2 * INT_LEN);
return readLength(inputStream);
}
final byte[] dataLengthBytes = BytesUtils.subBytes(doubleIntLengthBytes, 0, INT_LEN);
// For double check
return Arrays.equals(
dataLengthBytes, BytesUtils.subBytes(doubleIntLengthBytes, INT_LEN, INT_LEN))
? BytesUtils.bytesToInt(dataLengthBytes)
: 0;
}
/**
* Read to the buffer until it is full.
*
* @param inputStream the input socket stream
* @param readBuffer the buffer to read into
* @throws IOException if any IOException occurs
* @throws PipeConnectionException if the socket is closed during listening
*/
private void readTillFull(final InputStream inputStream, final byte[] readBuffer)
throws IOException, PipeConnectionException {
int alreadyReadBytes = 0;
while (alreadyReadBytes < readBuffer.length) {
final int readBytes =
inputStream.read(readBuffer, alreadyReadBytes, readBuffer.length - alreadyReadBytes);
// In socket input stream readBytes == -1 indicates EOF, namely the
// socket is closed
if (readBytes == -1) {
throw new PipeConnectionException("Socket closed when executing readTillFull.");
}
alreadyReadBytes += readBytes;
}
}
/**
* Skip given number of bytes of the buffer until enough bytes is skipped.
*
* @param inputStream the input socket stream
* @param length the length to skip
* @throws IOException if any IOException occurs
* @throws PipeConnectionException if the socket is closed during skipping
*/
private void skipTillEnough(final InputStream inputStream, final long length)
throws IOException, PipeConnectionException {
long currentSkippedBytes = 0;
while (currentSkippedBytes < length) {
final long skippedBytes = inputStream.skip(length - currentSkippedBytes);
// In socket input stream skippedBytes == 0 indicates EOF, namely the
// socket is closed
if (skippedBytes == 0) {
throw new PipeConnectionException("Socket closed when executing skipTillEnough.");
}
currentSkippedBytes += skippedBytes;
}
}
}