blob: d8d384f35afa99ce3e9d09ab9504f7df774775a8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
syntax = "proto3";
import "google/protobuf/empty.proto";
import "google/protobuf/wrappers.proto";
option java_package = "org.apache.uniffle.proto";
option java_outer_classname = "RssProtos";
option java_generate_equals_and_hash = true;
package rss.common;
service ShuffleServer {
rpc registerShuffle (ShuffleRegisterRequest) returns (ShuffleRegisterResponse);
rpc unregisterShuffle(ShuffleUnregisterRequest) returns (ShuffleUnregisterResponse);
rpc unregisterShuffleByAppId(ShuffleUnregisterByAppIdRequest) returns (ShuffleUnregisterByAppIdResponse);
rpc sendShuffleData (SendShuffleDataRequest) returns (SendShuffleDataResponse);
rpc getLocalShuffleIndex (GetLocalShuffleIndexRequest) returns (GetLocalShuffleIndexResponse);
rpc getLocalShuffleData (GetLocalShuffleDataRequest) returns (GetLocalShuffleDataResponse);
rpc getMemoryShuffleData (GetMemoryShuffleDataRequest) returns (GetMemoryShuffleDataResponse);
rpc commitShuffleTask (ShuffleCommitRequest) returns (ShuffleCommitResponse);
rpc reportShuffleResult (ReportShuffleResultRequest) returns (ReportShuffleResultResponse);
rpc getShuffleResult (GetShuffleResultRequest) returns (GetShuffleResultResponse);
rpc getShuffleResultForMultiPart (GetShuffleResultForMultiPartRequest) returns (GetShuffleResultForMultiPartResponse);
rpc finishShuffle (FinishShuffleRequest) returns (FinishShuffleResponse);
rpc requireBuffer (RequireBufferRequest) returns (RequireBufferResponse);
rpc appHeartbeat(AppHeartBeatRequest) returns (AppHeartBeatResponse);
}
message FinishShuffleRequest {
string appId = 1;
int32 shuffleId = 2;
}
message FinishShuffleResponse {
StatusCode status = 1;
string retMsg = 2;
}
message RequireBufferRequest {
int32 requireSize = 1;
string appId = 2;
int32 shuffleId = 3;
repeated int32 partitionIds = 4;
}
message RequireBufferResponse {
int64 requireBufferId = 1;
StatusCode status = 2;
string retMsg = 3;
}
message ShuffleDataBlockSegment {
int64 blockId = 1;
int64 offset = 2;
int32 length = 3;
int32 uncompressLength = 4;
int64 crc = 5;
int64 taskAttemptId = 6;
}
message GetLocalShuffleDataRequest {
string appId = 1;
int32 shuffleId = 2;
int32 partitionId = 3;
int32 partitionNumPerRange = 4;
int32 partitionNum = 5;
int64 offset = 6;
int32 length = 7;
int64 timestamp = 8;
}
message GetLocalShuffleDataResponse {
bytes data = 1;
StatusCode status = 2;
string retMsg = 3;
}
message GetMemoryShuffleDataRequest {
string appId = 1;
int32 shuffleId = 2;
int32 partitionId = 3;
int64 lastBlockId = 4;
int32 readBufferSize = 5;
int64 timestamp = 6;
optional bytes serializedExpectedTaskIdsBitmap = 7;
}
message GetMemoryShuffleDataResponse {
repeated ShuffleDataBlockSegment shuffleDataBlockSegments = 1;
bytes data = 2;
StatusCode status = 3;
string retMsg = 4;
}
message GetLocalShuffleIndexRequest {
string appId = 1;
int32 shuffleId = 2;
int32 partitionId = 3;
int32 partitionNumPerRange = 4;
int32 partitionNum = 5;
}
message GetLocalShuffleIndexResponse {
bytes indexData = 1;
StatusCode status = 2;
string retMsg = 3;
int64 dataFileLen = 4;
}
message ReportShuffleResultRequest {
string appId = 1;
int32 shuffleId = 2;
int64 taskAttemptId = 3;
int32 bitmapNum = 4;
repeated PartitionToBlockIds partitionToBlockIds = 5;
}
message PartitionToBlockIds {
int32 partitionId = 1;
repeated int64 blockIds = 2;
}
message ReportShuffleResultResponse {
StatusCode status = 1;
string retMsg = 2;
}
message GetShuffleResultRequest {
string appId = 1;
int32 shuffleId = 2;
int32 partitionId = 3;
BlockIdLayout blockIdLayout = 4;
}
message BlockIdLayout {
int32 sequenceNoBits = 1;
int32 partitionIdBits = 2;
int32 taskAttemptIdBits = 3;
}
message GetShuffleResultResponse {
StatusCode status = 1;
string retMsg = 2;
bytes serializedBitmap = 3;
}
message GetShuffleResultForMultiPartRequest {
string appId = 1;
int32 shuffleId = 2;
repeated int32 partitions = 3;
BlockIdLayout blockIdLayout = 4;
}
message GetShuffleResultForMultiPartResponse {
StatusCode status = 1;
string retMsg = 2;
bytes serializedBitmap = 3;
}
message ShufflePartitionRange {
int32 start = 1;
int32 end = 2;
}
message ShuffleRegisterRequest {
string appId = 1;
int32 shuffleId = 2;
repeated ShufflePartitionRange partitionRanges = 3;
RemoteStorage remoteStorage = 4;
string user = 5;
DataDistribution shuffleDataDistribution = 6;
int32 maxConcurrencyPerPartitionToWrite = 7;
}
enum DataDistribution {
NORMAL = 0;
LOCAL_ORDER = 1;
}
message ShuffleUnregisterRequest {
string appId = 1;
int32 shuffleId = 2;
}
message ShuffleUnregisterResponse {
StatusCode status = 1;
string retMsg = 2;
}
message ShuffleRegisterResponse {
StatusCode status = 1;
string retMsg = 2;
}
message ShuffleUnregisterByAppIdRequest {
string appId = 1;
}
message ShuffleUnregisterByAppIdResponse {
StatusCode status = 1;
string retMsg = 2;
}
message SendShuffleDataRequest {
string appId = 1;
int32 shuffleId = 2;
int64 requireBufferId = 3;
repeated ShuffleData shuffleData = 4;
int64 timestamp = 5;
}
message SendShuffleDataResponse {
StatusCode status = 1;
string retMsg = 2;
}
message ShuffleData {
int32 partitionId = 1;
repeated ShuffleBlock block = 2;
}
message ShuffleBlock {
int64 blockId = 1;
int32 length = 2;
int32 uncompressLength = 3;
int64 crc = 4;
bytes data = 5;
int64 taskAttemptId = 6;
}
message ShuffleCommitRequest {
string appId = 1;
int32 shuffleId = 2;
}
message ShuffleCommitResponse {
int32 commitCount = 1;
StatusCode status = 2;
string retMsg = 3;
}
enum ServerStatus {
ACTIVE = 0;
DECOMMISSIONING = 1;
DECOMMISSIONED = 2;
LOST = 3;
UNHEALTHY = 4;
EXCLUDED = 5;
// todo: more status, such as UPGRADING
}
message ShuffleServerHeartBeatRequest {
ShuffleServerId serverId = 1;
int64 usedMemory = 2;
int64 preAllocatedMemory = 3;
int64 availableMemory = 4;
int32 eventNumInFlush = 5;
repeated string tags = 6;
google.protobuf.BoolValue isHealthy = 7;
optional ServerStatus status = 8;
map<string, StorageInfo> storageInfo = 21; // mount point to storage info mapping.
}
message ShuffleServerHeartBeatResponse {
StatusCode status = 1;
string retMsg = 2;
}
message ShuffleServerId {
string id = 1;
string ip = 2;
int32 port = 3;
int32 netty_port = 4;
}
message ShuffleServerResult {
StatusCode status = 1;
string retMsg = 2;
}
/** Status code to identify the status of response */
enum StatusCode {
SUCCESS = 0;
DOUBLE_REGISTER = 1;
NO_BUFFER = 2;
INVALID_STORAGE = 3;
NO_REGISTER = 4;
NO_PARTITION = 5;
INTERNAL_ERROR = 6;
TIMEOUT = 7;
ACCESS_DENIED = 8;
INVALID_REQUEST = 9;
NO_BUFFER_FOR_HUGE_PARTITION = 10;
// add more status
}
message StorageInfo {
enum StorageMedia {
STORAGE_TYPE_UNKNOWN = 0;
HDD = 1;
SSD = 2;
HDFS = 3;
OBJECT_STORE = 4;
// possible other types, such as cloud-ssd.
}
enum StorageStatus {
STORAGE_STATUS_UNKNOWN = 0;
NORMAL = 1;
UNHEALTHY = 2;
OVERUSED = 3; // indicate current disk/storage is overused.
}
string mountPoint = 1;
StorageMedia storageMedia = 2;
int64 capacity = 3;
int64 usedBytes = 4;
int64 writingSpeed1M = 5; // writing speed of last minute
int64 writingSpeed5M = 6; // writing speed of last 5 minutes
int64 writingSpeed1H = 7; // writing speed of last hour
int64 numOfWritingFailures = 8; // number of writing failures since start up.
StorageStatus status = 9;
}
service CoordinatorServer {
// Get Shuffle Server list
rpc getShuffleServerList(google.protobuf.Empty) returns (GetShuffleServerListResponse);
// Count Shuffle Server number
rpc getShuffleServerNum(google.protobuf.Empty) returns (GetShuffleServerNumResponse);
// Ask for suitable Shuffle Servers with partitions
rpc getShuffleAssignments(GetShuffleServerRequest) returns (GetShuffleAssignmentsResponse);
// Heartbeat between Shuffle Server and Coordinator Server
rpc heartbeat(ShuffleServerHeartBeatRequest) returns (ShuffleServerHeartBeatResponse);
// Get the global configuration of this Rss-cluster, i.e., data storage info
rpc getShuffleDataStorageInfo(google.protobuf.Empty) returns (GetShuffleDataStorageInfoResponse);
rpc checkServiceAvailable(google.protobuf.Empty) returns (CheckServiceAvailableResponse);
// Heartbeat between Shuffle Application and Coordinator Server
rpc appHeartbeat(AppHeartBeatRequest) returns (AppHeartBeatResponse);
// Report a client operation's result to coordinator server
rpc reportClientOperation(ReportShuffleClientOpRequest) returns (ReportShuffleClientOpResponse);
// Report a application info to Coordinator Server
rpc registerApplicationInfo(ApplicationInfoRequest) returns (ApplicationInfoResponse);
// Access to the remote shuffle service cluster
rpc accessCluster(AccessClusterRequest) returns (AccessClusterResponse);
// Get basic client conf from coordinator
rpc fetchClientConf(google.protobuf.Empty) returns (FetchClientConfResponse);
// Get remote storage from coordinator
rpc fetchRemoteStorage(FetchRemoteStorageRequest) returns (FetchRemoteStorageResponse);
}
message AppHeartBeatRequest {
string appId = 1;
}
message AppHeartBeatResponse {
StatusCode status = 1;
string retMsg = 2;
}
message ApplicationInfoRequest {
string appId = 1;
string user = 2;
}
message ApplicationInfoResponse {
StatusCode status = 1;
string retMsg = 2;
}
message GetShuffleServerListResponse {
repeated ShuffleServerId servers = 1;
}
message GetShuffleServerNumResponse {
int32 num = 1;
}
message GetShuffleServerRequest {
string clientHost = 1;
string clientPort = 2;
string clientProperty = 3;
string applicationId = 4;
int32 shuffleId = 5;
int32 partitionNum = 6;
int32 partitionNumPerRange = 7;
int32 dataReplica = 8;
repeated string requireTags = 9;
int32 assignmentShuffleServerNumber = 10;
int32 estimateTaskConcurrency = 11;
repeated string faultyServerIds = 12;
}
message PartitionRangeAssignment {
int32 startPartition = 1;
int32 endPartition = 2;
// replica
repeated ShuffleServerId server = 3;
}
message GetShuffleAssignmentsResponse {
StatusCode status = 1;
repeated PartitionRangeAssignment assignments = 2;
string retMsg = 3;
}
message ReportShuffleClientOpRequest {
string clientHost = 1;
int32 clientPort = 2;
ShuffleServerId server = 3;
string operation = 4;
}
message ReportShuffleClientOpResponse {
StatusCode status = 1;
string retMsg = 2;
}
message GetShuffleDataStorageInfoResponse {
string storage = 1;
string storagePath = 2;
string storagePattern = 3;
}
message CheckServiceAvailableResponse {
StatusCode status = 1;
bool available = 2;
}
message AccessClusterRequest {
string accessId = 1;
repeated string tags = 2;
map<string, string> extraProperties = 3;
string user=4;
}
message AccessClusterResponse {
StatusCode status = 1;
string retMsg = 2;
string uuid = 3;
}
message FetchClientConfResponse {
StatusCode status = 1;
string retMsg = 2;
repeated ClientConfItem clientConf = 3;
}
message ClientConfItem {
string key = 1;
string value = 2;
}
message FetchRemoteStorageRequest {
string appId = 1;
}
message RemoteStorageConfItem {
string key = 1;
string value = 2;
}
message RemoteStorage {
string path = 1;
repeated RemoteStorageConfItem remoteStorageConf = 2;
}
message FetchRemoteStorageResponse {
StatusCode status = 1;
RemoteStorage remoteStorage = 2;
}
service ShuffleServerInternal {
rpc decommission(DecommissionRequest) returns (DecommissionResponse);
rpc cancelDecommission(CancelDecommissionRequest) returns (CancelDecommissionResponse);
}
message DecommissionRequest {
}
message DecommissionResponse {
StatusCode status = 1;
string retMsg = 2;
}
message CancelDecommissionRequest {
}
message CancelDecommissionResponse {
StatusCode status = 1;
string retMsg = 2;
}
// ShuffleManager service lives inside of compute-engine's application master, which handles rss shuffle specific logic
// per application.
service ShuffleManager {
rpc reportShuffleFetchFailure (ReportShuffleFetchFailureRequest) returns (ReportShuffleFetchFailureResponse);
// Gets the mapping between partitions and ShuffleServer from the ShuffleManager server
rpc getPartitionToShufflerServer(PartitionToShuffleServerRequest) returns (PartitionToShuffleServerResponse);
// Report write failures to ShuffleManager
rpc reportShuffleWriteFailure (ReportShuffleWriteFailureRequest) returns (ReportShuffleWriteFailureResponse);
// Reassign the RPC interface of the ShuffleServer list
rpc reassignShuffleServers(ReassignServersRequest) returns (ReassignServersReponse);
// Reassign on block send failure that occurs in writer
rpc reassignOnBlockSendFailure(RssReassignOnBlockSendFailureRequest) returns (RssReassignOnBlockSendFailureResponse);
rpc reportShuffleResult (ReportShuffleResultRequest) returns (ReportShuffleResultResponse);
rpc getShuffleResult (GetShuffleResultRequest) returns (GetShuffleResultResponse);
rpc getShuffleResultForMultiPart (GetShuffleResultForMultiPartRequest) returns (GetShuffleResultForMultiPartResponse);
}
message ReportShuffleFetchFailureRequest {
// appId normally should be omitted, it's used to avoid wrongly request issued from remaining executors of another
// app which accidentally has the same shuffle manager port with this app.
string appId = 1;
int32 shuffleId = 2;
int32 stageAttemptId = 3;
int32 partitionId = 4;
string exception = 5;
// todo: report ShuffleServerId if needed
// ShuffleServerId serverId = 6;
}
message ReportShuffleFetchFailureResponse {
StatusCode status = 1;
bool reSubmitWholeStage = 2;
string msg = 3;
}
message PartitionToShuffleServerRequest {
int32 shuffleId = 2;
}
message PartitionToShuffleServerResponse {
StatusCode status = 1;
string msg = 2;
MutableShuffleHandleInfo shuffleHandleInfo = 3;
}
message MutableShuffleHandleInfo {
int32 shuffleId = 1;
map<int32, PartitionReplicaServers> partitionToServers = 2;
RemoteStorageInfo remoteStorageInfo = 3;
}
message PartitionReplicaServers {
map<int32, ReplicaServersItem> replicaServers = 1;
}
message ReplicaServersItem {
repeated ShuffleServerId serverId = 1;
}
message RemoteStorageInfo{
string path = 1;
map<string, string> confItems = 2;
}
message ReportShuffleWriteFailureRequest {
string appId = 1;
int32 shuffleId = 2;
int32 stageAttemptNumber = 3;
repeated ShuffleServerId shuffleServerIds= 5;
string exception = 6;
}
message ReportShuffleWriteFailureResponse {
StatusCode status = 1;
bool reSubmitWholeStage = 2;
string msg = 3;
}
message ReassignServersRequest{
int32 stageId = 1;
int32 stageAttemptNumber = 2;
int32 shuffleId = 3;
int32 numPartitions = 4;
}
message ReassignServersReponse{
StatusCode status = 1;
bool needReassign = 2;
string msg = 3;
}
message RssReassignOnBlockSendFailureRequest{
int32 shuffleId = 1;
map<int32, ReceivingFailureServers> failurePartitionToServerIds = 2;
}
message ReceivingFailureServers {
repeated ReceivingFailureServer server = 1;
}
message ReceivingFailureServer {
string serverId = 1;
StatusCode statusCode = 2;
}
message RssReassignOnBlockSendFailureResponse {
StatusCode status = 1;
string msg = 2;
MutableShuffleHandleInfo handle = 3;
}