blob: eac9584e233586b1f03d916aaea21af899f5ba97 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.uniffle.storage.factory;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Supplier;
import com.google.common.collect.Lists;
import org.apache.commons.collections.CollectionUtils;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.apache.uniffle.client.api.ShuffleServerClient;
import org.apache.uniffle.client.factory.ShuffleServerClientFactory;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.util.RssUtils;
import org.apache.uniffle.storage.handler.api.ClientReadHandler;
import org.apache.uniffle.storage.handler.api.ShuffleDeleteHandler;
import org.apache.uniffle.storage.handler.impl.ComposedClientReadHandler;
import org.apache.uniffle.storage.handler.impl.HadoopClientReadHandler;
import org.apache.uniffle.storage.handler.impl.HadoopShuffleDeleteHandler;
import org.apache.uniffle.storage.handler.impl.LocalFileClientReadHandler;
import org.apache.uniffle.storage.handler.impl.LocalFileDeleteHandler;
import org.apache.uniffle.storage.handler.impl.MemoryClientReadHandler;
import org.apache.uniffle.storage.handler.impl.MultiReplicaClientReadHandler;
import org.apache.uniffle.storage.request.CreateShuffleDeleteHandlerRequest;
import org.apache.uniffle.storage.request.CreateShuffleReadHandlerRequest;
import org.apache.uniffle.storage.util.StorageType;
public class ShuffleHandlerFactory {
private static ShuffleHandlerFactory INSTANCE;
private ShuffleHandlerFactory() {}
public static synchronized ShuffleHandlerFactory getInstance() {
if (INSTANCE == null) {
INSTANCE = new ShuffleHandlerFactory();
}
return INSTANCE;
}
public ClientReadHandler createShuffleReadHandler(CreateShuffleReadHandlerRequest request) {
if (CollectionUtils.isEmpty(request.getShuffleServerInfoList())) {
throw new RssException("Shuffle servers should not be empty!");
}
if (request.getShuffleServerInfoList().size() > 1) {
List<ClientReadHandler> handlers = Lists.newArrayList();
request
.getShuffleServerInfoList()
.forEach(
(ssi) -> {
handlers.add(
ShuffleHandlerFactory.getInstance()
.createSingleReplicaClientReadHandler(request, ssi));
});
return new MultiReplicaClientReadHandler(
handlers,
request.getShuffleServerInfoList(),
request.getExpectBlockIds(),
request.getProcessBlockIds());
} else {
ShuffleServerInfo serverInfo = request.getShuffleServerInfoList().get(0);
return createSingleReplicaClientReadHandler(request, serverInfo);
}
}
public ClientReadHandler createSingleReplicaClientReadHandler(
CreateShuffleReadHandlerRequest request, ShuffleServerInfo serverInfo) {
String storageType = request.getStorageType();
StorageType type = StorageType.valueOf(storageType);
if (StorageType.MEMORY == type) {
throw new UnsupportedOperationException(
"Doesn't support storage type for client read :" + storageType);
}
if (StorageType.HDFS == type) {
return getHadoopClientReadHandler(request, serverInfo);
}
if (StorageType.LOCALFILE == type) {
return getLocalfileClientReaderHandler(request, serverInfo);
}
List<Supplier<ClientReadHandler>> handlers = new ArrayList<>();
if (StorageType.withMemory(type)) {
handlers.add(() -> getMemoryClientReadHandler(request, serverInfo));
}
if (StorageType.withLocalfile(type)) {
handlers.add(() -> getLocalfileClientReaderHandler(request, serverInfo));
}
if (StorageType.withHadoop(type)) {
handlers.add(() -> getHadoopClientReadHandler(request, serverInfo));
}
if (handlers.isEmpty()) {
throw new RssException(
"This should not happen due to the unknown storage type: " + storageType);
}
return new ComposedClientReadHandler(serverInfo, handlers);
}
private ClientReadHandler getMemoryClientReadHandler(
CreateShuffleReadHandlerRequest request, ShuffleServerInfo ssi) {
ShuffleServerClient shuffleServerClient =
ShuffleServerClientFactory.getInstance()
.getShuffleServerClient(request.getClientType().name(), ssi, request.getClientConf());
Roaring64NavigableMap expectTaskIds = null;
if (request.isExpectedTaskIdsBitmapFilterEnable()) {
Roaring64NavigableMap realExceptBlockIds = RssUtils.cloneBitMap(request.getExpectBlockIds());
realExceptBlockIds.xor(request.getProcessBlockIds());
expectTaskIds = RssUtils.generateTaskIdBitMap(realExceptBlockIds, request.getIdHelper());
}
ClientReadHandler memoryClientReadHandler =
new MemoryClientReadHandler(
request.getAppId(),
request.getShuffleId(),
request.getPartitionId(),
request.getReadBufferSize(),
shuffleServerClient,
expectTaskIds,
request.getRetryMax(),
request.getRetryIntervalMax());
return memoryClientReadHandler;
}
private ClientReadHandler getLocalfileClientReaderHandler(
CreateShuffleReadHandlerRequest request, ShuffleServerInfo ssi) {
ShuffleServerClient shuffleServerClient =
ShuffleServerClientFactory.getInstance()
.getShuffleServerClient(request.getClientType().name(), ssi, request.getClientConf());
return new LocalFileClientReadHandler(
request.getAppId(),
request.getShuffleId(),
request.getPartitionId(),
request.getIndexReadLimit(),
request.getPartitionNumPerRange(),
request.getPartitionNum(),
request.getReadBufferSize(),
request.getExpectBlockIds(),
request.getProcessBlockIds(),
shuffleServerClient,
request.getDistributionType(),
request.getExpectTaskIds(),
request.getRetryMax(),
request.getRetryIntervalMax());
}
private ClientReadHandler getHadoopClientReadHandler(
CreateShuffleReadHandlerRequest request, ShuffleServerInfo ssi) {
return new HadoopClientReadHandler(
request.getAppId(),
request.getShuffleId(),
request.getPartitionId(),
request.getIndexReadLimit(),
request.getPartitionNumPerRange(),
request.getPartitionNum(),
request.getReadBufferSize(),
request.getExpectBlockIds(),
request.getProcessBlockIds(),
request.getStorageBasePath(),
request.getHadoopConf(),
request.getDistributionType(),
request.getExpectTaskIds(),
ssi.getId(),
request.isOffHeapEnabled());
}
public ShuffleDeleteHandler createShuffleDeleteHandler(
CreateShuffleDeleteHandlerRequest request) {
if (StorageType.HDFS.name().equals(request.getStorageType())) {
return new HadoopShuffleDeleteHandler(request.getConf(), request.getShuffleServerId());
} else if (StorageType.LOCALFILE.name().equals(request.getStorageType())) {
return new LocalFileDeleteHandler();
} else {
throw new UnsupportedOperationException(
"Doesn't support storage type for shuffle delete handler:" + request.getStorageType());
}
}
}