blob: 0eed01ea879497cff723a416d6c8d8d2cb40b506 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.uniffle.client.factory;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.apache.uniffle.client.api.ShuffleReadClient;
import org.apache.uniffle.client.api.ShuffleWriteClient;
import org.apache.uniffle.client.impl.ShuffleReadClientImpl;
import org.apache.uniffle.client.impl.ShuffleWriteClientImpl;
import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.ShuffleDataDistributionType;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.util.IdHelper;
public class ShuffleClientFactory {
private static final ShuffleClientFactory INSTANCE = new ShuffleClientFactory();
public static ShuffleClientFactory getInstance() {
return INSTANCE;
}
public ShuffleWriteClient createShuffleWriteClient(WriteClientBuilder builder) {
if (builder.isReplicaSkipEnabled() && builder.getReplica() > builder.getReplicaWrite()) {
builder.retryMax(builder.getRetryMax() / 2);
}
return builder.build();
}
public ShuffleReadClient createShuffleReadClient(ReadClientBuilder builder) {
return builder.build();
}
public static class WriteClientBuilder<T extends WriteClientBuilder> {
private String clientType;
private int retryMax;
private long retryIntervalMax;
private int heartBeatThreadNum;
private int replica;
private int replicaWrite;
private int replicaRead;
private boolean replicaSkipEnabled;
private int dataTransferPoolSize;
private int dataCommitPoolSize;
private int unregisterThreadPoolSize;
private int unregisterRequestTimeSec;
private RssConf rssConf;
public String getClientType() {
return clientType;
}
public int getRetryMax() {
return retryMax;
}
public long getRetryIntervalMax() {
return retryIntervalMax;
}
public int getHeartBeatThreadNum() {
return heartBeatThreadNum;
}
public int getReplica() {
return replica;
}
public int getReplicaWrite() {
return replicaWrite;
}
public int getReplicaRead() {
return replicaRead;
}
public boolean isReplicaSkipEnabled() {
return replicaSkipEnabled;
}
public int getDataTransferPoolSize() {
return dataTransferPoolSize;
}
public int getDataCommitPoolSize() {
return dataCommitPoolSize;
}
public int getUnregisterThreadPoolSize() {
return unregisterThreadPoolSize;
}
public int getUnregisterRequestTimeSec() {
return unregisterRequestTimeSec;
}
public RssConf getRssConf() {
return rssConf;
}
protected T self() {
return (T) this;
}
public T clientType(String clientType) {
this.clientType = clientType;
return self();
}
public T retryMax(int retryMax) {
this.retryMax = retryMax;
return self();
}
public T retryIntervalMax(long retryIntervalMax) {
this.retryIntervalMax = retryIntervalMax;
return self();
}
public T heartBeatThreadNum(int heartBeatThreadNum) {
this.heartBeatThreadNum = heartBeatThreadNum;
return self();
}
public T replica(int replica) {
this.replica = replica;
return self();
}
public T replicaWrite(int replicaWrite) {
this.replicaWrite = replicaWrite;
return self();
}
public T replicaRead(int replicaRead) {
this.replicaRead = replicaRead;
return self();
}
public T replicaSkipEnabled(boolean replicaSkipEnabled) {
this.replicaSkipEnabled = replicaSkipEnabled;
return self();
}
public T dataTransferPoolSize(int dataTransferPoolSize) {
this.dataTransferPoolSize = dataTransferPoolSize;
return self();
}
public T dataCommitPoolSize(int dataCommitPoolSize) {
this.dataCommitPoolSize = dataCommitPoolSize;
return self();
}
public T unregisterThreadPoolSize(int unregisterThreadPoolSize) {
this.unregisterThreadPoolSize = unregisterThreadPoolSize;
return self();
}
public T unregisterRequestTimeSec(int unregisterRequestTimeSec) {
this.unregisterRequestTimeSec = unregisterRequestTimeSec;
return self();
}
public T rssConf(RssConf rssConf) {
this.rssConf = rssConf;
return self();
}
public ShuffleWriteClientImpl build() {
return new ShuffleWriteClientImpl(this);
}
}
public static class ReadClientBuilder {
private String appId;
private int shuffleId;
private int partitionId;
private String basePath;
private int partitionNumPerRange;
private int partitionNum;
private Roaring64NavigableMap blockIdBitmap;
private Roaring64NavigableMap taskIdBitmap;
private List<ShuffleServerInfo> shuffleServerInfoList;
private Configuration hadoopConf;
private IdHelper idHelper;
private ShuffleDataDistributionType shuffleDataDistributionType;
private boolean expectedTaskIdsBitmapFilterEnable;
private RssConf rssConf;
private boolean offHeapEnable;
private String storageType;
private int indexReadLimit;
private long readBufferSize;
private ClientType clientType;
private int retryMax;
private long retryIntervalMax;
public ReadClientBuilder appId(String appId) {
this.appId = appId;
return this;
}
public ReadClientBuilder shuffleId(int shuffleId) {
this.shuffleId = shuffleId;
return this;
}
public ReadClientBuilder partitionId(int partitionId) {
this.partitionId = partitionId;
return this;
}
public ReadClientBuilder basePath(String basePath) {
this.basePath = basePath;
return this;
}
public ReadClientBuilder partitionNumPerRange(int partitionNumPerRange) {
this.partitionNumPerRange = partitionNumPerRange;
return this;
}
public ReadClientBuilder partitionNum(int partitionNum) {
this.partitionNum = partitionNum;
return this;
}
public ReadClientBuilder blockIdBitmap(Roaring64NavigableMap blockIdBitmap) {
this.blockIdBitmap = blockIdBitmap;
return this;
}
public ReadClientBuilder taskIdBitmap(Roaring64NavigableMap taskIdBitmap) {
this.taskIdBitmap = taskIdBitmap;
return this;
}
public ReadClientBuilder shuffleServerInfoList(List<ShuffleServerInfo> shuffleServerInfoList) {
this.shuffleServerInfoList = shuffleServerInfoList;
return this;
}
public ReadClientBuilder hadoopConf(Configuration hadoopConf) {
this.hadoopConf = hadoopConf;
return this;
}
public ReadClientBuilder idHelper(IdHelper idHelper) {
this.idHelper = idHelper;
return this;
}
public ReadClientBuilder shuffleDataDistributionType(
ShuffleDataDistributionType shuffleDataDistributionType) {
this.shuffleDataDistributionType = shuffleDataDistributionType;
return this;
}
public ReadClientBuilder expectedTaskIdsBitmapFilterEnable(
boolean expectedTaskIdsBitmapFilterEnable) {
this.expectedTaskIdsBitmapFilterEnable = expectedTaskIdsBitmapFilterEnable;
return this;
}
public ReadClientBuilder rssConf(RssConf rssConf) {
this.rssConf = rssConf;
return this;
}
public ReadClientBuilder offHeapEnable(boolean offHeapEnable) {
this.offHeapEnable = offHeapEnable;
return this;
}
public ReadClientBuilder storageType(String storageType) {
this.storageType = storageType;
return this;
}
public ReadClientBuilder indexReadLimit(int indexReadLimit) {
this.indexReadLimit = indexReadLimit;
return this;
}
public ReadClientBuilder readBufferSize(long readBufferSize) {
this.readBufferSize = readBufferSize;
return this;
}
public ReadClientBuilder clientType(ClientType clientType) {
this.clientType = clientType;
return this;
}
public ReadClientBuilder retryMax(int retryMax) {
this.retryMax = retryMax;
return this;
}
public ReadClientBuilder retryIntervalMax(long retryIntervalMax) {
this.retryIntervalMax = retryIntervalMax;
return this;
}
public ReadClientBuilder() {}
public String getAppId() {
return appId;
}
public int getShuffleId() {
return shuffleId;
}
public int getPartitionId() {
return partitionId;
}
public int getPartitionNumPerRange() {
return partitionNumPerRange;
}
public int getPartitionNum() {
return partitionNum;
}
public String getBasePath() {
return basePath;
}
public Roaring64NavigableMap getBlockIdBitmap() {
return blockIdBitmap;
}
public Roaring64NavigableMap getTaskIdBitmap() {
return taskIdBitmap;
}
public List<ShuffleServerInfo> getShuffleServerInfoList() {
return shuffleServerInfoList;
}
public Configuration getHadoopConf() {
return hadoopConf;
}
public IdHelper getIdHelper() {
return idHelper;
}
public ShuffleDataDistributionType getShuffleDataDistributionType() {
return shuffleDataDistributionType;
}
public boolean isExpectedTaskIdsBitmapFilterEnable() {
return expectedTaskIdsBitmapFilterEnable;
}
public RssConf getRssConf() {
return rssConf;
}
public boolean isOffHeapEnable() {
return offHeapEnable;
}
public String getStorageType() {
return storageType;
}
public int getIndexReadLimit() {
return indexReadLimit;
}
public long getReadBufferSize() {
return readBufferSize;
}
public ClientType getClientType() {
return clientType;
}
public int getRetryMax() {
return retryMax;
}
public long getRetryIntervalMax() {
return retryIntervalMax;
}
public ShuffleReadClientImpl build() {
return new ShuffleReadClientImpl(this);
}
}
public static WriteClientBuilder newWriteBuilder() {
return new WriteClientBuilder();
}
public static ReadClientBuilder newReadBuilder() {
return new ReadClientBuilder();
}
}