blob: 2b5ea8f7a92a50f4b240781e584354c9875364af [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.uniffle.storage.handler.impl;
import com.google.common.annotations.VisibleForTesting;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.uniffle.client.api.ShuffleServerClient;
import org.apache.uniffle.client.request.RssGetShuffleDataRequest;
import org.apache.uniffle.client.request.RssGetShuffleIndexRequest;
import org.apache.uniffle.client.response.RssGetShuffleDataResponse;
import org.apache.uniffle.common.ShuffleDataDistributionType;
import org.apache.uniffle.common.ShuffleDataResult;
import org.apache.uniffle.common.ShuffleDataSegment;
import org.apache.uniffle.common.ShuffleIndexResult;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.exception.RssFetchFailedException;
public class LocalFileClientReadHandler extends DataSkippableReadHandler {
private static final Logger LOG = LoggerFactory.getLogger(LocalFileClientReadHandler.class);
private final int partitionNumPerRange;
private final int partitionNum;
private ShuffleServerClient shuffleServerClient;
private int retryMax;
private long retryIntervalMax;
public LocalFileClientReadHandler(
String appId,
int shuffleId,
int partitionId,
int indexReadLimit,
int partitionNumPerRange,
int partitionNum,
int readBufferSize,
Roaring64NavigableMap expectBlockIds,
Roaring64NavigableMap processBlockIds,
ShuffleServerClient shuffleServerClient,
ShuffleDataDistributionType distributionType,
Roaring64NavigableMap expectTaskIds,
int retryMax,
long retryIntervalMax) {
super(
appId,
shuffleId,
partitionId,
readBufferSize,
expectBlockIds,
processBlockIds,
distributionType,
expectTaskIds);
this.shuffleServerClient = shuffleServerClient;
this.partitionNumPerRange = partitionNumPerRange;
this.partitionNum = partitionNum;
this.retryMax = retryMax;
this.retryIntervalMax = retryIntervalMax;
}
@VisibleForTesting
public LocalFileClientReadHandler(
String appId,
int shuffleId,
int partitionId,
int indexReadLimit,
int partitionNumPerRange,
int partitionNum,
int readBufferSize,
Roaring64NavigableMap expectBlockIds,
Roaring64NavigableMap processBlockIds,
ShuffleServerClient shuffleServerClient) {
this(
appId,
shuffleId,
partitionId,
indexReadLimit,
partitionNumPerRange,
partitionNum,
readBufferSize,
expectBlockIds,
processBlockIds,
shuffleServerClient,
ShuffleDataDistributionType.NORMAL,
Roaring64NavigableMap.bitmapOf(),
1,
0);
}
@Override
public ShuffleIndexResult readShuffleIndex() {
ShuffleIndexResult shuffleIndexResult = null;
RssGetShuffleIndexRequest request =
new RssGetShuffleIndexRequest(
appId,
shuffleId,
partitionId,
partitionNumPerRange,
partitionNum,
retryMax,
retryIntervalMax);
try {
shuffleIndexResult = shuffleServerClient.getShuffleIndex(request).getShuffleIndexResult();
} catch (RssFetchFailedException e) {
throw e;
} catch (Exception e) {
throw new RssFetchFailedException(
"Failed to read shuffle index for appId["
+ appId
+ "], shuffleId["
+ shuffleId
+ "], partitionId["
+ partitionId
+ "]",
e);
}
return shuffleIndexResult;
}
@Override
public ShuffleDataResult readShuffleData(ShuffleDataSegment shuffleDataSegment) {
ShuffleDataResult result = null;
int expectedLength = shuffleDataSegment.getLength();
if (expectedLength <= 0) {
throw new RssException(
"Failed to read shuffle data for appId["
+ appId
+ "], shuffleId["
+ shuffleId
+ "], partitionId["
+ partitionId
+ "], "
+ "the length field in the index segment is "
+ expectedLength
+ " <= 0!");
}
RssGetShuffleDataRequest request =
new RssGetShuffleDataRequest(
appId,
shuffleId,
partitionId,
partitionNumPerRange,
partitionNum,
shuffleDataSegment.getOffset(),
expectedLength,
retryMax,
retryIntervalMax);
try {
RssGetShuffleDataResponse response = shuffleServerClient.getShuffleData(request);
result =
new ShuffleDataResult(response.getShuffleData(), shuffleDataSegment.getBufferSegments());
} catch (Exception e) {
throw new RssException(
"Failed to read shuffle data with " + shuffleServerClient.getClientInfo(), e);
}
if (result.getDataBuffer().remaining() != expectedLength) {
throw new RssException(
"Wrong data length expect "
+ result.getDataBuffer().remaining()
+ " but actual is "
+ expectedLength);
}
return result;
}
}