blob: 13a2412414b0658bf67c810791f2470afb0a1112 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.uniffle.common.netty.protocol;
import java.io.IOException;
import io.netty.buffer.ByteBuf;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.apache.uniffle.common.netty.DecodeException;
import org.apache.uniffle.common.netty.EncodeException;
import org.apache.uniffle.common.util.ByteBufUtils;
import org.apache.uniffle.common.util.RssUtils;
public class GetMemoryShuffleDataRequest extends RequestMessage {
private String appId;
private int shuffleId;
private int partitionId;
private long lastBlockId;
private int readBufferSize;
private long timestamp;
private Roaring64NavigableMap expectedTaskIdsBitmap;
public GetMemoryShuffleDataRequest(
long requestId,
String appId,
int shuffleId,
int partitionId,
long lastBlockId,
int readBufferSize,
long timestamp,
Roaring64NavigableMap expectedTaskIdsBitmap) {
super(requestId);
this.appId = appId;
this.shuffleId = shuffleId;
this.partitionId = partitionId;
this.lastBlockId = lastBlockId;
this.readBufferSize = readBufferSize;
this.timestamp = timestamp;
this.expectedTaskIdsBitmap = expectedTaskIdsBitmap;
}
@Override
public Type type() {
return Type.GET_MEMORY_SHUFFLE_DATA_REQUEST;
}
@Override
public int encodedLength() {
return (int)
(REQUEST_ID_ENCODE_LENGTH
+ ByteBufUtils.encodedLength(appId)
+ 4 * Integer.BYTES
+ 2 * Long.BYTES
+ (expectedTaskIdsBitmap == null ? 0L : expectedTaskIdsBitmap.serializedSizeInBytes()));
}
@Override
public void encode(ByteBuf buf) throws EncodeException {
buf.writeLong(getRequestId());
ByteBufUtils.writeLengthAndString(buf, appId);
buf.writeInt(shuffleId);
buf.writeInt(partitionId);
buf.writeLong(lastBlockId);
buf.writeInt(readBufferSize);
buf.writeLong(timestamp);
try {
if (expectedTaskIdsBitmap != null) {
buf.writeInt((int) expectedTaskIdsBitmap.serializedSizeInBytes());
buf.writeBytes(RssUtils.serializeBitMap(expectedTaskIdsBitmap));
} else {
buf.writeInt(-1);
}
} catch (IOException ioException) {
throw new EncodeException(
"serializeBitMap failed while encode GetMemoryShuffleDataRequest!", ioException);
}
}
public static GetMemoryShuffleDataRequest decode(ByteBuf byteBuf) throws DecodeException {
long requestId = byteBuf.readLong();
String appId = ByteBufUtils.readLengthAndString(byteBuf);
int shuffleId = byteBuf.readInt();
int partitionId = byteBuf.readInt();
long lastBlockId = byteBuf.readLong();
int readBufferSize = byteBuf.readInt();
long timestamp = byteBuf.readLong();
byte[] bytes = ByteBufUtils.readByteArray(byteBuf);
Roaring64NavigableMap expectedTaskIdsBitmap = null;
try {
if (bytes != null) {
expectedTaskIdsBitmap = RssUtils.deserializeBitMap(bytes);
}
} catch (IOException ioException) {
throw new DecodeException(
"serializeBitMap failed while decode GetMemoryShuffleDataRequest!", ioException);
}
return new GetMemoryShuffleDataRequest(
requestId,
appId,
shuffleId,
partitionId,
lastBlockId,
readBufferSize,
timestamp,
expectedTaskIdsBitmap);
}
public String getAppId() {
return appId;
}
public int getShuffleId() {
return shuffleId;
}
public int getPartitionId() {
return partitionId;
}
public long getLastBlockId() {
return lastBlockId;
}
public int getReadBufferSize() {
return readBufferSize;
}
public long getTimestamp() {
return timestamp;
}
public Roaring64NavigableMap getExpectedTaskIdsBitmap() {
return expectedTaskIdsBitmap;
}
@Override
public String getOperationType() {
return "getMemoryShuffleData";
}
}