blob: a24b6c4fc3914bd876e866cf6488b6d36f72fd7d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.pipe.processor.twostage.exchange.sender;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.client.property.ThriftClientProperty;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.connector.client.IoTDBSyncClient;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeInfo;
import org.apache.iotdb.confignode.rpc.thrift.TShowDataNodesResp;
import org.apache.iotdb.db.pipe.processor.twostage.combiner.PipeCombineHandlerManager;
import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
import org.apache.thrift.TException;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
public class TwoStageAggregateSender implements AutoCloseable {
private static final Logger LOGGER = LoggerFactory.getLogger(TwoStageAggregateSender.class);
private static final PipeConfig PIPE_CONFIG = PipeConfig.getInstance();
private final String pipeName;
private final long creationTime;
private static final AtomicLong DATANODE_ID_2_END_POINTS_LAST_UPDATE_TIME = new AtomicLong(0);
private static final AtomicReference<Map<Integer, TEndPoint>> DATANODE_ID_2_END_POINTS =
new AtomicReference<>();
private TEndPoint[] endPoints;
private final Map<TEndPoint, IoTDBSyncClient> endPointIoTDBSyncClientMap =
new ConcurrentHashMap<>();
public TwoStageAggregateSender(String pipeName, long creationTime) {
this.pipeName = pipeName;
this.creationTime = creationTime;
}
public synchronized TPipeTransferResp request(long watermark, TPipeTransferReq req)
throws TException {
final boolean endPointsChanged = tryFetchEndPointsIfNecessary();
tryConstructClients(endPointsChanged);
final TEndPoint endPoint = endPoints[(int) watermark % endPoints.length];
IoTDBSyncClient client = endPointIoTDBSyncClientMap.get(endPoint);
if (client == null) {
client = reconstructIoTDBSyncClient(endPoint);
}
LOGGER.info("Sending request {} (watermark = {}) to {}", req, watermark, endPoint);
try {
return client.pipeTransfer(req);
} catch (Exception e) {
LOGGER.warn("Failed to send request {} (watermark = {}) to {}", req, watermark, endPoint, e);
try {
reconstructIoTDBSyncClient(endPoint);
} catch (Exception ex) {
LOGGER.warn(
"Failed to reconstruct IoTDBSyncClient {} after failure to send request {} (watermark = {})",
endPoint,
req,
watermark,
ex);
}
throw e;
}
}
private static boolean tryFetchEndPointsIfNecessary() {
final long currentTime = System.currentTimeMillis();
if (DATANODE_ID_2_END_POINTS.get() != null
&& currentTime - DATANODE_ID_2_END_POINTS_LAST_UPDATE_TIME.get()
< PIPE_CONFIG.getTwoStageAggregateSenderEndPointsCacheInMs()) {
return false;
}
synchronized (DATANODE_ID_2_END_POINTS) {
if (DATANODE_ID_2_END_POINTS.get() != null
&& currentTime - DATANODE_ID_2_END_POINTS_LAST_UPDATE_TIME.get()
< PIPE_CONFIG.getTwoStageAggregateSenderEndPointsCacheInMs()) {
return false;
}
final Map<Integer, TEndPoint> dataNodeId2EndPointMap = new HashMap<>();
try (final ConfigNodeClient configNodeClient =
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
final TShowDataNodesResp showDataNodesResp = configNodeClient.showDataNodes();
if (showDataNodesResp == null || showDataNodesResp.getDataNodesInfoList() == null) {
throw new PipeException("Failed to fetch data nodes");
}
for (final TDataNodeInfo dataNodeInfo : showDataNodesResp.getDataNodesInfoList()) {
dataNodeId2EndPointMap.put(
dataNodeInfo.getDataNodeId(),
new TEndPoint(dataNodeInfo.getRpcAddresss(), dataNodeInfo.getRpcPort()));
}
} catch (ClientManagerException | TException e) {
throw new PipeException("Failed to fetch data nodes", e);
}
if (dataNodeId2EndPointMap.isEmpty()) {
throw new PipeException("No data nodes' endpoints fetched");
}
DATANODE_ID_2_END_POINTS.set(dataNodeId2EndPointMap);
DATANODE_ID_2_END_POINTS_LAST_UPDATE_TIME.set(currentTime);
}
LOGGER.info("Data nodes' endpoints for two-stage aggregation: {}", DATANODE_ID_2_END_POINTS);
return true;
}
private void tryConstructClients(boolean endPointsChanged) {
if (Objects.nonNull(endPoints) && !endPointsChanged) {
return;
}
final Set<Integer> expectedDataNodeIdSet =
PipeCombineHandlerManager.getInstance().getExpectedDataNodeIdSet(pipeName, creationTime);
if (expectedDataNodeIdSet.isEmpty()) {
throw new PipeException("No expected region id set fetched");
}
endPoints =
DATANODE_ID_2_END_POINTS.get().entrySet().stream()
.filter(entry -> expectedDataNodeIdSet.contains(entry.getKey()))
.map(Map.Entry::getValue)
.toArray(TEndPoint[]::new);
LOGGER.info(
"End points for two-stage aggregation pipe (pipeName={}, creationTime={}) were updated to {}",
pipeName,
creationTime,
endPoints);
for (final TEndPoint endPoint : endPoints) {
if (endPointIoTDBSyncClientMap.containsKey(endPoint)) {
continue;
}
try {
endPointIoTDBSyncClientMap.put(endPoint, constructIoTDBSyncClient(endPoint));
} catch (TTransportException e) {
LOGGER.warn("Failed to construct IoTDBSyncClient", e);
}
}
for (final TEndPoint endPoint : new HashSet<>(endPointIoTDBSyncClientMap.keySet())) {
if (!DATANODE_ID_2_END_POINTS.get().containsValue(endPoint)) {
try {
endPointIoTDBSyncClientMap.remove(endPoint).close();
} catch (Exception e) {
LOGGER.warn("Failed to close IoTDBSyncClient", e);
}
}
}
}
private IoTDBSyncClient reconstructIoTDBSyncClient(TEndPoint endPoint)
throws TTransportException {
final IoTDBSyncClient oldClient = endPointIoTDBSyncClientMap.remove(endPoint);
if (oldClient != null) {
try {
oldClient.close();
} catch (Exception e) {
LOGGER.warn("Failed to close old IoTDBSyncClient", e);
}
}
final IoTDBSyncClient newClient = constructIoTDBSyncClient(endPoint);
endPointIoTDBSyncClientMap.put(endPoint, newClient);
return newClient;
}
private IoTDBSyncClient constructIoTDBSyncClient(TEndPoint endPoint) throws TTransportException {
return new IoTDBSyncClient(
new ThriftClientProperty.Builder()
.setConnectionTimeoutMs(PIPE_CONFIG.getPipeConnectorHandshakeTimeoutMs())
.setRpcThriftCompressionEnabled(
PIPE_CONFIG.isPipeConnectorRPCThriftCompressionEnabled())
.build(),
endPoint.getIp(),
endPoint.getPort(),
false,
null,
null);
}
@Override
public void close() {
for (final IoTDBSyncClient client : endPointIoTDBSyncClientMap.values()) {
try {
client.close();
} catch (Exception e) {
LOGGER.warn("Failed to close IoTDBSyncClient", e);
}
}
endPointIoTDBSyncClientMap.clear();
endPoints = null;
}
}