blob: b1ee6ee6f5e19ef941a00ea7dcf3e146a213d764 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.confignode.client.async.datanode;
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TFlushReq;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TSetTTLReq;
import org.apache.iotdb.commons.client.ClientPoolFactory;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
import org.apache.iotdb.confignode.client.DataNodeRequestType;
import org.apache.iotdb.confignode.client.async.handlers.AbstractRetryHandler;
import org.apache.iotdb.confignode.client.async.handlers.ClearCacheHandler;
import org.apache.iotdb.confignode.client.async.handlers.ConstructSchemaBlackListHandler;
import org.apache.iotdb.confignode.client.async.handlers.CreateRegionHandler;
import org.apache.iotdb.confignode.client.async.handlers.DeleteDataForDeleteTimeSeriesHandler;
import org.apache.iotdb.confignode.client.async.handlers.DeleteTimeSeriesHandler;
import org.apache.iotdb.confignode.client.async.handlers.FetchSchemaBlackLsitHandler;
import org.apache.iotdb.confignode.client.async.handlers.FlushHandler;
import org.apache.iotdb.confignode.client.async.handlers.FunctionManagementHandler;
import org.apache.iotdb.confignode.client.async.handlers.InvalidateMatchedSchemaCacheHandler;
import org.apache.iotdb.confignode.client.async.handlers.LoadConfigurationHandler;
import org.apache.iotdb.confignode.client.async.handlers.MergeHandler;
import org.apache.iotdb.confignode.client.async.handlers.RollbackSchemaBlackListHandler;
import org.apache.iotdb.confignode.client.async.handlers.SetSystemStatusHandler;
import org.apache.iotdb.confignode.client.async.handlers.SetTTLHandler;
import org.apache.iotdb.confignode.client.async.handlers.UpdateConfigNodeGroupHandler;
import org.apache.iotdb.confignode.client.async.handlers.UpdateRegionRouteMapHandler;
import org.apache.iotdb.confignode.client.async.task.AbstractDataNodeTask;
import org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan;
import org.apache.iotdb.mpp.rpc.thrift.TConstructSchemaBlackListReq;
import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
import org.apache.iotdb.mpp.rpc.thrift.TCreateFunctionRequest;
import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq;
import org.apache.iotdb.mpp.rpc.thrift.TDeleteDataForDeleteTimeSeriesReq;
import org.apache.iotdb.mpp.rpc.thrift.TDeleteTimeSeriesReq;
import org.apache.iotdb.mpp.rpc.thrift.TDropFunctionRequest;
import org.apache.iotdb.mpp.rpc.thrift.TFetchSchemaBlackListReq;
import org.apache.iotdb.mpp.rpc.thrift.TInvalidateMatchedSchemaCacheReq;
import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq;
import org.apache.iotdb.mpp.rpc.thrift.TRollbackSchemaBlackListReq;
import org.apache.iotdb.mpp.rpc.thrift.TUpdateConfigNodeGroupReq;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
/** Asynchronously send RPC requests to DataNodes. See mpp.thrift for more details. */
public class AsyncDataNodeClientPool {
private static final Logger LOGGER = LoggerFactory.getLogger(AsyncDataNodeClientPool.class);
private final IClientManager<TEndPoint, AsyncDataNodeInternalServiceClient> clientManager;
private static final int MAX_RETRY_NUM = 6;
private AsyncDataNodeClientPool() {
clientManager =
new IClientManager.Factory<TEndPoint, AsyncDataNodeInternalServiceClient>()
.createClientManager(
new ClientPoolFactory.AsyncDataNodeInternalServiceClientPoolFactory());
}
/**
* Send asynchronize requests to the specific DataNodes, and reconnect the DataNode that failed to
* receive the requests
*
* @param req request
* @param dataNodeLocationMap Map<DataNodeId, TDataNodeLocation>
* @param requestType DataNodeRequestType
* @param dataNodeResponseStatus response list.Used by CREATE_FUNCTION,DROP_FUNCTION and FLUSH
*/
public void sendAsyncRequestToDataNodeWithRetry(
Object req,
Map<Integer, TDataNodeLocation> dataNodeLocationMap,
DataNodeRequestType requestType,
List<TSStatus> dataNodeResponseStatus) {
if (dataNodeLocationMap.isEmpty()) {
return;
}
for (int retry = 0; retry < MAX_RETRY_NUM; retry++) {
CountDownLatch countDownLatch = new CountDownLatch(dataNodeLocationMap.size());
for (TDataNodeLocation targetDataNode : dataNodeLocationMap.values()) {
AbstractRetryHandler handler;
switch (requestType) {
case SET_TTL:
handler =
new SetTTLHandler(countDownLatch, requestType, targetDataNode, dataNodeLocationMap);
break;
case CREATE_FUNCTION:
case DROP_FUNCTION:
handler =
new FunctionManagementHandler(
countDownLatch,
requestType,
targetDataNode,
dataNodeLocationMap,
dataNodeResponseStatus);
break;
case FULL_MERGE:
case MERGE:
handler =
new MergeHandler(
countDownLatch,
requestType,
targetDataNode,
dataNodeLocationMap,
dataNodeResponseStatus);
break;
case FLUSH:
handler =
new FlushHandler(
countDownLatch,
requestType,
targetDataNode,
dataNodeLocationMap,
dataNodeResponseStatus);
break;
case CLEAR_CACHE:
handler =
new ClearCacheHandler(
countDownLatch,
requestType,
targetDataNode,
dataNodeLocationMap,
dataNodeResponseStatus);
break;
case LOAD_CONFIGURATION:
handler =
new LoadConfigurationHandler(
countDownLatch,
requestType,
targetDataNode,
dataNodeLocationMap,
dataNodeResponseStatus);
break;
case SET_SYSTEM_STATUS:
handler =
new SetSystemStatusHandler(
countDownLatch,
requestType,
targetDataNode,
dataNodeLocationMap,
dataNodeResponseStatus);
break;
case UPDATE_REGION_ROUTE_MAP:
handler =
new UpdateRegionRouteMapHandler(
countDownLatch, requestType, targetDataNode, dataNodeLocationMap);
break;
case BROADCAST_LATEST_CONFIG_NODE_GROUP:
handler =
new UpdateConfigNodeGroupHandler(
countDownLatch, requestType, targetDataNode, dataNodeLocationMap);
break;
case CONSTRUCT_SCHEMA_BLACK_LIST:
handler =
new ConstructSchemaBlackListHandler(
countDownLatch, targetDataNode, dataNodeLocationMap);
break;
case ROLLBACK_SCHEMA_BLACK_LIST:
handler =
new RollbackSchemaBlackListHandler(
countDownLatch, targetDataNode, dataNodeLocationMap);
break;
case FETCH_SCHEMA_BLACK_LIST:
handler =
new FetchSchemaBlackLsitHandler(
countDownLatch, targetDataNode, dataNodeLocationMap);
break;
case INVALIDATE_MATCHED_SCHEMA_CACHE:
handler =
new InvalidateMatchedSchemaCacheHandler(
countDownLatch, targetDataNode, dataNodeLocationMap);
break;
case DELETE_DATA_FOR_DELETE_TIMESERIES:
handler =
new DeleteDataForDeleteTimeSeriesHandler(
countDownLatch, targetDataNode, dataNodeLocationMap);
break;
case DELETE_TIMESERIES:
handler =
new DeleteTimeSeriesHandler(countDownLatch, targetDataNode, dataNodeLocationMap);
break;
default:
return;
}
sendAsyncRequestToDataNode(targetDataNode, req, handler, retry);
}
try {
countDownLatch.await();
} catch (InterruptedException e) {
LOGGER.error("Interrupted during {} on ConfigNode", requestType);
}
// Check if there is a node that fails to send the request, and retry if there is one
if (dataNodeLocationMap.isEmpty()) {
break;
}
}
}
public void sendAsyncRequestToDataNodeWithRetry(
Object req, AbstractDataNodeTask<?> dataNodeTask) {
Map<Integer, TDataNodeLocation> dataNodeLocationMap = dataNodeTask.getDataNodeLocationMap();
if (dataNodeLocationMap.isEmpty()) {
return;
}
for (int retry = 0; retry < MAX_RETRY_NUM; retry++) {
CountDownLatch countDownLatch = new CountDownLatch(dataNodeLocationMap.size());
for (TDataNodeLocation targetDataNode : dataNodeLocationMap.values()) {
AbstractRetryHandler handler = dataNodeTask.getSingleRequestHandler();
handler.setCountDownLatch(countDownLatch);
handler.setTargetDataNode(targetDataNode);
sendAsyncRequestToDataNode(targetDataNode, req, handler, retry);
}
try {
countDownLatch.await();
} catch (InterruptedException e) {
LOGGER.error("Interrupted during {} on ConfigNode", dataNodeTask.getDataNodeRequestType());
}
// Check if there is a node that fails to send the request, and retry if there is one
if (dataNodeLocationMap.isEmpty()) {
break;
}
}
}
public void sendAsyncRequestToDataNode(
TDataNodeLocation dataNodeLocation,
Object req,
AbstractRetryHandler handler,
int retryCount) {
AsyncDataNodeInternalServiceClient client;
try {
client = clientManager.borrowClient(dataNodeLocation.getInternalEndPoint());
switch (handler.getDataNodeRequestType()) {
case SET_TTL:
client.setTTL((TSetTTLReq) req, (SetTTLHandler) handler);
break;
case CREATE_DATA_REGION:
client.createDataRegion((TCreateDataRegionReq) req, (CreateRegionHandler) handler);
break;
case CREATE_SCHEMA_REGION:
client.createSchemaRegion((TCreateSchemaRegionReq) req, (CreateRegionHandler) handler);
break;
case CREATE_FUNCTION:
client.createFunction((TCreateFunctionRequest) req, (FunctionManagementHandler) handler);
break;
case DROP_FUNCTION:
client.dropFunction((TDropFunctionRequest) req, (FunctionManagementHandler) handler);
break;
case MERGE:
case FULL_MERGE:
client.merge((MergeHandler) handler);
break;
case FLUSH:
client.flush((TFlushReq) req, (FlushHandler) handler);
break;
case CLEAR_CACHE:
client.clearCache((ClearCacheHandler) handler);
break;
case LOAD_CONFIGURATION:
client.loadConfiguration((LoadConfigurationHandler) handler);
break;
case SET_SYSTEM_STATUS:
client.setSystemStatus((String) req, (SetSystemStatusHandler) handler);
break;
case UPDATE_REGION_ROUTE_MAP:
client.updateRegionCache((TRegionRouteReq) req, (UpdateRegionRouteMapHandler) handler);
break;
case BROADCAST_LATEST_CONFIG_NODE_GROUP:
client.updateConfigNodeGroup(
(TUpdateConfigNodeGroupReq) req, (UpdateConfigNodeGroupHandler) handler);
break;
case CONSTRUCT_SCHEMA_BLACK_LIST:
client.constructSchemaBlackList(
(TConstructSchemaBlackListReq) req, (ConstructSchemaBlackListHandler) handler);
break;
case ROLLBACK_SCHEMA_BLACK_LIST:
client.rollbackSchemaBlackList(
(TRollbackSchemaBlackListReq) req, (RollbackSchemaBlackListHandler) handler);
break;
case FETCH_SCHEMA_BLACK_LIST:
client.fetchSchemaBlackList(
(TFetchSchemaBlackListReq) req, (FetchSchemaBlackLsitHandler) handler);
break;
case INVALIDATE_MATCHED_SCHEMA_CACHE:
client.invalidateMatchedSchemaCache(
(TInvalidateMatchedSchemaCacheReq) req,
(InvalidateMatchedSchemaCacheHandler) handler);
break;
case DELETE_DATA_FOR_DELETE_TIMESERIES:
client.deleteDataForDeleteTimeSeries(
(TDeleteDataForDeleteTimeSeriesReq) req,
(DeleteDataForDeleteTimeSeriesHandler) handler);
break;
case DELETE_TIMESERIES:
client.deleteTimeSeries((TDeleteTimeSeriesReq) req, (DeleteTimeSeriesHandler) handler);
break;
default:
LOGGER.error(
"Unexpected DataNode Request Type: {} when sendAsyncRequestToDataNode",
handler.getDataNodeRequestType());
}
} catch (Exception e) {
LOGGER.warn(
"{} failed on ConfigNode {}, because {}, retrying {}...",
handler.getDataNodeRequestType(),
dataNodeLocation.getInternalEndPoint(),
e.getMessage(),
retryCount);
}
}
/**
* Execute CreateRegionGroupsPlan asynchronously
*
* @param ttlMap Map<StorageGroupName, TTL>
* @return Those RegionReplicas that failed to create
*/
public Map<TConsensusGroupId, TRegionReplicaSet> createRegionGroups(
CreateRegionGroupsPlan createRegionGroupsPlan, Map<String, Long> ttlMap) {
// Because different requests will be sent to the same node when createRegions,
// so for CreateRegions use Map<index, TDataNodeLocation>
Map<Integer, TDataNodeLocation> dataNodeLocationMap = new ConcurrentHashMap<>();
int index = 0;
// Count the DataNodes to be sent
for (List<TRegionReplicaSet> regionReplicaSets :
createRegionGroupsPlan.getRegionGroupMap().values()) {
for (TRegionReplicaSet regionReplicaSet : regionReplicaSets) {
for (TDataNodeLocation dataNodeLocation : regionReplicaSet.getDataNodeLocations()) {
dataNodeLocationMap.put(index++, dataNodeLocation);
}
}
}
if (dataNodeLocationMap.isEmpty()) {
return new HashMap<>();
}
for (int retry = 0; retry < MAX_RETRY_NUM; retry++) {
index = 0;
CountDownLatch countDownLatch = new CountDownLatch(dataNodeLocationMap.size());
for (Map.Entry<String, List<TRegionReplicaSet>> entry :
createRegionGroupsPlan.getRegionGroupMap().entrySet()) {
// Enumerate each RegionReplicaSet
for (TRegionReplicaSet regionReplicaSet : entry.getValue()) {
// Enumerate each Region
for (TDataNodeLocation targetDataNode : regionReplicaSet.getDataNodeLocations()) {
if (dataNodeLocationMap.containsKey(index)) {
AbstractRetryHandler handler;
switch (regionReplicaSet.getRegionId().getType()) {
case SchemaRegion:
handler =
new CreateRegionHandler(
countDownLatch,
DataNodeRequestType.CREATE_SCHEMA_REGION,
regionReplicaSet.regionId,
targetDataNode,
dataNodeLocationMap,
index++);
sendAsyncRequestToDataNode(
targetDataNode,
genCreateSchemaRegionReq(entry.getKey(), regionReplicaSet),
handler,
retry);
break;
case DataRegion:
handler =
new CreateRegionHandler(
countDownLatch,
DataNodeRequestType.CREATE_DATA_REGION,
regionReplicaSet.regionId,
targetDataNode,
dataNodeLocationMap,
index++);
sendAsyncRequestToDataNode(
targetDataNode,
genCreateDataRegionReq(
entry.getKey(), regionReplicaSet, ttlMap.get(entry.getKey())),
handler,
retry);
break;
default:
break;
}
} else {
index++;
}
}
}
}
try {
countDownLatch.await();
} catch (InterruptedException e) {
LOGGER.error("Interrupted during createRegions on ConfigNode");
}
// Check if there is a node that fails to send the request, and
// retry if there is one
if (dataNodeLocationMap.isEmpty()) {
break;
}
}
// Filter RegionGroups that weren't created successfully
index = 0;
Map<TConsensusGroupId, TRegionReplicaSet> failedRegions = new HashMap<>();
for (List<TRegionReplicaSet> regionReplicaSets :
createRegionGroupsPlan.getRegionGroupMap().values()) {
for (TRegionReplicaSet regionReplicaSet : regionReplicaSets) {
for (TDataNodeLocation dataNodeLocation : regionReplicaSet.getDataNodeLocations()) {
if (dataNodeLocationMap.containsKey(index)) {
failedRegions
.computeIfAbsent(
regionReplicaSet.getRegionId(),
empty -> new TRegionReplicaSet().setRegionId(regionReplicaSet.getRegionId()))
.addToDataNodeLocations(dataNodeLocation);
}
index += 1;
}
}
}
return failedRegions;
}
private TCreateSchemaRegionReq genCreateSchemaRegionReq(
String storageGroup, TRegionReplicaSet regionReplicaSet) {
TCreateSchemaRegionReq req = new TCreateSchemaRegionReq();
req.setStorageGroup(storageGroup);
req.setRegionReplicaSet(regionReplicaSet);
return req;
}
private TCreateDataRegionReq genCreateDataRegionReq(
String storageGroup, TRegionReplicaSet regionReplicaSet, long TTL) {
TCreateDataRegionReq req = new TCreateDataRegionReq();
req.setStorageGroup(storageGroup);
req.setRegionReplicaSet(regionReplicaSet);
req.setTtl(TTL);
return req;
}
/**
* notify all DataNodes when the capacity of the ConfigNodeGroup is expanded or reduced
*
* @param registeredDataNodeLocationMap Map<Integer, TDataNodeLocation>
* @param registeredConfigNodes List<TConfigNodeLocation>
*/
public void broadCastTheLatestConfigNodeGroup(
Map<Integer, TDataNodeLocation> registeredDataNodeLocationMap,
List<TConfigNodeLocation> registeredConfigNodes) {
if (registeredDataNodeLocationMap != null) {
TUpdateConfigNodeGroupReq updateConfigNodeGroupReq =
new TUpdateConfigNodeGroupReq(registeredConfigNodes);
LOGGER.info("Begin to broadcast the latest configNodeGroup: {}", registeredConfigNodes);
sendAsyncRequestToDataNodeWithRetry(
updateConfigNodeGroupReq,
registeredDataNodeLocationMap,
DataNodeRequestType.BROADCAST_LATEST_CONFIG_NODE_GROUP,
null);
LOGGER.info("Broadcast the latest configNodeGroup finished.");
}
}
/**
* Always call this interface when a DataNode is restarted or removed
*
* @param endPoint The specific DataNode
*/
public void resetClient(TEndPoint endPoint) {
clientManager.clear(endPoint);
}
// TODO: Is the ClientPool must be a singleton?
private static class ClientPoolHolder {
private static final AsyncDataNodeClientPool INSTANCE = new AsyncDataNodeClientPool();
private ClientPoolHolder() {
// Empty constructor
}
}
public static AsyncDataNodeClientPool getInstance() {
return ClientPoolHolder.INSTANCE;
}
}